stream: Added a list of multicast client addresses
[platform/upstream/gst-rtsp-server.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 struct _GstRTSPStreamPrivate
63 {
64   GMutex lock;
65   guint idx;
66   /* Only one pad is ever set */
67   GstPad *srcpad, *sinkpad;
68   GstElement *payloader;
69   guint buffer_size;
70   GstBin *joined_bin;
71
72   /* TRUE if this stream is running on
73    * the client side of an RTSP link (for RECORD) */
74   gboolean client_side;
75   gchar *control;
76
77   /* TRUE if stream is complete. This means that the receiver and the sender
78    * parts are present in the stream. */
79   gboolean is_complete;
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans allowed_protocols;
82   GstRTSPLowerTrans configured_protocols;
83
84   /* pads on the rtpbin */
85   GstPad *send_rtp_sink;
86   GstPad *recv_rtp_src;
87   GstPad *recv_sink[2];
88   GstPad *send_src[2];
89
90   /* the RTPSession object */
91   GObject *session;
92
93   /* SRTP encoder/decoder */
94   GstElement *srtpenc;
95   GstElement *srtpdec;
96   GHashTable *keys;
97
98   /* for UDP unicast */
99   GstElement *udpsrc_v4[2];
100   GstElement *udpsrc_v6[2];
101   GstElement *udpqueue[2];
102   GstElement *udpsink[2];
103   GSocket *socket_v4[2];
104   GSocket *socket_v6[2];
105
106   /* for UDP multicast */
107   GstElement *mcast_udpsrc_v4[2];
108   GstElement *mcast_udpsrc_v6[2];
109   GstElement *mcast_udpqueue[2];
110   GstElement *mcast_udpsink[2];
111   GSocket *mcast_socket_v4[2];
112   GSocket *mcast_socket_v6[2];
113   GList *mcast_clients;
114
115   /* for TCP transport */
116   GstElement *appsrc[2];
117   GstClockTime appsrc_base_time[2];
118   GstElement *appqueue[2];
119   GstElement *appsink[2];
120
121   GstElement *tee[2];
122   GstElement *funnel[2];
123
124   /* retransmission */
125   GstElement *rtxsend;
126   GstElement *rtxreceive;
127   guint rtx_pt;
128   GstClockTime rtx_time;
129
130   /* Forward Error Correction with RFC 5109 */
131   GstElement *ulpfec_decoder;
132   GstElement *ulpfec_encoder;
133   guint ulpfec_pt;
134   gboolean ulpfec_enabled;
135   guint ulpfec_percentage;
136
137   /* pool used to manage unicast and multicast addresses */
138   GstRTSPAddressPool *pool;
139
140   /* unicast server addr/port */
141   GstRTSPAddress *server_addr_v4;
142   GstRTSPAddress *server_addr_v6;
143
144   /* multicast addresses */
145   GstRTSPAddress *mcast_addr_v4;
146   GstRTSPAddress *mcast_addr_v6;
147
148   gchar *multicast_iface;
149   guint max_mcast_ttl;
150
151   /* the caps of the stream */
152   gulong caps_sig;
153   GstCaps *caps;
154
155   /* transports we stream to */
156   guint n_active;
157   GList *transports;
158   guint transports_cookie;
159   GList *tr_cache_rtp;
160   GList *tr_cache_rtcp;
161   guint tr_cache_cookie_rtp;
162   guint tr_cache_cookie_rtcp;
163   guint n_tcp_transports;
164   gboolean have_buffer[2];
165   guint n_outstanding;
166
167   gint dscp_qos;
168
169   /* stream blocking */
170   gulong blocked_id[2];
171   gboolean blocking;
172
173   /* current stream postion */
174   GstClockTime position;
175
176   /* pt->caps map for RECORD streams */
177   GHashTable *ptmap;
178
179   GstRTSPPublishClockMode publish_clock_mode;
180 };
181
182 #define DEFAULT_CONTROL         NULL
183 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
184 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
185                                         GST_RTSP_LOWER_TRANS_TCP
186 #define DEFAULT_MAX_MCAST_TTL   255
187
188 enum
189 {
190   PROP_0,
191   PROP_CONTROL,
192   PROP_PROFILES,
193   PROP_PROTOCOLS,
194   PROP_LAST
195 };
196
197 enum
198 {
199   SIGNAL_NEW_RTP_ENCODER,
200   SIGNAL_NEW_RTCP_ENCODER,
201   SIGNAL_NEW_RTP_RTCP_DECODER,
202   SIGNAL_LAST
203 };
204
205 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
206 #define GST_CAT_DEFAULT rtsp_stream_debug
207
208 static GQuark ssrc_stream_map_key;
209
210 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
211     GValue * value, GParamSpec * pspec);
212 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
213     const GValue * value, GParamSpec * pspec);
214
215 static void gst_rtsp_stream_finalize (GObject * obj);
216
217 static gboolean
218 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
219     gboolean add);
220
221 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
222
223 G_DEFINE_TYPE_WITH_PRIVATE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
224
225 static void
226 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
227 {
228   GObjectClass *gobject_class;
229
230   gobject_class = G_OBJECT_CLASS (klass);
231
232   gobject_class->get_property = gst_rtsp_stream_get_property;
233   gobject_class->set_property = gst_rtsp_stream_set_property;
234   gobject_class->finalize = gst_rtsp_stream_finalize;
235
236   g_object_class_install_property (gobject_class, PROP_CONTROL,
237       g_param_spec_string ("control", "Control",
238           "The control string for this stream", DEFAULT_CONTROL,
239           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
240
241   g_object_class_install_property (gobject_class, PROP_PROFILES,
242       g_param_spec_flags ("profiles", "Profiles",
243           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
244           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
245
246   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
247       g_param_spec_flags ("protocols", "Protocols",
248           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
249           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
250
251   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
252       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
253       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
254       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
255
256   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
257       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
258       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
259       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
260
261   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_RTCP_DECODER] =
262       g_signal_new ("new-rtp-rtcp-decoder", G_TYPE_FROM_CLASS (klass),
263       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
264       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
265
266   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
267
268   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
269 }
270
271 static void
272 gst_rtsp_stream_init (GstRTSPStream * stream)
273 {
274   GstRTSPStreamPrivate *priv = gst_rtsp_stream_get_instance_private (stream);
275
276   GST_DEBUG ("new stream %p", stream);
277
278   stream->priv = priv;
279
280   priv->dscp_qos = -1;
281   priv->control = g_strdup (DEFAULT_CONTROL);
282   priv->profiles = DEFAULT_PROFILES;
283   priv->allowed_protocols = DEFAULT_PROTOCOLS;
284   priv->configured_protocols = 0;
285   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
286   priv->max_mcast_ttl = DEFAULT_MAX_MCAST_TTL;
287
288   g_mutex_init (&priv->lock);
289
290   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
291       NULL, (GDestroyNotify) gst_caps_unref);
292   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
293       (GDestroyNotify) gst_caps_unref);
294 }
295
296 typedef struct _UdpClientAddrInfo UdpClientAddrInfo;
297
298 struct _UdpClientAddrInfo
299 {
300   gchar *address;
301   guint rtp_port;
302   guint add_count;              /* how often this address has been added */
303 };
304
305 static void
306 free_mcast_client (gpointer data)
307 {
308   UdpClientAddrInfo *client = data;
309
310   g_free (client->address);
311   g_free (client);
312 }
313
314 static void
315 gst_rtsp_stream_finalize (GObject * obj)
316 {
317   GstRTSPStream *stream;
318   GstRTSPStreamPrivate *priv;
319   guint i;
320
321   stream = GST_RTSP_STREAM (obj);
322   priv = stream->priv;
323
324   GST_DEBUG ("finalize stream %p", stream);
325
326   /* we really need to be unjoined now */
327   g_return_if_fail (priv->joined_bin == NULL);
328
329   if (priv->mcast_addr_v4)
330     gst_rtsp_address_free (priv->mcast_addr_v4);
331   if (priv->mcast_addr_v6)
332     gst_rtsp_address_free (priv->mcast_addr_v6);
333   if (priv->server_addr_v4)
334     gst_rtsp_address_free (priv->server_addr_v4);
335   if (priv->server_addr_v6)
336     gst_rtsp_address_free (priv->server_addr_v6);
337   if (priv->pool)
338     g_object_unref (priv->pool);
339   if (priv->rtxsend)
340     g_object_unref (priv->rtxsend);
341   if (priv->rtxreceive)
342     g_object_unref (priv->rtxreceive);
343   if (priv->ulpfec_encoder)
344     gst_object_unref (priv->ulpfec_encoder);
345   if (priv->ulpfec_decoder)
346     gst_object_unref (priv->ulpfec_decoder);
347
348   for (i = 0; i < 2; i++) {
349     if (priv->socket_v4[i])
350       g_object_unref (priv->socket_v4[i]);
351     if (priv->socket_v6[i])
352       g_object_unref (priv->socket_v6[i]);
353     if (priv->mcast_socket_v4[i])
354       g_object_unref (priv->mcast_socket_v4[i]);
355     if (priv->mcast_socket_v6[i])
356       g_object_unref (priv->mcast_socket_v6[i]);
357   }
358
359   g_free (priv->multicast_iface);
360   g_list_free_full (priv->mcast_clients, (GDestroyNotify) free_mcast_client);
361
362   gst_object_unref (priv->payloader);
363   if (priv->srcpad)
364     gst_object_unref (priv->srcpad);
365   if (priv->sinkpad)
366     gst_object_unref (priv->sinkpad);
367   g_free (priv->control);
368   g_mutex_clear (&priv->lock);
369
370   g_hash_table_unref (priv->keys);
371   g_hash_table_destroy (priv->ptmap);
372
373   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
374 }
375
376 static void
377 gst_rtsp_stream_get_property (GObject * object, guint propid,
378     GValue * value, GParamSpec * pspec)
379 {
380   GstRTSPStream *stream = GST_RTSP_STREAM (object);
381
382   switch (propid) {
383     case PROP_CONTROL:
384       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
385       break;
386     case PROP_PROFILES:
387       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
388       break;
389     case PROP_PROTOCOLS:
390       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
391       break;
392     default:
393       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
394   }
395 }
396
397 static void
398 gst_rtsp_stream_set_property (GObject * object, guint propid,
399     const GValue * value, GParamSpec * pspec)
400 {
401   GstRTSPStream *stream = GST_RTSP_STREAM (object);
402
403   switch (propid) {
404     case PROP_CONTROL:
405       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
406       break;
407     case PROP_PROFILES:
408       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
409       break;
410     case PROP_PROTOCOLS:
411       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
412       break;
413     default:
414       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
415   }
416 }
417
418 /**
419  * gst_rtsp_stream_new:
420  * @idx: an index
421  * @pad: a #GstPad
422  * @payloader: a #GstElement
423  *
424  * Create a new media stream with index @idx that handles RTP data on
425  * @pad and has a payloader element @payloader if @pad is a source pad
426  * or a depayloader element @payloader if @pad is a sink pad.
427  *
428  * Returns: (transfer full): a new #GstRTSPStream
429  */
430 GstRTSPStream *
431 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
432 {
433   GstRTSPStreamPrivate *priv;
434   GstRTSPStream *stream;
435
436   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
437   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
438
439   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
440   priv = stream->priv;
441   priv->idx = idx;
442   priv->payloader = gst_object_ref (payloader);
443   if (GST_PAD_IS_SRC (pad))
444     priv->srcpad = gst_object_ref (pad);
445   else
446     priv->sinkpad = gst_object_ref (pad);
447
448   return stream;
449 }
450
451 /**
452  * gst_rtsp_stream_get_index:
453  * @stream: a #GstRTSPStream
454  *
455  * Get the stream index.
456  *
457  * Return: the stream index.
458  */
459 guint
460 gst_rtsp_stream_get_index (GstRTSPStream * stream)
461 {
462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
463
464   return stream->priv->idx;
465 }
466
467 /**
468  * gst_rtsp_stream_get_pt:
469  * @stream: a #GstRTSPStream
470  *
471  * Get the stream payload type.
472  *
473  * Return: the stream payload type.
474  */
475 guint
476 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
477 {
478   GstRTSPStreamPrivate *priv;
479   guint pt;
480
481   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
482
483   priv = stream->priv;
484
485   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
486
487   return pt;
488 }
489
490 /**
491  * gst_rtsp_stream_get_srcpad:
492  * @stream: a #GstRTSPStream
493  *
494  * Get the srcpad associated with @stream.
495  *
496  * Returns: (transfer full) (nullable): the srcpad. Unref after usage.
497  */
498 GstPad *
499 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
500 {
501   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
502
503   if (!stream->priv->srcpad)
504     return NULL;
505
506   return gst_object_ref (stream->priv->srcpad);
507 }
508
509 /**
510  * gst_rtsp_stream_get_sinkpad:
511  * @stream: a #GstRTSPStream
512  *
513  * Get the sinkpad associated with @stream.
514  *
515  * Returns: (transfer full) (nullable): the sinkpad. Unref after usage.
516  */
517 GstPad *
518 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
519 {
520   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
521
522   if (!stream->priv->sinkpad)
523     return NULL;
524
525   return gst_object_ref (stream->priv->sinkpad);
526 }
527
528 /**
529  * gst_rtsp_stream_get_control:
530  * @stream: a #GstRTSPStream
531  *
532  * Get the control string to identify this stream.
533  *
534  * Returns: (transfer full) (nullable): the control string. g_free() after usage.
535  */
536 gchar *
537 gst_rtsp_stream_get_control (GstRTSPStream * stream)
538 {
539   GstRTSPStreamPrivate *priv;
540   gchar *result;
541
542   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
543
544   priv = stream->priv;
545
546   g_mutex_lock (&priv->lock);
547   if ((result = g_strdup (priv->control)) == NULL)
548     result = g_strdup_printf ("stream=%u", priv->idx);
549   g_mutex_unlock (&priv->lock);
550
551   return result;
552 }
553
554 /**
555  * gst_rtsp_stream_set_control:
556  * @stream: a #GstRTSPStream
557  * @control: (nullable): a control string
558  *
559  * Set the control string in @stream.
560  */
561 void
562 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
563 {
564   GstRTSPStreamPrivate *priv;
565
566   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
567
568   priv = stream->priv;
569
570   g_mutex_lock (&priv->lock);
571   g_free (priv->control);
572   priv->control = g_strdup (control);
573   g_mutex_unlock (&priv->lock);
574 }
575
576 /**
577  * gst_rtsp_stream_has_control:
578  * @stream: a #GstRTSPStream
579  * @control: (nullable): a control string
580  *
581  * Check if @stream has the control string @control.
582  *
583  * Returns: %TRUE is @stream has @control as the control string
584  */
585 gboolean
586 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
587 {
588   GstRTSPStreamPrivate *priv;
589   gboolean res;
590
591   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
592
593   priv = stream->priv;
594
595   g_mutex_lock (&priv->lock);
596   if (priv->control)
597     res = (g_strcmp0 (priv->control, control) == 0);
598   else {
599     guint streamid;
600
601     if (sscanf (control, "stream=%u", &streamid) > 0)
602       res = (streamid == priv->idx);
603     else
604       res = FALSE;
605   }
606   g_mutex_unlock (&priv->lock);
607
608   return res;
609 }
610
611 /**
612  * gst_rtsp_stream_set_mtu:
613  * @stream: a #GstRTSPStream
614  * @mtu: a new MTU
615  *
616  * Configure the mtu in the payloader of @stream to @mtu.
617  */
618 void
619 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
620 {
621   GstRTSPStreamPrivate *priv;
622
623   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
624
625   priv = stream->priv;
626
627   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
628
629   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
630 }
631
632 /**
633  * gst_rtsp_stream_get_mtu:
634  * @stream: a #GstRTSPStream
635  *
636  * Get the configured MTU in the payloader of @stream.
637  *
638  * Returns: the MTU of the payloader.
639  */
640 guint
641 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
642 {
643   GstRTSPStreamPrivate *priv;
644   guint mtu;
645
646   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
647
648   priv = stream->priv;
649
650   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
651
652   return mtu;
653 }
654
655 /* Update the dscp qos property on the udp sinks */
656 static void
657 update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
658 {
659   GstRTSPStreamPrivate *priv;
660
661   priv = stream->priv;
662
663   if (*udpsink) {
664     g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
665   }
666 }
667
668 /**
669  * gst_rtsp_stream_set_dscp_qos:
670  * @stream: a #GstRTSPStream
671  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
672  *
673  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
674  */
675 void
676 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
677 {
678   GstRTSPStreamPrivate *priv;
679
680   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
681
682   priv = stream->priv;
683
684   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
685
686   if (dscp_qos < -1 || dscp_qos > 63) {
687     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
688     return;
689   }
690
691   priv->dscp_qos = dscp_qos;
692
693   update_dscp_qos (stream, priv->udpsink);
694 }
695
696 /**
697  * gst_rtsp_stream_get_dscp_qos:
698  * @stream: a #GstRTSPStream
699  *
700  * Get the configured DSCP QoS in of the outgoing sockets.
701  *
702  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
703  */
704 gint
705 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
706 {
707   GstRTSPStreamPrivate *priv;
708
709   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
710
711   priv = stream->priv;
712
713   return priv->dscp_qos;
714 }
715
716 /**
717  * gst_rtsp_stream_is_transport_supported:
718  * @stream: a #GstRTSPStream
719  * @transport: (transfer none): a #GstRTSPTransport
720  *
721  * Check if @transport can be handled by stream
722  *
723  * Returns: %TRUE if @transport can be handled by @stream.
724  */
725 gboolean
726 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
727     GstRTSPTransport * transport)
728 {
729   GstRTSPStreamPrivate *priv;
730
731   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
732   g_return_val_if_fail (transport != NULL, FALSE);
733
734   priv = stream->priv;
735
736   g_mutex_lock (&priv->lock);
737   if (transport->trans != GST_RTSP_TRANS_RTP)
738     goto unsupported_transmode;
739
740   if (!(transport->profile & priv->profiles))
741     goto unsupported_profile;
742
743   if (!(transport->lower_transport & priv->allowed_protocols))
744     goto unsupported_ltrans;
745
746   g_mutex_unlock (&priv->lock);
747
748   return TRUE;
749
750   /* ERRORS */
751 unsupported_transmode:
752   {
753     GST_DEBUG ("unsupported transport mode %d", transport->trans);
754     g_mutex_unlock (&priv->lock);
755     return FALSE;
756   }
757 unsupported_profile:
758   {
759     GST_DEBUG ("unsupported profile %d", transport->profile);
760     g_mutex_unlock (&priv->lock);
761     return FALSE;
762   }
763 unsupported_ltrans:
764   {
765     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
766     g_mutex_unlock (&priv->lock);
767     return FALSE;
768   }
769 }
770
771 /**
772  * gst_rtsp_stream_set_profiles:
773  * @stream: a #GstRTSPStream
774  * @profiles: the new profiles
775  *
776  * Configure the allowed profiles for @stream.
777  */
778 void
779 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
780 {
781   GstRTSPStreamPrivate *priv;
782
783   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
784
785   priv = stream->priv;
786
787   g_mutex_lock (&priv->lock);
788   priv->profiles = profiles;
789   g_mutex_unlock (&priv->lock);
790 }
791
792 /**
793  * gst_rtsp_stream_get_profiles:
794  * @stream: a #GstRTSPStream
795  *
796  * Get the allowed profiles of @stream.
797  *
798  * Returns: a #GstRTSPProfile
799  */
800 GstRTSPProfile
801 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
802 {
803   GstRTSPStreamPrivate *priv;
804   GstRTSPProfile res;
805
806   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
807
808   priv = stream->priv;
809
810   g_mutex_lock (&priv->lock);
811   res = priv->profiles;
812   g_mutex_unlock (&priv->lock);
813
814   return res;
815 }
816
817 /**
818  * gst_rtsp_stream_set_protocols:
819  * @stream: a #GstRTSPStream
820  * @protocols: the new flags
821  *
822  * Configure the allowed lower transport for @stream.
823  */
824 void
825 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
826     GstRTSPLowerTrans protocols)
827 {
828   GstRTSPStreamPrivate *priv;
829
830   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
831
832   priv = stream->priv;
833
834   g_mutex_lock (&priv->lock);
835   priv->allowed_protocols = protocols;
836   g_mutex_unlock (&priv->lock);
837 }
838
839 /**
840  * gst_rtsp_stream_get_protocols:
841  * @stream: a #GstRTSPStream
842  *
843  * Get the allowed protocols of @stream.
844  *
845  * Returns: a #GstRTSPLowerTrans
846  */
847 GstRTSPLowerTrans
848 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
849 {
850   GstRTSPStreamPrivate *priv;
851   GstRTSPLowerTrans res;
852
853   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
854       GST_RTSP_LOWER_TRANS_UNKNOWN);
855
856   priv = stream->priv;
857
858   g_mutex_lock (&priv->lock);
859   res = priv->allowed_protocols;
860   g_mutex_unlock (&priv->lock);
861
862   return res;
863 }
864
865 /**
866  * gst_rtsp_stream_set_address_pool:
867  * @stream: a #GstRTSPStream
868  * @pool: (transfer none) (nullable): a #GstRTSPAddressPool
869  *
870  * configure @pool to be used as the address pool of @stream.
871  */
872 void
873 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
874     GstRTSPAddressPool * pool)
875 {
876   GstRTSPStreamPrivate *priv;
877   GstRTSPAddressPool *old;
878
879   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
880
881   priv = stream->priv;
882
883   GST_LOG_OBJECT (stream, "set address pool %p", pool);
884
885   g_mutex_lock (&priv->lock);
886   if ((old = priv->pool) != pool)
887     priv->pool = pool ? g_object_ref (pool) : NULL;
888   else
889     old = NULL;
890   g_mutex_unlock (&priv->lock);
891
892   if (old)
893     g_object_unref (old);
894 }
895
896 /**
897  * gst_rtsp_stream_get_address_pool:
898  * @stream: a #GstRTSPStream
899  *
900  * Get the #GstRTSPAddressPool used as the address pool of @stream.
901  *
902  * Returns: (transfer full) (nullable): the #GstRTSPAddressPool of @stream.
903  * g_object_unref() after usage.
904  */
905 GstRTSPAddressPool *
906 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
907 {
908   GstRTSPStreamPrivate *priv;
909   GstRTSPAddressPool *result;
910
911   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
912
913   priv = stream->priv;
914
915   g_mutex_lock (&priv->lock);
916   if ((result = priv->pool))
917     g_object_ref (result);
918   g_mutex_unlock (&priv->lock);
919
920   return result;
921 }
922
923 /**
924  * gst_rtsp_stream_set_multicast_iface:
925  * @stream: a #GstRTSPStream
926  * @multicast_iface: (transfer none) (nullable): a multicast interface name
927  *
928  * configure @multicast_iface to be used for @stream.
929  */
930 void
931 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
932     const gchar * multicast_iface)
933 {
934   GstRTSPStreamPrivate *priv;
935   gchar *old;
936
937   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
938
939   priv = stream->priv;
940
941   GST_LOG_OBJECT (stream, "set multicast iface %s",
942       GST_STR_NULL (multicast_iface));
943
944   g_mutex_lock (&priv->lock);
945   if ((old = priv->multicast_iface) != multicast_iface)
946     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
947   else
948     old = NULL;
949   g_mutex_unlock (&priv->lock);
950
951   if (old)
952     g_free (old);
953 }
954
955 /**
956  * gst_rtsp_stream_get_multicast_iface:
957  * @stream: a #GstRTSPStream
958  *
959  * Get the multicast interface used for @stream.
960  *
961  * Returns: (transfer full) (nullable): the multicast interface for @stream.
962  * g_free() after usage.
963  */
964 gchar *
965 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
966 {
967   GstRTSPStreamPrivate *priv;
968   gchar *result;
969
970   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
971
972   priv = stream->priv;
973
974   g_mutex_lock (&priv->lock);
975   if ((result = priv->multicast_iface))
976     result = g_strdup (result);
977   g_mutex_unlock (&priv->lock);
978
979   return result;
980 }
981
982 /**
983  * gst_rtsp_stream_get_multicast_address:
984  * @stream: a #GstRTSPStream
985  * @family: the #GSocketFamily
986  *
987  * Get the multicast address of @stream for @family. The original
988  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
989  * won't release the address from the pool.
990  *
991  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
992  * or %NULL when no address could be allocated. gst_rtsp_address_free()
993  * after usage.
994  */
995 GstRTSPAddress *
996 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
997     GSocketFamily family)
998 {
999   GstRTSPStreamPrivate *priv;
1000   GstRTSPAddress *result;
1001   GstRTSPAddress **addrp;
1002   GstRTSPAddressFlags flags;
1003
1004   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1005
1006   priv = stream->priv;
1007
1008   g_mutex_lock (&stream->priv->lock);
1009
1010   if (family == G_SOCKET_FAMILY_IPV6) {
1011     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
1012     addrp = &priv->mcast_addr_v6;
1013   } else {
1014     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
1015     addrp = &priv->mcast_addr_v4;
1016   }
1017
1018   if (*addrp == NULL) {
1019     if (priv->pool == NULL)
1020       goto no_pool;
1021
1022     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
1023
1024     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
1025     if (*addrp == NULL)
1026       goto no_address;
1027
1028     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1029      * where we are going to bind our socket. Probably loop until we find a port
1030      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
1031      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
1032      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
1033   }
1034   result = gst_rtsp_address_copy (*addrp);
1035
1036   g_mutex_unlock (&stream->priv->lock);
1037
1038   return result;
1039
1040   /* ERRORS */
1041 no_pool:
1042   {
1043     GST_ERROR_OBJECT (stream, "no address pool specified");
1044     g_mutex_unlock (&stream->priv->lock);
1045     return NULL;
1046   }
1047 no_address:
1048   {
1049     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1050     g_mutex_unlock (&stream->priv->lock);
1051     return NULL;
1052   }
1053 }
1054
1055 /**
1056  * gst_rtsp_stream_reserve_address:
1057  * @stream: a #GstRTSPStream
1058  * @address: an address
1059  * @port: a port
1060  * @n_ports: n_ports
1061  * @ttl: a TTL
1062  *
1063  * Reserve @address and @port as the address and port of @stream. The original
1064  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1065  * won't release the address from the pool.
1066  *
1067  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1068  * the address could be reserved. gst_rtsp_address_free() after usage.
1069  */
1070 GstRTSPAddress *
1071 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1072     const gchar * address, guint port, guint n_ports, guint ttl)
1073 {
1074   GstRTSPStreamPrivate *priv;
1075   GstRTSPAddress *result;
1076   GInetAddress *addr;
1077   GSocketFamily family;
1078   GstRTSPAddress **addrp;
1079
1080   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1081   g_return_val_if_fail (address != NULL, NULL);
1082   g_return_val_if_fail (port > 0, NULL);
1083   g_return_val_if_fail (n_ports > 0, NULL);
1084   g_return_val_if_fail (ttl > 0, NULL);
1085
1086   priv = stream->priv;
1087
1088   addr = g_inet_address_new_from_string (address);
1089   if (!addr) {
1090     GST_ERROR ("failed to get inet addr from %s", address);
1091     family = G_SOCKET_FAMILY_IPV4;
1092   } else {
1093     family = g_inet_address_get_family (addr);
1094     g_object_unref (addr);
1095   }
1096
1097   if (family == G_SOCKET_FAMILY_IPV6)
1098     addrp = &priv->mcast_addr_v6;
1099   else
1100     addrp = &priv->mcast_addr_v4;
1101
1102   g_mutex_lock (&priv->lock);
1103   if (*addrp == NULL) {
1104     GstRTSPAddressPoolResult res;
1105
1106     if (priv->pool == NULL)
1107       goto no_pool;
1108
1109     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1110         port, n_ports, ttl, addrp);
1111     if (res != GST_RTSP_ADDRESS_POOL_OK)
1112       goto no_address;
1113
1114     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1115      * where we are going to bind our socket. */
1116   } else {
1117     if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1118         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1119         (*addrp)->ttl != ttl)
1120       goto different_address;
1121   }
1122   result = gst_rtsp_address_copy (*addrp);
1123   g_mutex_unlock (&priv->lock);
1124
1125   return result;
1126
1127   /* ERRORS */
1128 no_pool:
1129   {
1130     GST_ERROR_OBJECT (stream, "no address pool specified");
1131     g_mutex_unlock (&priv->lock);
1132     return NULL;
1133   }
1134 no_address:
1135   {
1136     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1137         address);
1138     g_mutex_unlock (&priv->lock);
1139     return NULL;
1140   }
1141 different_address:
1142   {
1143     GST_ERROR_OBJECT (stream,
1144         "address %s is not the same as %s that was already reserved",
1145         address, (*addrp)->address);
1146     g_mutex_unlock (&priv->lock);
1147     return NULL;
1148   }
1149 }
1150
1151 /* must be called with lock */
1152 static void
1153 set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1154     GSocketFamily family)
1155 {
1156   const gchar *multisink_socket;
1157
1158   if (family == G_SOCKET_FAMILY_IPV6)
1159     multisink_socket = "socket-v6";
1160   else
1161     multisink_socket = "socket";
1162
1163   g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
1164 }
1165
1166 /* must be called with lock */
1167 static void
1168 set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1169     GSocketFamily family, const gchar * multicast_iface,
1170     const gchar * addr_str, gint port, gint mcast_ttl)
1171 {
1172   set_socket_for_udpsink (udpsink, socket, family);
1173
1174   if (multicast_iface) {
1175     GST_INFO ("setting multicast-iface %s", multicast_iface);
1176     g_object_set (G_OBJECT (udpsink), "multicast-iface", multicast_iface, NULL);
1177   }
1178
1179   if (mcast_ttl > 0) {
1180     GST_INFO ("setting ttl-mc %d", mcast_ttl);
1181     g_object_set (G_OBJECT (udpsink), "ttl-mc", mcast_ttl, NULL);
1182   }
1183 }
1184
1185
1186 /* must be called with lock */
1187 static void
1188 set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1189     GSocketFamily family)
1190 {
1191   set_socket_for_udpsink (udpsink, socket, family);
1192 }
1193
1194 static guint16
1195 get_port_from_socket (GSocket * socket)
1196 {
1197   guint16 port;
1198   GSocketAddress *sockaddr;
1199   GError *err;
1200
1201   GST_DEBUG ("socket: %p", socket);
1202   sockaddr = g_socket_get_local_address (socket, &err);
1203   if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
1204     g_clear_object (&sockaddr);
1205     GST_ERROR ("failed to get sockaddr: %s", err->message);
1206     g_error_free (err);
1207     return 0;
1208   }
1209
1210   port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
1211   g_object_unref (sockaddr);
1212
1213   return port;
1214 }
1215
1216
1217 static gboolean
1218 create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
1219     GSocket * socket_v4, GSocket * socket_v6, gboolean multicast,
1220     gboolean is_rtp, gint mcast_ttl)
1221 {
1222   GstRTSPStreamPrivate *priv = stream->priv;
1223
1224   *udpsink = gst_element_factory_make ("multiudpsink", NULL);
1225
1226   if (!*udpsink)
1227     goto no_udp_protocol;
1228
1229   /* configure sinks */
1230
1231   g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
1232
1233   g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
1234
1235   if (is_rtp)
1236     g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
1237   else
1238     g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
1239
1240   /* Needs to be async for RECORD streams, otherwise we will never go to
1241    * PLAYING because the sinks will wait for data while the udpsrc can't
1242    * provide data with timestamps in PAUSED. */
1243   if (!is_rtp || priv->sinkpad)
1244     g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
1245
1246   if (multicast) {
1247     /* join multicast group when adding clients, so we'll start receiving from it.
1248      * We cannot rely on the udpsrc to join the group since its socket is always a
1249      * local unicast one. */
1250     g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
1251
1252     g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
1253   }
1254
1255   /* update the dscp qos field in the sinks */
1256   update_dscp_qos (stream, udpsink);
1257
1258   if (priv->server_addr_v4) {
1259     GST_DEBUG_OBJECT (stream, "udp IPv4, configure udpsinks");
1260     set_unicast_socket_for_udpsink (*udpsink, socket_v4, G_SOCKET_FAMILY_IPV4);
1261   }
1262
1263   if (priv->server_addr_v6) {
1264     GST_DEBUG_OBJECT (stream, "udp IPv6, configure udpsinks");
1265     set_unicast_socket_for_udpsink (*udpsink, socket_v6, G_SOCKET_FAMILY_IPV6);
1266   }
1267
1268   if (multicast) {
1269     gint port;
1270     if (priv->mcast_addr_v4) {
1271       GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
1272       port = get_port_from_socket (socket_v4);
1273       if (!port)
1274         goto get_port_failed;
1275       set_multicast_socket_for_udpsink (*udpsink, socket_v4,
1276           G_SOCKET_FAMILY_IPV4, priv->multicast_iface,
1277           priv->mcast_addr_v4->address, port, mcast_ttl);
1278     }
1279
1280     if (priv->mcast_addr_v6) {
1281       GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
1282       port = get_port_from_socket (socket_v6);
1283       if (!port)
1284         goto get_port_failed;
1285       set_multicast_socket_for_udpsink (*udpsink, socket_v6,
1286           G_SOCKET_FAMILY_IPV6, priv->multicast_iface,
1287           priv->mcast_addr_v6->address, port, mcast_ttl);
1288     }
1289
1290   }
1291
1292   return TRUE;
1293
1294   /* ERRORS */
1295 no_udp_protocol:
1296   {
1297     GST_ERROR_OBJECT (stream, "failed to create udpsink element");
1298     return FALSE;
1299   }
1300 get_port_failed:
1301   {
1302     GST_ERROR_OBJECT (stream, "failed to get udp port");
1303     return FALSE;
1304   }
1305 }
1306
1307 /* must be called with lock */
1308 static gboolean
1309 create_and_configure_udpsource (GstElement ** udpsrc, GSocket * socket)
1310 {
1311   GstStateChangeReturn ret;
1312
1313   g_assert (socket != NULL);
1314
1315   *udpsrc = gst_element_factory_make ("udpsrc", NULL);
1316   if (*udpsrc == NULL)
1317     goto error;
1318
1319   g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
1320
1321   /* The udpsrc cannot do the join because its socket is always a local unicast
1322    * one. The udpsink sharing the same socket will do it for us. */
1323   g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
1324
1325   g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
1326
1327   g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
1328
1329   ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
1330   if (ret == GST_STATE_CHANGE_FAILURE)
1331     goto error;
1332
1333   return TRUE;
1334
1335   /* ERRORS */
1336 error:
1337   {
1338     if (*udpsrc) {
1339       gst_element_set_state (*udpsrc, GST_STATE_NULL);
1340       g_clear_object (udpsrc);
1341     }
1342     return FALSE;
1343   }
1344 }
1345
1346 static gboolean
1347 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1348     GSocket * socket_out[2], GstRTSPAddress ** server_addr_out,
1349     gboolean multicast, GstRTSPTransport * ct, gboolean use_transport_settings)
1350 {
1351   GstRTSPStreamPrivate *priv = stream->priv;
1352   GSocket *rtp_socket = NULL;
1353   GSocket *rtcp_socket = NULL;
1354   gint tmp_rtp, tmp_rtcp;
1355   guint count;
1356   GList *rejected_addresses = NULL;
1357   GstRTSPAddress *addr = NULL;
1358   GInetAddress *inetaddr = NULL;
1359   GSocketAddress *rtp_sockaddr = NULL;
1360   GSocketAddress *rtcp_sockaddr = NULL;
1361   GstRTSPAddressPool *pool;
1362   gboolean transport_settings_defined = FALSE;
1363
1364   pool = priv->pool;
1365   count = 0;
1366
1367   /* Start with random port */
1368   tmp_rtp = 0;
1369
1370   if (use_transport_settings) {
1371     if (!multicast)
1372       goto no_mcast;
1373
1374     if (ct == NULL)
1375       goto no_transport;
1376
1377     /* multicast and transport specific case */
1378     if (ct->destination != NULL) {
1379       tmp_rtp = ct->port.min;
1380       tmp_rtcp = ct->port.max;
1381       inetaddr = g_inet_address_new_from_string (ct->destination);
1382       if (inetaddr == NULL)
1383         goto destination_error;
1384       if (!g_inet_address_get_is_multicast (inetaddr))
1385         goto destination_no_mcast;
1386       g_object_unref (inetaddr);
1387       inetaddr = g_inet_address_new_any (family);
1388
1389       GST_DEBUG_OBJECT (stream, "use transport settings");
1390       transport_settings_defined = TRUE;
1391     }
1392   }
1393
1394   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1395       G_SOCKET_PROTOCOL_UDP, NULL);
1396   if (!rtcp_socket)
1397     goto no_udp_protocol;
1398   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1399
1400   /* try to allocate 2 UDP ports, the RTP port should be an even
1401    * number and the RTCP port should be the next (uneven) port */
1402 again:
1403
1404   if (rtp_socket == NULL) {
1405     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1406         G_SOCKET_PROTOCOL_UDP, NULL);
1407     if (!rtp_socket)
1408       goto no_udp_protocol;
1409     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1410   }
1411
1412   if (!transport_settings_defined) {
1413     if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool))
1414         || multicast) {
1415       GstRTSPAddressFlags flags;
1416
1417       if (addr)
1418         rejected_addresses = g_list_prepend (rejected_addresses, addr);
1419
1420       if (!pool)
1421         goto no_pool;
1422
1423       flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1424       if (multicast)
1425         flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1426       else
1427         flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1428
1429       if (family == G_SOCKET_FAMILY_IPV6)
1430         flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1431       else
1432         flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1433
1434       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1435
1436       if (addr == NULL)
1437         goto no_address;
1438
1439       tmp_rtp = addr->port;
1440
1441       g_clear_object (&inetaddr);
1442       /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
1443        * socket control message set in udpsrc? */
1444       if (multicast)
1445         inetaddr = g_inet_address_new_any (family);
1446       else
1447         inetaddr = g_inet_address_new_from_string (addr->address);
1448     } else {
1449       if (tmp_rtp != 0) {
1450         tmp_rtp += 2;
1451         if (++count > 20)
1452           goto no_ports;
1453       }
1454
1455       if (inetaddr == NULL)
1456         inetaddr = g_inet_address_new_any (family);
1457     }
1458   }
1459
1460   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1461   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1462     GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
1463     g_object_unref (rtp_sockaddr);
1464     if (transport_settings_defined)
1465       goto transport_settings_error;
1466     goto again;
1467   }
1468   g_object_unref (rtp_sockaddr);
1469
1470   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1471   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1472     g_clear_object (&rtp_sockaddr);
1473     goto socket_error;
1474   }
1475
1476   if (!transport_settings_defined) {
1477     tmp_rtp =
1478         g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1479
1480     /* check if port is even. RFC 3550 encorages the use of an even/odd port
1481      * pair, however it's not a strict requirement so this check is not done
1482      * for the client selected ports. */
1483     if ((tmp_rtp & 1) != 0) {
1484       /* port not even, close and allocate another */
1485       tmp_rtp++;
1486       g_object_unref (rtp_sockaddr);
1487       g_clear_object (&rtp_socket);
1488       goto again;
1489     }
1490   }
1491   g_object_unref (rtp_sockaddr);
1492
1493   /* set port */
1494   tmp_rtcp = tmp_rtp + 1;
1495
1496   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1497   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1498     GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
1499     g_object_unref (rtcp_sockaddr);
1500     g_clear_object (&rtp_socket);
1501     if (transport_settings_defined)
1502       goto transport_settings_error;
1503     goto again;
1504   }
1505   g_object_unref (rtcp_sockaddr);
1506
1507   if (!addr) {
1508     addr = g_slice_new0 (GstRTSPAddress);
1509     addr->port = tmp_rtp;
1510     addr->n_ports = 2;
1511     if (transport_settings_defined)
1512       addr->address = g_strdup (ct->destination);
1513     else
1514       addr->address = g_inet_address_to_string (inetaddr);
1515     addr->ttl = ct->ttl;
1516   }
1517
1518   g_clear_object (&inetaddr);
1519
1520   if (multicast && (ct->ttl > 0) && (ct->ttl <= priv->max_mcast_ttl)) {
1521     GST_DEBUG ("setting mcast ttl to %d", ct->ttl);
1522     g_socket_set_multicast_ttl (rtp_socket, ct->ttl);
1523     g_socket_set_multicast_ttl (rtcp_socket, ct->ttl);
1524   }
1525
1526   socket_out[0] = rtp_socket;
1527   socket_out[1] = rtcp_socket;
1528   *server_addr_out = addr;
1529
1530   GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d",
1531       addr->address, tmp_rtp, tmp_rtcp);
1532
1533   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1534
1535   return TRUE;
1536
1537   /* ERRORS */
1538 no_mcast:
1539   {
1540     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: wrong transport");
1541     goto cleanup;
1542   }
1543 no_transport:
1544   {
1545     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no transport");
1546     goto cleanup;
1547   }
1548 destination_error:
1549   {
1550     GST_ERROR_OBJECT (stream,
1551         "failed to allocate UDP ports: destination error");
1552     goto cleanup;
1553   }
1554 destination_no_mcast:
1555   {
1556     GST_ERROR_OBJECT (stream,
1557         "failed to allocate UDP ports: destination not multicast address");
1558     goto cleanup;
1559   }
1560 no_udp_protocol:
1561   {
1562     GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: protocol error");
1563     goto cleanup;
1564   }
1565 no_pool:
1566   {
1567     GST_WARNING_OBJECT (stream,
1568         "failed to allocate UDP ports: no address pool specified");
1569     goto cleanup;
1570   }
1571 no_address:
1572   {
1573     GST_WARNING_OBJECT (stream, "failed to acquire address from pool");
1574     goto cleanup;
1575   }
1576 no_ports:
1577   {
1578     GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: no ports");
1579     goto cleanup;
1580   }
1581 transport_settings_error:
1582   {
1583     GST_ERROR_OBJECT (stream,
1584         "failed to allocate UDP ports with requested transport settings");
1585     goto cleanup;
1586   }
1587 socket_error:
1588   {
1589     GST_WARNING_OBJECT (stream, "failed to allocate UDP ports: socket error");
1590     goto cleanup;
1591   }
1592 cleanup:
1593   {
1594     if (inetaddr)
1595       g_object_unref (inetaddr);
1596     g_list_free_full (rejected_addresses,
1597         (GDestroyNotify) gst_rtsp_address_free);
1598     if (addr)
1599       gst_rtsp_address_free (addr);
1600     if (rtp_socket)
1601       g_object_unref (rtp_socket);
1602     if (rtcp_socket)
1603       g_object_unref (rtcp_socket);
1604     return FALSE;
1605   }
1606 }
1607
1608 /* must be called with lock */
1609 static gboolean
1610 add_mcast_client_addr (GstRTSPStream * stream, const gchar * destination,
1611     guint rtp_port, guint rtcp_port)
1612 {
1613   GstRTSPStreamPrivate *priv;
1614   GList *walk;
1615   UdpClientAddrInfo *client;
1616   GInetAddress *inet;
1617
1618   priv = stream->priv;
1619
1620   if (destination == NULL)
1621     return FALSE;
1622
1623   inet = g_inet_address_new_from_string (destination);
1624   if (inet == NULL)
1625     goto invalid_address;
1626
1627   if (!g_inet_address_get_is_multicast (inet)) {
1628     g_object_unref (inet);
1629     goto invalid_address;
1630   }
1631   g_object_unref (inet);
1632
1633   for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
1634     UdpClientAddrInfo *cli = walk->data;
1635
1636     if ((g_strcmp0 (cli->address, destination) == 0) &&
1637         (cli->rtp_port == rtp_port)) {
1638       GST_DEBUG ("requested destination already exists: %s:%u-%u",
1639           destination, rtp_port, rtcp_port);
1640       cli->add_count++;
1641       return TRUE;
1642     }
1643   }
1644
1645   client = g_new0 (UdpClientAddrInfo, 1);
1646   client->address = g_strdup (destination);
1647   client->rtp_port = rtp_port;
1648   client->add_count = 1;
1649   priv->mcast_clients = g_list_prepend (priv->mcast_clients, client);
1650
1651   GST_DEBUG ("added mcast client %s:%u-%u", destination, rtp_port, rtcp_port);
1652
1653   return TRUE;
1654
1655 invalid_address:
1656   {
1657     GST_WARNING_OBJECT (stream, "Multicast address is invalid: %s",
1658         destination);
1659     return FALSE;
1660   }
1661 }
1662
1663 /* must be called with lock */
1664 static gboolean
1665 remove_mcast_client_addr (GstRTSPStream * stream, const gchar * destination,
1666     guint rtp_port, guint rtcp_port)
1667 {
1668   GstRTSPStreamPrivate *priv;
1669   GList *walk;
1670
1671   priv = stream->priv;
1672
1673   if (destination == NULL)
1674     goto no_destination;
1675
1676   for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
1677     UdpClientAddrInfo *cli = walk->data;
1678
1679     if ((g_strcmp0 (cli->address, destination) == 0) &&
1680         (cli->rtp_port == rtp_port)) {
1681       cli->add_count--;
1682
1683       if (!cli->add_count) {
1684         priv->mcast_clients = g_list_remove (priv->mcast_clients, cli);
1685         free_mcast_client (cli);
1686       }
1687       return TRUE;
1688     }
1689   }
1690
1691   GST_WARNING_OBJECT (stream, "Address not found");
1692   return FALSE;
1693
1694 no_destination:
1695   {
1696     GST_WARNING_OBJECT (stream, "No destination has been provided");
1697     return FALSE;
1698   }
1699 }
1700
1701
1702 /**
1703  * gst_rtsp_stream_allocate_udp_sockets:
1704  * @stream: a #GstRTSPStream
1705  * @family: protocol family
1706  * @transport: transport method
1707  * @use_client_settings: Whether to use client settings or not
1708  *
1709  * Allocates RTP and RTCP ports.
1710  *
1711  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1712  */
1713 gboolean
1714 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1715     GSocketFamily family, GstRTSPTransport * ct,
1716     gboolean use_transport_settings)
1717 {
1718   GstRTSPStreamPrivate *priv;
1719   gboolean ret = FALSE;
1720   GstRTSPLowerTrans transport;
1721   gboolean allocated = FALSE;
1722
1723   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1724   g_return_val_if_fail (ct != NULL, FALSE);
1725   priv = stream->priv;
1726
1727   transport = ct->lower_transport;
1728
1729   g_mutex_lock (&priv->lock);
1730
1731   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1732     if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_socket_v4[0])
1733       allocated = TRUE;
1734     else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_socket_v6[0])
1735       allocated = TRUE;
1736   } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1737     if (family == G_SOCKET_FAMILY_IPV4 && priv->socket_v4[0])
1738       allocated = TRUE;
1739     else if (family == G_SOCKET_FAMILY_IPV6 && priv->socket_v6[0])
1740       allocated = TRUE;
1741   }
1742
1743   if (allocated) {
1744     GST_DEBUG_OBJECT (stream, "Allocated already");
1745     g_mutex_unlock (&priv->lock);
1746     return TRUE;
1747   }
1748
1749   if (family == G_SOCKET_FAMILY_IPV4) {
1750     /* IPv4 */
1751     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1752       /* UDP unicast */
1753       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
1754       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1755           priv->socket_v4, &priv->server_addr_v4, FALSE, ct, FALSE);
1756     } else {
1757       /* multicast */
1758       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
1759       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1760           priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct,
1761           use_transport_settings);
1762     }
1763   } else {
1764     /* IPv6 */
1765     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1766       /* unicast */
1767       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
1768       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1769           priv->socket_v6, &priv->server_addr_v6, FALSE, ct, FALSE);
1770
1771     } else {
1772       /* multicast */
1773       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
1774       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1775           priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct,
1776           use_transport_settings);
1777     }
1778   }
1779   g_mutex_unlock (&priv->lock);
1780
1781   return ret;
1782 }
1783
1784 /**
1785  * gst_rtsp_stream_set_client_side:
1786  * @stream: a #GstRTSPStream
1787  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1788  * an RTSP connection.
1789  *
1790  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1791  * streams to an RTSP server via RECORD. This has the practical effect
1792  * of changing which UDP port numbers are used when setting up the local
1793  * side of the stream sending to be either the 'server' or 'client' pair
1794  * of a configured UDP transport.
1795  */
1796 void
1797 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1798 {
1799   GstRTSPStreamPrivate *priv;
1800
1801   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1802   priv = stream->priv;
1803   g_mutex_lock (&priv->lock);
1804   priv->client_side = client_side;
1805   g_mutex_unlock (&priv->lock);
1806 }
1807
1808 /**
1809  * gst_rtsp_stream_is_client_side:
1810  * @stream: a #GstRTSPStream
1811  *
1812  * See gst_rtsp_stream_set_client_side()
1813  *
1814  * Returns: TRUE if this #GstRTSPStream is client-side.
1815  */
1816 gboolean
1817 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1818 {
1819   GstRTSPStreamPrivate *priv;
1820   gboolean ret;
1821
1822   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1823
1824   priv = stream->priv;
1825   g_mutex_lock (&priv->lock);
1826   ret = priv->client_side;
1827   g_mutex_unlock (&priv->lock);
1828
1829   return ret;
1830 }
1831
1832 /**
1833  * gst_rtsp_stream_get_server_port:
1834  * @stream: a #GstRTSPStream
1835  * @server_port: (out): result server port
1836  * @family: the port family to get
1837  *
1838  * Fill @server_port with the port pair used by the server. This function can
1839  * only be called when @stream has been joined.
1840  */
1841 void
1842 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1843     GstRTSPRange * server_port, GSocketFamily family)
1844 {
1845   GstRTSPStreamPrivate *priv;
1846
1847   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1848   priv = stream->priv;
1849   g_return_if_fail (priv->joined_bin != NULL);
1850
1851   if (server_port) {
1852     server_port->min = 0;
1853     server_port->max = 0;
1854   }
1855
1856   g_mutex_lock (&priv->lock);
1857   if (family == G_SOCKET_FAMILY_IPV4) {
1858     if (server_port && priv->server_addr_v4) {
1859       server_port->min = priv->server_addr_v4->port;
1860       server_port->max =
1861           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1862     }
1863   } else {
1864     if (server_port && priv->server_addr_v6) {
1865       server_port->min = priv->server_addr_v6->port;
1866       server_port->max =
1867           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1868     }
1869   }
1870   g_mutex_unlock (&priv->lock);
1871 }
1872
1873 /**
1874  * gst_rtsp_stream_get_rtpsession:
1875  * @stream: a #GstRTSPStream
1876  *
1877  * Get the RTP session of this stream.
1878  *
1879  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1880  */
1881 GObject *
1882 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1883 {
1884   GstRTSPStreamPrivate *priv;
1885   GObject *session;
1886
1887   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1888
1889   priv = stream->priv;
1890
1891   g_mutex_lock (&priv->lock);
1892   if ((session = priv->session))
1893     g_object_ref (session);
1894   g_mutex_unlock (&priv->lock);
1895
1896   return session;
1897 }
1898
1899 /**
1900  * gst_rtsp_stream_get_srtp_encoder:
1901  * @stream: a #GstRTSPStream
1902  *
1903  * Get the SRTP encoder for this stream.
1904  *
1905  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1906  */
1907 GstElement *
1908 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1909 {
1910   GstRTSPStreamPrivate *priv;
1911   GstElement *encoder;
1912
1913   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1914
1915   priv = stream->priv;
1916
1917   g_mutex_lock (&priv->lock);
1918   if ((encoder = priv->srtpenc))
1919     g_object_ref (encoder);
1920   g_mutex_unlock (&priv->lock);
1921
1922   return encoder;
1923 }
1924
1925 /**
1926  * gst_rtsp_stream_get_ssrc:
1927  * @stream: a #GstRTSPStream
1928  * @ssrc: (out): result ssrc
1929  *
1930  * Get the SSRC used by the RTP session of this stream. This function can only
1931  * be called when @stream has been joined.
1932  */
1933 void
1934 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1935 {
1936   GstRTSPStreamPrivate *priv;
1937
1938   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1939   priv = stream->priv;
1940   g_return_if_fail (priv->joined_bin != NULL);
1941
1942   g_mutex_lock (&priv->lock);
1943   if (ssrc && priv->session)
1944     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1945   g_mutex_unlock (&priv->lock);
1946 }
1947
1948 /**
1949  * gst_rtsp_stream_set_retransmission_time:
1950  * @stream: a #GstRTSPStream
1951  * @time: a #GstClockTime
1952  *
1953  * Set the amount of time to store retransmission packets.
1954  */
1955 void
1956 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1957     GstClockTime time)
1958 {
1959   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1960
1961   g_mutex_lock (&stream->priv->lock);
1962   stream->priv->rtx_time = time;
1963   if (stream->priv->rtxsend)
1964     g_object_set (stream->priv->rtxsend, "max-size-time",
1965         GST_TIME_AS_MSECONDS (time), NULL);
1966   g_mutex_unlock (&stream->priv->lock);
1967 }
1968
1969 /**
1970  * gst_rtsp_stream_get_retransmission_time:
1971  * @stream: a #GstRTSPStream
1972  *
1973  * Get the amount of time to store retransmission data.
1974  *
1975  * Returns: the amount of time to store retransmission data.
1976  */
1977 GstClockTime
1978 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1979 {
1980   GstClockTime ret;
1981
1982   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1983
1984   g_mutex_lock (&stream->priv->lock);
1985   ret = stream->priv->rtx_time;
1986   g_mutex_unlock (&stream->priv->lock);
1987
1988   return ret;
1989 }
1990
1991 /**
1992  * gst_rtsp_stream_set_retransmission_pt:
1993  * @stream: a #GstRTSPStream
1994  * @rtx_pt: a #guint
1995  *
1996  * Set the payload type (pt) for retransmission of this stream.
1997  */
1998 void
1999 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
2000 {
2001   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2002
2003   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
2004
2005   g_mutex_lock (&stream->priv->lock);
2006   stream->priv->rtx_pt = rtx_pt;
2007   if (stream->priv->rtxsend) {
2008     guint pt = gst_rtsp_stream_get_pt (stream);
2009     gchar *pt_s = g_strdup_printf ("%d", pt);
2010     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
2011         pt_s, G_TYPE_UINT, rtx_pt, NULL);
2012     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
2013     g_free (pt_s);
2014     gst_structure_free (rtx_pt_map);
2015   }
2016   g_mutex_unlock (&stream->priv->lock);
2017 }
2018
2019 /**
2020  * gst_rtsp_stream_get_retransmission_pt:
2021  * @stream: a #GstRTSPStream
2022  *
2023  * Get the payload-type used for retransmission of this stream
2024  *
2025  * Returns: The retransmission PT.
2026  */
2027 guint
2028 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
2029 {
2030   guint rtx_pt;
2031
2032   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2033
2034   g_mutex_lock (&stream->priv->lock);
2035   rtx_pt = stream->priv->rtx_pt;
2036   g_mutex_unlock (&stream->priv->lock);
2037
2038   return rtx_pt;
2039 }
2040
2041 /**
2042  * gst_rtsp_stream_set_buffer_size:
2043  * @stream: a #GstRTSPStream
2044  * @size: the buffer size
2045  *
2046  * Set the size of the UDP transmission buffer (in bytes)
2047  * Needs to be set before the stream is joined to a bin.
2048  *
2049  * Since: 1.6
2050  */
2051 void
2052 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
2053 {
2054   g_mutex_lock (&stream->priv->lock);
2055   stream->priv->buffer_size = size;
2056   g_mutex_unlock (&stream->priv->lock);
2057 }
2058
2059 /**
2060  * gst_rtsp_stream_get_buffer_size:
2061  * @stream: a #GstRTSPStream
2062  *
2063  * Get the size of the UDP transmission buffer (in bytes)
2064  *
2065  * Returns: the size of the UDP TX buffer
2066  *
2067  * Since: 1.6
2068  */
2069 guint
2070 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
2071 {
2072   guint buffer_size;
2073
2074   g_mutex_lock (&stream->priv->lock);
2075   buffer_size = stream->priv->buffer_size;
2076   g_mutex_unlock (&stream->priv->lock);
2077
2078   return buffer_size;
2079 }
2080
2081 /**
2082  * gst_rtsp_stream_set_max_mcast_ttl:
2083  * @stream: a #GstRTSPStream
2084  * @ttl: the new multicast ttl value
2085  *
2086  * Set the maximum time-to-live value of outgoing multicast packets.
2087  *
2088  * Returns: %TRUE if the requested ttl has been set successfully.
2089  *
2090  */
2091 gboolean
2092 gst_rtsp_stream_set_max_mcast_ttl (GstRTSPStream * stream, guint ttl)
2093 {
2094   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2095
2096   g_mutex_lock (&stream->priv->lock);
2097   if (ttl == 0 || ttl > DEFAULT_MAX_MCAST_TTL) {
2098     GST_WARNING_OBJECT (stream, "The reqested mcast TTL value is not valid.");
2099     g_mutex_unlock (&stream->priv->lock);
2100     return FALSE;
2101   }
2102   stream->priv->max_mcast_ttl = ttl;
2103   g_mutex_unlock (&stream->priv->lock);
2104
2105   return TRUE;
2106 }
2107
2108 /**
2109  * gst_rtsp_stream_get_max_mcast_ttl:
2110  * @stream: a #GstRTSPStream
2111  *
2112  * Get the the maximum time-to-live value of outgoing multicast packets.
2113  *
2114  * Returns: the maximum time-to-live value of outgoing multicast packets.
2115  *
2116  */
2117 guint
2118 gst_rtsp_stream_get_max_mcast_ttl (GstRTSPStream * stream)
2119 {
2120   guint ttl;
2121
2122   g_mutex_lock (&stream->priv->lock);
2123   ttl = stream->priv->max_mcast_ttl;
2124   g_mutex_unlock (&stream->priv->lock);
2125
2126   return ttl;
2127 }
2128
2129 /**
2130  * gst_rtsp_stream_verify_mcast_ttl:
2131  * @stream: a #GstRTSPStream
2132  * @ttl: a requested multicast ttl
2133  *
2134  * Check if the requested multicast ttl value is allowed.
2135  *
2136  * Returns: TRUE if the requested ttl value is allowed.
2137  *
2138  */
2139 gboolean
2140 gst_rtsp_stream_verify_mcast_ttl (GstRTSPStream * stream, guint ttl)
2141 {
2142   gboolean res = FALSE;
2143
2144   g_mutex_lock (&stream->priv->lock);
2145   if ((ttl > 0) && (ttl <= stream->priv->max_mcast_ttl))
2146     res = TRUE;
2147   g_mutex_unlock (&stream->priv->lock);
2148
2149   return res;
2150 }
2151
2152 /* executed from streaming thread */
2153 static void
2154 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
2155 {
2156   GstRTSPStreamPrivate *priv = stream->priv;
2157   GstCaps *newcaps, *oldcaps;
2158
2159   newcaps = gst_pad_get_current_caps (pad);
2160
2161   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
2162       newcaps);
2163
2164   g_mutex_lock (&priv->lock);
2165   oldcaps = priv->caps;
2166   priv->caps = newcaps;
2167   g_mutex_unlock (&priv->lock);
2168
2169   if (oldcaps)
2170     gst_caps_unref (oldcaps);
2171 }
2172
2173 static void
2174 dump_structure (const GstStructure * s)
2175 {
2176   gchar *sstr;
2177
2178   sstr = gst_structure_to_string (s);
2179   GST_INFO ("structure: %s", sstr);
2180   g_free (sstr);
2181 }
2182
2183 static GstRTSPStreamTransport *
2184 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
2185 {
2186   GstRTSPStreamPrivate *priv = stream->priv;
2187   GList *walk;
2188   GstRTSPStreamTransport *result = NULL;
2189   const gchar *tmp;
2190   gchar *dest;
2191   guint port;
2192
2193   if (rtcp_from == NULL)
2194     return NULL;
2195
2196   tmp = g_strrstr (rtcp_from, ":");
2197   if (tmp == NULL)
2198     return NULL;
2199
2200   port = atoi (tmp + 1);
2201   dest = g_strndup (rtcp_from, tmp - rtcp_from);
2202
2203   g_mutex_lock (&priv->lock);
2204   GST_INFO ("finding %s:%d in %d transports", dest, port,
2205       g_list_length (priv->transports));
2206
2207   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2208     GstRTSPStreamTransport *trans = walk->data;
2209     const GstRTSPTransport *tr;
2210     gint min, max;
2211
2212     tr = gst_rtsp_stream_transport_get_transport (trans);
2213
2214     if (priv->client_side) {
2215       /* In client side mode the 'destination' is the RTSP server, so send
2216        * to those ports */
2217       min = tr->server_port.min;
2218       max = tr->server_port.max;
2219     } else {
2220       min = tr->client_port.min;
2221       max = tr->client_port.max;
2222     }
2223
2224     if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
2225         (min == port || max == port)) {
2226       result = trans;
2227       break;
2228     }
2229   }
2230   if (result)
2231     g_object_ref (result);
2232   g_mutex_unlock (&priv->lock);
2233
2234   g_free (dest);
2235
2236   return result;
2237 }
2238
2239 static GstRTSPStreamTransport *
2240 check_transport (GObject * source, GstRTSPStream * stream)
2241 {
2242   GstStructure *stats;
2243   GstRTSPStreamTransport *trans;
2244
2245   /* see if we have a stream to match with the origin of the RTCP packet */
2246   trans = g_object_get_qdata (source, ssrc_stream_map_key);
2247   if (trans == NULL) {
2248     g_object_get (source, "stats", &stats, NULL);
2249     if (stats) {
2250       const gchar *rtcp_from;
2251
2252       dump_structure (stats);
2253
2254       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
2255       if ((trans = find_transport (stream, rtcp_from))) {
2256         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
2257             source);
2258         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
2259             g_object_unref);
2260       }
2261       gst_structure_free (stats);
2262     }
2263   }
2264   return trans;
2265 }
2266
2267
2268 static void
2269 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2270 {
2271   GstRTSPStreamTransport *trans;
2272
2273   GST_INFO ("%p: new source %p", stream, source);
2274
2275   trans = check_transport (source, stream);
2276
2277   if (trans)
2278     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
2279 }
2280
2281 static void
2282 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
2283 {
2284   GST_INFO ("%p: new SDES %p", stream, source);
2285 }
2286
2287 static void
2288 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
2289 {
2290   GstRTSPStreamTransport *trans;
2291
2292   trans = check_transport (source, stream);
2293
2294   if (trans) {
2295     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
2296     gst_rtsp_stream_transport_keep_alive (trans);
2297   }
2298 #ifdef DUMP_STATS
2299   {
2300     GstStructure *stats;
2301     g_object_get (source, "stats", &stats, NULL);
2302     if (stats) {
2303       dump_structure (stats);
2304       gst_structure_free (stats);
2305     }
2306   }
2307 #endif
2308 }
2309
2310 static void
2311 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2312 {
2313   GST_INFO ("%p: source %p bye", stream, source);
2314 }
2315
2316 static void
2317 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2318 {
2319   GstRTSPStreamTransport *trans;
2320
2321   GST_INFO ("%p: source %p bye timeout", stream, source);
2322
2323   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2324     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2325     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2326   }
2327 }
2328
2329 static void
2330 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2331 {
2332   GstRTSPStreamTransport *trans;
2333
2334   GST_INFO ("%p: source %p timeout", stream, source);
2335
2336   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2337     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2338     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2339   }
2340 }
2341
2342 static void
2343 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2344 {
2345   GST_INFO ("%p: new sender source %p", stream, source);
2346 #ifndef DUMP_STATS
2347   {
2348     GstStructure *stats;
2349     g_object_get (source, "stats", &stats, NULL);
2350     if (stats) {
2351       dump_structure (stats);
2352       gst_structure_free (stats);
2353     }
2354   }
2355 #endif
2356 }
2357
2358 static void
2359 on_sender_ssrc_active (GObject * session, GObject * source,
2360     GstRTSPStream * stream)
2361 {
2362 #ifndef DUMP_STATS
2363   {
2364     GstStructure *stats;
2365     g_object_get (source, "stats", &stats, NULL);
2366     if (stats) {
2367       dump_structure (stats);
2368       gst_structure_free (stats);
2369     }
2370   }
2371 #endif
2372 }
2373
2374 static void
2375 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2376 {
2377   if (is_rtp) {
2378     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2379     g_list_free (priv->tr_cache_rtp);
2380     priv->tr_cache_rtp = NULL;
2381   } else {
2382     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2383     g_list_free (priv->tr_cache_rtcp);
2384     priv->tr_cache_rtcp = NULL;
2385   }
2386 }
2387
2388 /* Must be called with priv->lock */
2389 static void
2390 send_tcp_message (GstRTSPStream * stream, gint idx)
2391 {
2392   GstRTSPStreamPrivate *priv = stream->priv;
2393   GstAppSink *sink;
2394   GList *walk;
2395   GstSample *sample;
2396   GstBuffer *buffer;
2397   gboolean is_rtp;
2398
2399   if (priv->n_outstanding > 0 || !priv->have_buffer[idx]) {
2400     return;
2401   }
2402
2403   priv->have_buffer[idx] = FALSE;
2404
2405   if (priv->appsink[idx] == NULL) {
2406     /* session expired */
2407     return;
2408   }
2409
2410   sink = GST_APP_SINK (priv->appsink[idx]);
2411   sample = gst_app_sink_pull_sample (sink);
2412   if (!sample) {
2413     return;
2414   }
2415
2416   buffer = gst_sample_get_buffer (sample);
2417
2418   is_rtp = (idx == 0);
2419
2420   if (is_rtp) {
2421     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2422       clear_tr_cache (priv, is_rtp);
2423       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2424         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2425         const GstRTSPTransport *t =
2426             gst_rtsp_stream_transport_get_transport (tr);
2427
2428         if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
2429           continue;
2430
2431         priv->tr_cache_rtp =
2432             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2433       }
2434       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2435     }
2436   } else {
2437     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2438       clear_tr_cache (priv, is_rtp);
2439       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2440         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2441         const GstRTSPTransport *t =
2442             gst_rtsp_stream_transport_get_transport (tr);
2443
2444         if (t->lower_transport != GST_RTSP_LOWER_TRANS_TCP)
2445           continue;
2446
2447         priv->tr_cache_rtcp =
2448             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2449       }
2450       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2451     }
2452   }
2453
2454   priv->n_outstanding += priv->n_tcp_transports;
2455
2456   g_mutex_unlock (&priv->lock);
2457
2458   if (is_rtp) {
2459     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2460       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2461       if (!gst_rtsp_stream_transport_send_rtp (tr, buffer)) {
2462         /* remove transport on send error */
2463         g_mutex_lock (&priv->lock);
2464         priv->n_outstanding--;
2465         update_transport (stream, tr, FALSE);
2466         g_mutex_unlock (&priv->lock);
2467       }
2468     }
2469   } else {
2470     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2471       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2472       if (!gst_rtsp_stream_transport_send_rtcp (tr, buffer)) {
2473         /* remove transport on send error */
2474         g_mutex_lock (&priv->lock);
2475         priv->n_outstanding--;
2476         update_transport (stream, tr, FALSE);
2477         g_mutex_unlock (&priv->lock);
2478       }
2479     }
2480   }
2481   gst_sample_unref (sample);
2482
2483   g_mutex_lock (&priv->lock);
2484 }
2485
2486 static GstFlowReturn
2487 handle_new_sample (GstAppSink * sink, gpointer user_data)
2488 {
2489   GstRTSPStream *stream = user_data;
2490   GstRTSPStreamPrivate *priv = stream->priv;
2491   int i;
2492   int idx = -1;
2493
2494   g_mutex_lock (&priv->lock);
2495
2496   for (i = 0; i < 2; i++)
2497     if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
2498       priv->have_buffer[i] = TRUE;
2499       if (priv->n_outstanding == 0) {
2500         /* send message */
2501         idx = i;
2502       }
2503       break;
2504     }
2505
2506   if (idx != -1)
2507     send_tcp_message (stream, idx);
2508
2509   g_mutex_unlock (&priv->lock);
2510
2511   return GST_FLOW_OK;
2512 }
2513
2514 static GstAppSinkCallbacks sink_cb = {
2515   NULL,                         /* not interested in EOS */
2516   NULL,                         /* not interested in preroll samples */
2517   handle_new_sample,
2518 };
2519
2520 static GstElement *
2521 get_rtp_encoder (GstRTSPStream * stream, guint session)
2522 {
2523   GstRTSPStreamPrivate *priv = stream->priv;
2524
2525   if (priv->srtpenc == NULL) {
2526     gchar *name;
2527
2528     name = g_strdup_printf ("srtpenc_%u", session);
2529     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2530     g_free (name);
2531
2532     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2533   }
2534   return gst_object_ref (priv->srtpenc);
2535 }
2536
2537 static GstElement *
2538 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2539 {
2540   GstRTSPStreamPrivate *priv = stream->priv;
2541   GstElement *oldenc, *enc;
2542   GstPad *pad;
2543   gchar *name;
2544
2545   if (priv->idx != session)
2546     return NULL;
2547
2548   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2549
2550   oldenc = priv->srtpenc;
2551   enc = get_rtp_encoder (stream, session);
2552   name = g_strdup_printf ("rtp_sink_%d", session);
2553   pad = gst_element_get_request_pad (enc, name);
2554   g_free (name);
2555   gst_object_unref (pad);
2556
2557   if (oldenc == NULL)
2558     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2559         enc);
2560
2561   return enc;
2562 }
2563
2564 static GstElement *
2565 request_rtcp_encoder (GstElement * rtpbin, guint session,
2566     GstRTSPStream * stream)
2567 {
2568   GstRTSPStreamPrivate *priv = stream->priv;
2569   GstElement *oldenc, *enc;
2570   GstPad *pad;
2571   gchar *name;
2572
2573   if (priv->idx != session)
2574     return NULL;
2575
2576   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2577
2578   oldenc = priv->srtpenc;
2579   enc = get_rtp_encoder (stream, session);
2580   name = g_strdup_printf ("rtcp_sink_%d", session);
2581   pad = gst_element_get_request_pad (enc, name);
2582   g_free (name);
2583   gst_object_unref (pad);
2584
2585   if (oldenc == NULL)
2586     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2587         enc);
2588
2589   return enc;
2590 }
2591
2592 static GstCaps *
2593 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2594 {
2595   GstRTSPStreamPrivate *priv = stream->priv;
2596   GstCaps *caps;
2597
2598   GST_DEBUG ("request key %08x", ssrc);
2599
2600   g_mutex_lock (&priv->lock);
2601   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2602     gst_caps_ref (caps);
2603   g_mutex_unlock (&priv->lock);
2604
2605   return caps;
2606 }
2607
2608 static GstElement *
2609 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2610     GstRTSPStream * stream)
2611 {
2612   GstRTSPStreamPrivate *priv = stream->priv;
2613
2614   if (priv->idx != session)
2615     return NULL;
2616
2617   if (priv->srtpdec == NULL) {
2618     gchar *name;
2619
2620     name = g_strdup_printf ("srtpdec_%u", session);
2621     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2622     g_free (name);
2623
2624     g_signal_connect (priv->srtpdec, "request-key",
2625         (GCallback) request_key, stream);
2626
2627     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_RTCP_DECODER],
2628         0, priv->srtpdec);
2629
2630   }
2631   return gst_object_ref (priv->srtpdec);
2632 }
2633
2634 /**
2635  * gst_rtsp_stream_request_aux_sender:
2636  * @stream: a #GstRTSPStream
2637  * @sessid: the session id
2638  *
2639  * Creating a rtxsend bin
2640  *
2641  * Returns: (transfer full) (nullable): a #GstElement.
2642  *
2643  * Since: 1.6
2644  */
2645 GstElement *
2646 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2647 {
2648   GstElement *bin;
2649   GstPad *pad;
2650   GstStructure *pt_map;
2651   gchar *name;
2652   guint pt, rtx_pt;
2653   gchar *pt_s;
2654
2655   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2656
2657   pt = gst_rtsp_stream_get_pt (stream);
2658   pt_s = g_strdup_printf ("%u", pt);
2659   rtx_pt = stream->priv->rtx_pt;
2660
2661   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2662
2663   bin = gst_bin_new (NULL);
2664   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2665   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2666       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2667   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2668       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2669   g_free (pt_s);
2670   gst_structure_free (pt_map);
2671   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2672
2673   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2674   name = g_strdup_printf ("src_%u", sessid);
2675   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2676   g_free (name);
2677   gst_object_unref (pad);
2678
2679   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2680   name = g_strdup_printf ("sink_%u", sessid);
2681   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2682   g_free (name);
2683   gst_object_unref (pad);
2684
2685   return bin;
2686 }
2687
2688 static void
2689 add_rtx_pt (gpointer key, GstCaps * caps, GstStructure * pt_map)
2690 {
2691   guint pt = GPOINTER_TO_INT (key);
2692   const GstStructure *s = gst_caps_get_structure (caps, 0);
2693   const gchar *apt;
2694
2695   if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "RTX") &&
2696       (apt = gst_structure_get_string (s, "apt"))) {
2697     gst_structure_set (pt_map, apt, G_TYPE_UINT, pt, NULL);
2698   }
2699 }
2700
2701 /* Call with priv->lock taken */
2702 static void
2703 update_rtx_receive_pt_map (GstRTSPStream * stream)
2704 {
2705   GstStructure *pt_map;
2706
2707   if (!stream->priv->rtxreceive)
2708     goto done;
2709
2710   pt_map = gst_structure_new_empty ("application/x-rtp-pt-map");
2711   g_hash_table_foreach (stream->priv->ptmap, (GHFunc) add_rtx_pt, pt_map);
2712   g_object_set (stream->priv->rtxreceive, "payload-type-map", pt_map, NULL);
2713   gst_structure_free (pt_map);
2714
2715 done:
2716   return;
2717 }
2718
2719 static void
2720 retrieve_ulpfec_pt (gpointer key, GstCaps * caps, GstElement * ulpfec_decoder)
2721 {
2722   guint pt = GPOINTER_TO_INT (key);
2723   const GstStructure *s = gst_caps_get_structure (caps, 0);
2724
2725   if (!g_strcmp0 (gst_structure_get_string (s, "encoding-name"), "ULPFEC"))
2726     g_object_set (ulpfec_decoder, "pt", pt, NULL);
2727 }
2728
2729 static void
2730 update_ulpfec_decoder_pt (GstRTSPStream * stream)
2731 {
2732   if (!stream->priv->ulpfec_decoder)
2733     goto done;
2734
2735   g_hash_table_foreach (stream->priv->ptmap, (GHFunc) retrieve_ulpfec_pt,
2736       stream->priv->ulpfec_decoder);
2737
2738 done:
2739   return;
2740 }
2741
2742 /**
2743  * gst_rtsp_stream_request_aux_receiver:
2744  * @stream: a #GstRTSPStream
2745  * @sessid: the session id
2746  *
2747  * Creating a rtxreceive bin
2748  *
2749  * Returns: (transfer full) (nullable): a #GstElement.
2750  *
2751  * Since: 1.16
2752  */
2753 GstElement *
2754 gst_rtsp_stream_request_aux_receiver (GstRTSPStream * stream, guint sessid)
2755 {
2756   GstElement *bin;
2757   GstPad *pad;
2758   gchar *name;
2759
2760   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2761
2762   bin = gst_bin_new (NULL);
2763   stream->priv->rtxreceive = gst_element_factory_make ("rtprtxreceive", NULL);
2764   update_rtx_receive_pt_map (stream);
2765   update_ulpfec_decoder_pt (stream);
2766   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxreceive));
2767
2768   pad = gst_element_get_static_pad (stream->priv->rtxreceive, "src");
2769   name = g_strdup_printf ("src_%u", sessid);
2770   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2771   g_free (name);
2772   gst_object_unref (pad);
2773
2774   pad = gst_element_get_static_pad (stream->priv->rtxreceive, "sink");
2775   name = g_strdup_printf ("sink_%u", sessid);
2776   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2777   g_free (name);
2778   gst_object_unref (pad);
2779
2780   return bin;
2781 }
2782
2783 /**
2784  * gst_rtsp_stream_set_pt_map:
2785  * @stream: a #GstRTSPStream
2786  * @pt: the pt
2787  * @caps: a #GstCaps
2788  *
2789  * Configure a pt map between @pt and @caps.
2790  */
2791 void
2792 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2793 {
2794   GstRTSPStreamPrivate *priv = stream->priv;
2795
2796   if (!GST_IS_CAPS (caps))
2797     return;
2798
2799   g_mutex_lock (&priv->lock);
2800   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2801   update_rtx_receive_pt_map (stream);
2802   g_mutex_unlock (&priv->lock);
2803 }
2804
2805 /**
2806  * gst_rtsp_stream_set_publish_clock_mode:
2807  * @stream: a #GstRTSPStream
2808  * @mode: the clock publish mode
2809  *
2810  * Sets if and how the stream clock should be published according to RFC7273.
2811  *
2812  * Since: 1.8
2813  */
2814 void
2815 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2816     GstRTSPPublishClockMode mode)
2817 {
2818   GstRTSPStreamPrivate *priv;
2819
2820   priv = stream->priv;
2821   g_mutex_lock (&priv->lock);
2822   priv->publish_clock_mode = mode;
2823   g_mutex_unlock (&priv->lock);
2824 }
2825
2826 /**
2827  * gst_rtsp_stream_get_publish_clock_mode:
2828  * @stream: a #GstRTSPStream
2829  *
2830  * Gets if and how the stream clock should be published according to RFC7273.
2831  *
2832  * Returns: The GstRTSPPublishClockMode
2833  *
2834  * Since: 1.8
2835  */
2836 GstRTSPPublishClockMode
2837 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2838 {
2839   GstRTSPStreamPrivate *priv;
2840   GstRTSPPublishClockMode ret;
2841
2842   priv = stream->priv;
2843   g_mutex_lock (&priv->lock);
2844   ret = priv->publish_clock_mode;
2845   g_mutex_unlock (&priv->lock);
2846
2847   return ret;
2848 }
2849
2850 static GstCaps *
2851 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2852     GstRTSPStream * stream)
2853 {
2854   GstRTSPStreamPrivate *priv = stream->priv;
2855   GstCaps *caps = NULL;
2856
2857   g_mutex_lock (&priv->lock);
2858
2859   if (priv->idx == session) {
2860     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2861     if (caps) {
2862       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2863       gst_caps_ref (caps);
2864     } else {
2865       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2866     }
2867   }
2868
2869   g_mutex_unlock (&priv->lock);
2870
2871   return caps;
2872 }
2873
2874 static void
2875 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2876 {
2877   GstRTSPStreamPrivate *priv = stream->priv;
2878   gchar *name;
2879   GstPadLinkReturn ret;
2880   guint sessid;
2881
2882   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2883       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2884
2885   name = gst_pad_get_name (pad);
2886   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2887     g_free (name);
2888     return;
2889   }
2890   g_free (name);
2891
2892   if (priv->idx != sessid)
2893     return;
2894
2895   if (gst_pad_is_linked (priv->sinkpad)) {
2896     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2897         GST_DEBUG_PAD_NAME (priv->sinkpad));
2898     return;
2899   }
2900
2901   /* link the RTP pad to the session manager, it should not really fail unless
2902    * this is not really an RTP pad */
2903   ret = gst_pad_link (pad, priv->sinkpad);
2904   if (ret != GST_PAD_LINK_OK)
2905     goto link_failed;
2906   priv->recv_rtp_src = gst_object_ref (pad);
2907
2908   return;
2909
2910 /* ERRORS */
2911 link_failed:
2912   {
2913     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2914         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2915   }
2916 }
2917
2918 static void
2919 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2920     GstRTSPStream * stream)
2921 {
2922   /* TODO: What to do here other than this? */
2923   GST_DEBUG ("Stream %p: Got EOS", stream);
2924   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2925 }
2926
2927 typedef struct _ProbeData ProbeData;
2928
2929 struct _ProbeData
2930 {
2931   GstRTSPStream *stream;
2932   /* existing sink, already linked to tee */
2933   GstElement *sink1;
2934   /* new sink, about to be linked */
2935   GstElement *sink2;
2936   /* new queue element, that will be linked to tee and sink1 */
2937   GstElement **queue1;
2938   /* new queue element, that will be linked to tee and sink2 */
2939   GstElement **queue2;
2940   GstPad *sink_pad;
2941   GstPad *tee_pad;
2942   guint index;
2943 };
2944
2945 static void
2946 free_cb_data (gpointer user_data)
2947 {
2948   ProbeData *data = user_data;
2949
2950   gst_object_unref (data->stream);
2951   gst_object_unref (data->sink1);
2952   gst_object_unref (data->sink2);
2953   gst_object_unref (data->sink_pad);
2954   gst_object_unref (data->tee_pad);
2955   g_free (data);
2956 }
2957
2958
2959 static void
2960 create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream,
2961     GstElement * tee, GstElement * sink, GstElement ** queue)
2962 {
2963   GstRTSPStreamPrivate *priv = stream->priv;
2964   GstPad *tee_pad;
2965   GstPad *queue_pad;
2966   GstPad *sink_pad;
2967
2968   /* create queue for the new stream */
2969   *queue = gst_element_factory_make ("queue", NULL);
2970   g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
2971       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2972   gst_bin_add (priv->joined_bin, *queue);
2973
2974   /* link tee to queue */
2975   tee_pad = gst_element_get_request_pad (tee, "src_%u");
2976   queue_pad = gst_element_get_static_pad (*queue, "sink");
2977   gst_pad_link (tee_pad, queue_pad);
2978   gst_object_unref (queue_pad);
2979   gst_object_unref (tee_pad);
2980
2981   /* link queue to sink */
2982   queue_pad = gst_element_get_static_pad (*queue, "src");
2983   sink_pad = gst_element_get_static_pad (sink, "sink");
2984   gst_pad_link (queue_pad, sink_pad);
2985   gst_object_unref (queue_pad);
2986   gst_object_unref (sink_pad);
2987
2988   gst_element_sync_state_with_parent (sink);
2989   gst_element_sync_state_with_parent (*queue);
2990 }
2991
2992 static GstPadProbeReturn
2993 create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
2994     GstPadProbeInfo * info, gpointer user_data)
2995 {
2996   GstRTSPStreamPrivate *priv;
2997   ProbeData *data = user_data;
2998   GstRTSPStream *stream;
2999   GstElement **queue1;
3000   GstElement **queue2;
3001   GstPad *sink_pad;
3002   GstPad *tee_pad;
3003   GstPad *queue_pad;
3004   guint index;
3005
3006   stream = data->stream;
3007   priv = stream->priv;
3008   queue1 = data->queue1;
3009   queue2 = data->queue2;
3010   sink_pad = data->sink_pad;
3011   tee_pad = data->tee_pad;
3012   index = data->index;
3013
3014   /* unlink tee and the existing sink:
3015    *   .-----.    .---------.
3016    *   | tee |    |  sink1  |
3017    * sink   src->sink       |
3018    *   '-----'    '---------'
3019    */
3020   g_assert (gst_pad_unlink (tee_pad, sink_pad));
3021
3022   /* add queue to the already existing stream */
3023   *queue1 = gst_element_factory_make ("queue", NULL);
3024   g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
3025       "max-size-time", G_GINT64_CONSTANT (0), NULL);
3026   gst_bin_add (priv->joined_bin, *queue1);
3027
3028   /* link tee, queue and sink:
3029    *   .-----.    .---------.    .---------.
3030    *   | tee |    |  queue1 |    | sink1   |
3031    * sink   src->sink      src->sink       |
3032    *   '-----'    '---------'    '---------'
3033    */
3034   queue_pad = gst_element_get_static_pad (*queue1, "sink");
3035   gst_pad_link (tee_pad, queue_pad);
3036   gst_object_unref (queue_pad);
3037   queue_pad = gst_element_get_static_pad (*queue1, "src");
3038   gst_pad_link (queue_pad, sink_pad);
3039   gst_object_unref (queue_pad);
3040
3041   gst_element_sync_state_with_parent (*queue1);
3042
3043   /* create queue and link it to tee and the new sink */
3044   create_and_plug_queue_to_unlinked_stream (stream,
3045       priv->tee[index], data->sink2, queue2);
3046
3047   /* the final stream:
3048    *
3049    *    .-----.    .---------.    .---------.
3050    *    | tee |    |  queue1 |    | sink1   |
3051    *  sink   src->sink      src->sink       |
3052    *    |     |    '---------'    '---------'
3053    *    |     |    .---------.    .---------.
3054    *    |     |    |  queue2 |    | sink2   |
3055    *    |    src->sink      src->sink       |
3056    *    '-----'    '---------'    '---------'
3057    */
3058
3059   return GST_PAD_PROBE_REMOVE;
3060 }
3061
3062 static void
3063 create_and_plug_queue_to_linked_stream (GstRTSPStream * stream,
3064     GstElement * sink1, GstElement * sink2, guint index, GstElement ** queue1,
3065     GstElement ** queue2)
3066 {
3067   ProbeData *data;
3068
3069   data = g_new0 (ProbeData, 1);
3070   data->stream = gst_object_ref (stream);
3071   data->sink1 = gst_object_ref (sink1);
3072   data->sink2 = gst_object_ref (sink2);
3073   data->queue1 = queue1;
3074   data->queue2 = queue2;
3075   data->index = index;
3076
3077   data->sink_pad = gst_element_get_static_pad (sink1, "sink");
3078   g_assert (data->sink_pad);
3079   data->tee_pad = gst_pad_get_peer (data->sink_pad);
3080   g_assert (data->tee_pad);
3081
3082   gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
3083       create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
3084 }
3085
3086 static void
3087 plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
3088     GstElement ** queue_to_plug, guint index, gboolean is_mcast)
3089 {
3090   GstRTSPStreamPrivate *priv = stream->priv;
3091   GstElement *existing_sink;
3092
3093   if (is_mcast)
3094     existing_sink = priv->udpsink[index];
3095   else
3096     existing_sink = priv->mcast_udpsink[index];
3097
3098   GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
3099
3100   /* add sink to the bin */
3101   gst_bin_add (priv->joined_bin, sink_to_plug);
3102
3103   if (priv->appsink[index] && existing_sink) {
3104
3105     /* queues are already added for the existing stream, add one for
3106        the newly added udp stream */
3107     create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
3108         sink_to_plug, queue_to_plug);
3109
3110   } else if (priv->appsink[index] || existing_sink) {
3111     GstElement **queue;
3112     GstElement *element;
3113
3114     /* add queue to the already existing stream plus the newly created udp
3115        stream */
3116     if (priv->appsink[index]) {
3117       element = priv->appsink[index];
3118       queue = &priv->appqueue[index];
3119     } else {
3120       element = existing_sink;
3121       if (is_mcast)
3122         queue = &priv->udpqueue[index];
3123       else
3124         queue = &priv->mcast_udpqueue[index];
3125     }
3126
3127     create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug,
3128         index, queue, queue_to_plug);
3129
3130   } else {
3131     GstPad *tee_pad;
3132     GstPad *sink_pad;
3133
3134     GST_DEBUG_OBJECT (stream, "creating first stream");
3135
3136     /* no need to add queues */
3137     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
3138     sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
3139     gst_pad_link (tee_pad, sink_pad);
3140     gst_object_unref (tee_pad);
3141     gst_object_unref (sink_pad);
3142   }
3143
3144   gst_element_sync_state_with_parent (sink_to_plug);
3145 }
3146
3147 static void
3148 plug_tcp_sink (GstRTSPStream * stream, guint index)
3149 {
3150   GstRTSPStreamPrivate *priv = stream->priv;
3151
3152   GST_DEBUG_OBJECT (stream, "plug tcp sink");
3153
3154   /* add sink to the bin */
3155   gst_bin_add (priv->joined_bin, priv->appsink[index]);
3156
3157   if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
3158
3159     /* queues are already added for the existing stream, add one for
3160        the newly added tcp stream */
3161     create_and_plug_queue_to_unlinked_stream (stream,
3162         priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
3163
3164   } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
3165     GstElement **queue;
3166     GstElement *element;
3167
3168     /* add queue to the already existing stream plus the newly created tcp
3169        stream */
3170     if (priv->mcast_udpsink[index]) {
3171       element = priv->mcast_udpsink[index];
3172       queue = &priv->mcast_udpqueue[index];
3173     } else {
3174       element = priv->udpsink[index];
3175       queue = &priv->udpqueue[index];
3176     }
3177
3178     create_and_plug_queue_to_linked_stream (stream, element,
3179         priv->appsink[index], index, queue, &priv->appqueue[index]);
3180
3181   } else {
3182     GstPad *tee_pad;
3183     GstPad *sink_pad;
3184
3185     /* no need to add queues */
3186     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
3187     sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
3188     gst_pad_link (tee_pad, sink_pad);
3189     gst_object_unref (tee_pad);
3190     gst_object_unref (sink_pad);
3191   }
3192
3193   gst_element_sync_state_with_parent (priv->appsink[index]);
3194 }
3195
3196 static void
3197 plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
3198     guint index)
3199 {
3200   GstRTSPStreamPrivate *priv;
3201   gboolean is_tcp, is_udp, is_mcast;
3202   priv = stream->priv;
3203
3204   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3205   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3206   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3207
3208   if (is_udp)
3209     plug_udp_sink (stream, priv->udpsink[index],
3210         &priv->udpqueue[index], index, FALSE);
3211
3212   else if (is_mcast)
3213     plug_udp_sink (stream, priv->mcast_udpsink[index],
3214         &priv->mcast_udpqueue[index], index, TRUE);
3215
3216   else if (is_tcp)
3217     plug_tcp_sink (stream, index);
3218 }
3219
3220 /* must be called with lock */
3221 static gboolean
3222 create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
3223 {
3224   GstRTSPStreamPrivate *priv;
3225   GstPad *pad;
3226   GstBin *bin;
3227   gboolean is_tcp, is_udp, is_mcast;
3228   gint mcast_ttl = 0;
3229   gint i;
3230
3231   GST_DEBUG_OBJECT (stream, "create sender part");
3232   priv = stream->priv;
3233   bin = priv->joined_bin;
3234
3235   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3236   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3237   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3238
3239   if (is_mcast)
3240     mcast_ttl = transport->ttl;
3241
3242   GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d (ttl: %d)", is_tcp,
3243       is_udp, is_mcast, mcast_ttl);
3244
3245   if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
3246     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
3247     return FALSE;
3248   }
3249
3250   if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
3251     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
3252     return FALSE;
3253   }
3254
3255   for (i = 0; i < 2; i++) {
3256     gboolean link_tee = FALSE;
3257     /* For the sender we create this bit of pipeline for both
3258      * RTP and RTCP.
3259      * Initially there will be only one active transport for
3260      * the stream, so the pipeline will look like this:
3261      *
3262      * .--------.      .-----.    .---------.
3263      * | rtpbin |      | tee |    |  sink   |
3264      * |       send->sink   src->sink       |
3265      * '--------'      '-----'    '---------'
3266      *
3267      * For each new transport, the already existing branch will
3268      * be reconfigured by adding a queue element:
3269      *
3270      * .--------.      .-----.    .---------.    .---------.
3271      * | rtpbin |      | tee |    |  queue  |    | udpsink |
3272      * |       send->sink   src->sink      src->sink       |
3273      * '--------'      |     |    '---------'    '---------'
3274      *                 |     |    .---------.    .---------.
3275      *                 |     |    |  queue  |    | udpsink |
3276      *                 |    src->sink      src->sink       |
3277      *                 |     |    '---------'    '---------'
3278      *                 |     |    .---------.    .---------.
3279      *                 |     |    |  queue  |    | appsink |
3280      *                 |    src->sink      src->sink       |
3281      *                 '-----'    '---------'    '---------'
3282      */
3283
3284     /* Only link the RTP send src if we're going to send RTP, link
3285      * the RTCP send src always */
3286     if (!priv->srcpad && i == 0)
3287       continue;
3288
3289     if (!priv->tee[i]) {
3290       /* make tee for RTP/RTCP */
3291       priv->tee[i] = gst_element_factory_make ("tee", NULL);
3292       gst_bin_add (bin, priv->tee[i]);
3293       link_tee = TRUE;
3294     }
3295
3296     if (is_udp && !priv->udpsink[i]) {
3297       /* we create only one pair of udpsinks for IPv4 and IPv6 */
3298       create_and_configure_udpsink (stream, &priv->udpsink[i],
3299           priv->socket_v4[i], priv->socket_v6[i], FALSE, (i == 0), mcast_ttl);
3300       plug_sink (stream, transport, i);
3301     } else if (is_mcast && !priv->mcast_udpsink[i]) {
3302       /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
3303       create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
3304           priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0),
3305           mcast_ttl);
3306       plug_sink (stream, transport, i);
3307     } else if (is_tcp && !priv->appsink[i]) {
3308       /* make appsink */
3309       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
3310       g_object_set (priv->appsink[i], "emit-signals", FALSE, "max-buffers", 1,
3311           NULL);
3312
3313       /* we need to set sync and preroll to FALSE for the sink to avoid
3314        * deadlock. This is only needed for sink sending RTCP data. */
3315       if (i == 1)
3316         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
3317
3318       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
3319           &sink_cb, stream, NULL);
3320       plug_sink (stream, transport, i);
3321     }
3322
3323     if (link_tee) {
3324       /* and link to rtpbin send pad */
3325       gst_element_sync_state_with_parent (priv->tee[i]);
3326       pad = gst_element_get_static_pad (priv->tee[i], "sink");
3327       gst_pad_link (priv->send_src[i], pad);
3328       gst_object_unref (pad);
3329     }
3330   }
3331
3332   return TRUE;
3333 }
3334
3335 /* must be called with lock */
3336 static void
3337 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
3338     GstElement * funnel)
3339 {
3340   GstRTSPStreamPrivate *priv;
3341   GstPad *pad, *selpad;
3342   gulong id = 0;
3343
3344   priv = stream->priv;
3345
3346   pad = gst_element_get_static_pad (src, "src");
3347   if (priv->srcpad) {
3348     /* block pad so src can't push data while it's not yet linked */
3349     id = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK |
3350         GST_PAD_PROBE_TYPE_BUFFER, NULL, NULL, NULL);
3351     /* we set and keep these to playing so that they don't cause NO_PREROLL return
3352      * values. This is only relevant for PLAY pipelines */
3353     gst_element_set_state (src, GST_STATE_PLAYING);
3354     gst_element_set_locked_state (src, TRUE);
3355   }
3356
3357   /* add src */
3358   gst_bin_add (bin, src);
3359
3360   /* and link to the funnel */
3361   selpad = gst_element_get_request_pad (funnel, "sink_%u");
3362   gst_pad_link (pad, selpad);
3363   if (id != 0)
3364     gst_pad_remove_probe (pad, id);
3365   gst_object_unref (pad);
3366   gst_object_unref (selpad);
3367 }
3368
3369 /* must be called with lock */
3370 static gboolean
3371 create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
3372     transport)
3373 {
3374   GstRTSPStreamPrivate *priv;
3375   GstPad *pad;
3376   GstBin *bin;
3377   gboolean tcp;
3378   gboolean udp;
3379   gboolean mcast;
3380   gint i;
3381
3382   GST_DEBUG_OBJECT (stream, "create receiver part");
3383   priv = stream->priv;
3384   bin = priv->joined_bin;
3385
3386   tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
3387   udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
3388   mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
3389
3390   for (i = 0; i < 2; i++) {
3391     /* For the receiver we create this bit of pipeline for both
3392      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
3393      * and it is all funneled into the rtpbin receive pad.
3394      *
3395      *
3396      * .--------.     .--------.    .--------.
3397      * | udpsrc |     | funnel |    | rtpbin |
3398      * | RTP    src->sink      src->sink     |
3399      * '--------'     |        |    |        |
3400      * .--------.     |        |    |        |
3401      * | appsrc |     |        |    |        |
3402      * | RTP    src->sink      |    |        |
3403      * '--------'     '--------'    |        |
3404      *                              |        |
3405      * .--------.     .--------.    |        |
3406      * | udpsrc |     | funnel |    |        |
3407      * | RTCP   src->sink      src->sink     |
3408      * '--------'     |        |    '--------'
3409      * .--------.     |        |
3410      * | appsrc |     |        |
3411      * | RTCP   src->sink      |
3412      * '--------'     '--------'
3413      */
3414
3415     if (!priv->sinkpad && i == 0) {
3416       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
3417        * RTCP sink always */
3418       continue;
3419     }
3420
3421     /* make funnel for the RTP/RTCP receivers */
3422     if (!priv->funnel[i]) {
3423       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
3424       gst_bin_add (bin, priv->funnel[i]);
3425
3426       pad = gst_element_get_static_pad (priv->funnel[i], "src");
3427       gst_pad_link (pad, priv->recv_sink[i]);
3428       gst_object_unref (pad);
3429     }
3430
3431     if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
3432       GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
3433       if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
3434               priv->socket_v4[i]))
3435         goto udpsrc_error;
3436
3437       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
3438     }
3439
3440     if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
3441       GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
3442       if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
3443               priv->socket_v6[i]))
3444         goto udpsrc_error;
3445
3446       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
3447     }
3448
3449     if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
3450       GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
3451       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
3452               priv->mcast_socket_v4[i]))
3453         goto mcast_udpsrc_error;
3454       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
3455     }
3456
3457     if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
3458       GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
3459       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
3460               priv->mcast_socket_v6[i]))
3461         goto mcast_udpsrc_error;
3462       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
3463     }
3464
3465     if (tcp && !priv->appsrc[i]) {
3466       /* make and add appsrc */
3467       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
3468       priv->appsrc_base_time[i] = -1;
3469       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
3470           TRUE, NULL);
3471       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
3472     }
3473
3474     gst_element_sync_state_with_parent (priv->funnel[i]);
3475   }
3476
3477   return TRUE;
3478
3479 mcast_udpsrc_error:
3480 udpsrc_error:
3481   return FALSE;
3482 }
3483
3484 static gboolean
3485 check_mcast_client_addr (GstRTSPStream * stream, const GstRTSPTransport * tr)
3486 {
3487   GstRTSPStreamPrivate *priv = stream->priv;
3488   GList *walk;
3489
3490   if (priv->mcast_clients == NULL)
3491     goto no_addr;
3492
3493   if (tr == NULL)
3494     goto no_transport;
3495
3496   if (tr->destination == NULL)
3497     goto no_destination;
3498
3499   for (walk = priv->mcast_clients; walk; walk = g_list_next (walk)) {
3500     UdpClientAddrInfo *cli = walk->data;
3501
3502     if ((g_strcmp0 (cli->address, tr->destination) == 0) &&
3503         (cli->rtp_port == tr->port.min))
3504       return TRUE;
3505   }
3506
3507   return FALSE;
3508
3509 no_addr:
3510   {
3511     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
3512         "has been reserved");
3513     return FALSE;
3514   }
3515 no_transport:
3516   {
3517     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no transport "
3518         "has been provided");
3519     return FALSE;
3520   }
3521 no_destination:
3522   {
3523     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
3524         "the reserved address");
3525     return FALSE;
3526   }
3527 }
3528
3529 /**
3530  * gst_rtsp_stream_join_bin:
3531  * @stream: a #GstRTSPStream
3532  * @bin: (transfer none): a #GstBin to join
3533  * @rtpbin: (transfer none): a rtpbin element in @bin
3534  * @state: the target state of the new elements
3535  *
3536  * Join the #GstBin @bin that contains the element @rtpbin.
3537  *
3538  * @stream will link to @rtpbin, which must be inside @bin. The elements
3539  * added to @bin will be set to the state given in @state.
3540  *
3541  * Returns: %TRUE on success.
3542  */
3543 gboolean
3544 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
3545     GstElement * rtpbin, GstState state)
3546 {
3547   GstRTSPStreamPrivate *priv;
3548   guint idx;
3549   gchar *name;
3550   GstPadLinkReturn ret;
3551
3552   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3553   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3554   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3555
3556   priv = stream->priv;
3557
3558   g_mutex_lock (&priv->lock);
3559   if (priv->joined_bin != NULL)
3560     goto was_joined;
3561
3562   /* create a session with the same index as the stream */
3563   idx = priv->idx;
3564
3565   GST_INFO ("stream %p joining bin as session %u", stream, idx);
3566
3567   if (priv->profiles & GST_RTSP_PROFILE_SAVP
3568       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
3569     /* For SRTP */
3570     g_signal_connect (rtpbin, "request-rtp-encoder",
3571         (GCallback) request_rtp_encoder, stream);
3572     g_signal_connect (rtpbin, "request-rtcp-encoder",
3573         (GCallback) request_rtcp_encoder, stream);
3574     g_signal_connect (rtpbin, "request-rtp-decoder",
3575         (GCallback) request_rtp_rtcp_decoder, stream);
3576     g_signal_connect (rtpbin, "request-rtcp-decoder",
3577         (GCallback) request_rtp_rtcp_decoder, stream);
3578   }
3579
3580   if (priv->sinkpad) {
3581     g_signal_connect (rtpbin, "request-pt-map",
3582         (GCallback) request_pt_map, stream);
3583   }
3584
3585   /* get pads from the RTP session element for sending and receiving
3586    * RTP/RTCP*/
3587   if (priv->srcpad) {
3588     /* get a pad for sending RTP */
3589     name = g_strdup_printf ("send_rtp_sink_%u", idx);
3590     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
3591     g_free (name);
3592
3593     /* link the RTP pad to the session manager, it should not really fail unless
3594      * this is not really an RTP pad */
3595     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
3596     if (ret != GST_PAD_LINK_OK)
3597       goto link_failed;
3598
3599     name = g_strdup_printf ("send_rtp_src_%u", idx);
3600     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
3601     g_free (name);
3602   } else {
3603     /* RECORD case: need to connect our sinkpad from here */
3604     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
3605     /* EOS */
3606     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
3607
3608     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
3609     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
3610     g_free (name);
3611   }
3612
3613   name = g_strdup_printf ("send_rtcp_src_%u", idx);
3614   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
3615   g_free (name);
3616   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
3617   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
3618   g_free (name);
3619
3620   /* get the session */
3621   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
3622
3623   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
3624       stream);
3625   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
3626       stream);
3627   g_signal_connect (priv->session, "on-ssrc-active",
3628       (GCallback) on_ssrc_active, stream);
3629   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
3630       stream);
3631   g_signal_connect (priv->session, "on-bye-timeout",
3632       (GCallback) on_bye_timeout, stream);
3633   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
3634       stream);
3635
3636   /* signal for sender ssrc */
3637   g_signal_connect (priv->session, "on-new-sender-ssrc",
3638       (GCallback) on_new_sender_ssrc, stream);
3639   g_signal_connect (priv->session, "on-sender-ssrc-active",
3640       (GCallback) on_sender_ssrc_active, stream);
3641
3642   if (priv->srcpad) {
3643     /* be notified of caps changes */
3644     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
3645         (GCallback) caps_notify, stream);
3646     priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
3647   }
3648
3649   priv->joined_bin = bin;
3650   GST_DEBUG_OBJECT (stream, "successfully joined bin");
3651   g_mutex_unlock (&priv->lock);
3652
3653   return TRUE;
3654
3655   /* ERRORS */
3656 was_joined:
3657   {
3658     g_mutex_unlock (&priv->lock);
3659     return TRUE;
3660   }
3661 link_failed:
3662   {
3663     GST_WARNING ("failed to link stream %u", idx);
3664     gst_object_unref (priv->send_rtp_sink);
3665     priv->send_rtp_sink = NULL;
3666     g_mutex_unlock (&priv->lock);
3667     return FALSE;
3668   }
3669 }
3670
3671 static void
3672 clear_element (GstBin * bin, GstElement ** elementptr)
3673 {
3674   if (*elementptr) {
3675     gst_element_set_locked_state (*elementptr, FALSE);
3676     gst_element_set_state (*elementptr, GST_STATE_NULL);
3677     if (GST_ELEMENT_PARENT (*elementptr))
3678       gst_bin_remove (bin, *elementptr);
3679     else
3680       gst_object_unref (*elementptr);
3681     *elementptr = NULL;
3682   }
3683 }
3684
3685 /**
3686  * gst_rtsp_stream_leave_bin:
3687  * @stream: a #GstRTSPStream
3688  * @bin: (transfer none): a #GstBin
3689  * @rtpbin: (transfer none): a rtpbin #GstElement
3690  *
3691  * Remove the elements of @stream from @bin.
3692  *
3693  * Return: %TRUE on success.
3694  */
3695 gboolean
3696 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
3697     GstElement * rtpbin)
3698 {
3699   GstRTSPStreamPrivate *priv;
3700   gint i;
3701
3702   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3703   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3704   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3705
3706   priv = stream->priv;
3707
3708   g_mutex_lock (&priv->lock);
3709   if (priv->joined_bin == NULL)
3710     goto was_not_joined;
3711   if (priv->joined_bin != bin)
3712     goto wrong_bin;
3713
3714   priv->joined_bin = NULL;
3715
3716   /* all transports must be removed by now */
3717   if (priv->transports != NULL)
3718     goto transports_not_removed;
3719
3720   clear_tr_cache (priv, TRUE);
3721   clear_tr_cache (priv, FALSE);
3722
3723   GST_INFO ("stream %p leaving bin", stream);
3724
3725   if (priv->srcpad) {
3726     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
3727
3728     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
3729     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
3730     gst_object_unref (priv->send_rtp_sink);
3731     priv->send_rtp_sink = NULL;
3732   } else if (priv->recv_rtp_src) {
3733     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
3734     gst_object_unref (priv->recv_rtp_src);
3735     priv->recv_rtp_src = NULL;
3736   }
3737
3738   for (i = 0; i < 2; i++) {
3739     clear_element (bin, &priv->udpsrc_v4[i]);
3740     clear_element (bin, &priv->udpsrc_v6[i]);
3741     clear_element (bin, &priv->udpqueue[i]);
3742     clear_element (bin, &priv->udpsink[i]);
3743
3744     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
3745     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
3746     clear_element (bin, &priv->mcast_udpqueue[i]);
3747     clear_element (bin, &priv->mcast_udpsink[i]);
3748
3749     clear_element (bin, &priv->appsrc[i]);
3750     clear_element (bin, &priv->appqueue[i]);
3751     clear_element (bin, &priv->appsink[i]);
3752
3753     clear_element (bin, &priv->tee[i]);
3754     clear_element (bin, &priv->funnel[i]);
3755
3756     if (priv->sinkpad || i == 1) {
3757       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3758       gst_object_unref (priv->recv_sink[i]);
3759       priv->recv_sink[i] = NULL;
3760     }
3761   }
3762
3763   if (priv->srcpad) {
3764     gst_object_unref (priv->send_src[0]);
3765     priv->send_src[0] = NULL;
3766   }
3767
3768   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3769   gst_object_unref (priv->send_src[1]);
3770   priv->send_src[1] = NULL;
3771
3772   g_object_unref (priv->session);
3773   priv->session = NULL;
3774   if (priv->caps)
3775     gst_caps_unref (priv->caps);
3776   priv->caps = NULL;
3777
3778   if (priv->srtpenc)
3779     gst_object_unref (priv->srtpenc);
3780   if (priv->srtpdec)
3781     gst_object_unref (priv->srtpdec);
3782
3783   if (priv->mcast_addr_v4)
3784     gst_rtsp_address_free (priv->mcast_addr_v4);
3785   priv->mcast_addr_v4 = NULL;
3786   if (priv->mcast_addr_v6)
3787     gst_rtsp_address_free (priv->mcast_addr_v6);
3788   priv->mcast_addr_v6 = NULL;
3789   if (priv->server_addr_v4)
3790     gst_rtsp_address_free (priv->server_addr_v4);
3791   priv->server_addr_v4 = NULL;
3792   if (priv->server_addr_v6)
3793     gst_rtsp_address_free (priv->server_addr_v6);
3794   priv->server_addr_v6 = NULL;
3795
3796   g_mutex_unlock (&priv->lock);
3797
3798   return TRUE;
3799
3800 was_not_joined:
3801   {
3802     g_mutex_unlock (&priv->lock);
3803     return TRUE;
3804   }
3805 transports_not_removed:
3806   {
3807     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3808     g_mutex_unlock (&priv->lock);
3809     return FALSE;
3810   }
3811 wrong_bin:
3812   {
3813     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3814     g_mutex_unlock (&priv->lock);
3815     return FALSE;
3816   }
3817 }
3818
3819 /**
3820  * gst_rtsp_stream_get_joined_bin:
3821  * @stream: a #GstRTSPStream
3822  *
3823  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3824  *
3825  * Return: (transfer full) (nullable): the joined bin or NULL.
3826  */
3827 GstBin *
3828 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3829 {
3830   GstRTSPStreamPrivate *priv;
3831   GstBin *bin = NULL;
3832
3833   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3834
3835   priv = stream->priv;
3836
3837   g_mutex_lock (&priv->lock);
3838   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3839   g_mutex_unlock (&priv->lock);
3840
3841   return bin;
3842 }
3843
3844 /**
3845  * gst_rtsp_stream_get_rtpinfo:
3846  * @stream: a #GstRTSPStream
3847  * @rtptime: (allow-none) (out caller-allocates): result RTP timestamp
3848  * @seq: (allow-none) (out caller-allocates): result RTP seqnum
3849  * @clock_rate: (allow-none) (out caller-allocates): the clock rate
3850  * @running_time: (out caller-allocates): result running-time
3851  *
3852  * Retrieve the current rtptime, seq and running-time. This is used to
3853  * construct a RTPInfo reply header.
3854  *
3855  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3856  */
3857 gboolean
3858 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3859     guint * rtptime, guint * seq, guint * clock_rate,
3860     GstClockTime * running_time)
3861 {
3862   GstRTSPStreamPrivate *priv;
3863   GstStructure *stats;
3864   GObjectClass *payobjclass;
3865
3866   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3867
3868   priv = stream->priv;
3869
3870   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3871
3872   g_mutex_lock (&priv->lock);
3873
3874   /* First try to extract the information from the last buffer on the sinks.
3875    * This will have a more accurate sequence number and timestamp, as between
3876    * the payloader and the sink there can be some queues
3877    */
3878   if (priv->udpsink[0] || priv->appsink[0]) {
3879     GstSample *last_sample;
3880
3881     if (priv->udpsink[0])
3882       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3883     else
3884       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3885
3886     if (last_sample) {
3887       GstCaps *caps;
3888       GstBuffer *buffer;
3889       GstSegment *segment;
3890       GstStructure *s;
3891       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3892
3893       caps = gst_sample_get_caps (last_sample);
3894       buffer = gst_sample_get_buffer (last_sample);
3895       segment = gst_sample_get_segment (last_sample);
3896       s = gst_caps_get_structure (caps, 0);
3897
3898       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3899         guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer);
3900         guint ssrc_stream = 0;
3901         if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
3902             gst_structure_get_uint (s, "ssrc", &ssrc_stream) &&
3903             ssrc_buf != ssrc_stream) {
3904           /* Skip buffers from auxiliary streams. */
3905           GST_DEBUG_OBJECT (stream,
3906               "not a buffer from the payloader, SSRC: %08x", ssrc_buf);
3907
3908           gst_rtp_buffer_unmap (&rtp_buffer);
3909           gst_sample_unref (last_sample);
3910           goto stats;
3911         }
3912
3913         if (seq) {
3914           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3915         }
3916
3917         if (rtptime) {
3918           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3919         }
3920
3921         gst_rtp_buffer_unmap (&rtp_buffer);
3922
3923         if (running_time) {
3924           *running_time =
3925               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3926               GST_BUFFER_TIMESTAMP (buffer));
3927         }
3928
3929         if (clock_rate) {
3930           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3931
3932           if (*clock_rate == 0 && running_time)
3933             *running_time = GST_CLOCK_TIME_NONE;
3934         }
3935         gst_sample_unref (last_sample);
3936
3937         goto done;
3938       } else {
3939         gst_sample_unref (last_sample);
3940       }
3941     }
3942   }
3943
3944 stats:
3945   if (g_object_class_find_property (payobjclass, "stats")) {
3946     g_object_get (priv->payloader, "stats", &stats, NULL);
3947     if (stats == NULL)
3948       goto no_stats;
3949
3950     if (seq)
3951       gst_structure_get_uint (stats, "seqnum", seq);
3952
3953     if (rtptime)
3954       gst_structure_get_uint (stats, "timestamp", rtptime);
3955
3956     if (running_time)
3957       gst_structure_get_clock_time (stats, "running-time", running_time);
3958
3959     if (clock_rate) {
3960       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3961       if (*clock_rate == 0 && running_time)
3962         *running_time = GST_CLOCK_TIME_NONE;
3963     }
3964     gst_structure_free (stats);
3965   } else {
3966     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3967         !g_object_class_find_property (payobjclass, "timestamp"))
3968       goto no_stats;
3969
3970     if (seq)
3971       g_object_get (priv->payloader, "seqnum", seq, NULL);
3972
3973     if (rtptime)
3974       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3975
3976     if (running_time)
3977       *running_time = GST_CLOCK_TIME_NONE;
3978   }
3979
3980 done:
3981   g_mutex_unlock (&priv->lock);
3982
3983   return TRUE;
3984
3985   /* ERRORS */
3986 no_stats:
3987   {
3988     GST_WARNING ("Could not get payloader stats");
3989     g_mutex_unlock (&priv->lock);
3990     return FALSE;
3991   }
3992 }
3993
3994 /**
3995  * gst_rtsp_stream_get_caps:
3996  * @stream: a #GstRTSPStream
3997  *
3998  * Retrieve the current caps of @stream.
3999  *
4000  * Returns: (transfer full) (nullable): the #GstCaps of @stream.
4001  * use gst_caps_unref() after usage.
4002  */
4003 GstCaps *
4004 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
4005 {
4006   GstRTSPStreamPrivate *priv;
4007   GstCaps *result;
4008
4009   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4010
4011   priv = stream->priv;
4012
4013   g_mutex_lock (&priv->lock);
4014   if ((result = priv->caps))
4015     gst_caps_ref (result);
4016   g_mutex_unlock (&priv->lock);
4017
4018   return result;
4019 }
4020
4021 /**
4022  * gst_rtsp_stream_recv_rtp:
4023  * @stream: a #GstRTSPStream
4024  * @buffer: (transfer full): a #GstBuffer
4025  *
4026  * Handle an RTP buffer for the stream. This method is usually called when a
4027  * message has been received from a client using the TCP transport.
4028  *
4029  * This function takes ownership of @buffer.
4030  *
4031  * Returns: a GstFlowReturn.
4032  */
4033 GstFlowReturn
4034 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
4035 {
4036   GstRTSPStreamPrivate *priv;
4037   GstFlowReturn ret;
4038   GstElement *element;
4039
4040   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
4041   priv = stream->priv;
4042   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
4043   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4044
4045   g_mutex_lock (&priv->lock);
4046   if (priv->appsrc[0])
4047     element = gst_object_ref (priv->appsrc[0]);
4048   else
4049     element = NULL;
4050   g_mutex_unlock (&priv->lock);
4051
4052   if (element) {
4053     if (priv->appsrc_base_time[0] == -1) {
4054       /* Take current running_time. This timestamp will be put on
4055        * the first buffer of each stream because we are a live source and so we
4056        * timestamp with the running_time. When we are dealing with TCP, we also
4057        * only timestamp the first buffer (using the DISCONT flag) because a server
4058        * typically bursts data, for which we don't want to compensate by speeding
4059        * up the media. The other timestamps will be interpollated from this one
4060        * using the RTP timestamps. */
4061       GST_OBJECT_LOCK (element);
4062       if (GST_ELEMENT_CLOCK (element)) {
4063         GstClockTime now;
4064         GstClockTime base_time;
4065
4066         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
4067         base_time = GST_ELEMENT_CAST (element)->base_time;
4068
4069         priv->appsrc_base_time[0] = now - base_time;
4070         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
4071         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
4072             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
4073             GST_TIME_ARGS (base_time));
4074       }
4075       GST_OBJECT_UNLOCK (element);
4076     }
4077
4078     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
4079     gst_object_unref (element);
4080   } else {
4081     ret = GST_FLOW_OK;
4082   }
4083   return ret;
4084 }
4085
4086 /**
4087  * gst_rtsp_stream_recv_rtcp:
4088  * @stream: a #GstRTSPStream
4089  * @buffer: (transfer full): a #GstBuffer
4090  *
4091  * Handle an RTCP buffer for the stream. This method is usually called when a
4092  * message has been received from a client using the TCP transport.
4093  *
4094  * This function takes ownership of @buffer.
4095  *
4096  * Returns: a GstFlowReturn.
4097  */
4098 GstFlowReturn
4099 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
4100 {
4101   GstRTSPStreamPrivate *priv;
4102   GstFlowReturn ret;
4103   GstElement *element;
4104
4105   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
4106   priv = stream->priv;
4107   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
4108
4109   if (priv->joined_bin == NULL) {
4110     gst_buffer_unref (buffer);
4111     return GST_FLOW_NOT_LINKED;
4112   }
4113   g_mutex_lock (&priv->lock);
4114   if (priv->appsrc[1])
4115     element = gst_object_ref (priv->appsrc[1]);
4116   else
4117     element = NULL;
4118   g_mutex_unlock (&priv->lock);
4119
4120   if (element) {
4121     if (priv->appsrc_base_time[1] == -1) {
4122       /* Take current running_time. This timestamp will be put on
4123        * the first buffer of each stream because we are a live source and so we
4124        * timestamp with the running_time. When we are dealing with TCP, we also
4125        * only timestamp the first buffer (using the DISCONT flag) because a server
4126        * typically bursts data, for which we don't want to compensate by speeding
4127        * up the media. The other timestamps will be interpollated from this one
4128        * using the RTP timestamps. */
4129       GST_OBJECT_LOCK (element);
4130       if (GST_ELEMENT_CLOCK (element)) {
4131         GstClockTime now;
4132         GstClockTime base_time;
4133
4134         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
4135         base_time = GST_ELEMENT_CAST (element)->base_time;
4136
4137         priv->appsrc_base_time[1] = now - base_time;
4138         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
4139         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
4140             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
4141             GST_TIME_ARGS (base_time));
4142       }
4143       GST_OBJECT_UNLOCK (element);
4144     }
4145
4146     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
4147     gst_object_unref (element);
4148   } else {
4149     ret = GST_FLOW_OK;
4150     gst_buffer_unref (buffer);
4151   }
4152   return ret;
4153 }
4154
4155 /* must be called with lock */
4156 static inline void
4157 add_client (GstElement * rtp_sink, GstElement * rtcp_sink, const gchar * host,
4158     gint rtp_port, gint rtcp_port)
4159 {
4160   if (rtp_sink != NULL)
4161     g_signal_emit_by_name (rtp_sink, "add", host, rtp_port, NULL);
4162   if (rtcp_sink != NULL)
4163     g_signal_emit_by_name (rtcp_sink, "add", host, rtcp_port, NULL);
4164 }
4165
4166 /* must be called with lock */
4167 static void
4168 remove_client (GstElement * rtp_sink, GstElement * rtcp_sink,
4169     const gchar * host, gint rtp_port, gint rtcp_port)
4170 {
4171   if (rtp_sink != NULL)
4172     g_signal_emit_by_name (rtp_sink, "remove", host, rtp_port, NULL);
4173   if (rtcp_sink != NULL)
4174     g_signal_emit_by_name (rtcp_sink, "remove", host, rtcp_port, NULL);
4175 }
4176
4177 /* must be called with lock */
4178 static gboolean
4179 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
4180     gboolean add)
4181 {
4182   GstRTSPStreamPrivate *priv = stream->priv;
4183   const GstRTSPTransport *tr;
4184   gchar *dest;
4185   gint min, max;
4186
4187   tr = gst_rtsp_stream_transport_get_transport (trans);
4188   dest = tr->destination;
4189
4190   switch (tr->lower_transport) {
4191     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
4192     {
4193       min = tr->port.min;
4194       max = tr->port.max;
4195
4196       if (add) {
4197         GST_INFO ("adding %s:%d-%d", dest, min, max);
4198         if (!check_mcast_client_addr (stream, tr))
4199           goto mcast_error;
4200         add_client (priv->mcast_udpsink[0], priv->mcast_udpsink[1], dest, min,
4201             max);
4202
4203         if (tr->ttl > 0) {
4204           GST_INFO ("setting ttl-mc %d", tr->ttl);
4205           if (priv->mcast_udpsink[0])
4206             g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "ttl-mc", tr->ttl,
4207                 NULL);
4208           if (priv->mcast_udpsink[1])
4209             g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "ttl-mc", tr->ttl,
4210                 NULL);
4211         }
4212         priv->transports = g_list_prepend (priv->transports, trans);
4213       } else {
4214         GST_INFO ("removing %s:%d-%d", dest, min, max);
4215         if (!remove_mcast_client_addr (stream, dest, min, max))
4216           GST_WARNING_OBJECT (stream,
4217               "Failed to remove multicast address: %s:%d-%d", dest, min, max);
4218         remove_client (priv->mcast_udpsink[0], priv->mcast_udpsink[1], dest,
4219             min, max);
4220         priv->transports = g_list_remove (priv->transports, trans);
4221       }
4222       break;
4223     }
4224     case GST_RTSP_LOWER_TRANS_UDP:
4225     {
4226       if (priv->client_side) {
4227         /* In client side mode the 'destination' is the RTSP server, so send
4228          * to those ports */
4229         min = tr->server_port.min;
4230         max = tr->server_port.max;
4231       } else {
4232         min = tr->client_port.min;
4233         max = tr->client_port.max;
4234       }
4235
4236       if (add) {
4237         GST_INFO ("adding %s:%d-%d", dest, min, max);
4238         add_client (priv->udpsink[0], priv->udpsink[1], dest, min, max);
4239         priv->transports = g_list_prepend (priv->transports, trans);
4240       } else {
4241         GST_INFO ("removing %s:%d-%d", dest, min, max);
4242         remove_client (priv->udpsink[0], priv->udpsink[1], dest, min, max);
4243         priv->transports = g_list_remove (priv->transports, trans);
4244       }
4245       priv->transports_cookie++;
4246       break;
4247     }
4248     case GST_RTSP_LOWER_TRANS_TCP:
4249       if (add) {
4250         GST_INFO ("adding TCP %s", tr->destination);
4251         priv->transports = g_list_prepend (priv->transports, trans);
4252         priv->n_tcp_transports++;
4253       } else {
4254         GST_INFO ("removing TCP %s", tr->destination);
4255         priv->transports = g_list_remove (priv->transports, trans);
4256         priv->n_tcp_transports--;
4257       }
4258       priv->transports_cookie++;
4259       break;
4260     default:
4261       goto unknown_transport;
4262   }
4263   return TRUE;
4264
4265   /* ERRORS */
4266 unknown_transport:
4267   {
4268     GST_INFO ("Unknown transport %d", tr->lower_transport);
4269     return FALSE;
4270   }
4271 mcast_error:
4272   {
4273     return FALSE;
4274   }
4275 }
4276
4277 static void
4278 on_message_sent (gpointer user_data)
4279 {
4280   GstRTSPStream *stream = user_data;
4281   GstRTSPStreamPrivate *priv = stream->priv;
4282   gint idx = -1;
4283
4284   GST_DEBUG_OBJECT (stream, "message send complete");
4285
4286   g_mutex_lock (&priv->lock);
4287
4288   g_assert (priv->n_outstanding >= 0);
4289
4290   if (priv->n_outstanding == 0)
4291     goto no_outstanding;
4292
4293   priv->n_outstanding--;
4294   if (priv->n_outstanding == 0) {
4295     gint i;
4296
4297     /* iterate from 1 and down, so we prioritize RTCP over RTP */
4298     for (i = 1; i >= 0; i--) {
4299       if (priv->have_buffer[i]) {
4300         /* send message */
4301         idx = i;
4302         break;
4303       }
4304     }
4305   }
4306
4307   if (idx != -1)
4308     send_tcp_message (stream, idx);
4309
4310   g_mutex_unlock (&priv->lock);
4311
4312   return;
4313
4314   /* ERRORS */
4315 no_outstanding:
4316   {
4317     GST_INFO ("no outstanding messages");
4318     g_mutex_unlock (&priv->lock);
4319     return;
4320   }
4321 }
4322
4323 /**
4324  * gst_rtsp_stream_add_transport:
4325  * @stream: a #GstRTSPStream
4326  * @trans: (transfer none): a #GstRTSPStreamTransport
4327  *
4328  * Add the transport in @trans to @stream. The media of @stream will
4329  * then also be send to the values configured in @trans.
4330  *
4331  * @stream must be joined to a bin.
4332  *
4333  * @trans must contain a valid #GstRTSPTransport.
4334  *
4335  * Returns: %TRUE if @trans was added
4336  */
4337 gboolean
4338 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
4339     GstRTSPStreamTransport * trans)
4340 {
4341   GstRTSPStreamPrivate *priv;
4342   gboolean res;
4343
4344   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4345   priv = stream->priv;
4346   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
4347   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4348
4349   g_mutex_lock (&priv->lock);
4350   res = update_transport (stream, trans, TRUE);
4351   if (res)
4352     gst_rtsp_stream_transport_set_message_sent (trans, on_message_sent, stream,
4353         NULL);
4354   g_mutex_unlock (&priv->lock);
4355
4356   return res;
4357 }
4358
4359 /**
4360  * gst_rtsp_stream_remove_transport:
4361  * @stream: a #GstRTSPStream
4362  * @trans: (transfer none): a #GstRTSPStreamTransport
4363  *
4364  * Remove the transport in @trans from @stream. The media of @stream will
4365  * not be sent to the values configured in @trans.
4366  *
4367  * @stream must be joined to a bin.
4368  *
4369  * @trans must contain a valid #GstRTSPTransport.
4370  *
4371  * Returns: %TRUE if @trans was removed
4372  */
4373 gboolean
4374 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
4375     GstRTSPStreamTransport * trans)
4376 {
4377   GstRTSPStreamPrivate *priv;
4378   gboolean res;
4379
4380   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4381   priv = stream->priv;
4382   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
4383   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
4384
4385   g_mutex_lock (&priv->lock);
4386   res = update_transport (stream, trans, FALSE);
4387   g_mutex_unlock (&priv->lock);
4388
4389   return res;
4390 }
4391
4392 /**
4393  * gst_rtsp_stream_update_crypto:
4394  * @stream: a #GstRTSPStream
4395  * @ssrc: the SSRC
4396  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
4397  *
4398  * Update the new crypto information for @ssrc in @stream. If information
4399  * for @ssrc did not exist, it will be added. If information
4400  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
4401  * be removed from @stream.
4402  *
4403  * Returns: %TRUE if @crypto could be updated
4404  */
4405 gboolean
4406 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
4407     guint ssrc, GstCaps * crypto)
4408 {
4409   GstRTSPStreamPrivate *priv;
4410
4411   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4412   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
4413
4414   priv = stream->priv;
4415
4416   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
4417
4418   g_mutex_lock (&priv->lock);
4419   if (crypto)
4420     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
4421         gst_caps_ref (crypto));
4422   else
4423     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
4424   g_mutex_unlock (&priv->lock);
4425
4426   return TRUE;
4427 }
4428
4429 /**
4430  * gst_rtsp_stream_get_rtp_socket:
4431  * @stream: a #GstRTSPStream
4432  * @family: the socket family
4433  *
4434  * Get the RTP socket from @stream for a @family.
4435  *
4436  * @stream must be joined to a bin.
4437  *
4438  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
4439  * socket could be allocated for @family. Unref after usage
4440  */
4441 GSocket *
4442 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
4443 {
4444   GstRTSPStreamPrivate *priv = stream->priv;
4445   GSocket *socket;
4446
4447   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4448   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4449       family == G_SOCKET_FAMILY_IPV6, NULL);
4450
4451   g_mutex_lock (&priv->lock);
4452   if (family == G_SOCKET_FAMILY_IPV6)
4453     socket = priv->socket_v6[0];
4454   else
4455     socket = priv->socket_v4[0];
4456
4457   if (socket != NULL)
4458     socket = g_object_ref (socket);
4459   g_mutex_unlock (&priv->lock);
4460
4461   return socket;
4462 }
4463
4464 /**
4465  * gst_rtsp_stream_get_rtcp_socket:
4466  * @stream: a #GstRTSPStream
4467  * @family: the socket family
4468  *
4469  * Get the RTCP socket from @stream for a @family.
4470  *
4471  * @stream must be joined to a bin.
4472  *
4473  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
4474  * socket could be allocated for @family. Unref after usage
4475  */
4476 GSocket *
4477 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
4478 {
4479   GstRTSPStreamPrivate *priv = stream->priv;
4480   GSocket *socket;
4481
4482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4483   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4484       family == G_SOCKET_FAMILY_IPV6, NULL);
4485
4486   g_mutex_lock (&priv->lock);
4487   if (family == G_SOCKET_FAMILY_IPV6)
4488     socket = priv->socket_v6[1];
4489   else
4490     socket = priv->socket_v4[1];
4491
4492   if (socket != NULL)
4493     socket = g_object_ref (socket);
4494   g_mutex_unlock (&priv->lock);
4495
4496   return socket;
4497 }
4498
4499 /**
4500  * gst_rtsp_stream_get_rtp_multicast_socket:
4501  * @stream: a #GstRTSPStream
4502  * @family: the socket family
4503  *
4504  * Get the multicast RTP socket from @stream for a @family.
4505  *
4506  * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no
4507  * socket could be allocated for @family. Unref after usage
4508  */
4509 GSocket *
4510 gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream,
4511     GSocketFamily family)
4512 {
4513   GstRTSPStreamPrivate *priv = stream->priv;
4514   GSocket *socket;
4515
4516   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4517   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4518       family == G_SOCKET_FAMILY_IPV6, NULL);
4519
4520   g_mutex_lock (&priv->lock);
4521   if (family == G_SOCKET_FAMILY_IPV6)
4522     socket = priv->mcast_socket_v6[0];
4523   else
4524     socket = priv->mcast_socket_v4[0];
4525
4526   if (socket != NULL)
4527     socket = g_object_ref (socket);
4528   g_mutex_unlock (&priv->lock);
4529
4530   return socket;
4531 }
4532
4533 /**
4534  * gst_rtsp_stream_get_rtcp_multicast_socket:
4535  * @stream: a #GstRTSPStream
4536  * @family: the socket family
4537  *
4538  * Get the multicast RTCP socket from @stream for a @family.
4539  *
4540  * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no
4541  * socket could be allocated for @family. Unref after usage
4542  */
4543 GSocket *
4544 gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream,
4545     GSocketFamily family)
4546 {
4547   GstRTSPStreamPrivate *priv = stream->priv;
4548   GSocket *socket;
4549
4550   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4551   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
4552       family == G_SOCKET_FAMILY_IPV6, NULL);
4553
4554   g_mutex_lock (&priv->lock);
4555   if (family == G_SOCKET_FAMILY_IPV6)
4556     socket = priv->mcast_socket_v6[1];
4557   else
4558     socket = priv->mcast_socket_v4[1];
4559
4560   if (socket != NULL)
4561     socket = g_object_ref (socket);
4562   g_mutex_unlock (&priv->lock);
4563
4564   return socket;
4565 }
4566
4567 /**
4568  * gst_rtsp_stream_add_multicast_client_address:
4569  * @stream: a #GstRTSPStream
4570  * @destination: (transfer none): a multicast address to add
4571  * @rtp_port: RTP port
4572  * @rtcp_port: RTCP port
4573  * @family: socket family
4574  *
4575  * Add multicast client address to stream. At this point, the sockets that
4576  * will stream RTP and RTCP data to @destination are supposed to be
4577  * allocated.
4578  *
4579  * Returns: %TRUE if @destination can be addedd and handled by @stream.
4580  */
4581 gboolean
4582 gst_rtsp_stream_add_multicast_client_address (GstRTSPStream * stream,
4583     const gchar * destination, guint rtp_port, guint rtcp_port,
4584     GSocketFamily family)
4585 {
4586   GstRTSPStreamPrivate *priv;
4587
4588   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4589   g_return_val_if_fail (destination != NULL, FALSE);
4590
4591   priv = stream->priv;
4592   g_mutex_lock (&priv->lock);
4593   if ((family == G_SOCKET_FAMILY_IPV4) && (priv->mcast_socket_v4[0] == NULL))
4594     goto socket_error;
4595   else if ((family == G_SOCKET_FAMILY_IPV6) &&
4596       (priv->mcast_socket_v6[0] == NULL))
4597     goto socket_error;
4598
4599   if (!add_mcast_client_addr (stream, destination, rtp_port, rtcp_port))
4600     goto add_addr_error;
4601   g_mutex_unlock (&priv->lock);
4602
4603   return TRUE;
4604
4605 socket_error:
4606   {
4607     GST_WARNING_OBJECT (stream,
4608         "Failed to add multicast address: no udp socket");
4609     g_mutex_unlock (&priv->lock);
4610     return FALSE;
4611   }
4612 add_addr_error:
4613   {
4614     GST_WARNING_OBJECT (stream,
4615         "Failed to add multicast address: invalid address");
4616     g_mutex_unlock (&priv->lock);
4617     return FALSE;
4618   }
4619 }
4620
4621 /**
4622  * gst_rtsp_stream_get_multicast_client_addresses
4623  * @stream: a #GstRTSPStream
4624  *
4625  * Get all multicast client addresses that RTP data will be sent to
4626  *
4627  * Returns: A comma separated list of host:port pairs with destinations
4628  */
4629 gchar *
4630 gst_rtsp_stream_get_multicast_client_addresses (GstRTSPStream * stream)
4631 {
4632   GstRTSPStreamPrivate *priv;
4633   GString *str;
4634   GList *clients;
4635
4636   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4637
4638   priv = stream->priv;
4639   str = g_string_new ("");
4640
4641   g_mutex_lock (&priv->lock);
4642   clients = priv->mcast_clients;
4643   while (clients != NULL) {
4644     UdpClientAddrInfo *client;
4645
4646     client = (UdpClientAddrInfo *) clients->data;
4647     clients = g_list_next (clients);
4648     g_string_append_printf (str, "%s:%d%s", client->address, client->rtp_port,
4649         (clients != NULL ? "," : ""));
4650   }
4651   g_mutex_unlock (&priv->lock);
4652
4653   return g_string_free (str, FALSE);
4654 }
4655
4656 /**
4657  * gst_rtsp_stream_set_seqnum:
4658  * @stream: a #GstRTSPStream
4659  * @seqnum: a new sequence number
4660  *
4661  * Configure the sequence number in the payloader of @stream to @seqnum.
4662  */
4663 void
4664 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
4665 {
4666   GstRTSPStreamPrivate *priv;
4667
4668   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
4669
4670   priv = stream->priv;
4671
4672   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
4673 }
4674
4675 /**
4676  * gst_rtsp_stream_get_seqnum:
4677  * @stream: a #GstRTSPStream
4678  *
4679  * Get the configured sequence number in the payloader of @stream.
4680  *
4681  * Returns: the sequence number of the payloader.
4682  */
4683 guint16
4684 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
4685 {
4686   GstRTSPStreamPrivate *priv;
4687   guint seqnum;
4688
4689   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
4690
4691   priv = stream->priv;
4692
4693   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
4694
4695   return seqnum;
4696 }
4697
4698 /**
4699  * gst_rtsp_stream_transport_filter:
4700  * @stream: a #GstRTSPStream
4701  * @func: (scope call) (allow-none): a callback
4702  * @user_data: (closure): user data passed to @func
4703  *
4704  * Call @func for each transport managed by @stream. The result value of @func
4705  * determines what happens to the transport. @func will be called with @stream
4706  * locked so no further actions on @stream can be performed from @func.
4707  *
4708  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
4709  * @stream.
4710  *
4711  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
4712  *
4713  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
4714  * will also be added with an additional ref to the result #GList of this
4715  * function..
4716  *
4717  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
4718  *
4719  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
4720  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
4721  * element in the #GList should be unreffed before the list is freed.
4722  */
4723 GList *
4724 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
4725     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
4726 {
4727   GstRTSPStreamPrivate *priv;
4728   GList *result, *walk, *next;
4729   GHashTable *visited = NULL;
4730   guint cookie;
4731
4732   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4733
4734   priv = stream->priv;
4735
4736   result = NULL;
4737   if (func)
4738     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
4739
4740   g_mutex_lock (&priv->lock);
4741 restart:
4742   cookie = priv->transports_cookie;
4743   for (walk = priv->transports; walk; walk = next) {
4744     GstRTSPStreamTransport *trans = walk->data;
4745     GstRTSPFilterResult res;
4746     gboolean changed;
4747
4748     next = g_list_next (walk);
4749
4750     if (func) {
4751       /* only visit each transport once */
4752       if (g_hash_table_contains (visited, trans))
4753         continue;
4754
4755       g_hash_table_add (visited, g_object_ref (trans));
4756       g_mutex_unlock (&priv->lock);
4757
4758       res = func (stream, trans, user_data);
4759
4760       g_mutex_lock (&priv->lock);
4761     } else
4762       res = GST_RTSP_FILTER_REF;
4763
4764     changed = (cookie != priv->transports_cookie);
4765
4766     switch (res) {
4767       case GST_RTSP_FILTER_REMOVE:
4768         update_transport (stream, trans, FALSE);
4769         break;
4770       case GST_RTSP_FILTER_REF:
4771         result = g_list_prepend (result, g_object_ref (trans));
4772         break;
4773       case GST_RTSP_FILTER_KEEP:
4774       default:
4775         break;
4776     }
4777     if (changed)
4778       goto restart;
4779   }
4780   g_mutex_unlock (&priv->lock);
4781
4782   if (func)
4783     g_hash_table_unref (visited);
4784
4785   return result;
4786 }
4787
4788 static GstPadProbeReturn
4789 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4790 {
4791   GstRTSPStreamPrivate *priv;
4792   GstRTSPStream *stream;
4793   GstBuffer *buffer = NULL;
4794
4795   stream = user_data;
4796   priv = stream->priv;
4797
4798   GST_DEBUG_OBJECT (pad, "now blocking");
4799
4800   g_mutex_lock (&priv->lock);
4801   priv->blocking = TRUE;
4802
4803   if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) {
4804     buffer = gst_pad_probe_info_get_buffer (info);
4805   } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) {
4806     GstBufferList *list = gst_pad_probe_info_get_buffer_list (info);
4807     buffer = gst_buffer_list_get (list, 0);
4808   } else {
4809     g_assert_not_reached ();
4810   }
4811
4812   g_assert (buffer);
4813   priv->position = GST_BUFFER_TIMESTAMP (buffer);
4814   GST_DEBUG_OBJECT (stream, "buffer position: %" GST_TIME_FORMAT,
4815       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
4816   g_mutex_unlock (&priv->lock);
4817
4818   gst_element_post_message (priv->payloader,
4819       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
4820           gst_structure_new_empty ("GstRTSPStreamBlocking")));
4821
4822   return GST_PAD_PROBE_OK;
4823 }
4824
4825 static void
4826 set_blocked (GstRTSPStream * stream, gboolean blocked)
4827 {
4828   GstRTSPStreamPrivate *priv;
4829   int i;
4830
4831   GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
4832
4833   priv = stream->priv;
4834
4835   if (blocked) {
4836     for (i = 0; i < 2; i++) {
4837       if (priv->blocked_id[i] != 0)
4838         continue;
4839       if (priv->send_src[i]) {
4840         priv->blocking = FALSE;
4841         priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
4842             GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
4843             GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
4844             g_object_ref (stream), g_object_unref);
4845       }
4846     }
4847   } else {
4848     for (i = 0; i < 2; i++) {
4849       if (priv->blocked_id[i] != 0) {
4850         gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
4851         priv->blocked_id[i] = 0;
4852       }
4853     }
4854     priv->blocking = FALSE;
4855   }
4856 }
4857
4858 /**
4859  * gst_rtsp_stream_set_blocked:
4860  * @stream: a #GstRTSPStream
4861  * @blocked: boolean indicating we should block or unblock
4862  *
4863  * Blocks or unblocks the dataflow on @stream.
4864  *
4865  * Returns: %TRUE on success
4866  */
4867 gboolean
4868 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
4869 {
4870   GstRTSPStreamPrivate *priv;
4871
4872   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4873
4874   priv = stream->priv;
4875   g_mutex_lock (&priv->lock);
4876   set_blocked (stream, blocked);
4877   g_mutex_unlock (&priv->lock);
4878
4879   return TRUE;
4880 }
4881
4882 /**
4883  * gst_rtsp_stream_ublock_linked:
4884  * @stream: a #GstRTSPStream
4885  *
4886  * Unblocks the dataflow on @stream if it is linked.
4887  *
4888  * Returns: %TRUE on success
4889  */
4890 gboolean
4891 gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
4892 {
4893   GstRTSPStreamPrivate *priv;
4894
4895   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4896
4897   priv = stream->priv;
4898   g_mutex_lock (&priv->lock);
4899   if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
4900     set_blocked (stream, FALSE);
4901   g_mutex_unlock (&priv->lock);
4902
4903   return TRUE;
4904 }
4905
4906 /**
4907  * gst_rtsp_stream_is_blocking:
4908  * @stream: a #GstRTSPStream
4909  *
4910  * Check if @stream is blocking on a #GstBuffer.
4911  *
4912  * Returns: %TRUE if @stream is blocking
4913  */
4914 gboolean
4915 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
4916 {
4917   GstRTSPStreamPrivate *priv;
4918   gboolean result;
4919
4920   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4921
4922   priv = stream->priv;
4923
4924   g_mutex_lock (&priv->lock);
4925   result = priv->blocking;
4926   g_mutex_unlock (&priv->lock);
4927
4928   return result;
4929 }
4930
4931 /**
4932  * gst_rtsp_stream_query_position:
4933  * @stream: a #GstRTSPStream
4934  * @position: (out): current position of a #GstRTSPStream
4935  *
4936  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
4937  * the RTP parts of the pipeline and not the RTCP parts.
4938  *
4939  * Returns: %TRUE if the position could be queried
4940  */
4941 gboolean
4942 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
4943 {
4944   GstRTSPStreamPrivate *priv;
4945   GstElement *sink;
4946   GstPad *pad = NULL;
4947
4948   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4949
4950   /* query position: if no sinks have been added yet,
4951    * we obtain the position from the pad otherwise we query the sinks */
4952
4953   priv = stream->priv;
4954
4955   g_mutex_lock (&priv->lock);
4956   /* depending on the transport type, it should query corresponding sink */
4957   if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP)
4958     sink = priv->udpsink[0];
4959   else if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4960     sink = priv->mcast_udpsink[0];
4961   else
4962     sink = priv->appsink[0];
4963
4964   if (sink) {
4965     gst_object_ref (sink);
4966   } else if (priv->send_src[0]) {
4967     pad = gst_object_ref (priv->send_src[0]);
4968   } else {
4969     g_mutex_unlock (&priv->lock);
4970     GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline");
4971     return FALSE;
4972   }
4973   g_mutex_unlock (&priv->lock);
4974
4975   if (sink) {
4976     if (!gst_element_query_position (sink, GST_FORMAT_TIME, position)) {
4977       GST_WARNING_OBJECT (stream,
4978           "Couldn't obtain postion: position query failed");
4979       gst_object_unref (sink);
4980       return FALSE;
4981     }
4982     gst_object_unref (sink);
4983   } else if (pad) {
4984     GstEvent *event;
4985     const GstSegment *segment;
4986
4987     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4988     if (!event) {
4989       GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event");
4990       gst_object_unref (pad);
4991       return FALSE;
4992     }
4993
4994     gst_event_parse_segment (event, &segment);
4995     if (segment->format != GST_FORMAT_TIME) {
4996       *position = -1;
4997     } else {
4998       g_mutex_lock (&priv->lock);
4999       *position = priv->position;
5000       g_mutex_unlock (&priv->lock);
5001       *position =
5002           gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position);
5003     }
5004     gst_event_unref (event);
5005     gst_object_unref (pad);
5006   }
5007
5008   return TRUE;
5009 }
5010
5011 /**
5012  * gst_rtsp_stream_query_stop:
5013  * @stream: a #GstRTSPStream
5014  * @stop: (out): current stop of a #GstRTSPStream
5015  *
5016  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
5017  * the RTP parts of the pipeline and not the RTCP parts.
5018  *
5019  * Returns: %TRUE if the stop could be queried
5020  */
5021 gboolean
5022 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
5023 {
5024   GstRTSPStreamPrivate *priv;
5025   GstElement *sink;
5026   GstPad *pad = NULL;
5027
5028   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5029
5030   /* query stop position: if no sinks have been added yet,
5031    * we obtain the stop position from the pad otherwise we query the sinks */
5032
5033   priv = stream->priv;
5034
5035   g_mutex_lock (&priv->lock);
5036   /* depending on the transport type, it should query corresponding sink */
5037   if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP)
5038     sink = priv->udpsink[0];
5039   else if (priv->configured_protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
5040     sink = priv->mcast_udpsink[0];
5041   else
5042     sink = priv->appsink[0];
5043
5044   if (sink) {
5045     gst_object_ref (sink);
5046   } else if (priv->send_src[0]) {
5047     pad = gst_object_ref (priv->send_src[0]);
5048   } else {
5049     g_mutex_unlock (&priv->lock);
5050     GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline");
5051     return FALSE;
5052   }
5053   g_mutex_unlock (&priv->lock);
5054
5055   if (sink) {
5056     GstQuery *query;
5057     GstFormat format;
5058
5059     query = gst_query_new_segment (GST_FORMAT_TIME);
5060     if (!gst_element_query (sink, query)) {
5061       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed");
5062       gst_query_unref (query);
5063       gst_object_unref (sink);
5064       return FALSE;
5065     }
5066     gst_query_parse_segment (query, NULL, &format, NULL, stop);
5067     if (format != GST_FORMAT_TIME)
5068       *stop = -1;
5069     gst_query_unref (query);
5070     gst_object_unref (sink);
5071   } else if (pad) {
5072     GstEvent *event;
5073     const GstSegment *segment;
5074
5075     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
5076     if (!event) {
5077       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event");
5078       gst_object_unref (pad);
5079       return FALSE;
5080     }
5081     gst_event_parse_segment (event, &segment);
5082     if (segment->format != GST_FORMAT_TIME) {
5083       *stop = -1;
5084     } else {
5085       *stop = segment->stop;
5086       if (*stop == -1)
5087         *stop = segment->duration;
5088       else
5089         *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop);
5090     }
5091     gst_event_unref (event);
5092     gst_object_unref (pad);
5093   }
5094
5095   return TRUE;
5096 }
5097
5098 /**
5099  * gst_rtsp_stream_seekable:
5100  * @stream: a #GstRTSPStream
5101  *
5102  * Checks whether the individual @stream is seekable.
5103  *
5104  * Returns: %TRUE if @stream is seekable, else %FALSE.
5105  */
5106 gboolean
5107 gst_rtsp_stream_seekable (GstRTSPStream * stream)
5108 {
5109   GstRTSPStreamPrivate *priv;
5110   GstPad *pad = NULL;
5111   GstQuery *query = NULL;
5112   gboolean seekable = FALSE;
5113
5114   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5115
5116   /* query stop position: if no sinks have been added yet,
5117    * we obtain the stop position from the pad otherwise we query the sinks */
5118
5119   priv = stream->priv;
5120
5121   g_mutex_lock (&priv->lock);
5122   /* depending on the transport type, it should query corresponding sink */
5123   if (priv->srcpad) {
5124     pad = gst_object_ref (priv->srcpad);
5125   } else {
5126     g_mutex_unlock (&priv->lock);
5127     GST_WARNING_OBJECT (stream, "Pad not available, can't query seekability");
5128     goto beach;
5129   }
5130   g_mutex_unlock (&priv->lock);
5131
5132   query = gst_query_new_seeking (GST_FORMAT_TIME);
5133   if (!gst_pad_query (pad, query)) {
5134     GST_WARNING_OBJECT (stream, "seeking query failed");
5135     goto beach;
5136   }
5137   gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
5138
5139 beach:
5140   if (pad)
5141     gst_object_unref (pad);
5142   if (query)
5143     gst_query_unref (query);
5144
5145   GST_DEBUG_OBJECT (stream, "Returning %d", seekable);
5146
5147   return seekable;
5148 }
5149
5150 /**
5151  * gst_rtsp_stream_complete_stream:
5152  * @stream: a #GstRTSPStream
5153  * @transport: a #GstRTSPTransport
5154  *
5155  * Add a receiver and sender part to the pipeline based on the transport from
5156  * SETUP.
5157  *
5158  * Returns: %TRUE if the stream has been sucessfully updated.
5159  */
5160 gboolean
5161 gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
5162     const GstRTSPTransport * transport)
5163 {
5164   GstRTSPStreamPrivate *priv;
5165
5166   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5167
5168   priv = stream->priv;
5169   GST_DEBUG_OBJECT (stream, "complete stream");
5170
5171   g_mutex_lock (&priv->lock);
5172
5173   if (!(priv->allowed_protocols & transport->lower_transport))
5174     goto unallowed_transport;
5175
5176   if (!create_receiver_part (stream, transport))
5177     goto create_receiver_error;
5178
5179   /* in the RECORD case, we only add RTCP sender part */
5180   if (!create_sender_part (stream, transport))
5181     goto create_sender_error;
5182
5183   priv->configured_protocols |= transport->lower_transport;
5184
5185   priv->is_complete = TRUE;
5186   g_mutex_unlock (&priv->lock);
5187
5188   GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
5189   return TRUE;
5190
5191 create_receiver_error:
5192 create_sender_error:
5193 unallowed_transport:
5194   {
5195     g_mutex_unlock (&priv->lock);
5196     return FALSE;
5197   }
5198 }
5199
5200 /**
5201  * gst_rtsp_stream_is_complete:
5202  * @stream: a #GstRTSPStream
5203  *
5204  * Checks whether the stream is complete, contains the receiver and the sender
5205  * parts. As the stream contains sink(s) element(s), it's possible to perform
5206  * seek operations on it.
5207  *
5208  * Returns: %TRUE if the stream contains at least one sink element.
5209  */
5210 gboolean
5211 gst_rtsp_stream_is_complete (GstRTSPStream * stream)
5212 {
5213   GstRTSPStreamPrivate *priv;
5214   gboolean ret = FALSE;
5215
5216   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5217
5218   priv = stream->priv;
5219   g_mutex_lock (&priv->lock);
5220   ret = priv->is_complete;
5221   g_mutex_unlock (&priv->lock);
5222
5223   return ret;
5224 }
5225
5226 /**
5227  * gst_rtsp_stream_is_sender:
5228  * @stream: a #GstRTSPStream
5229  *
5230  * Checks whether the stream is a sender.
5231  *
5232  * Returns: %TRUE if the stream is a sender and %FALSE otherwise.
5233  */
5234 gboolean
5235 gst_rtsp_stream_is_sender (GstRTSPStream * stream)
5236 {
5237   GstRTSPStreamPrivate *priv;
5238   gboolean ret = FALSE;
5239
5240   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5241
5242   priv = stream->priv;
5243   g_mutex_lock (&priv->lock);
5244   ret = (priv->srcpad != NULL);
5245   g_mutex_unlock (&priv->lock);
5246
5247   return ret;
5248 }
5249
5250 /**
5251  * gst_rtsp_stream_is_receiver:
5252  * @stream: a #GstRTSPStream
5253  *
5254  * Checks whether the stream is a receiver.
5255  *
5256  * Returns: %TRUE if the stream is a receiver and %FALSE otherwise.
5257  */
5258 gboolean
5259 gst_rtsp_stream_is_receiver (GstRTSPStream * stream)
5260 {
5261   GstRTSPStreamPrivate *priv;
5262   gboolean ret = FALSE;
5263
5264   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
5265
5266   priv = stream->priv;
5267   g_mutex_lock (&priv->lock);
5268   ret = (priv->sinkpad != NULL);
5269   g_mutex_unlock (&priv->lock);
5270
5271   return ret;
5272 }
5273
5274 #define AES_128_KEY_LEN 16
5275 #define AES_256_KEY_LEN 32
5276
5277 #define HMAC_32_KEY_LEN 4
5278 #define HMAC_80_KEY_LEN 10
5279
5280 static gboolean
5281 mikey_apply_policy (GstCaps * caps, GstMIKEYMessage * msg, guint8 policy)
5282 {
5283   const gchar *srtp_cipher;
5284   const gchar *srtp_auth;
5285   const GstMIKEYPayload *sp;
5286   guint i;
5287
5288   /* loop over Security policy until we find one containing policy */
5289   for (i = 0;; i++) {
5290     if ((sp = gst_mikey_message_find_payload (msg, GST_MIKEY_PT_SP, i)) == NULL)
5291       break;
5292
5293     if (((GstMIKEYPayloadSP *) sp)->policy == policy)
5294       break;
5295   }
5296
5297   /* the default ciphers */
5298   srtp_cipher = "aes-128-icm";
5299   srtp_auth = "hmac-sha1-80";
5300
5301   /* now override the defaults with what is in the Security Policy */
5302   if (sp != NULL) {
5303     guint len;
5304
5305     /* collect all the params and go over them */
5306     len = gst_mikey_payload_sp_get_n_params (sp);
5307     for (i = 0; i < len; i++) {
5308       const GstMIKEYPayloadSPParam *param =
5309           gst_mikey_payload_sp_get_param (sp, i);
5310
5311       switch (param->type) {
5312         case GST_MIKEY_SP_SRTP_ENC_ALG:
5313           switch (param->val[0]) {
5314             case 0:
5315               srtp_cipher = "null";
5316               break;
5317             case 2:
5318             case 1:
5319               srtp_cipher = "aes-128-icm";
5320               break;
5321             default:
5322               break;
5323           }
5324           break;
5325         case GST_MIKEY_SP_SRTP_ENC_KEY_LEN:
5326           switch (param->val[0]) {
5327             case AES_128_KEY_LEN:
5328               srtp_cipher = "aes-128-icm";
5329               break;
5330             case AES_256_KEY_LEN:
5331               srtp_cipher = "aes-256-icm";
5332               break;
5333             default:
5334               break;
5335           }
5336           break;
5337         case GST_MIKEY_SP_SRTP_AUTH_ALG:
5338           switch (param->val[0]) {
5339             case 0:
5340               srtp_auth = "null";
5341               break;
5342             case 2:
5343             case 1:
5344               srtp_auth = "hmac-sha1-80";
5345               break;
5346             default:
5347               break;
5348           }
5349           break;
5350         case GST_MIKEY_SP_SRTP_AUTH_KEY_LEN:
5351           switch (param->val[0]) {
5352             case HMAC_32_KEY_LEN:
5353               srtp_auth = "hmac-sha1-32";
5354               break;
5355             case HMAC_80_KEY_LEN:
5356               srtp_auth = "hmac-sha1-80";
5357               break;
5358             default:
5359               break;
5360           }
5361           break;
5362         case GST_MIKEY_SP_SRTP_SRTP_ENC:
5363           break;
5364         case GST_MIKEY_SP_SRTP_SRTCP_ENC:
5365           break;
5366         default:
5367           break;
5368       }
5369     }
5370   }
5371   /* now configure the SRTP parameters */
5372   gst_caps_set_simple (caps,
5373       "srtp-cipher", G_TYPE_STRING, srtp_cipher,
5374       "srtp-auth", G_TYPE_STRING, srtp_auth,
5375       "srtcp-cipher", G_TYPE_STRING, srtp_cipher,
5376       "srtcp-auth", G_TYPE_STRING, srtp_auth, NULL);
5377
5378   return TRUE;
5379 }
5380
5381 static gboolean
5382 handle_mikey_data (GstRTSPStream * stream, guint8 * data, gsize size)
5383 {
5384   GstMIKEYMessage *msg;
5385   guint i, n_cs;
5386   GstCaps *caps = NULL;
5387   GstMIKEYPayloadKEMAC *kemac;
5388   const GstMIKEYPayloadKeyData *pkd;
5389   GstBuffer *key;
5390
5391   /* the MIKEY message contains a CSB or crypto session bundle. It is a
5392    * set of Crypto Sessions protected with the same master key.
5393    * In the context of SRTP, an RTP and its RTCP stream is part of a
5394    * crypto session */
5395   if ((msg = gst_mikey_message_new_from_data (data, size, NULL, NULL)) == NULL)
5396     goto parse_failed;
5397
5398   /* we can only handle SRTP crypto sessions for now */
5399   if (msg->map_type != GST_MIKEY_MAP_TYPE_SRTP)
5400     goto invalid_map_type;
5401
5402   /* get the number of crypto sessions. This maps SSRC to its
5403    * security parameters */
5404   n_cs = gst_mikey_message_get_n_cs (msg);
5405   if (n_cs == 0)
5406     goto no_crypto_sessions;
5407
5408   /* we also need keys */
5409   if (!(kemac = (GstMIKEYPayloadKEMAC *) gst_mikey_message_find_payload
5410           (msg, GST_MIKEY_PT_KEMAC, 0)))
5411     goto no_keys;
5412
5413   /* we don't support encrypted keys */
5414   if (kemac->enc_alg != GST_MIKEY_ENC_NULL
5415       || kemac->mac_alg != GST_MIKEY_MAC_NULL)
5416     goto unsupported_encryption;
5417
5418   /* get Key data sub-payload */
5419   pkd = (const GstMIKEYPayloadKeyData *)
5420       gst_mikey_payload_kemac_get_sub (&kemac->pt, 0);
5421
5422   key =
5423       gst_buffer_new_wrapped (g_memdup (pkd->key_data, pkd->key_len),
5424       pkd->key_len);
5425
5426   /* go over all crypto sessions and create the security policy for each
5427    * SSRC */
5428   for (i = 0; i < n_cs; i++) {
5429     const GstMIKEYMapSRTP *map = gst_mikey_message_get_cs_srtp (msg, i);
5430
5431     caps = gst_caps_new_simple ("application/x-srtp",
5432         "ssrc", G_TYPE_UINT, map->ssrc,
5433         "roc", G_TYPE_UINT, map->roc, "srtp-key", GST_TYPE_BUFFER, key, NULL);
5434     mikey_apply_policy (caps, msg, map->policy);
5435
5436     gst_rtsp_stream_update_crypto (stream, map->ssrc, caps);
5437     gst_caps_unref (caps);
5438   }
5439   gst_mikey_message_unref (msg);
5440   gst_buffer_unref (key);
5441
5442   return TRUE;
5443
5444   /* ERRORS */
5445 parse_failed:
5446   {
5447     GST_DEBUG_OBJECT (stream, "failed to parse MIKEY message");
5448     return FALSE;
5449   }
5450 invalid_map_type:
5451   {
5452     GST_DEBUG_OBJECT (stream, "invalid map type %d", msg->map_type);
5453     goto cleanup_message;
5454   }
5455 no_crypto_sessions:
5456   {
5457     GST_DEBUG_OBJECT (stream, "no crypto sessions");
5458     goto cleanup_message;
5459   }
5460 no_keys:
5461   {
5462     GST_DEBUG_OBJECT (stream, "no keys found");
5463     goto cleanup_message;
5464   }
5465 unsupported_encryption:
5466   {
5467     GST_DEBUG_OBJECT (stream, "unsupported key encryption");
5468     goto cleanup_message;
5469   }
5470 cleanup_message:
5471   {
5472     gst_mikey_message_unref (msg);
5473     return FALSE;
5474   }
5475 }
5476
5477 #define IS_STRIP_CHAR(c) (g_ascii_isspace ((guchar)(c)) || ((c) == '\"'))
5478
5479 static void
5480 strip_chars (gchar * str)
5481 {
5482   gchar *s;
5483   gsize len;
5484
5485   len = strlen (str);
5486   while (len--) {
5487     if (!IS_STRIP_CHAR (str[len]))
5488       break;
5489     str[len] = '\0';
5490   }
5491   for (s = str; *s && IS_STRIP_CHAR (*s); s++);
5492   memmove (str, s, len + 1);
5493 }
5494
5495 /**
5496  * gst_rtsp_stream_handle_keymgmt:
5497  * @stream: a #GstRTSPStream
5498  * @keymgmt: a keymgmt header
5499  *
5500  * Parse and handle a KeyMgmt header.
5501  *
5502  * Since: 1.16
5503  */
5504 /* KeyMgmt = "KeyMgmt" ":" key-mgmt-spec 0*("," key-mgmt-spec)
5505  * key-mgmt-spec = "prot" "=" KMPID ";" ["uri" "=" %x22 URI %x22 ";"]
5506  */
5507 gboolean
5508 gst_rtsp_stream_handle_keymgmt (GstRTSPStream * stream, const gchar * keymgmt)
5509 {
5510   gchar **specs;
5511   gint i, j;
5512
5513   specs = g_strsplit (keymgmt, ",", 0);
5514   for (i = 0; specs[i]; i++) {
5515     gchar **split;
5516
5517     split = g_strsplit (specs[i], ";", 0);
5518     for (j = 0; split[j]; j++) {
5519       g_strstrip (split[j]);
5520       if (g_str_has_prefix (split[j], "prot=")) {
5521         g_strstrip (split[j] + 5);
5522         if (!g_str_equal (split[j] + 5, "mikey"))
5523           break;
5524         GST_DEBUG ("found mikey");
5525       } else if (g_str_has_prefix (split[j], "uri=")) {
5526         strip_chars (split[j] + 4);
5527         GST_DEBUG ("found uri '%s'", split[j] + 4);
5528       } else if (g_str_has_prefix (split[j], "data=")) {
5529         guchar *data;
5530         gsize size;
5531         strip_chars (split[j] + 5);
5532         GST_DEBUG ("found data '%s'", split[j] + 5);
5533         data = g_base64_decode_inplace (split[j] + 5, &size);
5534         handle_mikey_data (stream, data, size);
5535       }
5536     }
5537     g_strfreev (split);
5538   }
5539   g_strfreev (specs);
5540   return TRUE;
5541 }
5542
5543
5544 /**
5545  * gst_rtsp_stream_get_ulpfec_pt:
5546  *
5547  * Returns: the payload type used for ULPFEC protection packets
5548  *
5549  * Since: 1.16
5550  */
5551 guint
5552 gst_rtsp_stream_get_ulpfec_pt (GstRTSPStream * stream)
5553 {
5554   guint res;
5555
5556   g_mutex_lock (&stream->priv->lock);
5557   res = stream->priv->ulpfec_pt;
5558   g_mutex_unlock (&stream->priv->lock);
5559
5560   return res;
5561 }
5562
5563 /**
5564  * gst_rtsp_stream_set_ulpfec_pt:
5565  *
5566  * Set the payload type to be used for ULPFEC protection packets
5567  *
5568  * Since: 1.16
5569  */
5570 void
5571 gst_rtsp_stream_set_ulpfec_pt (GstRTSPStream * stream, guint pt)
5572 {
5573   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5574
5575   g_mutex_lock (&stream->priv->lock);
5576   stream->priv->ulpfec_pt = pt;
5577   if (stream->priv->ulpfec_encoder) {
5578     g_object_set (stream->priv->ulpfec_encoder, "pt", pt, NULL);
5579   }
5580   g_mutex_unlock (&stream->priv->lock);
5581 }
5582
5583 /**
5584  * gst_rtsp_stream_request_ulpfec_decoder:
5585  *
5586  * Creating a rtpulpfecdec element
5587  *
5588  * Returns: (transfer full) (nullable): a #GstElement.
5589  *
5590  * Since: 1.16
5591  */
5592 GstElement *
5593 gst_rtsp_stream_request_ulpfec_decoder (GstRTSPStream * stream,
5594     GstElement * rtpbin, guint sessid)
5595 {
5596   GObject *internal_storage = NULL;
5597
5598   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5599   stream->priv->ulpfec_decoder =
5600       gst_object_ref (gst_element_factory_make ("rtpulpfecdec", NULL));
5601
5602   g_signal_emit_by_name (G_OBJECT (rtpbin), "get-internal-storage", sessid,
5603       &internal_storage);
5604   g_object_set (stream->priv->ulpfec_decoder, "storage", internal_storage,
5605       NULL);
5606   g_object_unref (internal_storage);
5607   update_ulpfec_decoder_pt (stream);
5608
5609   return stream->priv->ulpfec_decoder;
5610 }
5611
5612 /**
5613  * gst_rtsp_stream_request_ulpfec_encoder:
5614  *
5615  * Creating a rtpulpfecenc element
5616  *
5617  * Returns: (transfer full) (nullable): a #GstElement.
5618  *
5619  * Since: 1.16
5620  */
5621 GstElement *
5622 gst_rtsp_stream_request_ulpfec_encoder (GstRTSPStream * stream, guint sessid)
5623 {
5624   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
5625
5626   if (!stream->priv->ulpfec_percentage)
5627     return NULL;
5628
5629   stream->priv->ulpfec_encoder =
5630       gst_object_ref (gst_element_factory_make ("rtpulpfecenc", NULL));
5631
5632   g_object_set (stream->priv->ulpfec_encoder, "pt", stream->priv->ulpfec_pt,
5633       "percentage", stream->priv->ulpfec_percentage, NULL);
5634
5635   return stream->priv->ulpfec_encoder;
5636 }
5637
5638 /**
5639  * gst_rtsp_stream_set_ulpfec_percentage:
5640  *
5641  * Sets the amount of redundancy to apply when creating ULPFEC
5642  * protection packets.
5643  *
5644  * Since: 1.16
5645  */
5646 void
5647 gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream * stream, guint percentage)
5648 {
5649   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
5650
5651   g_mutex_lock (&stream->priv->lock);
5652   stream->priv->ulpfec_percentage = percentage;
5653   if (stream->priv->ulpfec_encoder) {
5654     g_object_set (stream->priv->ulpfec_encoder, "percentage", percentage, NULL);
5655   }
5656   g_mutex_unlock (&stream->priv->lock);
5657 }
5658
5659 /**
5660  * gst_rtsp_stream_get_ulpfec_percentage:
5661  *
5662  * Returns: the amount of redundancy applied when creating ULPFEC
5663  * protection packets.
5664  *
5665  * Since: 1.16
5666  */
5667 guint
5668 gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
5669 {
5670   guint res;
5671
5672   g_mutex_lock (&stream->priv->lock);
5673   res = stream->priv->ulpfec_percentage;
5674   g_mutex_unlock (&stream->priv->lock);
5675
5676   return res;
5677 }