Update for g_type_class_add_private() deprecation in recent GLib
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 struct _GstRTSPStreamPrivate
63 {
64   GMutex lock;
65   guint idx;
66   /* Only one pad is ever set */
67   GstPad *srcpad, *sinkpad;
68   GstElement *payloader;
69   guint buffer_size;
70   GstBin *joined_bin;
71
72   /* TRUE if this stream is running on
73    * the client side of an RTSP link (for RECORD) */
74   gboolean client_side;
75   gchar *control;
76
77   /* TRUE if stream is complete. This means that the receiver and the sender
78    * parts are present in the stream. */
79   gboolean is_complete;
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102   GSocket *socket_v4[2];
103   GSocket *socket_v6[2];
104
105   /* for UDP multicast */
106   GstElement *mcast_udpsrc_v4[2];
107   GstElement *mcast_udpsrc_v6[2];
108   GstElement *mcast_udpqueue[2];
109   GstElement *mcast_udpsink[2];
110   GSocket *mcast_socket_v4[2];
111   GSocket *mcast_socket_v6[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   GstElement *rtxreceive;
125   guint rtx_pt;
126   GstClockTime rtx_time;
127
128   /* Forward Error Correction with RFC 5109 */
129   GstElement *ulpfec_decoder;
130   GstElement *ulpfec_encoder;
131   guint ulpfec_pt;
132   gboolean ulpfec_enabled;
133   guint ulpfec_percentage;
134
135   /* pool used to manage unicast and multicast addresses */
136   GstRTSPAddressPool *pool;
137
138   /* unicast server addr/port */
139   GstRTSPAddress *server_addr_v4;
140   GstRTSPAddress *server_addr_v6;
141
142   /* multicast addresses */
143   GstRTSPAddress *mcast_addr_v4;
144   GstRTSPAddress *mcast_addr_v6;
145
146   gchar *multicast_iface;
147
148   /* the caps of the stream */
149   gulong caps_sig;
150   GstCaps *caps;
151
152   /* transports we stream to */
153   guint n_active;
154   GList *transports;
155   guint transports_cookie;
156   GList *tr_cache_rtp;
157   GList *tr_cache_rtcp;
158   guint tr_cache_cookie_rtp;
159   guint tr_cache_cookie_rtcp;
160
161   gint dscp_qos;
162
163   /* stream blocking */
164   gulong blocked_id[2];
165   gboolean blocking;
166
167   /* current stream postion */
168   GstClockTime position;
169
170   /* pt->caps map for RECORD streams */
171   GHashTable *ptmap;
172
173   GstRTSPPublishClockMode publish_clock_mode;
174 };
175
176 #define DEFAULT_CONTROL         NULL
177 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
178 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
179                                         GST_RTSP_LOWER_TRANS_TCP
180
181 enum
182 {
183   PROP_0,
184   PROP_CONTROL,
185   PROP_PROFILES,
186   PROP_PROTOCOLS,
187   PROP_LAST
188 };
189
190 enum
191 {
192   SIGNAL_NEW_RTP_ENCODER,
193   SIGNAL_NEW_RTCP_ENCODER,
194   SIGNAL_LAST
195 };
196
197 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
198 #define GST_CAT_DEFAULT rtsp_stream_debug
199
200 static GQuark ssrc_stream_map_key;
201
202 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
203     GValue * value, GParamSpec * pspec);
204 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
205     const GValue * value, GParamSpec * pspec);
206
207 static void gst_rtsp_stream_finalize (GObject * obj);
208
209 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
210
211 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
212
213 static void
214 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
215 {
216   GObjectClass *gobject_class;
217
218   gobject_class = G_OBJECT_CLASS (klass);
219
220   gobject_class->get_property = gst_rtsp_stream_get_property;
221   gobject_class->set_property = gst_rtsp_stream_set_property;
222   gobject_class->finalize = gst_rtsp_stream_finalize;
223
224   g_object_class_install_property (gobject_class, PROP_CONTROL,
225       g_param_spec_string ("control", "Control",
226           "The control string for this stream", DEFAULT_CONTROL,
227           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228
229   g_object_class_install_property (gobject_class, PROP_PROFILES,
230       g_param_spec_flags ("profiles", "Profiles",
231           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
232           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
233
234   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
235       g_param_spec_flags ("protocols", "Protocols",
236           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
237           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
238
239   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
240       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
241       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
242       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
243
244   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
245       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
246       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
247       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
248
249   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
250
251   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
252 }
253
254 static void
255 gst_rtsp_stream_init (GstRTSPStream * stream)
256 {
257   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
258
259   GST_DEBUG ("new stream %p", stream);
260
261   stream->priv = priv;
262
263   priv->dscp_qos = -1;
264   priv->control = g_strdup (DEFAULT_CONTROL);
265   priv->profiles = DEFAULT_PROFILES;
266   priv->protocols = DEFAULT_PROTOCOLS;
267   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
268
269   g_mutex_init (&priv->lock);
270
271   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
272       NULL, (GDestroyNotify) gst_caps_unref);
273   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
274       (GDestroyNotify) gst_caps_unref);
275 }
276
277 static void
278 gst_rtsp_stream_finalize (GObject * obj)
279 {
280   GstRTSPStream *stream;
281   GstRTSPStreamPrivate *priv;
282   guint i;
283
284   stream = GST_RTSP_STREAM (obj);
285   priv = stream->priv;
286
287   GST_DEBUG ("finalize stream %p", stream);
288
289   /* we really need to be unjoined now */
290   g_return_if_fail (priv->joined_bin == NULL);
291
292   if (priv->mcast_addr_v4)
293     gst_rtsp_address_free (priv->mcast_addr_v4);
294   if (priv->mcast_addr_v6)
295     gst_rtsp_address_free (priv->mcast_addr_v6);
296   if (priv->server_addr_v4)
297     gst_rtsp_address_free (priv->server_addr_v4);
298   if (priv->server_addr_v6)
299     gst_rtsp_address_free (priv->server_addr_v6);
300   if (priv->pool)
301     g_object_unref (priv->pool);
302   if (priv->rtxsend)
303     g_object_unref (priv->rtxsend);
304   if (priv->rtxreceive)
305     g_object_unref (priv->rtxreceive);
306   if (priv->ulpfec_encoder)
307     gst_object_unref (priv->ulpfec_encoder);
308   if (priv->ulpfec_decoder)
309     gst_object_unref (priv->ulpfec_decoder);
310
311   for (i = 0; i < 2; i++) {
312     if (priv->socket_v4[i])
313       g_object_unref (priv->socket_v4[i]);
314     if (priv->socket_v6[i])
315       g_object_unref (priv->socket_v6[i]);
316     if (priv->mcast_socket_v4[i])
317       g_object_unref (priv->mcast_socket_v4[i]);
318     if (priv->mcast_socket_v6[i])
319       g_object_unref (priv->mcast_socket_v6[i]);
320   }
321
322   g_free (priv->multicast_iface);
323
324   gst_object_unref (priv->payloader);
325   if (priv->srcpad)
326     gst_object_unref (priv->srcpad);
327   if (priv->sinkpad)
328     gst_object_unref (priv->sinkpad);
329   g_free (priv->control);
330   g_mutex_clear (&priv->lock);
331
332   g_hash_table_unref (priv->keys);
333   g_hash_table_destroy (priv->ptmap);
334
335   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
336 }
337
338 static void
339 gst_rtsp_stream_get_property (GObject * object, guint propid,
340     GValue * value, GParamSpec * pspec)
341 {
342   GstRTSPStream *stream = GST_RTSP_STREAM (object);
343
344   switch (propid) {
345     case PROP_CONTROL:
346       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
347       break;
348     case PROP_PROFILES:
349       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
350       break;
351     case PROP_PROTOCOLS:
352       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
353       break;
354     default:
355       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
356   }
357 }
358
359 static void
360 gst_rtsp_stream_set_property (GObject * object, guint propid,
361     const GValue * value, GParamSpec * pspec)
362 {
363   GstRTSPStream *stream = GST_RTSP_STREAM (object);
364
365   switch (propid) {
366     case PROP_CONTROL:
367       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
368       break;
369     case PROP_PROFILES:
370       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
371       break;
372     case PROP_PROTOCOLS:
373       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
374       break;
375     default:
376       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
377   }
378 }
379
380 /**
381  * gst_rtsp_stream_new:
382  * @idx: an index
383  * @pad: a #GstPad
384  * @payloader: a #GstElement
385  *
386  * Create a new media stream with index @idx that handles RTP data on
387  * @pad and has a payloader element @payloader if @pad is a source pad
388  * or a depayloader element @payloader if @pad is a sink pad.
389  *
390  * Returns: (transfer full): a new #GstRTSPStream
391  */
392 GstRTSPStream *
393 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
394 {
395   GstRTSPStreamPrivate *priv;
396   GstRTSPStream *stream;
397
398   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
399   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
400
401   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
402   priv = stream->priv;
403   priv->idx = idx;
404   priv->payloader = gst_object_ref (payloader);
405   if (GST_PAD_IS_SRC (pad))
406     priv->srcpad = gst_object_ref (pad);
407   else
408     priv->sinkpad = gst_object_ref (pad);
409
410   return stream;
411 }
412
413 /**
414  * gst_rtsp_stream_get_index:
415  * @stream: a #GstRTSPStream
416  *
417  * Get the stream index.
418  *
419  * Return: the stream index.
420  */
421 guint
422 gst_rtsp_stream_get_index (GstRTSPStream * stream)
423 {
424   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
425
426   return stream->priv->idx;
427 }
428
429 /**
430  * gst_rtsp_stream_get_pt:
431  * @stream: a #GstRTSPStream
432  *
433  * Get the stream payload type.
434  *
435  * Return: the stream payload type.
436  */
437 guint
438 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
439 {
440   GstRTSPStreamPrivate *priv;
441   guint pt;
442
443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
444
445   priv = stream->priv;
446
447   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
448
449   return pt;
450 }
451
452 /**
453  * gst_rtsp_stream_get_srcpad:
454  * @stream: a #GstRTSPStream
455  *
456  * Get the srcpad associated with @stream.
457  *
458  * Returns: (transfer full) (nullable): the srcpad. Unref after usage.
459  */
460 GstPad *
461 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
462 {
463   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
464
465   if (!stream->priv->srcpad)
466     return NULL;
467
468   return gst_object_ref (stream->priv->srcpad);
469 }
470
471 /**
472  * gst_rtsp_stream_get_sinkpad:
473  * @stream: a #GstRTSPStream
474  *
475  * Get the sinkpad associated with @stream.
476  *
477  * Returns: (transfer full) (nullable): the sinkpad. Unref after usage.
478  */
479 GstPad *
480 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
481 {
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
483
484   if (!stream->priv->sinkpad)
485     return NULL;
486
487   return gst_object_ref (stream->priv->sinkpad);
488 }
489
490 /**
491  * gst_rtsp_stream_get_control:
492  * @stream: a #GstRTSPStream
493  *
494  * Get the control string to identify this stream.
495  *
496  * Returns: (transfer full) (nullable): the control string. g_free() after usage.
497  */
498 gchar *
499 gst_rtsp_stream_get_control (GstRTSPStream * stream)
500 {
501   GstRTSPStreamPrivate *priv;
502   gchar *result;
503
504   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
505
506   priv = stream->priv;
507
508   g_mutex_lock (&priv->lock);
509   if ((result = g_strdup (priv->control)) == NULL)
510     result = g_strdup_printf ("stream=%u", priv->idx);
511   g_mutex_unlock (&priv->lock);
512
513   return result;
514 }
515
516 /**
517  * gst_rtsp_stream_set_control:
518  * @stream: a #GstRTSPStream
519  * @control: (nullable): a control string
520  *
521  * Set the control string in @stream.
522  */
523 void
524 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
525 {
526   GstRTSPStreamPrivate *priv;
527
528   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
529
530   priv = stream->priv;
531
532   g_mutex_lock (&priv->lock);
533   g_free (priv->control);
534   priv->control = g_strdup (control);
535   g_mutex_unlock (&priv->lock);
536 }
537
538 /**
539  * gst_rtsp_stream_has_control:
540  * @stream: a #GstRTSPStream
541  * @control: (nullable): a control string
542  *
543  * Check if @stream has the control string @control.
544  *
545  * Returns: %TRUE is @stream has @control as the control string
546  */
547 gboolean
548 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
549 {
550   GstRTSPStreamPrivate *priv;
551   gboolean res;
552
553   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
554
555   priv = stream->priv;
556
557   g_mutex_lock (&priv->lock);
558   if (priv->control)
559     res = (g_strcmp0 (priv->control, control) == 0);
560   else {
561     guint streamid;
562
563     if (sscanf (control, "stream=%u", &streamid) > 0)
564       res = (streamid == priv->idx);
565     else
566       res = FALSE;
567   }
568   g_mutex_unlock (&priv->lock);
569
570   return res;
571 }
572
573 /**
574  * gst_rtsp_stream_set_mtu:
575  * @stream: a #GstRTSPStream
576  * @mtu: a new MTU
577  *
578  * Configure the mtu in the payloader of @stream to @mtu.
579  */
580 void
581 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
582 {
583   GstRTSPStreamPrivate *priv;
584
585   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
586
587   priv = stream->priv;
588
589   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
590
591   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
592 }
593
594 /**
595  * gst_rtsp_stream_get_mtu:
596  * @stream: a #GstRTSPStream
597  *
598  * Get the configured MTU in the payloader of @stream.
599  *
600  * Returns: the MTU of the payloader.
601  */
602 guint
603 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
604 {
605   GstRTSPStreamPrivate *priv;
606   guint mtu;
607
608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
609
610   priv = stream->priv;
611
612   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
613
614   return mtu;
615 }
616
617 /* Update the dscp qos property on the udp sinks */
618 static void
619 update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
620 {
621   GstRTSPStreamPrivate *priv;
622
623   priv = stream->priv;
624
625   if (*udpsink) {
626     g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
627   }
628 }
629
630 /**
631  * gst_rtsp_stream_set_dscp_qos:
632  * @stream: a #GstRTSPStream
633  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
634  *
635  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
636  */
637 void
638 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
639 {
640   GstRTSPStreamPrivate *priv;
641
642   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
643
644   priv = stream->priv;
645
646   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
647
648   if (dscp_qos < -1 || dscp_qos > 63) {
649     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
650     return;
651   }
652
653   priv->dscp_qos = dscp_qos;
654
655   update_dscp_qos (stream, priv->udpsink);
656 }
657
658 /**
659  * gst_rtsp_stream_get_dscp_qos:
660  * @stream: a #GstRTSPStream
661  *
662  * Get the configured DSCP QoS in of the outgoing sockets.
663  *
664  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
665  */
666 gint
667 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
668 {
669   GstRTSPStreamPrivate *priv;
670
671   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
672
673   priv = stream->priv;
674
675   return priv->dscp_qos;
676 }
677
678 /**
679  * gst_rtsp_stream_is_transport_supported:
680  * @stream: a #GstRTSPStream
681  * @transport: (transfer none): a #GstRTSPTransport
682  *
683  * Check if @transport can be handled by stream
684  *
685  * Returns: %TRUE if @transport can be handled by @stream.
686  */
687 gboolean
688 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
689     GstRTSPTransport * transport)
690 {
691   GstRTSPStreamPrivate *priv;
692
693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
694   g_return_val_if_fail (transport != NULL, FALSE);
695
696   priv = stream->priv;
697
698   g_mutex_lock (&priv->lock);
699   if (transport->trans != GST_RTSP_TRANS_RTP)
700     goto unsupported_transmode;
701
702   if (!(transport->profile & priv->profiles))
703     goto unsupported_profile;
704
705   if (!(transport->lower_transport & priv->protocols))
706     goto unsupported_ltrans;
707
708   g_mutex_unlock (&priv->lock);
709
710   return TRUE;
711
712   /* ERRORS */
713 unsupported_transmode:
714   {
715     GST_DEBUG ("unsupported transport mode %d", transport->trans);
716     g_mutex_unlock (&priv->lock);
717     return FALSE;
718   }
719 unsupported_profile:
720   {
721     GST_DEBUG ("unsupported profile %d", transport->profile);
722     g_mutex_unlock (&priv->lock);
723     return FALSE;
724   }
725 unsupported_ltrans:
726   {
727     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
728     g_mutex_unlock (&priv->lock);
729     return FALSE;
730   }
731 }
732
733 /**
734  * gst_rtsp_stream_set_profiles:
735  * @stream: a #GstRTSPStream
736  * @profiles: the new profiles
737  *
738  * Configure the allowed profiles for @stream.
739  */
740 void
741 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
742 {
743   GstRTSPStreamPrivate *priv;
744
745   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
746
747   priv = stream->priv;
748
749   g_mutex_lock (&priv->lock);
750   priv->profiles = profiles;
751   g_mutex_unlock (&priv->lock);
752 }
753
754 /**
755  * gst_rtsp_stream_get_profiles:
756  * @stream: a #GstRTSPStream
757  *
758  * Get the allowed profiles of @stream.
759  *
760  * Returns: a #GstRTSPProfile
761  */
762 GstRTSPProfile
763 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
764 {
765   GstRTSPStreamPrivate *priv;
766   GstRTSPProfile res;
767
768   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
769
770   priv = stream->priv;
771
772   g_mutex_lock (&priv->lock);
773   res = priv->profiles;
774   g_mutex_unlock (&priv->lock);
775
776   return res;
777 }
778
779 /**
780  * gst_rtsp_stream_set_protocols:
781  * @stream: a #GstRTSPStream
782  * @protocols: the new flags
783  *
784  * Configure the allowed lower transport for @stream.
785  */
786 void
787 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
788     GstRTSPLowerTrans protocols)
789 {
790   GstRTSPStreamPrivate *priv;
791
792   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
793
794   priv = stream->priv;
795
796   g_mutex_lock (&priv->lock);
797   priv->protocols = protocols;
798   g_mutex_unlock (&priv->lock);
799 }
800
801 /**
802  * gst_rtsp_stream_get_protocols:
803  * @stream: a #GstRTSPStream
804  *
805  * Get the allowed protocols of @stream.
806  *
807  * Returns: a #GstRTSPLowerTrans
808  */
809 GstRTSPLowerTrans
810 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
811 {
812   GstRTSPStreamPrivate *priv;
813   GstRTSPLowerTrans res;
814
815   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
816       GST_RTSP_LOWER_TRANS_UNKNOWN);
817
818   priv = stream->priv;
819
820   g_mutex_lock (&priv->lock);
821   res = priv->protocols;
822   g_mutex_unlock (&priv->lock);
823
824   return res;
825 }
826
827 /**
828  * gst_rtsp_stream_set_address_pool:
829  * @stream: a #GstRTSPStream
830  * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
831  *
832  * configure @pool to be used as the address pool of @stream.
833  */
834 void
835 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
836     GstRTSPAddressPool * pool)
837 {
838   GstRTSPStreamPrivate *priv;
839   GstRTSPAddressPool *old;
840
841   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
842
843   priv = stream->priv;
844
845   GST_LOG_OBJECT (stream, "set address pool %p", pool);
846
847   g_mutex_lock (&priv->lock);
848   if ((old = priv->pool) != pool)
849     priv->pool = pool ? g_object_ref (pool) : NULL;
850   else
851     old = NULL;
852   g_mutex_unlock (&priv->lock);
853
854   if (old)
855     g_object_unref (old);
856 }
857
858 /**
859  * gst_rtsp_stream_get_address_pool:
860  * @stream: a #GstRTSPStream
861  *
862  * Get the #GstRTSPAddressPool used as the address pool of @stream.
863  *
864  * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @stream.
865  * g_object_unref() after usage.
866  */
867 GstRTSPAddressPool *
868 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
869 {
870   GstRTSPStreamPrivate *priv;
871   GstRTSPAddressPool *result;
872
873   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
874
875   priv = stream->priv;
876
877   g_mutex_lock (&priv->lock);
878   if ((result = priv->pool))
879     g_object_ref (result);
880   g_mutex_unlock (&priv->lock);
881
882   return result;
883 }
884
885 /**
886  * gst_rtsp_stream_set_multicast_iface:
887  * @stream: a #GstRTSPStream
888  * @multicast_iface: (transfer none) (nullable): a multicast interface name
889  *
890  * configure @multicast_iface to be used for @stream.
891  */
892 void
893 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
894     const gchar * multicast_iface)
895 {
896   GstRTSPStreamPrivate *priv;
897   gchar *old;
898
899   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
900
901   priv = stream->priv;
902
903   GST_LOG_OBJECT (stream, "set multicast iface %s",
904       GST_STR_NULL (multicast_iface));
905
906   g_mutex_lock (&priv->lock);
907   if ((old = priv->multicast_iface) != multicast_iface)
908     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
909   else
910     old = NULL;
911   g_mutex_unlock (&priv->lock);
912
913   if (old)
914     g_free (old);
915 }
916
917 /**
918  * gst_rtsp_stream_get_multicast_iface:
919  * @stream: a #GstRTSPStream
920  *
921  * Get the multicast interface used for @stream.
922  *
923  * Returns: (transfer full) (nullable): the multicast interface for @stream.
924  * g_free() after usage.
925  */
926 gchar *
927 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
928 {
929   GstRTSPStreamPrivate *priv;
930   gchar *result;
931
932   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
933
934   priv = stream->priv;
935
936   g_mutex_lock (&priv->lock);
937   if ((result = priv->multicast_iface))
938     result = g_strdup (result);
939   g_mutex_unlock (&priv->lock);
940
941   return result;
942 }
943
944 /**
945  * gst_rtsp_stream_get_multicast_address:
946  * @stream: a #GstRTSPStream
947  * @family: the #GSocketFamily
948  *
949  * Get the multicast address of @stream for @family. The original
950  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
951  * won't release the address from the pool.
952  *
953  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
954  * or %NULL when no address could be allocated. gst_rtsp_address_free()
955  * after usage.
956  */
957 GstRTSPAddress *
958 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
959     GSocketFamily family)
960 {
961   GstRTSPStreamPrivate *priv;
962   GstRTSPAddress *result;
963   GstRTSPAddress **addrp;
964   GstRTSPAddressFlags flags;
965
966   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
967
968   priv = stream->priv;
969
970   g_mutex_lock (&stream->priv->lock);
971
972   if (family == G_SOCKET_FAMILY_IPV6) {
973     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
974     addrp = &priv->mcast_addr_v6;
975   } else {
976     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
977     addrp = &priv->mcast_addr_v4;
978   }
979
980   if (*addrp == NULL) {
981     if (priv->pool == NULL)
982       goto no_pool;
983
984     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
985
986     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
987     if (*addrp == NULL)
988       goto no_address;
989
990     /* FIXME: Also reserve the same port with unicast ANY address, since that's
991      * where we are going to bind our socket. Probably loop until we find a port
992      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
993      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
994      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
995   }
996   result = gst_rtsp_address_copy (*addrp);
997
998   g_mutex_unlock (&stream->priv->lock);
999
1000   return result;
1001
1002   /* ERRORS */
1003 no_pool:
1004   {
1005     GST_ERROR_OBJECT (stream, "no address pool specified");
1006     g_mutex_unlock (&stream->priv->lock);
1007     return NULL;
1008   }
1009 no_address:
1010   {
1011     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1012     g_mutex_unlock (&stream->priv->lock);
1013     return NULL;
1014   }
1015 }
1016
1017 /**
1018  * gst_rtsp_stream_reserve_address:
1019  * @stream: a #GstRTSPStream
1020  * @address: an address
1021  * @port: a port
1022  * @n_ports: n_ports
1023  * @ttl: a TTL
1024  *
1025  * Reserve @address and @port as the address and port of @stream. The original
1026  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1027  * won't release the address from the pool.
1028  *
1029  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1030  * the address could be reserved. gst_rtsp_address_free() after usage.
1031  */
1032 GstRTSPAddress *
1033 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1034     const gchar * address, guint port, guint n_ports, guint ttl)
1035 {
1036   GstRTSPStreamPrivate *priv;
1037   GstRTSPAddress *result;
1038   GInetAddress *addr;
1039   GSocketFamily family;
1040   GstRTSPAddress **addrp;
1041
1042   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1043   g_return_val_if_fail (address != NULL, NULL);
1044   g_return_val_if_fail (port > 0, NULL);
1045   g_return_val_if_fail (n_ports > 0, NULL);
1046   g_return_val_if_fail (ttl > 0, NULL);
1047
1048   priv = stream->priv;
1049
1050   addr = g_inet_address_new_from_string (address);
1051   if (!addr) {
1052     GST_ERROR ("failed to get inet addr from %s", address);
1053     family = G_SOCKET_FAMILY_IPV4;
1054   } else {
1055     family = g_inet_address_get_family (addr);
1056     g_object_unref (addr);
1057   }
1058
1059   if (family == G_SOCKET_FAMILY_IPV6)
1060     addrp = &priv->mcast_addr_v6;
1061   else
1062     addrp = &priv->mcast_addr_v4;
1063
1064   g_mutex_lock (&priv->lock);
1065   if (*addrp == NULL) {
1066     GstRTSPAddressPoolResult res;
1067
1068     if (priv->pool == NULL)
1069       goto no_pool;
1070
1071     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1072         port, n_ports, ttl, addrp);
1073     if (res != GST_RTSP_ADDRESS_POOL_OK)
1074       goto no_address;
1075
1076     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1077      * where we are going to bind our socket. */
1078   } else {
1079     if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1080         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1081         (*addrp)->ttl != ttl)
1082       goto different_address;
1083   }
1084   result = gst_rtsp_address_copy (*addrp);
1085   g_mutex_unlock (&priv->lock);
1086
1087   return result;
1088
1089   /* ERRORS */
1090 no_pool:
1091   {
1092     GST_ERROR_OBJECT (stream, "no address pool specified");
1093     g_mutex_unlock (&priv->lock);
1094     return NULL;
1095   }
1096 no_address:
1097   {
1098     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1099         address);
1100     g_mutex_unlock (&priv->lock);
1101     return NULL;
1102   }
1103 different_address:
1104   {
1105     GST_ERROR_OBJECT (stream,
1106         "address %s is not the same as %s that was already reserved",
1107         address, (*addrp)->address);
1108     g_mutex_unlock (&priv->lock);
1109     return NULL;
1110   }
1111 }
1112
1113 /* must be called with lock */
1114 static void
1115 set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1116     GSocketFamily family)
1117 {
1118   const gchar *multisink_socket;
1119
1120   if (family == G_SOCKET_FAMILY_IPV6)
1121     multisink_socket = "socket-v6";
1122   else
1123     multisink_socket = "socket";
1124
1125   g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
1126 }
1127
1128 /* must be called with lock */
1129 static void
1130 set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1131     GSocketFamily family, const gchar * multicast_iface,
1132     const gchar * addr_str, gint port, gint mcast_ttl)
1133 {
1134   set_socket_for_udpsink (udpsink, socket, family);
1135
1136   if (multicast_iface) {
1137     GST_INFO ("setting multicast-iface %s", multicast_iface);
1138     g_object_set (G_OBJECT (udpsink), "multicast-iface", multicast_iface, NULL);
1139   }
1140
1141   if (mcast_ttl > 0) {
1142     GST_INFO ("setting ttl-mc %d", mcast_ttl);
1143     g_object_set (G_OBJECT (udpsink), "ttl-mc", mcast_ttl, NULL);
1144   }
1145
1146   g_signal_emit_by_name (udpsink, "add", addr_str, port, NULL);
1147 }
1148
1149
1150 /* must be called with lock */
1151 static void
1152 set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1153     GSocketFamily family)
1154 {
1155   set_socket_for_udpsink (udpsink, socket, family);
1156 }
1157
1158 static guint16
1159 get_port_from_socket (GSocket * socket)
1160 {
1161   guint16 port;
1162   GSocketAddress *sockaddr;
1163   GError *err;
1164
1165   GST_DEBUG ("socket: %p", socket);
1166   sockaddr = g_socket_get_local_address (socket, &err);
1167   if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
1168     g_clear_object (&sockaddr);
1169     GST_ERROR ("failed to get sockaddr: %s", err->message);
1170     g_error_free (err);
1171     return 0;
1172   }
1173
1174   port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
1175   g_object_unref (sockaddr);
1176
1177   return port;
1178 }
1179
1180
1181 static gboolean
1182 create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
1183     GSocket * socket_v4, GSocket * socket_v6, gboolean multicast,
1184     gboolean is_rtp, gint mcast_ttl)
1185 {
1186   GstRTSPStreamPrivate *priv = stream->priv;
1187
1188   *udpsink = gst_element_factory_make ("multiudpsink", NULL);
1189
1190   if (!*udpsink)
1191     goto no_udp_protocol;
1192
1193   /* configure sinks */
1194
1195   g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
1196
1197   g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
1198
1199   if (is_rtp)
1200     g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
1201   else
1202     g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
1203
1204   /* Needs to be async for RECORD streams, otherwise we will never go to
1205    * PLAYING because the sinks will wait for data while the udpsrc can't
1206    * provide data with timestamps in PAUSED. */
1207   if (!is_rtp || priv->sinkpad)
1208     g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
1209
1210   if (multicast) {
1211     /* join multicast group when adding clients, so we'll start receiving from it.
1212      * We cannot rely on the udpsrc to join the group since its socket is always a
1213      * local unicast one. */
1214     g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
1215
1216     g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
1217   }
1218
1219   /* update the dscp qos field in the sinks */
1220   update_dscp_qos (stream, udpsink);
1221
1222   if (priv->server_addr_v4) {
1223     GST_DEBUG_OBJECT (stream, "udp IPv4, configure udpsinks");
1224     set_unicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4);
1225   }
1226
1227   if (priv->server_addr_v6) {
1228     GST_DEBUG_OBJECT (stream, "udp IPv6, configure udpsinks");
1229     set_unicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6);
1230   }
1231
1232   if (multicast) {
1233     gint port;
1234     if (priv->mcast_addr_v4) {
1235       GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
1236       port = get_port_from_socket (socket_v4);
1237       if (!port)
1238         goto get_port_failed;
1239       set_multicast_socket_for_udpsink (*udpsink, socket_v4,
1240           G_SOCKET_FAMILY_IPV4, priv->multicast_iface,
1241           priv->mcast_addr_v4->address, port, mcast_ttl);
1242     }
1243
1244     if (priv->mcast_addr_v6) {
1245       GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
1246       port = get_port_from_socket (socket_v6);
1247       if (!port)
1248         goto get_port_failed;
1249       set_multicast_socket_for_udpsink (*udpsink, socket_v6,
1250           G_SOCKET_FAMILY_IPV6, priv->multicast_iface,
1251           priv->mcast_addr_v6->address, port, mcast_ttl);
1252     }
1253
1254   }
1255
1256   return TRUE;
1257
1258   /* ERRORS */
1259 no_udp_protocol:
1260   {
1261     GST_ERROR_OBJECT (stream, "failed to create udpsink element");
1262     return FALSE;
1263   }
1264 get_port_failed:
1265   {
1266     GST_ERROR_OBJECT (stream, "failed to get udp port");
1267     return FALSE;
1268   }
1269 }
1270
1271 /* must be called with lock */
1272 static gboolean
1273 create_and_configure_udpsource (GstElement ** udpsrc, GSocket * socket)
1274 {
1275   GstStateChangeReturn ret;
1276
1277   g_assert (socket != NULL);
1278
1279   *udpsrc = gst_element_factory_make ("udpsrc", NULL);
1280   if (*udpsrc == NULL)
1281     goto error;
1282
1283   g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
1284
1285   /* The udpsrc cannot do the join because its socket is always a local unicast
1286    * one. The udpsink sharing the same socket will do it for us. */
1287   g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
1288
1289   g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
1290
1291   g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
1292
1293   ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
1294   if (ret == GST_STATE_CHANGE_FAILURE)
1295     goto error;
1296
1297   return TRUE;
1298
1299   /* ERRORS */
1300 error:
1301   {
1302     if (*udpsrc) {
1303       gst_element_set_state (*udpsrc, GST_STATE_NULL);
1304       g_clear_object (udpsrc);
1305     }
1306     return FALSE;
1307   }
1308 }
1309
1310 static gboolean
1311 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1312     GSocket * socket_out[2], GstRTSPAddress ** server_addr_out,
1313     gboolean multicast, GstRTSPTransport * ct)
1314 {
1315   GstRTSPStreamPrivate *priv = stream->priv;
1316   GSocket *rtp_socket = NULL;
1317   GSocket *rtcp_socket;
1318   gint tmp_rtp, tmp_rtcp;
1319   guint count;
1320   GList *rejected_addresses = NULL;
1321   GstRTSPAddress *addr = NULL;
1322   GInetAddress *inetaddr = NULL;
1323   GSocketAddress *rtp_sockaddr = NULL;
1324   GSocketAddress *rtcp_sockaddr = NULL;
1325   GstRTSPAddressPool *pool;
1326
1327   pool = priv->pool;
1328   count = 0;
1329
1330   /* Start with random port */
1331   tmp_rtp = 0;
1332
1333   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1334       G_SOCKET_PROTOCOL_UDP, NULL);
1335   if (!rtcp_socket)
1336     goto no_udp_protocol;
1337   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1338
1339   /* try to allocate 2 UDP ports, the RTP port should be an even
1340    * number and the RTCP port should be the next (uneven) port */
1341 again:
1342
1343   if (rtp_socket == NULL) {
1344     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1345         G_SOCKET_PROTOCOL_UDP, NULL);
1346     if (!rtp_socket)
1347       goto no_udp_protocol;
1348     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1349   }
1350
1351   if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1352     GstRTSPAddressFlags flags;
1353
1354     if (addr)
1355       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1356
1357     if (!pool)
1358       goto no_pool;
1359
1360     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1361     if (multicast)
1362       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1363     else
1364       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1365
1366     if (family == G_SOCKET_FAMILY_IPV6)
1367       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1368     else
1369       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1370
1371     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1372
1373     if (addr == NULL)
1374       goto no_address;
1375
1376     tmp_rtp = addr->port;
1377
1378     g_clear_object (&inetaddr);
1379     /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
1380      * socket control message set in udpsrc? */
1381     if (multicast)
1382       inetaddr = g_inet_address_new_any (family);
1383     else
1384       inetaddr = g_inet_address_new_from_string (addr->address);
1385   } else {
1386     if (tmp_rtp != 0) {
1387       tmp_rtp += 2;
1388       if (++count > 20)
1389         goto no_ports;
1390     }
1391
1392     if (inetaddr == NULL)
1393       inetaddr = g_inet_address_new_any (family);
1394   }
1395
1396   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1397   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1398     GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
1399     g_object_unref (rtp_sockaddr);
1400     goto again;
1401   }
1402   g_object_unref (rtp_sockaddr);
1403
1404   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1405   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1406     g_clear_object (&rtp_sockaddr);
1407     goto socket_error;
1408   }
1409
1410   tmp_rtp =
1411       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1412   g_object_unref (rtp_sockaddr);
1413
1414   /* check if port is even */
1415   if ((tmp_rtp & 1) != 0) {
1416     /* port not even, close and allocate another */
1417     tmp_rtp++;
1418     g_clear_object (&rtp_socket);
1419     goto again;
1420   }
1421
1422   /* set port */
1423   tmp_rtcp = tmp_rtp + 1;
1424
1425   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1426   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1427     GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
1428     g_object_unref (rtcp_sockaddr);
1429     g_clear_object (&rtp_socket);
1430     goto again;
1431   }
1432   g_object_unref (rtcp_sockaddr);
1433
1434   if (!addr) {
1435     addr = g_slice_new0 (GstRTSPAddress);
1436     addr->address = g_inet_address_to_string (inetaddr);
1437     addr->port = tmp_rtp;
1438     addr->n_ports = 2;
1439   }
1440
1441   g_clear_object (&inetaddr);
1442
1443   socket_out[0] = rtp_socket;
1444   socket_out[1] = rtcp_socket;
1445   *server_addr_out = addr;
1446
1447   GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d",
1448       addr->address, tmp_rtp, tmp_rtcp);
1449
1450   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1451
1452   return TRUE;
1453
1454   /* ERRORS */
1455 no_udp_protocol:
1456   {
1457     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: protocol error");
1458     goto cleanup;
1459   }
1460 no_pool:
1461   {
1462     GST_ERROR_OBJECT (stream,
1463         "failed to allocate UDP ports: no address pool specified");
1464     goto cleanup;
1465   }
1466 no_address:
1467   {
1468     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1469     goto cleanup;
1470   }
1471 no_ports:
1472   {
1473     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no ports");
1474     goto cleanup;
1475   }
1476 socket_error:
1477   {
1478     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: socket error");
1479     goto cleanup;
1480   }
1481 cleanup:
1482   {
1483     if (inetaddr)
1484       g_object_unref (inetaddr);
1485     g_list_free_full (rejected_addresses,
1486         (GDestroyNotify) gst_rtsp_address_free);
1487     if (addr)
1488       gst_rtsp_address_free (addr);
1489     if (rtp_socket)
1490       g_object_unref (rtp_socket);
1491     if (rtcp_socket)
1492       g_object_unref (rtcp_socket);
1493     return FALSE;
1494   }
1495 }
1496
1497 /**
1498  * gst_rtsp_stream_allocate_udp_sockets:
1499  * @stream: a #GstRTSPStream
1500  * @family: protocol family
1501  * @transport: transport method
1502  * @use_client_settings: Whether to use client settings or not
1503  *
1504  * Allocates RTP and RTCP ports.
1505  *
1506  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1507  */
1508 gboolean
1509 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1510     GSocketFamily family, GstRTSPTransport * ct,
1511     gboolean use_transport_settings)
1512 {
1513   GstRTSPStreamPrivate *priv;
1514   gboolean ret = FALSE;
1515   GstRTSPLowerTrans transport;
1516   gboolean allocated = FALSE;
1517
1518   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1519   g_return_val_if_fail (ct != NULL, FALSE);
1520   priv = stream->priv;
1521
1522   transport = ct->lower_transport;
1523
1524   g_mutex_lock (&priv->lock);
1525
1526   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1527     if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_socket_v4[0])
1528       allocated = TRUE;
1529     else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_socket_v6[0])
1530       allocated = TRUE;
1531   } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1532     if (family == G_SOCKET_FAMILY_IPV4 && priv->socket_v4[0])
1533       allocated = TRUE;
1534     else if (family == G_SOCKET_FAMILY_IPV6 && priv->socket_v6[0])
1535       allocated = TRUE;
1536   }
1537
1538   if (allocated) {
1539     GST_DEBUG_OBJECT (stream, "Allocated already");
1540     g_mutex_unlock (&priv->lock);
1541     return TRUE;
1542   }
1543
1544   if (family == G_SOCKET_FAMILY_IPV4) {
1545     /* IPv4 */
1546     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1547       /* UDP unicast */
1548       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
1549       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1550           priv->socket_v4, &priv->server_addr_v4, FALSE, ct);
1551     } else {
1552       /* multicast */
1553       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
1554       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1555           priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct);
1556     }
1557   } else {
1558     /* IPv6 */
1559     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1560       /* unicast */
1561       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
1562       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1563           priv->socket_v6, &priv->server_addr_v6, FALSE, ct);
1564
1565     } else {
1566       /* multicast */
1567       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
1568       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1569           priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct);
1570     }
1571   }
1572   g_mutex_unlock (&priv->lock);
1573
1574   return ret;
1575 }
1576
1577 /**
1578  * gst_rtsp_stream_set_client_side:
1579  * @stream: a #GstRTSPStream
1580  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1581  * an RTSP connection.
1582  *
1583  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1584  * streams to an RTSP server via RECORD. This has the practical effect
1585  * of changing which UDP port numbers are used when setting up the local
1586  * side of the stream sending to be either the 'server' or 'client' pair
1587  * of a configured UDP transport.
1588  */
1589 void
1590 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1591 {
1592   GstRTSPStreamPrivate *priv;
1593
1594   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1595   priv = stream->priv;
1596   g_mutex_lock (&priv->lock);
1597   priv->client_side = client_side;
1598   g_mutex_unlock (&priv->lock);
1599 }
1600
1601 /**
1602  * gst_rtsp_stream_is_client_side:
1603  * @stream: a #GstRTSPStream
1604  *
1605  * See gst_rtsp_stream_set_client_side()
1606  *
1607  * Returns: TRUE if this #GstRTSPStream is client-side.
1608  */
1609 gboolean
1610 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1611 {
1612   GstRTSPStreamPrivate *priv;
1613   gboolean ret;
1614
1615   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1616
1617   priv = stream->priv;
1618   g_mutex_lock (&priv->lock);
1619   ret = priv->client_side;
1620   g_mutex_unlock (&priv->lock);
1621
1622   return ret;
1623 }
1624
1625 /**
1626  * gst_rtsp_stream_get_server_port:
1627  * @stream: a #GstRTSPStream
1628  * @server_port: (out): result server port
1629  * @family: the port family to get
1630  *
1631  * Fill @server_port with the port pair used by the server. This function can
1632  * only be called when @stream has been joined.
1633  */
1634 void
1635 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1636     GstRTSPRange * server_port, GSocketFamily family)
1637 {
1638   GstRTSPStreamPrivate *priv;
1639
1640   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1641   priv = stream->priv;
1642   g_return_if_fail (priv->joined_bin != NULL);
1643
1644   if (server_port) {
1645     server_port->min = 0;
1646     server_port->max = 0;
1647   }
1648
1649   g_mutex_lock (&priv->lock);
1650   if (family == G_SOCKET_FAMILY_IPV4) {
1651     if (server_port && priv->server_addr_v4) {
1652       server_port->min = priv->server_addr_v4->port;
1653       server_port->max =
1654           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1655     }
1656   } else {
1657     if (server_port && priv->server_addr_v6) {
1658       server_port->min = priv->server_addr_v6->port;
1659       server_port->max =
1660           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1661     }
1662   }
1663   g_mutex_unlock (&priv->lock);
1664 }
1665
1666 /**
1667  * gst_rtsp_stream_get_rtpsession:
1668  * @stream: a #GstRTSPStream
1669  *
1670  * Get the RTP session of this stream.
1671  *
1672  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1673  */
1674 GObject *
1675 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1676 {
1677   GstRTSPStreamPrivate *priv;
1678   GObject *session;
1679
1680   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1681
1682   priv = stream->priv;
1683
1684   g_mutex_lock (&priv->lock);
1685   if ((session = priv->session))
1686     g_object_ref (session);
1687   g_mutex_unlock (&priv->lock);
1688
1689   return session;
1690 }
1691
1692 /**
1693  * gst_rtsp_stream_get_srtp_encoder:
1694  * @stream: a #GstRTSPStream
1695  *
1696  * Get the SRTP encoder for this stream.
1697  *
1698  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1699  */
1700 GstElement *
1701 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1702 {
1703   GstRTSPStreamPrivate *priv;
1704   GstElement *encoder;
1705
1706   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1707
1708   priv = stream->priv;
1709
1710   g_mutex_lock (&priv->lock);
1711   if ((encoder = priv->srtpenc))
1712     g_object_ref (encoder);
1713   g_mutex_unlock (&priv->lock);
1714
1715   return encoder;
1716 }
1717
1718 /**
1719  * gst_rtsp_stream_get_ssrc:
1720  * @stream: a #GstRTSPStream
1721  * @ssrc: (out): result ssrc
1722  *
1723  * Get the SSRC used by the RTP session of this stream. This function can only
1724  * be called when @stream has been joined.
1725  */
1726 void
1727 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1728 {
1729   GstRTSPStreamPrivate *priv;
1730
1731   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1732   priv = stream->priv;
1733   g_return_if_fail (priv->joined_bin != NULL);
1734
1735   g_mutex_lock (&priv->lock);
1736   if (ssrc && priv->session)
1737     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1738   g_mutex_unlock (&priv->lock);
1739 }
1740
1741 /**
1742  * gst_rtsp_stream_set_retransmission_time:
1743  * @stream: a #GstRTSPStream
1744  * @time: a #GstClockTime
1745  *
1746  * Set the amount of time to store retransmission packets.
1747  */
1748 void
1749 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1750     GstClockTime time)
1751 {
1752   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1753
1754   g_mutex_lock (&stream->priv->lock);
1755   stream->priv->rtx_time = time;
1756   if (stream->priv->rtxsend)
1757     g_object_set (stream->priv->rtxsend, "max-size-time",
1758         GST_TIME_AS_MSECONDS (time), NULL);
1759   g_mutex_unlock (&stream->priv->lock);
1760 }
1761
1762 /**
1763  * gst_rtsp_stream_get_retransmission_time:
1764  * @stream: a #GstRTSPStream
1765  *
1766  * Get the amount of time to store retransmission data.
1767  *
1768  * Returns: the amount of time to store retransmission data.
1769  */
1770 GstClockTime
1771 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1772 {
1773   GstClockTime ret;
1774
1775   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1776
1777   g_mutex_lock (&stream->priv->lock);
1778   ret = stream->priv->rtx_time;
1779   g_mutex_unlock (&stream->priv->lock);
1780
1781   return ret;
1782 }
1783
1784 /**
1785  * gst_rtsp_stream_set_retransmission_pt:
1786  * @stream: a #GstRTSPStream
1787  * @rtx_pt: a #guint
1788  *
1789  * Set the payload type (pt) for retransmission of this stream.
1790  */
1791 void
1792 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1793 {
1794   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1795
1796   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1797
1798   g_mutex_lock (&stream->priv->lock);
1799   stream->priv->rtx_pt = rtx_pt;
1800   if (stream->priv->rtxsend) {
1801     guint pt = gst_rtsp_stream_get_pt (stream);
1802     gchar *pt_s = g_strdup_printf ("%d", pt);
1803     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1804         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1805     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1806     g_free (pt_s);
1807     gst_structure_free (rtx_pt_map);
1808   }
1809   g_mutex_unlock (&stream->priv->lock);
1810 }
1811
1812 /**
1813  * gst_rtsp_stream_get_retransmission_pt:
1814  * @stream: a #GstRTSPStream
1815  *
1816  * Get the payload-type used for retransmission of this stream
1817  *
1818  * Returns: The retransmission PT.
1819  */
1820 guint
1821 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1822 {
1823   guint rtx_pt;
1824
1825   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1826
1827   g_mutex_lock (&stream->priv->lock);
1828   rtx_pt = stream->priv->rtx_pt;
1829   g_mutex_unlock (&stream->priv->lock);
1830
1831   return rtx_pt;
1832 }
1833
1834 /**
1835  * gst_rtsp_stream_set_buffer_size:
1836  * @stream: a #GstRTSPStream
1837  * @size: the buffer size
1838  *
1839  * Set the size of the UDP transmission buffer (in bytes)
1840  * Needs to be set before the stream is joined to a bin.
1841  *
1842  * Since: 1.6
1843  */
1844 void
1845 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1846 {
1847   g_mutex_lock (&stream->priv->lock);
1848   stream->priv->buffer_size = size;
1849   g_mutex_unlock (&stream->priv->lock);
1850 }
1851
1852 /**
1853  * gst_rtsp_stream_get_buffer_size:
1854  * @stream: a #GstRTSPStream
1855  *
1856  * Get the size of the UDP transmission buffer (in bytes)
1857  *
1858  * Returns: the size of the UDP TX buffer
1859  *
1860  * Since: 1.6
1861  */
1862 guint
1863 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1864 {
1865   guint buffer_size;
1866
1867   g_mutex_lock (&stream->priv->lock);
1868   buffer_size = stream->priv->buffer_size;
1869   g_mutex_unlock (&stream->priv->lock);
1870
1871   return buffer_size;
1872 }
1873
1874 /* executed from streaming thread */
1875 static void
1876 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1877 {
1878   GstRTSPStreamPrivate *priv = stream->priv;
1879   GstCaps *newcaps, *oldcaps;
1880
1881   newcaps = gst_pad_get_current_caps (pad);
1882
1883   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1884       newcaps);
1885
1886   g_mutex_lock (&priv->lock);
1887   oldcaps = priv->caps;
1888   priv->caps = newcaps;
1889   g_mutex_unlock (&priv->lock);
1890
1891   if (oldcaps)
1892     gst_caps_unref (oldcaps);
1893 }
1894
1895 static void
1896 dump_structure (const GstStructure * s)
1897 {
1898   gchar *sstr;
1899
1900   sstr = gst_structure_to_string (s);
1901   GST_INFO ("structure: %s", sstr);
1902   g_free (sstr);
1903 }
1904
1905 static GstRTSPStreamTransport *
1906 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1907 {
1908   GstRTSPStreamPrivate *priv = stream->priv;
1909   GList *walk;
1910   GstRTSPStreamTransport *result = NULL;
1911   const gchar *tmp;
1912   gchar *dest;
1913   guint port;
1914
1915   if (rtcp_from == NULL)
1916     return NULL;
1917
1918   tmp = g_strrstr (rtcp_from, ":");
1919   if (tmp == NULL)
1920     return NULL;
1921
1922   port = atoi (tmp + 1);
1923   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1924
1925   g_mutex_lock (&priv->lock);
1926   GST_INFO ("finding %s:%d in %d transports", dest, port,
1927       g_list_length (priv->transports));
1928
1929   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1930     GstRTSPStreamTransport *trans = walk->data;
1931     const GstRTSPTransport *tr;
1932     gint min, max;
1933
1934     tr = gst_rtsp_stream_transport_get_transport (trans);
1935
1936     if (priv->client_side) {
1937       /* In client side mode the 'destination' is the RTSP server, so send
1938        * to those ports */
1939       min = tr->server_port.min;
1940       max = tr->server_port.max;
1941     } else {
1942       min = tr->client_port.min;
1943       max = tr->client_port.max;
1944     }
1945
1946     if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
1947         (min == port || max == port)) {
1948       result = trans;
1949       break;
1950     }
1951   }
1952   if (result)
1953     g_object_ref (result);
1954   g_mutex_unlock (&priv->lock);
1955
1956   g_free (dest);
1957
1958   return result;
1959 }
1960
1961 static GstRTSPStreamTransport *
1962 check_transport (GObject * source, GstRTSPStream * stream)
1963 {
1964   GstStructure *stats;
1965   GstRTSPStreamTransport *trans;
1966
1967   /* see if we have a stream to match with the origin of the RTCP packet */
1968   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1969   if (trans == NULL) {
1970     g_object_get (source, "stats", &stats, NULL);
1971     if (stats) {
1972       const gchar *rtcp_from;
1973
1974       dump_structure (stats);
1975
1976       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1977       if ((trans = find_transport (stream, rtcp_from))) {
1978         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1979             source);
1980         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1981             g_object_unref);
1982       }
1983       gst_structure_free (stats);
1984     }
1985   }
1986   return trans;
1987 }
1988
1989
1990 static void
1991 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1992 {
1993   GstRTSPStreamTransport *trans;
1994
1995   GST_INFO ("%p: new source %p", stream, source);
1996
1997   trans = check_transport (source, stream);
1998
1999   if (trans)
2000     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
2001 }
2002
2003 static void
2004 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
2005 {
2006   GST_INFO ("%p: new SDES %p", stream, source);
2007 }
2008
2009 static void
2010 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
2011 {
2012   GstRTSPStreamTransport *trans;
2013
2014   trans = check_transport (source, stream);
2015
2016   if (trans) {
2017     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
2018     gst_rtsp_stream_transport_keep_alive (trans);
2019   }
2020 #ifdef DUMP_STATS
2021   {
2022     GstStructure *stats;
2023     g_object_get (source, "stats", &stats, NULL);
2024     if (stats) {
2025       dump_structure (stats);
2026       gst_structure_free (stats);
2027     }
2028   }
2029 #endif
2030 }
2031
2032 static void
2033 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2034 {
2035   GST_INFO ("%p: source %p bye", stream, source);
2036 }
2037
2038 static void
2039 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2040 {
2041   GstRTSPStreamTransport *trans;
2042
2043   GST_INFO ("%p: source %p bye timeout", stream, source);
2044
2045   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2046     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2047     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2048   }
2049 }
2050
2051 static void
2052 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2053 {
2054   GstRTSPStreamTransport *trans;
2055
2056   GST_INFO ("%p: source %p timeout", stream, source);
2057
2058   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2059     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2060     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2061   }
2062 }
2063
2064 static void
2065 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2066 {
2067   GST_INFO ("%p: new sender source %p", stream, source);
2068 #ifndef DUMP_STATS
2069   {
2070     GstStructure *stats;
2071     g_object_get (source, "stats", &stats, NULL);
2072     if (stats) {
2073       dump_structure (stats);
2074       gst_structure_free (stats);
2075     }
2076   }
2077 #endif
2078 }
2079
2080 static void
2081 on_sender_ssrc_active (GObject * session, GObject * source,
2082     GstRTSPStream * stream)
2083 {
2084 #ifndef DUMP_STATS
2085   {
2086     GstStructure *stats;
2087     g_object_get (source, "stats", &stats, NULL);
2088     if (stats) {
2089       dump_structure (stats);
2090       gst_structure_free (stats);
2091     }
2092   }
2093 #endif
2094 }
2095
2096 static void
2097 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2098 {
2099   if (is_rtp) {
2100     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2101     g_list_free (priv->tr_cache_rtp);
2102     priv->tr_cache_rtp = NULL;
2103   } else {
2104     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2105     g_list_free (priv->tr_cache_rtcp);
2106     priv->tr_cache_rtcp = NULL;
2107   }
2108 }
2109
2110 static GstFlowReturn
2111 handle_new_sample (GstAppSink * sink, gpointer user_data)
2112 {
2113   GstRTSPStreamPrivate *priv;
2114   GList *walk;
2115   GstSample *sample;
2116   GstBuffer *buffer;
2117   GstRTSPStream *stream;
2118   gboolean is_rtp;
2119
2120   sample = gst_app_sink_pull_sample (sink);
2121   if (!sample)
2122     return GST_FLOW_OK;
2123
2124   stream = (GstRTSPStream *) user_data;
2125   priv = stream->priv;
2126   buffer = gst_sample_get_buffer (sample);
2127
2128   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2129
2130   g_mutex_lock (&priv->lock);
2131   if (is_rtp) {
2132     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2133       clear_tr_cache (priv, is_rtp);
2134       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2135         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2136         priv->tr_cache_rtp =
2137             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2138       }
2139       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2140     }
2141   } else {
2142     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2143       clear_tr_cache (priv, is_rtp);
2144       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2145         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2146         priv->tr_cache_rtcp =
2147             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2148       }
2149       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2150     }
2151   }
2152   g_mutex_unlock (&priv->lock);
2153
2154   if (is_rtp) {
2155     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2156       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2157       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2158     }
2159   } else {
2160     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2161       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2162       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2163     }
2164   }
2165   gst_sample_unref (sample);
2166
2167   return GST_FLOW_OK;
2168 }
2169
2170 static GstAppSinkCallbacks sink_cb = {
2171   NULL,                         /* not interested in EOS */
2172   NULL,                         /* not interested in preroll samples */
2173   handle_new_sample,
2174 };
2175
2176 static GstElement *
2177 get_rtp_encoder (GstRTSPStream * stream, guint session)
2178 {
2179   GstRTSPStreamPrivate *priv = stream->priv;
2180
2181   if (priv->srtpenc == NULL) {
2182     gchar *name;
2183
2184     name = g_strdup_printf ("srtpenc_%u", session);
2185     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2186     g_free (name);
2187
2188     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2189   }
2190   return gst_object_ref (priv->srtpenc);
2191 }
2192
2193 static GstElement *
2194 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2195 {
2196   GstRTSPStreamPrivate *priv = stream->priv;
2197   GstElement *oldenc, *enc;
2198   GstPad *pad;
2199   gchar *name;
2200
2201   if (priv->idx != session)
2202     return NULL;
2203
2204   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2205
2206   oldenc = priv->srtpenc;
2207   enc = get_rtp_encoder (stream, session);
2208   name = g_strdup_printf ("rtp_sink_%d", session);
2209   pad = gst_element_get_request_pad (enc, name);
2210   g_free (name);
2211   gst_object_unref (pad);
2212
2213   if (oldenc == NULL)
2214     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2215         enc);
2216
2217   return enc;
2218 }
2219
2220 static GstElement *
2221 request_rtcp_encoder (GstElement * rtpbin, guint session,
2222     GstRTSPStream * stream)
2223 {
2224   GstRTSPStreamPrivate *priv = stream->priv;
2225   GstElement *oldenc, *enc;
2226   GstPad *pad;
2227   gchar *name;
2228
2229   if (priv->idx != session)
2230     return NULL;
2231
2232   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2233
2234   oldenc = priv->srtpenc;
2235   enc = get_rtp_encoder (stream, session);
2236   name = g_strdup_printf ("rtcp_sink_%d", session);
2237   pad = gst_element_get_request_pad (enc, name);
2238   g_free (name);
2239   gst_object_unref (pad);
2240
2241   if (oldenc == NULL)
2242     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2243         enc);
2244
2245   return enc;
2246 }
2247
2248 static GstCaps *
2249 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2250 {
2251   GstRTSPStreamPrivate *priv = stream->priv;
2252   GstCaps *caps;
2253
2254   GST_DEBUG ("request key %08x", ssrc);
2255
2256   g_mutex_lock (&priv->lock);
2257   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2258     gst_caps_ref (caps);
2259   g_mutex_unlock (&priv->lock);
2260
2261   return caps;
2262 }
2263
2264 static GstElement *
2265 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2266     GstRTSPStream * stream)
2267 {
2268   GstRTSPStreamPrivate *priv = stream->priv;
2269
2270   if (priv->idx != session)
2271     return NULL;
2272
2273   if (priv->srtpdec == NULL) {
2274     gchar *name;
2275
2276     name = g_strdup_printf ("srtpdec_%u", session);
2277     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2278     g_free (name);
2279
2280     g_signal_connect (priv->srtpdec, "request-key",
2281         (GCallback) request_key, stream);
2282   }
2283   return gst_object_ref (priv->srtpdec);
2284 }
2285
2286 /**
2287  * gst_rtsp_stream_request_aux_sender:
2288  * @stream: a #GstRTSPStream
2289  * @sessid: the session id
2290  *
2291  * Creating a rtxsend bin
2292  *
2293  * Returns: (transfer full) (nullable): a #GstElement.
2294  *
2295  * Since: 1.6
2296  */
2297 GstElement *
2298 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2299 {
2300   GstElement *bin;
2301   GstPad *pad;
2302   GstStructure *pt_map;
2303   gchar *name;
2304   guint pt, rtx_pt;
2305   gchar *pt_s;
2306
2307   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2308
2309   pt = gst_rtsp_stream_get_pt (stream);
2310   pt_s = g_strdup_printf ("%u", pt);
2311   rtx_pt = stream->priv->rtx_pt;
2312
2313   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2314
2315   bin = gst_bin_new (NULL);
2316   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2317   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2318       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2319   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2320       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2321   g_free (pt_s);
2322   gst_structure_free (pt_map);
2323   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2324
2325   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2326   name = g_strdup_printf ("src_%u", sessid);
2327   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2328   g_free (name);
2329   gst_object_unref (pad);
2330
2331   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2332   name = g_strdup_printf ("sink_%u", sessid);
2333   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2334   g_free (name);
2335   gst_object_unref (pad);
2336
2337   return bin;
2338 }
2339
2340 static void
2341 add_rtx_pt (gpointer key, GstCaps * caps, GstStructure * pt_map)
2342 {
2343   guint pt = GPOINTER_TO_INT (key);
2344   const GstStructure *s = gst_caps_get_structure (caps, 0);
2345   const gchar *apt;
2346
2347   if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "RTX") &&
2348       (apt = gst_structure_get_string (s, "apt"))) {
2349     gst_structure_set (pt_map, apt, G_TYPE_UINT, pt, NULL);
2350   }
2351 }
2352
2353 /* Call with priv->lock taken */
2354 static void
2355 update_rtx_receive_pt_map (GstRTSPStream * stream)
2356 {
2357   GstStructure *pt_map;
2358
2359   if (!stream->priv->rtxreceive)
2360     goto done;
2361
2362   pt_map = gst_structure_new_empty ("application/x-rtp-pt-map");
2363   g_hash_table_foreach (stream->priv->ptmap, (GHFunc) add_rtx_pt, pt_map);
2364   g_object_set (stream->priv->rtxreceive, "payload-type-map", pt_map, NULL);
2365   gst_structure_free (pt_map);
2366
2367 done:
2368   return;
2369 }
2370
2371 static void
2372 retrieve_ulpfec_pt (gpointer key, GstCaps * caps, GstElement * ulpfec_decoder)
2373 {
2374   guint pt = GPOINTER_TO_INT (key);
2375   const GstStructure *s = gst_caps_get_structure (caps, 0);
2376
2377   if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
2378     g_object_set (ulpfec_decoder, "pt", pt, NULL);
2379 }
2380
2381 static void
2382 update_ulpfec_decoder_pt (GstRTSPStream * stream)
2383 {
2384   if (!stream->priv->ulpfec_decoder)
2385     goto done;
2386
2387   g_hash_table_foreach (stream->priv->ptmap, (GHFunc) retrieve_ulpfec_pt,
2388       stream->priv->ulpfec_decoder);
2389
2390 done:
2391   return;
2392 }
2393
2394 /**
2395  * gst_rtsp_stream_request_aux_receiver:
2396  * @stream: a #GstRTSPStream
2397  * @sessid: the session id
2398  *
2399  * Creating a rtxreceive bin
2400  *
2401  * Returns: (transfer full) (nullable): a #GstElement.
2402  *
2403  * Since: 1.16
2404  */
2405 GstElement *
2406 gst_rtsp_stream_request_aux_receiver (GstRTSPStream * stream, guint sessid)
2407 {
2408   GstElement *bin;
2409   GstPad *pad;
2410   gchar *name;
2411
2412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2413
2414   bin = gst_bin_new (NULL);
2415   stream->priv->rtxreceive = gst_element_factory_make ("rtprtxreceive", NULL);
2416   update_rtx_receive_pt_map (stream);
2417   update_ulpfec_decoder_pt (stream);
2418   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxreceive));
2419
2420   pad = gst_element_get_static_pad (stream->priv->rtxreceive, "src");
2421   name = g_strdup_printf ("src_%u", sessid);
2422   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2423   g_free (name);
2424   gst_object_unref (pad);
2425
2426   pad = gst_element_get_static_pad (stream->priv->rtxreceive, "sink");
2427   name = g_strdup_printf ("sink_%u", sessid);
2428   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2429   g_free (name);
2430   gst_object_unref (pad);
2431
2432   return bin;
2433 }
2434
2435 /**
2436  * gst_rtsp_stream_set_pt_map:
2437  * @stream: a #GstRTSPStream
2438  * @pt: the pt
2439  * @caps: a #GstCaps
2440  *
2441  * Configure a pt map between @pt and @caps.
2442  */
2443 void
2444 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2445 {
2446   GstRTSPStreamPrivate *priv = stream->priv;
2447
2448   if (!GST_IS_CAPS (caps))
2449     return;
2450
2451   g_mutex_lock (&priv->lock);
2452   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2453   update_rtx_receive_pt_map (stream);
2454   g_mutex_unlock (&priv->lock);
2455 }
2456
2457 /**
2458  * gst_rtsp_stream_set_publish_clock_mode:
2459  * @stream: a #GstRTSPStream
2460  * @mode: the clock publish mode
2461  *
2462  * Sets if and how the stream clock should be published according to RFC7273.
2463  *
2464  * Since: 1.8
2465  */
2466 void
2467 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2468     GstRTSPPublishClockMode mode)
2469 {
2470   GstRTSPStreamPrivate *priv;
2471
2472   priv = stream->priv;
2473   g_mutex_lock (&priv->lock);
2474   priv->publish_clock_mode = mode;
2475   g_mutex_unlock (&priv->lock);
2476 }
2477
2478 /**
2479  * gst_rtsp_stream_get_publish_clock_mode:
2480  * @stream: a #GstRTSPStream
2481  *
2482  * Gets if and how the stream clock should be published according to RFC7273.
2483  *
2484  * Returns: The GstRTSPPublishClockMode
2485  *
2486  * Since: 1.8
2487  */
2488 GstRTSPPublishClockMode
2489 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2490 {
2491   GstRTSPStreamPrivate *priv;
2492   GstRTSPPublishClockMode ret;
2493
2494   priv = stream->priv;
2495   g_mutex_lock (&priv->lock);
2496   ret = priv->publish_clock_mode;
2497   g_mutex_unlock (&priv->lock);
2498
2499   return ret;
2500 }
2501
2502 static GstCaps *
2503 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2504     GstRTSPStream * stream)
2505 {
2506   GstRTSPStreamPrivate *priv = stream->priv;
2507   GstCaps *caps = NULL;
2508
2509   g_mutex_lock (&priv->lock);
2510
2511   if (priv->idx == session) {
2512     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2513     if (caps) {
2514       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2515       gst_caps_ref (caps);
2516     } else {
2517       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2518     }
2519   }
2520
2521   g_mutex_unlock (&priv->lock);
2522
2523   return caps;
2524 }
2525
2526 static void
2527 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2528 {
2529   GstRTSPStreamPrivate *priv = stream->priv;
2530   gchar *name;
2531   GstPadLinkReturn ret;
2532   guint sessid;
2533
2534   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2535       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2536
2537   name = gst_pad_get_name (pad);
2538   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2539     g_free (name);
2540     return;
2541   }
2542   g_free (name);
2543
2544   if (priv->idx != sessid)
2545     return;
2546
2547   if (gst_pad_is_linked (priv->sinkpad)) {
2548     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2549         GST_DEBUG_PAD_NAME (priv->sinkpad));
2550     return;
2551   }
2552
2553   /* link the RTP pad to the session manager, it should not really fail unless
2554    * this is not really an RTP pad */
2555   ret = gst_pad_link (pad, priv->sinkpad);
2556   if (ret != GST_PAD_LINK_OK)
2557     goto link_failed;
2558   priv->recv_rtp_src = gst_object_ref (pad);
2559
2560   return;
2561
2562 /* ERRORS */
2563 link_failed:
2564   {
2565     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2566         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2567   }
2568 }
2569
2570 static void
2571 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2572     GstRTSPStream * stream)
2573 {
2574   /* TODO: What to do here other than this? */
2575   GST_DEBUG ("Stream %p: Got EOS", stream);
2576   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2577 }
2578
2579 typedef struct _ProbeData ProbeData;
2580
2581 struct _ProbeData
2582 {
2583   GstRTSPStream *stream;
2584   /* existing sink, already linked to tee */
2585   GstElement *sink1;
2586   /* new sink, about to be linked */
2587   GstElement *sink2;
2588   /* new queue element, that will be linked to tee and sink1 */
2589   GstElement **queue1;
2590   /* new queue element, that will be linked to tee and sink2 */
2591   GstElement **queue2;
2592   GstPad *sink_pad;
2593   GstPad *tee_pad;
2594   guint index;
2595 };
2596
2597 static void
2598 free_cb_data (gpointer user_data)
2599 {
2600   ProbeData *data = user_data;
2601
2602   gst_object_unref (data->stream);
2603   gst_object_unref (data->sink1);
2604   gst_object_unref (data->sink2);
2605   gst_object_unref (data->sink_pad);
2606   gst_object_unref (data->tee_pad);
2607   g_free (data);
2608 }
2609
2610
2611 static void
2612 create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream,
2613     GstElement * tee, GstElement * sink, GstElement ** queue)
2614 {
2615   GstRTSPStreamPrivate *priv = stream->priv;
2616   GstPad *tee_pad;
2617   GstPad *queue_pad;
2618   GstPad *sink_pad;
2619
2620   /* create queue for the new stream */
2621   *queue = gst_element_factory_make ("queue", NULL);
2622   g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
2623       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2624   gst_bin_add (priv->joined_bin, *queue);
2625
2626   /* link tee to queue */
2627   tee_pad = gst_element_get_request_pad (tee, "src_%u");
2628   queue_pad = gst_element_get_static_pad (*queue, "sink");
2629   gst_pad_link (tee_pad, queue_pad);
2630   gst_object_unref (queue_pad);
2631   gst_object_unref (tee_pad);
2632
2633   /* link queue to sink */
2634   queue_pad = gst_element_get_static_pad (*queue, "src");
2635   sink_pad = gst_element_get_static_pad (sink, "sink");
2636   gst_pad_link (queue_pad, sink_pad);
2637   gst_object_unref (queue_pad);
2638   gst_object_unref (sink_pad);
2639
2640   gst_element_sync_state_with_parent (sink);
2641   gst_element_sync_state_with_parent (*queue);
2642 }
2643
2644 static GstPadProbeReturn
2645 create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
2646     GstPadProbeInfo * info, gpointer user_data)
2647 {
2648   GstRTSPStreamPrivate *priv;
2649   ProbeData *data = user_data;
2650   GstRTSPStream *stream;
2651   GstElement **queue1;
2652   GstElement **queue2;
2653   GstPad *sink_pad;
2654   GstPad *tee_pad;
2655   GstPad *queue_pad;
2656   guint index;
2657
2658   stream = data->stream;
2659   priv = stream->priv;
2660   queue1 = data->queue1;
2661   queue2 = data->queue2;
2662   sink_pad = data->sink_pad;
2663   tee_pad = data->tee_pad;
2664   index = data->index;
2665
2666   /* unlink tee and the existing sink:
2667    *   .-----.    .---------.
2668    *   | tee |    |  sink1  |
2669    * sink   src->sink       |
2670    *   '-----'    '---------'
2671    */
2672   g_assert (gst_pad_unlink (tee_pad, sink_pad));
2673
2674   /* add queue to the already existing stream */
2675   *queue1 = gst_element_factory_make ("queue", NULL);
2676   g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
2677       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2678   gst_bin_add (priv->joined_bin, *queue1);
2679
2680   /* link tee, queue and sink:
2681    *   .-----.    .---------.    .---------.
2682    *   | tee |    |  queue1 |    | sink1   |
2683    * sink   src->sink      src->sink       |
2684    *   '-----'    '---------'    '---------'
2685    */
2686   queue_pad = gst_element_get_static_pad (*queue1, "sink");
2687   gst_pad_link (tee_pad, queue_pad);
2688   gst_object_unref (queue_pad);
2689   queue_pad = gst_element_get_static_pad (*queue1, "src");
2690   gst_pad_link (queue_pad, sink_pad);
2691   gst_object_unref (queue_pad);
2692
2693   gst_element_sync_state_with_parent (*queue1);
2694
2695   /* create queue and link it to tee and the new sink */
2696   create_and_plug_queue_to_unlinked_stream (stream,
2697       priv->tee[index], data->sink2, queue2);
2698
2699   /* the final stream:
2700    *
2701    *    .-----.    .---------.    .---------.
2702    *    | tee |    |  queue1 |    | sink1   |
2703    *  sink   src->sink      src->sink       |
2704    *    |     |    '---------'    '---------'
2705    *    |     |    .---------.    .---------.
2706    *    |     |    |  queue2 |    | sink2   |
2707    *    |    src->sink      src->sink       |
2708    *    '-----'    '---------'    '---------'
2709    */
2710
2711   return GST_PAD_PROBE_REMOVE;
2712 }
2713
2714 static void
2715 create_and_plug_queue_to_linked_stream (GstRTSPStream * stream,
2716     GstElement * sink1, GstElement * sink2, guint index, GstElement ** queue1,
2717     GstElement ** queue2)
2718 {
2719   ProbeData *data;
2720
2721   data = g_new0 (ProbeData, 1);
2722   data->stream = gst_object_ref (stream);
2723   data->sink1 = gst_object_ref (sink1);
2724   data->sink2 = gst_object_ref (sink2);
2725   data->queue1 = queue1;
2726   data->queue2 = queue2;
2727   data->index = index;
2728
2729   data->sink_pad = gst_element_get_static_pad (sink1, "sink");
2730   g_assert (data->sink_pad);
2731   data->tee_pad = gst_pad_get_peer (data->sink_pad);
2732   g_assert (data->tee_pad);
2733
2734   gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
2735       create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
2736 }
2737
2738 static void
2739 plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
2740     GstElement ** queue_to_plug, guint index, gboolean is_mcast)
2741 {
2742   GstRTSPStreamPrivate *priv = stream->priv;
2743   GstElement *existing_sink;
2744
2745   if (is_mcast)
2746     existing_sink = priv->udpsink[index];
2747   else
2748     existing_sink = priv->mcast_udpsink[index];
2749
2750   GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
2751
2752   /* add sink to the bin */
2753   gst_bin_add (priv->joined_bin, sink_to_plug);
2754
2755   if (priv->appsink[index] && existing_sink) {
2756
2757     /* queues are already added for the existing stream, add one for
2758        the newly added udp stream */
2759     create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
2760         sink_to_plug, queue_to_plug);
2761
2762   } else if (priv->appsink[index] || existing_sink) {
2763     GstElement **queue;
2764     GstElement *element;
2765
2766     /* add queue to the already existing stream plus the newly created udp
2767        stream */
2768     if (priv->appsink[index]) {
2769       element = priv->appsink[index];
2770       queue = &priv->appqueue[index];
2771     } else {
2772       element = existing_sink;
2773       if (is_mcast)
2774         queue = &priv->udpqueue[index];
2775       else
2776         queue = &priv->mcast_udpqueue[index];
2777     }
2778
2779     create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug,
2780         index, queue, queue_to_plug);
2781
2782   } else {
2783     GstPad *tee_pad;
2784     GstPad *sink_pad;
2785
2786     GST_DEBUG_OBJECT (stream, "creating first stream");
2787
2788     /* no need to add queues */
2789     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2790     sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
2791     gst_pad_link (tee_pad, sink_pad);
2792     gst_object_unref (tee_pad);
2793     gst_object_unref (sink_pad);
2794   }
2795
2796   gst_element_sync_state_with_parent (sink_to_plug);
2797 }
2798
2799 static void
2800 plug_tcp_sink (GstRTSPStream * stream, guint index)
2801 {
2802   GstRTSPStreamPrivate *priv = stream->priv;
2803
2804   GST_DEBUG_OBJECT (stream, "plug tcp sink");
2805
2806   /* add sink to the bin */
2807   gst_bin_add (priv->joined_bin, priv->appsink[index]);
2808
2809   if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
2810
2811     /* queues are already added for the existing stream, add one for
2812        the newly added tcp stream */
2813     create_and_plug_queue_to_unlinked_stream (stream,
2814         priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
2815
2816   } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
2817     GstElement **queue;
2818     GstElement *element;
2819
2820     /* add queue to the already existing stream plus the newly created tcp
2821        stream */
2822     if (priv->mcast_udpsink[index]) {
2823       element = priv->mcast_udpsink[index];
2824       queue = &priv->mcast_udpqueue[index];
2825     } else {
2826       element = priv->udpsink[index];
2827       queue = &priv->udpqueue[index];
2828     }
2829
2830     create_and_plug_queue_to_linked_stream (stream, element,
2831         priv->appsink[index], index, queue, &priv->appqueue[index]);
2832
2833   } else {
2834     GstPad *tee_pad;
2835     GstPad *sink_pad;
2836
2837     /* no need to add queues */
2838     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2839     sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
2840     gst_pad_link (tee_pad, sink_pad);
2841     gst_object_unref (tee_pad);
2842     gst_object_unref (sink_pad);
2843   }
2844
2845   gst_element_sync_state_with_parent (priv->appsink[index]);
2846 }
2847
2848 static void
2849 plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
2850     guint index)
2851 {
2852   GstRTSPStreamPrivate *priv;
2853   gboolean is_tcp, is_udp, is_mcast;
2854   priv = stream->priv;
2855
2856   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2857   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2858   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2859
2860   if (is_udp)
2861     plug_udp_sink (stream, priv->udpsink[index],
2862         &priv->udpqueue[index], index, FALSE);
2863
2864   else if (is_mcast)
2865     plug_udp_sink (stream, priv->mcast_udpsink[index],
2866         &priv->mcast_udpqueue[index], index, TRUE);
2867
2868   else if (is_tcp)
2869     plug_tcp_sink (stream, index);
2870 }
2871
2872 /* must be called with lock */
2873 static gboolean
2874 create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
2875 {
2876   GstRTSPStreamPrivate *priv;
2877   GstPad *pad;
2878   GstBin *bin;
2879   gboolean is_tcp, is_udp, is_mcast;
2880   gint mcast_ttl = 0;
2881   gint i;
2882
2883   GST_DEBUG_OBJECT (stream, "create sender part");
2884   priv = stream->priv;
2885   bin = priv->joined_bin;
2886
2887   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2888   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2889   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2890
2891   if (is_mcast)
2892     mcast_ttl = transport->ttl;
2893
2894   GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d (ttl: %d)", is_tcp,
2895       is_udp, is_mcast, mcast_ttl);
2896
2897   if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
2898     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
2899     return FALSE;
2900   }
2901
2902   if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
2903     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
2904     return FALSE;
2905   }
2906
2907   for (i = 0; i < 2; i++) {
2908     gboolean link_tee = FALSE;
2909     /* For the sender we create this bit of pipeline for both
2910      * RTP and RTCP.
2911      * Initially there will be only one active transport for
2912      * the stream, so the pipeline will look like this:
2913      *
2914      * .--------.      .-----.    .---------.
2915      * | rtpbin |      | tee |    |  sink   |
2916      * |       send->sink   src->sink       |
2917      * '--------'      '-----'    '---------'
2918      *
2919      * For each new transport, the already existing branch will
2920      * be reconfigured by adding a queue element:
2921      *
2922      * .--------.      .-----.    .---------.    .---------.
2923      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2924      * |       send->sink   src->sink      src->sink       |
2925      * '--------'      |     |    '---------'    '---------'
2926      *                 |     |    .---------.    .---------.
2927      *                 |     |    |  queue  |    | udpsink |
2928      *                 |    src->sink      src->sink       |
2929      *                 |     |    '---------'    '---------'
2930      *                 |     |    .---------.    .---------.
2931      *                 |     |    |  queue  |    | appsink |
2932      *                 |    src->sink      src->sink       |
2933      *                 '-----'    '---------'    '---------'
2934      */
2935
2936     /* Only link the RTP send src if we're going to send RTP, link
2937      * the RTCP send src always */
2938     if (!priv->srcpad && i == 0)
2939       continue;
2940
2941     if (!priv->tee[i]) {
2942       /* make tee for RTP/RTCP */
2943       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2944       gst_bin_add (bin, priv->tee[i]);
2945       link_tee = TRUE;
2946     }
2947
2948     if (is_udp && !priv->udpsink[i]) {
2949       /* we create only one pair of udpsinks for IPv4 and IPv6 */
2950       create_and_configure_udpsink (stream, &priv->udpsink[i],
2951           priv->socket_v4[i], priv->socket_v6[i], FALSE, (i == 0), mcast_ttl);
2952       plug_sink (stream, transport, i);
2953     } else if (is_mcast && !priv->mcast_udpsink[i]) {
2954       /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
2955       create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
2956           priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0),
2957           mcast_ttl);
2958       plug_sink (stream, transport, i);
2959     } else if (is_tcp && !priv->appsink[i]) {
2960       /* make appsink */
2961       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2962       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2963
2964       /* we need to set sync and preroll to FALSE for the sink to avoid
2965        * deadlock. This is only needed for sink sending RTCP data. */
2966       if (i == 1)
2967         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2968
2969       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2970           &sink_cb, stream, NULL);
2971       plug_sink (stream, transport, i);
2972     }
2973
2974     if (link_tee) {
2975       /* and link to rtpbin send pad */
2976       gst_element_sync_state_with_parent (priv->tee[i]);
2977       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2978       gst_pad_link (priv->send_src[i], pad);
2979       gst_object_unref (pad);
2980     }
2981   }
2982
2983   return TRUE;
2984 }
2985
2986 /* must be called with lock */
2987 static void
2988 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2989     GstElement * funnel)
2990 {
2991   GstRTSPStreamPrivate *priv;
2992   GstPad *pad, *selpad;
2993
2994   priv = stream->priv;
2995
2996   if (priv->srcpad) {
2997     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2998      * values. This is only relevant for PLAY pipelines */
2999     gst_element_set_state (src, GST_STATE_PLAYING);
3000     gst_element_set_locked_state (src, TRUE);
3001   }
3002
3003   /* add src */
3004   gst_bin_add (bin, src);
3005
3006   /* and link to the funnel */
3007   selpad = gst_element_get_request_pad (funnel, "sink_%u");
3008   pad = gst_element_get_static_pad (src, "src");
3009   gst_pad_link (pad, selpad);
3010   gst_object_unref (pad);
3011   gst_object_unref (selpad);
3012 }
3013
3014 /* must be called with lock */
3015 static gboolean
3016 create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
3017     transport)
3018 {
3019   GstRTSPStreamPrivate *priv;
3020   GstPad *pad;
3021   GstBin *bin;
3022   gboolean tcp;
3023   gboolean udp;
3024   gboolean mcast;
3025   gint i;
3026
3027   GST_DEBUG_OBJECT (stream, "create receiver part");
3028   priv = stream->priv;
3029   bin = priv->joined_bin;
3030
3031   tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3032   udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3033   mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3034
3035   for (i = 0; i < 2; i++) {
3036     /* For the receiver we create this bit of pipeline for both
3037      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
3038      * and it is all funneled into the rtpbin receive pad.
3039      *
3040      *
3041      * .--------.     .--------.    .--------.
3042      * | udpsrc |     | funnel |    | rtpbin |
3043      * | RTP    src->sink      src->sink     |
3044      * '--------'     |        |    |        |
3045      * .--------.     |        |    |        |
3046      * | appsrc |     |        |    |        |
3047      * | RTP    src->sink      |    |        |
3048      * '--------'     '--------'    |        |
3049      *                              |        |
3050      * .--------.     .--------.    |        |
3051      * | udpsrc |     | funnel |    |        |
3052      * | RTCP   src->sink      src->sink     |
3053      * '--------'     |        |    '--------'
3054      * .--------.     |        |
3055      * | appsrc |     |        |
3056      * | RTCP   src->sink      |
3057      * '--------'     '--------'
3058      */
3059
3060     if (!priv->sinkpad && i == 0) {
3061       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
3062        * RTCP sink always */
3063       continue;
3064     }
3065
3066     /* make funnel for the RTP/RTCP receivers */
3067     if (!priv->funnel[i]) {
3068       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
3069       gst_bin_add (bin, priv->funnel[i]);
3070
3071       pad = gst_element_get_static_pad (priv->funnel[i], "src");
3072       gst_pad_link (pad, priv->recv_sink[i]);
3073       gst_object_unref (pad);
3074     }
3075
3076     if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
3077       GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
3078       if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
3079               priv->socket_v4[i]))
3080         goto udpsrc_error;
3081
3082       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
3083     }
3084
3085     if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
3086       GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
3087       if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
3088               priv->socket_v6[i]))
3089         goto udpsrc_error;
3090
3091       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
3092     }
3093
3094     if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
3095       GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
3096       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
3097               priv->mcast_socket_v4[i]))
3098         goto mcast_udpsrc_error;
3099       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
3100     }
3101
3102     if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
3103       GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
3104       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
3105               priv->mcast_socket_v6[i]))
3106         goto mcast_udpsrc_error;
3107       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
3108     }
3109
3110     if (tcp && !priv->appsrc[i]) {
3111       /* make and add appsrc */
3112       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
3113       priv->appsrc_base_time[i] = -1;
3114       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
3115           TRUE, NULL);
3116       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
3117     }
3118
3119     gst_element_sync_state_with_parent (priv->funnel[i]);
3120   }
3121
3122   return TRUE;
3123
3124 mcast_udpsrc_error:
3125 udpsrc_error:
3126   return FALSE;
3127 }
3128
3129 static gboolean
3130 check_mcast_part_for_transport (GstRTSPStream * stream,
3131     const GstRTSPTransport * tr)
3132 {
3133   GstRTSPStreamPrivate *priv = stream->priv;
3134   GInetAddress *inetaddr;
3135   GSocketFamily family;
3136   GstRTSPAddress *mcast_addr;
3137
3138   /* Check if it's a ipv4 or ipv6 transport */
3139   inetaddr = g_inet_address_new_from_string (tr->destination);
3140   family = g_inet_address_get_family (inetaddr);
3141   g_object_unref (inetaddr);
3142
3143   /* Select fields corresponding to the family */
3144   if (family == G_SOCKET_FAMILY_IPV4) {
3145     mcast_addr = priv->mcast_addr_v4;
3146   } else {
3147     mcast_addr = priv->mcast_addr_v6;
3148   }
3149
3150   /* We support only one mcast group per family, make sure this transport
3151    * matches it. */
3152   if (!mcast_addr)
3153     goto no_addr;
3154
3155   if (g_ascii_strcasecmp (tr->destination, mcast_addr->address) != 0 ||
3156       tr->port.min != mcast_addr->port ||
3157       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
3158       tr->ttl != mcast_addr->ttl)
3159     goto wrong_addr;
3160
3161   return TRUE;
3162
3163 no_addr:
3164   {
3165     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
3166         "has been reserved");
3167     return FALSE;
3168   }
3169 wrong_addr:
3170   {
3171     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
3172         "the reserved address");
3173     return FALSE;
3174   }
3175 }
3176
3177 /**
3178  * gst_rtsp_stream_join_bin:
3179  * @stream: a #GstRTSPStream
3180  * @bin: (transfer none): a #GstBin to join
3181  * @rtpbin: (transfer none): a rtpbin element in @bin
3182  * @state: the target state of the new elements
3183  *
3184  * Join the #GstBin @bin that contains the element @rtpbin.
3185  *
3186  * @stream will link to @rtpbin, which must be inside @bin. The elements
3187  * added to @bin will be set to the state given in @state.
3188  *
3189  * Returns: %TRUE on success.
3190  */
3191 gboolean
3192 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
3193     GstElement * rtpbin, GstState state)
3194 {
3195   GstRTSPStreamPrivate *priv;
3196   guint idx;
3197   gchar *name;
3198   GstPadLinkReturn ret;
3199
3200   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3201   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3202   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3203
3204   priv = stream->priv;
3205
3206   g_mutex_lock (&priv->lock);
3207   if (priv->joined_bin != NULL)
3208     goto was_joined;
3209
3210   /* create a session with the same index as the stream */
3211   idx = priv->idx;
3212
3213   GST_INFO ("stream %p joining bin as session %u", stream, idx);
3214
3215   if (priv->profiles & GST_RTSP_PROFILE_SAVP
3216       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
3217     /* For SRTP */
3218     g_signal_connect (rtpbin, "request-rtp-encoder",
3219         (GCallback) request_rtp_encoder, stream);
3220     g_signal_connect (rtpbin, "request-rtcp-encoder",
3221         (GCallback) request_rtcp_encoder, stream);
3222     g_signal_connect (rtpbin, "request-rtp-decoder",
3223         (GCallback) request_rtp_rtcp_decoder, stream);
3224     g_signal_connect (rtpbin, "request-rtcp-decoder",
3225         (GCallback) request_rtp_rtcp_decoder, stream);
3226   }
3227
3228   if (priv->sinkpad) {
3229     g_signal_connect (rtpbin, "request-pt-map",
3230         (GCallback) request_pt_map, stream);
3231   }
3232
3233   /* get pads from the RTP session element for sending and receiving
3234    * RTP/RTCP*/
3235   if (priv->srcpad) {
3236     /* get a pad for sending RTP */
3237     name = g_strdup_printf ("send_rtp_sink_%u", idx);
3238     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
3239     g_free (name);
3240
3241     /* link the RTP pad to the session manager, it should not really fail unless
3242      * this is not really an RTP pad */
3243     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
3244     if (ret != GST_PAD_LINK_OK)
3245       goto link_failed;
3246
3247     name = g_strdup_printf ("send_rtp_src_%u", idx);
3248     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
3249     g_free (name);
3250   } else {
3251     /* RECORD case: need to connect our sinkpad from here */
3252     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
3253     /* EOS */
3254     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
3255
3256     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
3257     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
3258     g_free (name);
3259   }
3260
3261   name = g_strdup_printf ("send_rtcp_src_%u", idx);
3262   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
3263   g_free (name);
3264   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
3265   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
3266   g_free (name);
3267
3268   /* get the session */
3269   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
3270
3271   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
3272       stream);
3273   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
3274       stream);
3275   g_signal_connect (priv->session, "on-ssrc-active",
3276       (GCallback) on_ssrc_active, stream);
3277   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
3278       stream);
3279   g_signal_connect (priv->session, "on-bye-timeout",
3280       (GCallback) on_bye_timeout, stream);
3281   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
3282       stream);
3283
3284   /* signal for sender ssrc */
3285   g_signal_connect (priv->session, "on-new-sender-ssrc",
3286       (GCallback) on_new_sender_ssrc, stream);
3287   g_signal_connect (priv->session, "on-sender-ssrc-active",
3288       (GCallback) on_sender_ssrc_active, stream);
3289
3290   if (priv->srcpad) {
3291     /* be notified of caps changes */
3292     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
3293         (GCallback) caps_notify, stream);
3294     priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
3295   }
3296
3297   priv->joined_bin = bin;
3298   GST_DEBUG_OBJECT (stream, "successfully joined bin");
3299   g_mutex_unlock (&priv->lock);
3300
3301   return TRUE;
3302
3303   /* ERRORS */
3304 was_joined:
3305   {
3306     g_mutex_unlock (&priv->lock);
3307     return TRUE;
3308   }
3309 link_failed:
3310   {
3311     GST_WARNING ("failed to link stream %u", idx);
3312     gst_object_unref (priv->send_rtp_sink);
3313     priv->send_rtp_sink = NULL;
3314     g_mutex_unlock (&priv->lock);
3315     return FALSE;
3316   }
3317 }
3318
3319 static void
3320 clear_element (GstBin * bin, GstElement ** elementptr)
3321 {
3322   if (*elementptr) {
3323     gst_element_set_locked_state (*elementptr, FALSE);
3324     gst_element_set_state (*elementptr, GST_STATE_NULL);
3325     if (GST_ELEMENT_PARENT (*elementptr))
3326       gst_bin_remove (bin, *elementptr);
3327     else
3328       gst_object_unref (*elementptr);
3329     *elementptr = NULL;
3330   }
3331 }
3332
3333 /**
3334  * gst_rtsp_stream_leave_bin:
3335  * @stream: a #GstRTSPStream
3336  * @bin: (transfer none): a #GstBin
3337  * @rtpbin: (transfer none): a rtpbin #GstElement
3338  *
3339  * Remove the elements of @stream from @bin.
3340  *
3341  * Return: %TRUE on success.
3342  */
3343 gboolean
3344 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
3345     GstElement * rtpbin)
3346 {
3347   GstRTSPStreamPrivate *priv;
3348   gint i;
3349
3350   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3351   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3352   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3353
3354   priv = stream->priv;
3355
3356   g_mutex_lock (&priv->lock);
3357   if (priv->joined_bin == NULL)
3358     goto was_not_joined;
3359   if (priv->joined_bin != bin)
3360     goto wrong_bin;
3361
3362   priv->joined_bin = NULL;
3363
3364   /* all transports must be removed by now */
3365   if (priv->transports != NULL)
3366     goto transports_not_removed;
3367
3368   clear_tr_cache (priv, TRUE);
3369   clear_tr_cache (priv, FALSE);
3370
3371   GST_INFO ("stream %p leaving bin", stream);
3372
3373   if (priv->srcpad) {
3374     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
3375
3376     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
3377     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
3378     gst_object_unref (priv->send_rtp_sink);
3379     priv->send_rtp_sink = NULL;
3380   } else if (priv->recv_rtp_src) {
3381     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
3382     gst_object_unref (priv->recv_rtp_src);
3383     priv->recv_rtp_src = NULL;
3384   }
3385
3386   for (i = 0; i < 2; i++) {
3387     clear_element (bin, &priv->udpsrc_v4[i]);
3388     clear_element (bin, &priv->udpsrc_v6[i]);
3389     clear_element (bin, &priv->udpqueue[i]);
3390     clear_element (bin, &priv->udpsink[i]);
3391
3392     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
3393     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
3394     clear_element (bin, &priv->mcast_udpqueue[i]);
3395     clear_element (bin, &priv->mcast_udpsink[i]);
3396
3397     clear_element (bin, &priv->appsrc[i]);
3398     clear_element (bin, &priv->appqueue[i]);
3399     clear_element (bin, &priv->appsink[i]);
3400
3401     clear_element (bin, &priv->tee[i]);
3402     clear_element (bin, &priv->funnel[i]);
3403
3404     if (priv->sinkpad || i == 1) {
3405       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3406       gst_object_unref (priv->recv_sink[i]);
3407       priv->recv_sink[i] = NULL;
3408     }
3409   }
3410
3411   if (priv->srcpad) {
3412     gst_object_unref (priv->send_src[0]);
3413     priv->send_src[0] = NULL;
3414   }
3415
3416   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3417   gst_object_unref (priv->send_src[1]);
3418   priv->send_src[1] = NULL;
3419
3420   g_object_unref (priv->session);
3421   priv->session = NULL;
3422   if (priv->caps)
3423     gst_caps_unref (priv->caps);
3424   priv->caps = NULL;
3425
3426   if (priv->srtpenc)
3427     gst_object_unref (priv->srtpenc);
3428   if (priv->srtpdec)
3429     gst_object_unref (priv->srtpdec);
3430
3431   if (priv->mcast_addr_v4)
3432     gst_rtsp_address_free (priv->mcast_addr_v4);
3433   priv->mcast_addr_v4 = NULL;
3434   if (priv->mcast_addr_v6)
3435     gst_rtsp_address_free (priv->mcast_addr_v6);
3436   priv->mcast_addr_v6 = NULL;
3437   if (priv->server_addr_v4)
3438     gst_rtsp_address_free (priv->server_addr_v4);
3439   priv->server_addr_v4 = NULL;
3440   if (priv->server_addr_v6)
3441     gst_rtsp_address_free (priv->server_addr_v6);
3442   priv->server_addr_v6 = NULL;
3443
3444   g_mutex_unlock (&priv->lock);
3445
3446   return TRUE;
3447
3448 was_not_joined:
3449   {
3450     g_mutex_unlock (&priv->lock);
3451     return TRUE;
3452   }
3453 transports_not_removed:
3454   {
3455     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3456     g_mutex_unlock (&priv->lock);
3457     return FALSE;
3458   }
3459 wrong_bin:
3460   {
3461     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3462     g_mutex_unlock (&priv->lock);
3463     return FALSE;
3464   }
3465 }
3466
3467 /**
3468  * gst_rtsp_stream_get_joined_bin:
3469  * @stream: a #GstRTSPStream
3470  *
3471  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3472  *
3473  * Return: (transfer full) (nullable): the joined bin or NULL.
3474  */
3475 GstBin *
3476 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3477 {
3478   GstRTSPStreamPrivate *priv;
3479   GstBin *bin = NULL;
3480
3481   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3482
3483   priv = stream->priv;
3484
3485   g_mutex_lock (&priv->lock);
3486   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3487   g_mutex_unlock (&priv->lock);
3488
3489   return bin;
3490 }
3491
3492 /**
3493  * gst_rtsp_stream_get_rtpinfo:
3494  * @stream: a #GstRTSPStream
3495  * @rtptime: (allow-none) (out caller-allocates): result RTP timestamp
3496  * @seq: (allow-none) (out caller-allocates): result RTP seqnum
3497  * @clock_rate: (allow-none) (out caller-allocates): the clock rate
3498  * @running_time: (out caller-allocates): result running-time
3499  *
3500  * Retrieve the current rtptime, seq and running-time. This is used to
3501  * construct a RTPInfo reply header.
3502  *
3503  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3504  */
3505 gboolean
3506 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3507     guint * rtptime, guint * seq, guint * clock_rate,
3508     GstClockTime * running_time)
3509 {
3510   GstRTSPStreamPrivate *priv;
3511   GstStructure *stats;
3512   GObjectClass *payobjclass;
3513
3514   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3515
3516   priv = stream->priv;
3517
3518   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3519
3520   g_mutex_lock (&priv->lock);
3521
3522   /* First try to extract the information from the last buffer on the sinks.
3523    * This will have a more accurate sequence number and timestamp, as between
3524    * the payloader and the sink there can be some queues
3525    */
3526   if (priv->udpsink[0] || priv->appsink[0]) {
3527     GstSample *last_sample;
3528
3529     if (priv->udpsink[0])
3530       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3531     else
3532       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3533
3534     if (last_sample) {
3535       GstCaps *caps;
3536       GstBuffer *buffer;
3537       GstSegment *segment;
3538       GstStructure *s;
3539       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3540
3541       caps = gst_sample_get_caps (last_sample);
3542       buffer = gst_sample_get_buffer (last_sample);
3543       segment = gst_sample_get_segment (last_sample);
3544       s = gst_caps_get_structure (caps, 0);
3545
3546       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3547         guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer);
3548         guint ssrc_stream = 0;
3549         if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
3550             gst_structure_get_uint (s, "ssrc", &ssrc_stream) &&
3551             ssrc_buf != ssrc_stream) {
3552           /* Skip buffers from auxiliary streams. */
3553           GST_DEBUG_OBJECT (stream,
3554               "not a buffer from the payloader, SSRC: %08x", ssrc_buf);
3555
3556           gst_rtp_buffer_unmap (&rtp_buffer);
3557           gst_sample_unref (last_sample);
3558           goto stats;
3559         }
3560
3561         if (seq) {
3562           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3563         }
3564
3565         if (rtptime) {
3566           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3567         }
3568
3569         gst_rtp_buffer_unmap (&rtp_buffer);
3570
3571         if (running_time) {
3572           *running_time =
3573               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3574               GST_BUFFER_TIMESTAMP (buffer));
3575         }
3576
3577         if (clock_rate) {
3578           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3579
3580           if (*clock_rate == 0 && running_time)
3581             *running_time = GST_CLOCK_TIME_NONE;
3582         }
3583         gst_sample_unref (last_sample);
3584
3585         goto done;
3586       } else {
3587         gst_sample_unref (last_sample);
3588       }
3589     }
3590   }
3591
3592 stats:
3593   if (g_object_class_find_property (payobjclass, "stats")) {
3594     g_object_get (priv->payloader, "stats", &stats, NULL);
3595     if (stats == NULL)
3596       goto no_stats;
3597
3598     if (seq)
3599       gst_structure_get_uint (stats, "seqnum", seq);
3600
3601     if (rtptime)
3602       gst_structure_get_uint (stats, "timestamp", rtptime);
3603
3604     if (running_time)
3605       gst_structure_get_clock_time (stats, "running-time", running_time);
3606
3607     if (clock_rate) {
3608       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3609       if (*clock_rate == 0 && running_time)
3610         *running_time = GST_CLOCK_TIME_NONE;
3611     }
3612     gst_structure_free (stats);
3613   } else {
3614     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3615         !g_object_class_find_property (payobjclass, "timestamp"))
3616       goto no_stats;
3617
3618     if (seq)
3619       g_object_get (priv->payloader, "seqnum", seq, NULL);
3620
3621     if (rtptime)
3622       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3623
3624     if (running_time)
3625       *running_time = GST_CLOCK_TIME_NONE;
3626   }
3627
3628 done:
3629   g_mutex_unlock (&priv->lock);
3630
3631   return TRUE;
3632
3633   /* ERRORS */
3634 no_stats:
3635   {
3636     GST_WARNING ("Could not get payloader stats");
3637     g_mutex_unlock (&priv->lock);
3638     return FALSE;
3639   }
3640 }
3641
3642 /**
3643  * gst_rtsp_stream_get_caps:
3644  * @stream: a #GstRTSPStream
3645  *
3646  * Retrieve the current caps of @stream.
3647  *
3648  * Returns: (transfer full) (nullable): the #GstCaps of @stream.
3649  * use gst_caps_unref() after usage.
3650  */
3651 GstCaps *
3652 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3653 {
3654   GstRTSPStreamPrivate *priv;
3655   GstCaps *result;
3656
3657   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3658
3659   priv = stream->priv;
3660
3661   g_mutex_lock (&priv->lock);
3662   if ((result = priv->caps))
3663     gst_caps_ref (result);
3664   g_mutex_unlock (&priv->lock);
3665
3666   return result;
3667 }
3668
3669 /**
3670  * gst_rtsp_stream_recv_rtp:
3671  * @stream: a #GstRTSPStream
3672  * @buffer: (transfer full): a #GstBuffer
3673  *
3674  * Handle an RTP buffer for the stream. This method is usually called when a
3675  * message has been received from a client using the TCP transport.
3676  *
3677  * This function takes ownership of @buffer.
3678  *
3679  * Returns: a GstFlowReturn.
3680  */
3681 GstFlowReturn
3682 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3683 {
3684   GstRTSPStreamPrivate *priv;
3685   GstFlowReturn ret;
3686   GstElement *element;
3687
3688   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3689   priv = stream->priv;
3690   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3691   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3692
3693   g_mutex_lock (&priv->lock);
3694   if (priv->appsrc[0])
3695     element = gst_object_ref (priv->appsrc[0]);
3696   else
3697     element = NULL;
3698   g_mutex_unlock (&priv->lock);
3699
3700   if (element) {
3701     if (priv->appsrc_base_time[0] == -1) {
3702       /* Take current running_time. This timestamp will be put on
3703        * the first buffer of each stream because we are a live source and so we
3704        * timestamp with the running_time. When we are dealing with TCP, we also
3705        * only timestamp the first buffer (using the DISCONT flag) because a server
3706        * typically bursts data, for which we don't want to compensate by speeding
3707        * up the media. The other timestamps will be interpollated from this one
3708        * using the RTP timestamps. */
3709       GST_OBJECT_LOCK (element);
3710       if (GST_ELEMENT_CLOCK (element)) {
3711         GstClockTime now;
3712         GstClockTime base_time;
3713
3714         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3715         base_time = GST_ELEMENT_CAST (element)->base_time;
3716
3717         priv->appsrc_base_time[0] = now - base_time;
3718         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3719         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3720             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3721             GST_TIME_ARGS (base_time));
3722       }
3723       GST_OBJECT_UNLOCK (element);
3724     }
3725
3726     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3727     gst_object_unref (element);
3728   } else {
3729     ret = GST_FLOW_OK;
3730   }
3731   return ret;
3732 }
3733
3734 /**
3735  * gst_rtsp_stream_recv_rtcp:
3736  * @stream: a #GstRTSPStream
3737  * @buffer: (transfer full): a #GstBuffer
3738  *
3739  * Handle an RTCP buffer for the stream. This method is usually called when a
3740  * message has been received from a client using the TCP transport.
3741  *
3742  * This function takes ownership of @buffer.
3743  *
3744  * Returns: a GstFlowReturn.
3745  */
3746 GstFlowReturn
3747 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3748 {
3749   GstRTSPStreamPrivate *priv;
3750   GstFlowReturn ret;
3751   GstElement *element;
3752
3753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3754   priv = stream->priv;
3755   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3756
3757   if (priv->joined_bin == NULL) {
3758     gst_buffer_unref (buffer);
3759     return GST_FLOW_NOT_LINKED;
3760   }
3761   g_mutex_lock (&priv->lock);
3762   if (priv->appsrc[1])
3763     element = gst_object_ref (priv->appsrc[1]);
3764   else
3765     element = NULL;
3766   g_mutex_unlock (&priv->lock);
3767
3768   if (element) {
3769     if (priv->appsrc_base_time[1] == -1) {
3770       /* Take current running_time. This timestamp will be put on
3771        * the first buffer of each stream because we are a live source and so we
3772        * timestamp with the running_time. When we are dealing with TCP, we also
3773        * only timestamp the first buffer (using the DISCONT flag) because a server
3774        * typically bursts data, for which we don't want to compensate by speeding
3775        * up the media. The other timestamps will be interpollated from this one
3776        * using the RTP timestamps. */
3777       GST_OBJECT_LOCK (element);
3778       if (GST_ELEMENT_CLOCK (element)) {
3779         GstClockTime now;
3780         GstClockTime base_time;
3781
3782         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3783         base_time = GST_ELEMENT_CAST (element)->base_time;
3784
3785         priv->appsrc_base_time[1] = now - base_time;
3786         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3787         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3788             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3789             GST_TIME_ARGS (base_time));
3790       }
3791       GST_OBJECT_UNLOCK (element);
3792     }
3793
3794     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3795     gst_object_unref (element);
3796   } else {
3797     ret = GST_FLOW_OK;
3798     gst_buffer_unref (buffer);
3799   }
3800   return ret;
3801 }
3802
3803 /* must be called with lock */
3804 static gboolean
3805 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3806     gboolean add)
3807 {
3808   GstRTSPStreamPrivate *priv = stream->priv;
3809   const GstRTSPTransport *tr;
3810
3811   tr = gst_rtsp_stream_transport_get_transport (trans);
3812
3813   switch (tr->lower_transport) {
3814     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3815     {
3816       if (add) {
3817         if (!check_mcast_part_for_transport (stream, tr))
3818           goto mcast_error;
3819         priv->transports = g_list_prepend (priv->transports, trans);
3820
3821         if (tr->ttl > 0) {
3822           GST_INFO ("setting ttl-mc %d", tr->ttl);
3823           if (priv->udpsink[0])
3824             g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", tr->ttl, NULL);
3825           if (priv->udpsink[1])
3826             g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", tr->ttl, NULL);
3827         }
3828       } else {
3829         priv->transports = g_list_remove (priv->transports, trans);
3830       }
3831       break;
3832     }
3833     case GST_RTSP_LOWER_TRANS_UDP:
3834     {
3835       gchar *dest;
3836       gint min, max;
3837
3838       dest = tr->destination;
3839       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3840         min = tr->port.min;
3841         max = tr->port.max;
3842       } else if (priv->client_side) {
3843         /* In client side mode the 'destination' is the RTSP server, so send
3844          * to those ports */
3845         min = tr->server_port.min;
3846         max = tr->server_port.max;
3847       } else {
3848         min = tr->client_port.min;
3849         max = tr->client_port.max;
3850       }
3851
3852       if (add) {
3853         GST_INFO ("adding %s:%d-%d", dest, min, max);
3854         if (priv->udpsink[0])
3855           g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3856         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3857         priv->transports = g_list_prepend (priv->transports, trans);
3858       } else {
3859         GST_INFO ("removing %s:%d-%d", dest, min, max);
3860         if (priv->udpsink[0])
3861           g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3862         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3863         priv->transports = g_list_remove (priv->transports, trans);
3864       }
3865       priv->transports_cookie++;
3866       break;
3867     }
3868     case GST_RTSP_LOWER_TRANS_TCP:
3869       if (add) {
3870         GST_INFO ("adding TCP %s", tr->destination);
3871         priv->transports = g_list_prepend (priv->transports, trans);
3872       } else {
3873         GST_INFO ("removing TCP %s", tr->destination);
3874         priv->transports = g_list_remove (priv->transports, trans);
3875       }
3876       priv->transports_cookie++;
3877       break;
3878     default:
3879       goto unknown_transport;
3880   }
3881   return TRUE;
3882
3883   /* ERRORS */
3884 unknown_transport:
3885   {
3886     GST_INFO ("Unknown transport %d", tr->lower_transport);
3887     return FALSE;
3888   }
3889 mcast_error:
3890   {
3891     return FALSE;
3892   }
3893 }
3894
3895
3896 /**
3897  * gst_rtsp_stream_add_transport:
3898  * @stream: a #GstRTSPStream
3899  * @trans: (transfer none): a #GstRTSPStreamTransport
3900  *
3901  * Add the transport in @trans to @stream. The media of @stream will
3902  * then also be send to the values configured in @trans.
3903  *
3904  * @stream must be joined to a bin.
3905  *
3906  * @trans must contain a valid #GstRTSPTransport.
3907  *
3908  * Returns: %TRUE if @trans was added
3909  */
3910 gboolean
3911 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3912     GstRTSPStreamTransport * trans)
3913 {
3914   GstRTSPStreamPrivate *priv;
3915   gboolean res;
3916
3917   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3918   priv = stream->priv;
3919   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3920   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3921
3922   g_mutex_lock (&priv->lock);
3923   res = update_transport (stream, trans, TRUE);
3924   g_mutex_unlock (&priv->lock);
3925
3926   return res;
3927 }
3928
3929 /**
3930  * gst_rtsp_stream_remove_transport:
3931  * @stream: a #GstRTSPStream
3932  * @trans: (transfer none): a #GstRTSPStreamTransport
3933  *
3934  * Remove the transport in @trans from @stream. The media of @stream will
3935  * not be sent to the values configured in @trans.
3936  *
3937  * @stream must be joined to a bin.
3938  *
3939  * @trans must contain a valid #GstRTSPTransport.
3940  *
3941  * Returns: %TRUE if @trans was removed
3942  */
3943 gboolean
3944 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3945     GstRTSPStreamTransport * trans)
3946 {
3947   GstRTSPStreamPrivate *priv;
3948   gboolean res;
3949
3950   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3951   priv = stream->priv;
3952   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3953   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3954
3955   g_mutex_lock (&priv->lock);
3956   res = update_transport (stream, trans, FALSE);
3957   g_mutex_unlock (&priv->lock);
3958
3959   return res;
3960 }
3961
3962 /**
3963  * gst_rtsp_stream_update_crypto:
3964  * @stream: a #GstRTSPStream
3965  * @ssrc: the SSRC
3966  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3967  *
3968  * Update the new crypto information for @ssrc in @stream. If information
3969  * for @ssrc did not exist, it will be added. If information
3970  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3971  * be removed from @stream.
3972  *
3973  * Returns: %TRUE if @crypto could be updated
3974  */
3975 gboolean
3976 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3977     guint ssrc, GstCaps * crypto)
3978 {
3979   GstRTSPStreamPrivate *priv;
3980
3981   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3982   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3983
3984   priv = stream->priv;
3985
3986   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3987
3988   g_mutex_lock (&priv->lock);
3989   if (crypto)
3990     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3991         gst_caps_ref (crypto));
3992   else
3993     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3994   g_mutex_unlock (&priv->lock);
3995
3996   return TRUE;
3997 }
3998
3999 /**
4000  * gst_rtsp_stream_get_rtp_socket:
4001  * @stream: a #GstRTSPStream
4002  * @family: the socket family
4003  *
4004  * Get the RTP socket from @stream for a @family.
4005  *
4006  * @stream must be joined to a bin.
4007  *
4008  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
4009  * socket could be allocated for @family. Unref after usage
4010  */
4011 GSocket *
4012 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
4013 {
4014   GstRTSPStreamPrivate *priv = stream->priv;
4015   GSocket *socket;
4016   const gchar *name;
4017
4018   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4019   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4020       family == G_SOCKET_FAMILY_IPV6, NULL);
4021   g_return_val_if_fail (priv->udpsink[0], NULL);
4022
4023   if (family == G_SOCKET_FAMILY_IPV6)
4024     name = "socket-v6";
4025   else
4026     name = "socket";
4027
4028   g_object_get (priv->udpsink[0], name, &socket, NULL);
4029
4030   return socket;
4031 }
4032
4033 /**
4034  * gst_rtsp_stream_get_rtcp_socket:
4035  * @stream: a #GstRTSPStream
4036  * @family: the socket family
4037  *
4038  * Get the RTCP socket from @stream for a @family.
4039  *
4040  * @stream must be joined to a bin.
4041  *
4042  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
4043  * socket could be allocated for @family. Unref after usage
4044  */
4045 GSocket *
4046 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
4047 {
4048   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4049   GSocket *socket;
4050   const gchar *name;
4051
4052   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4053   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4054       family == G_SOCKET_FAMILY_IPV6, NULL);
4055   g_return_val_if_fail (priv->udpsink[1], NULL);
4056
4057   if (family == G_SOCKET_FAMILY_IPV6)
4058     name = "socket-v6";
4059   else
4060     name = "socket";
4061
4062   g_object_get (priv->udpsink[1], name, &socket, NULL);
4063
4064   return socket;
4065 }
4066
4067 /**
4068  * gst_rtsp_stream_get_rtp_multicast_socket:
4069  * @stream: a #GstRTSPStream
4070  * @family: the socket family
4071  *
4072  * Get the multicast RTP socket from @stream for a @family.
4073  *
4074  * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no
4075  * socket could be allocated for @family. Unref after usage
4076  */
4077 GSocket *
4078 gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream,
4079     GSocketFamily family)
4080 {
4081   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4082   GSocket *socket;
4083   const gchar *name;
4084
4085   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4086   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4087       family == G_SOCKET_FAMILY_IPV6, NULL);
4088   g_return_val_if_fail (priv->mcast_udpsink[0], NULL);
4089
4090   if (family == G_SOCKET_FAMILY_IPV6)
4091     name = "socket-v6";
4092   else
4093     name = "socket";
4094
4095   g_object_get (priv->mcast_udpsink[0], name, &socket, NULL);
4096
4097   return socket;
4098 }
4099
4100 /**
4101  * gst_rtsp_stream_get_rtcp_multicast_socket:
4102  * @stream: a #GstRTSPStream
4103  * @family: the socket family
4104  *
4105  * Get the multicast RTCP socket from @stream for a @family.
4106  *
4107  * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no
4108  * socket could be allocated for @family. Unref after usage
4109  */
4110 GSocket *
4111 gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream,
4112     GSocketFamily family)
4113 {
4114   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
4115   GSocket *socket;
4116   const gchar *name;
4117
4118   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4119   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4120       family == G_SOCKET_FAMILY_IPV6, NULL);
4121   g_return_val_if_fail (priv->mcast_udpsink[1], NULL);
4122
4123   if (family == G_SOCKET_FAMILY_IPV6)
4124     name = "socket-v6";
4125   else
4126     name = "socket";
4127
4128   g_object_get (priv->mcast_udpsink[1], name, &socket, NULL);
4129
4130   return socket;
4131 }
4132
4133 /**
4134  * gst_rtsp_stream_set_seqnum:
4135  * @stream: a #GstRTSPStream
4136  * @seqnum: a new sequence number
4137  *
4138  * Configure the sequence number in the payloader of @stream to @seqnum.
4139  */
4140 void
4141 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
4142 {
4143   GstRTSPStreamPrivate *priv;
4144
4145   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
4146
4147   priv = stream->priv;
4148
4149   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
4150 }
4151
4152 /**
4153  * gst_rtsp_stream_get_seqnum:
4154  * @stream: a #GstRTSPStream
4155  *
4156  * Get the configured sequence number in the payloader of @stream.
4157  *
4158  * Returns: the sequence number of the payloader.
4159  */
4160 guint16
4161 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
4162 {
4163   GstRTSPStreamPrivate *priv;
4164   guint seqnum;
4165
4166   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
4167
4168   priv = stream->priv;
4169
4170   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
4171
4172   return seqnum;
4173 }
4174
4175 /**
4176  * gst_rtsp_stream_transport_filter:
4177  * @stream: a #GstRTSPStream
4178  * @func: (scope call) (allow-none): a callback
4179  * @user_data: (closure): user data passed to @func
4180  *
4181  * Call @func for each transport managed by @stream. The result value of @func
4182  * determines what happens to the transport. @func will be called with @stream
4183  * locked so no further actions on @stream can be performed from @func.
4184  *
4185  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
4186  * @stream.
4187  *
4188  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
4189  *
4190  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
4191  * will also be added with an additional ref to the result #GList of this
4192  * function..
4193  *
4194  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
4195  *
4196  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
4197  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
4198  * element in the #GList should be unreffed before the list is freed.
4199  */
4200 GList *
4201 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
4202     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
4203 {
4204   GstRTSPStreamPrivate *priv;
4205   GList *result, *walk, *next;
4206   GHashTable *visited = NULL;
4207   guint cookie;
4208
4209   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4210
4211   priv = stream->priv;
4212
4213   result = NULL;
4214   if (func)
4215     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
4216
4217   g_mutex_lock (&priv->lock);
4218 restart:
4219   cookie = priv->transports_cookie;
4220   for (walk = priv->transports; walk; walk = next) {
4221     GstRTSPStreamTransport *trans = walk->data;
4222     GstRTSPFilterResult res;
4223     gboolean changed;
4224
4225     next = g_list_next (walk);
4226
4227     if (func) {
4228       /* only visit each transport once */
4229       if (g_hash_table_contains (visited, trans))
4230         continue;
4231
4232       g_hash_table_add (visited, g_object_ref (trans));
4233       g_mutex_unlock (&priv->lock);
4234
4235       res = func (stream, trans, user_data);
4236
4237       g_mutex_lock (&priv->lock);
4238     } else
4239       res = GST_RTSP_FILTER_REF;
4240
4241     changed = (cookie != priv->transports_cookie);
4242
4243     switch (res) {
4244       case GST_RTSP_FILTER_REMOVE:
4245         update_transport (stream, trans, FALSE);
4246         break;
4247       case GST_RTSP_FILTER_REF:
4248         result = g_list_prepend (result, g_object_ref (trans));
4249         break;
4250       case GST_RTSP_FILTER_KEEP:
4251       default:
4252         break;
4253     }
4254     if (changed)
4255       goto restart;
4256   }
4257   g_mutex_unlock (&priv->lock);
4258
4259   if (func)
4260     g_hash_table_unref (visited);
4261
4262   return result;
4263 }
4264
4265 static GstPadProbeReturn
4266 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4267 {
4268   GstRTSPStreamPrivate *priv;
4269   GstRTSPStream *stream;
4270   GstBuffer *buffer = NULL;
4271
4272   stream = user_data;
4273   priv = stream->priv;
4274
4275   GST_DEBUG_OBJECT (pad, "now blocking");
4276
4277   g_mutex_lock (&priv->lock);
4278   priv->blocking = TRUE;
4279
4280   if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) {
4281     buffer = gst_pad_probe_info_get_buffer (info);
4282   } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) {
4283     GstBufferList *list = gst_pad_probe_info_get_buffer_list (info);
4284     buffer = gst_buffer_list_get (list, 0);
4285   } else {
4286     g_assert_not_reached ();
4287   }
4288
4289   g_assert (buffer);
4290   priv->position = GST_BUFFER_TIMESTAMP (buffer);
4291   GST_DEBUG_OBJECT (stream, "buffer position: %" GST_TIME_FORMAT,
4292       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
4293   g_mutex_unlock (&priv->lock);
4294
4295   gst_element_post_message (priv->payloader,
4296       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
4297           gst_structure_new_empty ("GstRTSPStreamBlocking")));
4298
4299   return GST_PAD_PROBE_OK;
4300 }
4301
4302 static void
4303 set_blocked (GstRTSPStream * stream, gboolean blocked)
4304 {
4305   GstRTSPStreamPrivate *priv;
4306   int i;
4307
4308   GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
4309
4310   priv = stream->priv;
4311
4312   if (blocked) {
4313     for (i = 0; i < 2; i++) {
4314       if (priv->blocked_id[i] != 0)
4315         continue;
4316       if (priv->send_src[i]) {
4317         priv->blocking = FALSE;
4318         priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
4319             GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
4320             GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
4321             g_object_ref (stream), g_object_unref);
4322       }
4323     }
4324   } else {
4325     for (i = 0; i < 2; i++) {
4326       if (priv->blocked_id[i] != 0) {
4327         gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
4328         priv->blocked_id[i] = 0;
4329       }
4330     }
4331     priv->blocking = FALSE;
4332   }
4333 }
4334
4335 /**
4336  * gst_rtsp_stream_set_blocked:
4337  * @stream: a #GstRTSPStream
4338  * @blocked: boolean indicating we should block or unblock
4339  *
4340  * Blocks or unblocks the dataflow on @stream.
4341  *
4342  * Returns: %TRUE on success
4343  */
4344 gboolean
4345 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
4346 {
4347   GstRTSPStreamPrivate *priv;
4348
4349   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4350
4351   priv = stream->priv;
4352   g_mutex_lock (&priv->lock);
4353   set_blocked (stream, blocked);
4354   g_mutex_unlock (&priv->lock);
4355
4356   return TRUE;
4357 }
4358
4359 /**
4360  * gst_rtsp_stream_ublock_linked:
4361  * @stream: a #GstRTSPStream
4362  *
4363  * Unblocks the dataflow on @stream if it is linked.
4364  *
4365  * Returns: %TRUE on success
4366  */
4367 gboolean
4368 gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
4369 {
4370   GstRTSPStreamPrivate *priv;
4371
4372   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4373
4374   priv = stream->priv;
4375   g_mutex_lock (&priv->lock);
4376   if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
4377     set_blocked (stream, FALSE);
4378   g_mutex_unlock (&priv->lock);
4379
4380   return TRUE;
4381 }
4382
4383 /**
4384  * gst_rtsp_stream_is_blocking:
4385  * @stream: a #GstRTSPStream
4386  *
4387  * Check if @stream is blocking on a #GstBuffer.
4388  *
4389  * Returns: %TRUE if @stream is blocking
4390  */
4391 gboolean
4392 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
4393 {
4394   GstRTSPStreamPrivate *priv;
4395   gboolean result;
4396
4397   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4398
4399   priv = stream->priv;
4400
4401   g_mutex_lock (&priv->lock);
4402   result = priv->blocking;
4403   g_mutex_unlock (&priv->lock);
4404
4405   return result;
4406 }
4407
4408 /**
4409  * gst_rtsp_stream_query_position:
4410  * @stream: a #GstRTSPStream
4411  * @position: (out): current position of a #GstRTSPStream
4412  *
4413  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
4414  * the RTP parts of the pipeline and not the RTCP parts.
4415  *
4416  * Returns: %TRUE if the position could be queried
4417  */
4418 gboolean
4419 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
4420 {
4421   GstRTSPStreamPrivate *priv;
4422   GstElement *sink;
4423   GstPad *pad = NULL;
4424
4425   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4426
4427   /* query position: if no sinks have been added yet,
4428    * we obtain the position from the pad otherwise we query the sinks */
4429
4430   priv = stream->priv;
4431
4432   g_mutex_lock (&priv->lock);
4433   /* depending on the transport type, it should query corresponding sink */
4434   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP)
4435     sink = priv->udpsink[0];
4436   else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4437     sink = priv->mcast_udpsink[0];
4438   else
4439     sink = priv->appsink[0];
4440
4441   if (sink) {
4442     gst_object_ref (sink);
4443   } else if (priv->send_src[0]) {
4444     pad = gst_object_ref (priv->send_src[0]);
4445   } else {
4446     g_mutex_unlock (&priv->lock);
4447     GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline");
4448     return FALSE;
4449   }
4450   g_mutex_unlock (&priv->lock);
4451
4452   if (sink) {
4453     if (!gst_element_query_position (sink, GST_FORMAT_TIME, position)) {
4454       GST_WARNING_OBJECT (stream,
4455           "Couldn't obtain postion: position query failed");
4456       gst_object_unref (sink);
4457       return FALSE;
4458     }
4459     gst_object_unref (sink);
4460   } else if (pad) {
4461     GstEvent *event;
4462     const GstSegment *segment;
4463
4464     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4465     if (!event) {
4466       GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event");
4467       gst_object_unref (pad);
4468       return FALSE;
4469     }
4470
4471     gst_event_parse_segment (event, &segment);
4472     if (segment->format != GST_FORMAT_TIME) {
4473       *position = -1;
4474     } else {
4475       g_mutex_lock (&priv->lock);
4476       *position = priv->position;
4477       g_mutex_unlock (&priv->lock);
4478       *position =
4479           gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position);
4480     }
4481     gst_event_unref (event);
4482     gst_object_unref (pad);
4483   }
4484
4485   return TRUE;
4486 }
4487
4488 /**
4489  * gst_rtsp_stream_query_stop:
4490  * @stream: a #GstRTSPStream
4491  * @stop: (out): current stop of a #GstRTSPStream
4492  *
4493  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
4494  * the RTP parts of the pipeline and not the RTCP parts.
4495  *
4496  * Returns: %TRUE if the stop could be queried
4497  */
4498 gboolean
4499 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
4500 {
4501   GstRTSPStreamPrivate *priv;
4502   GstElement *sink;
4503   GstPad *pad = NULL;
4504
4505   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4506
4507   /* query stop position: if no sinks have been added yet,
4508    * we obtain the stop position from the pad otherwise we query the sinks */
4509
4510   priv = stream->priv;
4511
4512   g_mutex_lock (&priv->lock);
4513   /* depending on the transport type, it should query corresponding sink */
4514   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP)
4515     sink = priv->udpsink[0];
4516   else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4517     sink = priv->mcast_udpsink[0];
4518   else
4519     sink = priv->appsink[0];
4520
4521   if (sink) {
4522     gst_object_ref (sink);
4523   } else if (priv->send_src[0]) {
4524     pad = gst_object_ref (priv->send_src[0]);
4525   } else {
4526     g_mutex_unlock (&priv->lock);
4527     GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline");
4528     return FALSE;
4529   }
4530   g_mutex_unlock (&priv->lock);
4531
4532   if (sink) {
4533     GstQuery *query;
4534     GstFormat format;
4535
4536     query = gst_query_new_segment (GST_FORMAT_TIME);
4537     if (!gst_element_query (sink, query)) {
4538       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed");
4539       gst_query_unref (query);
4540       gst_object_unref (sink);
4541       return FALSE;
4542     }
4543     gst_query_parse_segment (query, NULL, &format, NULL, stop);
4544     if (format != GST_FORMAT_TIME)
4545       *stop = -1;
4546     gst_query_unref (query);
4547     gst_object_unref (sink);
4548   } else if (pad) {
4549     GstEvent *event;
4550     const GstSegment *segment;
4551
4552     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4553     if (!event) {
4554       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event");
4555       gst_object_unref (pad);
4556       return FALSE;
4557     }
4558     gst_event_parse_segment (event, &segment);
4559     if (segment->format != GST_FORMAT_TIME) {
4560       *stop = -1;
4561     } else {
4562       *stop = segment->stop;
4563       if (*stop == -1)
4564         *stop = segment->duration;
4565       else
4566         *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop);
4567     }
4568     gst_event_unref (event);
4569     gst_object_unref (pad);
4570   }
4571
4572   return TRUE;
4573 }
4574
4575 /**
4576  * gst_rtsp_stream_seekable:
4577  * @stream: a #GstRTSPStream
4578  *
4579  * Checks whether the individual @stream is seekable.
4580  *
4581  * Returns: %TRUE if @stream is seekable, else %FALSE.
4582  */
4583 gboolean
4584 gst_rtsp_stream_seekable (GstRTSPStream * stream)
4585 {
4586   GstRTSPStreamPrivate *priv;
4587   GstPad *pad = NULL;
4588   GstQuery *query = NULL;
4589   gboolean seekable = FALSE;
4590
4591   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4592
4593   /* query stop position: if no sinks have been added yet,
4594    * we obtain the stop position from the pad otherwise we query the sinks */
4595
4596   priv = stream->priv;
4597
4598   g_mutex_lock (&priv->lock);
4599   /* depending on the transport type, it should query corresponding sink */
4600   if (priv->srcpad) {
4601     pad = gst_object_ref (priv->srcpad);
4602   } else {
4603     g_mutex_unlock (&priv->lock);
4604     GST_WARNING_OBJECT (stream, "Pad not available, can't query seekability");
4605     goto beach;
4606   }
4607   g_mutex_unlock (&priv->lock);
4608
4609   query = gst_query_new_seeking (GST_FORMAT_TIME);
4610   if (!gst_pad_query (pad, query)) {
4611     GST_WARNING_OBJECT (stream, "seeking query failed");
4612     goto beach;
4613   }
4614   gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
4615
4616 beach:
4617   if (pad)
4618     gst_object_unref (pad);
4619   if (query)
4620     gst_query_unref (query);
4621
4622   GST_DEBUG_OBJECT (stream, "Returning %d", seekable);
4623
4624   return seekable;
4625 }
4626
4627 /**
4628  * gst_rtsp_stream_complete_stream:
4629  * @stream: a #GstRTSPStream
4630  * @transport: a #GstRTSPTransport
4631  *
4632  * Add a receiver and sender part to the pipeline based on the transport from
4633  * SETUP.
4634  *
4635  * Returns: %TRUE if the stream has been sucessfully updated.
4636  */
4637 gboolean
4638 gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
4639     const GstRTSPTransport * transport)
4640 {
4641   GstRTSPStreamPrivate *priv;
4642
4643   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4644
4645   priv = stream->priv;
4646   GST_DEBUG_OBJECT (stream, "complete stream");
4647
4648   g_mutex_lock (&priv->lock);
4649
4650   if (!(priv->protocols & transport->lower_transport))
4651     goto unallowed_transport;
4652
4653   if (!create_receiver_part (stream, transport))
4654     goto create_receiver_error;
4655
4656   /* in the RECORD case, we only add RTCP sender part */
4657   if (!create_sender_part (stream, transport))
4658     goto create_sender_error;
4659
4660   priv->is_complete = TRUE;
4661   g_mutex_unlock (&priv->lock);
4662
4663   GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
4664   return TRUE;
4665
4666 create_receiver_error:
4667 create_sender_error:
4668 unallowed_transport:
4669   {
4670     g_mutex_unlock (&priv->lock);
4671     return FALSE;
4672   }
4673 }
4674
4675 /**
4676  * gst_rtsp_stream_is_complete:
4677  * @stream: a #GstRTSPStream
4678  *
4679  * Checks whether the stream is complete, contains the receiver and the sender
4680  * parts. As the stream contains sink(s) element(s), it's possible to perform
4681  * seek operations on it.
4682  *
4683  * Returns: %TRUE if the stream contains at least one sink element.
4684  */
4685 gboolean
4686 gst_rtsp_stream_is_complete (GstRTSPStream * stream)
4687 {
4688   GstRTSPStreamPrivate *priv;
4689   gboolean ret = FALSE;
4690
4691   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4692
4693   priv = stream->priv;
4694   g_mutex_lock (&priv->lock);
4695   ret = priv->is_complete;
4696   g_mutex_unlock (&priv->lock);
4697
4698   return ret;
4699 }
4700
4701 /**
4702  * gst_rtsp_stream_is_sender:
4703  * @stream: a #GstRTSPStream
4704  *
4705  * Checks whether the stream is a sender.
4706  *
4707  * Returns: %TRUE if the stream is a sender and %FALSE otherwise.
4708  */
4709 gboolean
4710 gst_rtsp_stream_is_sender (GstRTSPStream * stream)
4711 {
4712   GstRTSPStreamPrivate *priv;
4713   gboolean ret = FALSE;
4714
4715   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4716
4717   priv = stream->priv;
4718   g_mutex_lock (&priv->lock);
4719   ret = (priv->srcpad != NULL);
4720   g_mutex_unlock (&priv->lock);
4721
4722   return ret;
4723 }
4724
4725 /**
4726  * gst_rtsp_stream_is_receiver:
4727  * @stream: a #GstRTSPStream
4728  *
4729  * Checks whether the stream is a receiver.
4730  *
4731  * Returns: %TRUE if the stream is a receiver and %FALSE otherwise.
4732  */
4733 gboolean
4734 gst_rtsp_stream_is_receiver (GstRTSPStream * stream)
4735 {
4736   GstRTSPStreamPrivate *priv;
4737   gboolean ret = FALSE;
4738
4739   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4740
4741   priv = stream->priv;
4742   g_mutex_lock (&priv->lock);
4743   ret = (priv->sinkpad != NULL);
4744   g_mutex_unlock (&priv->lock);
4745
4746   return ret;
4747 }
4748
4749 #define AES_128_KEY_LEN 16
4750 #define AES_256_KEY_LEN 32
4751
4752 #define HMAC_32_KEY_LEN 4
4753 #define HMAC_80_KEY_LEN 10
4754
4755 static gboolean
4756 mikey_apply_policy (GstCaps * caps, GstMIKEYMessage * msg, guint8 policy)
4757 {
4758   const gchar *srtp_cipher;
4759   const gchar *srtp_auth;
4760   const GstMIKEYPayload *sp;
4761   guint i;
4762
4763   /* loop over Security policy until we find one containing policy */
4764   for (i = 0;; i++) {
4765     if ((sp = gst_mikey_message_find_payload (msg, GST_MIKEY_PT_SP, i)) == NULL)
4766       break;
4767
4768     if (((GstMIKEYPayloadSP *) sp)->policy == policy)
4769       break;
4770   }
4771
4772   /* the default ciphers */
4773   srtp_cipher = "aes-128-icm";
4774   srtp_auth = "hmac-sha1-80";
4775
4776   /* now override the defaults with what is in the Security Policy */
4777   if (sp != NULL) {
4778     guint len;
4779
4780     /* collect all the params and go over them */
4781     len = gst_mikey_payload_sp_get_n_params (sp);
4782     for (i = 0; i < len; i++) {
4783       const GstMIKEYPayloadSPParam *param =
4784           gst_mikey_payload_sp_get_param (sp, i);
4785
4786       switch (param->type) {
4787         case GST_MIKEY_SP_SRTP_ENC_ALG:
4788           switch (param->val[0]) {
4789             case 0:
4790               srtp_cipher = "null";
4791               break;
4792             case 2:
4793             case 1:
4794               srtp_cipher = "aes-128-icm";
4795               break;
4796             default:
4797               break;
4798           }
4799           break;
4800         case GST_MIKEY_SP_SRTP_ENC_KEY_LEN:
4801           switch (param->val[0]) {
4802             case AES_128_KEY_LEN:
4803               srtp_cipher = "aes-128-icm";
4804               break;
4805             case AES_256_KEY_LEN:
4806               srtp_cipher = "aes-256-icm";
4807               break;
4808             default:
4809               break;
4810           }
4811           break;
4812         case GST_MIKEY_SP_SRTP_AUTH_ALG:
4813           switch (param->val[0]) {
4814             case 0:
4815               srtp_auth = "null";
4816               break;
4817             case 2:
4818             case 1:
4819               srtp_auth = "hmac-sha1-80";
4820               break;
4821             default:
4822               break;
4823           }
4824           break;
4825         case GST_MIKEY_SP_SRTP_AUTH_KEY_LEN:
4826           switch (param->val[0]) {
4827             case HMAC_32_KEY_LEN:
4828               srtp_auth = "hmac-sha1-32";
4829               break;
4830             case HMAC_80_KEY_LEN:
4831               srtp_auth = "hmac-sha1-80";
4832               break;
4833             default:
4834               break;
4835           }
4836           break;
4837         case GST_MIKEY_SP_SRTP_SRTP_ENC:
4838           break;
4839         case GST_MIKEY_SP_SRTP_SRTCP_ENC:
4840           break;
4841         default:
4842           break;
4843       }
4844     }
4845   }
4846   /* now configure the SRTP parameters */
4847   gst_caps_set_simple (caps,
4848       "srtp-cipher", G_TYPE_STRING, srtp_cipher,
4849       "srtp-auth", G_TYPE_STRING, srtp_auth,
4850       "srtcp-cipher", G_TYPE_STRING, srtp_cipher,
4851       "srtcp-auth", G_TYPE_STRING, srtp_auth, NULL);
4852
4853   return TRUE;
4854 }
4855
4856 static gboolean
4857 handle_mikey_data (GstRTSPStream * stream, guint8 * data, gsize size)
4858 {
4859   GstMIKEYMessage *msg;
4860   guint i, n_cs;
4861   GstCaps *caps = NULL;
4862   GstMIKEYPayloadKEMAC *kemac;
4863   const GstMIKEYPayloadKeyData *pkd;
4864   GstBuffer *key;
4865
4866   /* the MIKEY message contains a CSB or crypto session bundle. It is a
4867    * set of Crypto Sessions protected with the same master key.
4868    * In the context of SRTP, an RTP and its RTCP stream is part of a
4869    * crypto session */
4870   if ((msg = gst_mikey_message_new_from_data (data, size, NULL, NULL)) == NULL)
4871     goto parse_failed;
4872
4873   /* we can only handle SRTP crypto sessions for now */
4874   if (msg->map_type != GST_MIKEY_MAP_TYPE_SRTP)
4875     goto invalid_map_type;
4876
4877   /* get the number of crypto sessions. This maps SSRC to its
4878    * security parameters */
4879   n_cs = gst_mikey_message_get_n_cs (msg);
4880   if (n_cs == 0)
4881     goto no_crypto_sessions;
4882
4883   /* we also need keys */
4884   if (!(kemac = (GstMIKEYPayloadKEMAC *) gst_mikey_message_find_payload
4885           (msg, GST_MIKEY_PT_KEMAC, 0)))
4886     goto no_keys;
4887
4888   /* we don't support encrypted keys */
4889   if (kemac->enc_alg != GST_MIKEY_ENC_NULL
4890       || kemac->mac_alg != GST_MIKEY_MAC_NULL)
4891     goto unsupported_encryption;
4892
4893   /* get Key data sub-payload */
4894   pkd = (const GstMIKEYPayloadKeyData *)
4895       gst_mikey_payload_kemac_get_sub (&kemac->pt, 0);
4896
4897   key =
4898       gst_buffer_new_wrapped (g_memdup (pkd->key_data, pkd->key_len),
4899       pkd->key_len);
4900
4901   /* go over all crypto sessions and create the security policy for each
4902    * SSRC */
4903   for (i = 0; i < n_cs; i++) {
4904     const GstMIKEYMapSRTP *map = gst_mikey_message_get_cs_srtp (msg, i);
4905
4906     caps = gst_caps_new_simple ("application/x-srtp",
4907         "ssrc", G_TYPE_UINT, map->ssrc,
4908         "roc", G_TYPE_UINT, map->roc, "srtp-key", GST_TYPE_BUFFER, key, NULL);
4909     mikey_apply_policy (caps, msg, map->policy);
4910
4911     gst_rtsp_stream_update_crypto (stream, map->ssrc, caps);
4912     gst_caps_unref (caps);
4913   }
4914   gst_mikey_message_unref (msg);
4915   gst_buffer_unref (key);
4916
4917   return TRUE;
4918
4919   /* ERRORS */
4920 parse_failed:
4921   {
4922     GST_DEBUG_OBJECT (stream, "failed to parse MIKEY message");
4923     return FALSE;
4924   }
4925 invalid_map_type:
4926   {
4927     GST_DEBUG_OBJECT (stream, "invalid map type %d", msg->map_type);
4928     goto cleanup_message;
4929   }
4930 no_crypto_sessions:
4931   {
4932     GST_DEBUG_OBJECT (stream, "no crypto sessions");
4933     goto cleanup_message;
4934   }
4935 no_keys:
4936   {
4937     GST_DEBUG_OBJECT (stream, "no keys found");
4938     goto cleanup_message;
4939   }
4940 unsupported_encryption:
4941   {
4942     GST_DEBUG_OBJECT (stream, "unsupported key encryption");
4943     goto cleanup_message;
4944   }
4945 cleanup_message:
4946   {
4947     gst_mikey_message_unref (msg);
4948     return FALSE;
4949   }
4950 }
4951
4952 #define IS_STRIP_CHAR(c) (g_ascii_isspace ((guchar)(c)) || ((c) == '\"'))
4953
4954 static void
4955 strip_chars (gchar * str)
4956 {
4957   gchar *s;
4958   gsize len;
4959
4960   len = strlen (str);
4961   while (len--) {
4962     if (!IS_STRIP_CHAR (str[len]))
4963       break;
4964     str[len] = '\0';
4965   }
4966   for (s = str; *s && IS_STRIP_CHAR (*s); s++);
4967   memmove (str, s, len + 1);
4968 }
4969
4970 /**
4971  * gst_rtsp_stream_handle_keymgmt:
4972  * @stream: a #GstRTSPStream
4973  * @keymgmt: a keymgmt header
4974  *
4975  * Parse and handle a KeyMgmt header.
4976  *
4977  * Since: 1.16
4978  */
4979 /* KeyMgmt = "KeyMgmt" ":" key-mgmt-spec 0*("," key-mgmt-spec)
4980  * key-mgmt-spec = "prot" "=" KMPID ";" ["uri" "=" %x22 URI %x22 ";"]
4981  */
4982 gboolean
4983 gst_rtsp_stream_handle_keymgmt (GstRTSPStream * stream, const gchar * keymgmt)
4984 {
4985   gchar **specs;
4986   gint i, j;
4987
4988   specs = g_strsplit (keymgmt, ",", 0);
4989   for (i = 0; specs[i]; i++) {
4990     gchar **split;
4991
4992     split = g_strsplit (specs[i], ";", 0);
4993     for (j = 0; split[j]; j++) {
4994       g_strstrip (split[j]);
4995       if (g_str_has_prefix (split[j], "prot=")) {
4996         g_strstrip (split[j] + 5);
4997         if (!g_str_equal (split[j] + 5, "mikey"))
4998           break;
4999         GST_DEBUG ("found mikey");
5000       } else if (g_str_has_prefix (split[j], "uri=")) {
5001         strip_chars (split[j] + 4);
5002         GST_DEBUG ("found uri '%s'", split[j] + 4);
5003       } else if (g_str_has_prefix (split[j], "data=")) {
5004         guchar *data;
5005         gsize size;
5006         strip_chars (split[j] + 5);
5007         GST_DEBUG ("found data '%s'", split[j] + 5);
5008         data = g_base64_decode_inplace (split[j] + 5, &size);
5009         handle_mikey_data (stream, data, size);
5010       }
5011     }
5012     g_strfreev (split);
5013   }
5014   g_strfreev (specs);
5015   return TRUE;
5016 }
5017
5018
5019 /**
5020  * gst_rtsp_stream_get_ulpfec_pt:
5021  *
5022  * Returns: the payload type used for ULPFEC protection packets
5023  *
5024  * Since: 1.16
5025  */
5026 guint
5027 gst_rtsp_stream_get_ulpfec_pt (GstRTSPStream * stream)
5028 {
5029   guint res;
5030
5031   g_mutex_lock (&stream->priv->lock);
5032   res = stream->priv->ulpfec_pt;
5033   g_mutex_unlock (&stream->priv->lock);
5034
5035   return res;
5036 }
5037
5038 /**
5039  * gst_rtsp_stream_set_ulpfec_pt:
5040  *
5041  * Set the payload type to be used for ULPFEC protection packets
5042  *
5043  * Since: 1.16
5044  */
5045 void
5046 gst_rtsp_stream_set_ulpfec_pt (GstRTSPStream * stream, guint pt)
5047 {
5048   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5049
5050   g_mutex_lock (&stream->priv->lock);
5051   stream->priv->ulpfec_pt = pt;
5052   if (stream->priv->ulpfec_encoder) {
5053     g_object_set (stream->priv->ulpfec_encoder, "pt", pt, NULL);
5054   }
5055   g_mutex_unlock (&stream->priv->lock);
5056 }
5057
5058 /**
5059  * gst_rtsp_stream_request_ulpfec_decoder:
5060  *
5061  * Creating a rtpulpfecdec element
5062  *
5063  * Returns: (transfer full) (nullable): a #GstElement.
5064  *
5065  * Since: 1.16
5066  */
5067 GstElement *
5068 gst_rtsp_stream_request_ulpfec_decoder (GstRTSPStream * stream,
5069     GstElement * rtpbin, guint sessid)
5070 {
5071   GObject *internal_storage = NULL;
5072
5073   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5074   stream->priv->ulpfec_decoder =
5075       gst_object_ref (gst_element_factory_make ("rtpulpfecdec", NULL));
5076
5077   g_signal_emit_by_name (G_OBJECT (rtpbin), "get-internal-storage", sessid,
5078       &internal_storage);
5079   g_object_set (stream->priv->ulpfec_decoder, "storage", internal_storage,
5080       NULL);
5081   g_object_unref (internal_storage);
5082   update_ulpfec_decoder_pt (stream);
5083
5084   return stream->priv->ulpfec_decoder;
5085 }
5086
5087 /**
5088  * gst_rtsp_stream_request_ulpfec_encoder:
5089  *
5090  * Creating a rtpulpfecenc element
5091  *
5092  * Returns: (transfer full) (nullable): a #GstElement.
5093  *
5094  * Since: 1.16
5095  */
5096 GstElement *
5097 gst_rtsp_stream_request_ulpfec_encoder (GstRTSPStream * stream, guint sessid)
5098 {
5099   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5100
5101   if (!stream->priv->ulpfec_percentage)
5102     return NULL;
5103
5104   stream->priv->ulpfec_encoder =
5105       gst_object_ref (gst_element_factory_make ("rtpulpfecenc", NULL));
5106
5107   g_object_set (stream->priv->ulpfec_encoder, "pt", stream->priv->ulpfec_pt,
5108       "percentage", stream->priv->ulpfec_percentage, NULL);
5109
5110   return stream->priv->ulpfec_encoder;
5111 }
5112
5113 /**
5114  * gst_rtsp_stream_set_ulpfec_percentage:
5115  *
5116  * Sets the amount of redundancy to apply when creating ULPFEC
5117  * protection packets.
5118  *
5119  * Since: 1.16
5120  */
5121 void
5122 gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream * stream, guint percentage)
5123 {
5124   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5125
5126   g_mutex_lock (&stream->priv->lock);
5127   stream->priv->ulpfec_percentage = percentage;
5128   if (stream->priv->ulpfec_encoder) {
5129     g_object_set (stream->priv->ulpfec_encoder, "percentage", percentage, NULL);
5130   }
5131   g_mutex_unlock (&stream->priv->lock);
5132 }
5133
5134 /**
5135  * gst_rtsp_stream_get_ulpfec_percentage:
5136  *
5137  * Returns: the amount of redundancy applied when creating ULPFEC
5138  * protection packets.
5139  *
5140  * Since: 1.16
5141  */
5142 guint
5143 gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
5144 {
5145   guint res;
5146
5147   g_mutex_lock (&stream->priv->lock);
5148   res = stream->priv->ulpfec_percentage;
5149   g_mutex_unlock (&stream->priv->lock);
5150
5151   return res;
5152 }