rtsp-stream: Compare IP address strings case insensitive
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102
103   /* for UDP multicast */
104   GstElement *mcast_udpsrc_v4[2];
105   GstElement *mcast_udpsrc_v6[2];
106   GstElement *mcast_udpqueue[2];
107   GstElement *mcast_udpsink[2];
108
109   /* for TCP transport */
110   GstElement *appsrc[2];
111   GstClockTime appsrc_base_time[2];
112   GstElement *appqueue[2];
113   GstElement *appsink[2];
114
115   GstElement *tee[2];
116   GstElement *funnel[2];
117
118   /* retransmission */
119   GstElement *rtxsend;
120   guint rtx_pt;
121   GstClockTime rtx_time;
122
123   /* pool used to manage unicast and multicast addresses */
124   GstRTSPAddressPool *pool;
125
126   /* unicast server addr/port */
127   GstRTSPAddress *server_addr_v4;
128   GstRTSPAddress *server_addr_v6;
129
130   /* multicast addresses */
131   GstRTSPAddress *mcast_addr_v4;
132   GstRTSPAddress *mcast_addr_v6;
133
134   gchar *multicast_iface;
135
136   /* the caps of the stream */
137   gulong caps_sig;
138   GstCaps *caps;
139
140   /* transports we stream to */
141   guint n_active;
142   GList *transports;
143   guint transports_cookie;
144   GList *tr_cache_rtp;
145   GList *tr_cache_rtcp;
146   guint tr_cache_cookie_rtp;
147   guint tr_cache_cookie_rtcp;
148
149   gint dscp_qos;
150
151   /* stream blocking */
152   gulong blocked_id;
153   gboolean blocking;
154
155   /* pt->caps map for RECORD streams */
156   GHashTable *ptmap;
157
158   GstRTSPPublishClockMode publish_clock_mode;
159 };
160
161 #define DEFAULT_CONTROL         NULL
162 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
163 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
164                                         GST_RTSP_LOWER_TRANS_TCP
165
166 enum
167 {
168   PROP_0,
169   PROP_CONTROL,
170   PROP_PROFILES,
171   PROP_PROTOCOLS,
172   PROP_LAST
173 };
174
175 enum
176 {
177   SIGNAL_NEW_RTP_ENCODER,
178   SIGNAL_NEW_RTCP_ENCODER,
179   SIGNAL_LAST
180 };
181
182 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
183 #define GST_CAT_DEFAULT rtsp_stream_debug
184
185 static GQuark ssrc_stream_map_key;
186
187 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
188     GValue * value, GParamSpec * pspec);
189 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
190     const GValue * value, GParamSpec * pspec);
191
192 static void gst_rtsp_stream_finalize (GObject * obj);
193
194 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
195
196 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
197
198 static void
199 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
200 {
201   GObjectClass *gobject_class;
202
203   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
204
205   gobject_class = G_OBJECT_CLASS (klass);
206
207   gobject_class->get_property = gst_rtsp_stream_get_property;
208   gobject_class->set_property = gst_rtsp_stream_set_property;
209   gobject_class->finalize = gst_rtsp_stream_finalize;
210
211   g_object_class_install_property (gobject_class, PROP_CONTROL,
212       g_param_spec_string ("control", "Control",
213           "The control string for this stream", DEFAULT_CONTROL,
214           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215
216   g_object_class_install_property (gobject_class, PROP_PROFILES,
217       g_param_spec_flags ("profiles", "Profiles",
218           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
219           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220
221   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
222       g_param_spec_flags ("protocols", "Protocols",
223           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
224           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
225
226   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
227       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
228       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
229       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
230
231   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
232       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
234       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
235
236   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
237
238   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
239 }
240
241 static void
242 gst_rtsp_stream_init (GstRTSPStream * stream)
243 {
244   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
245
246   GST_DEBUG ("new stream %p", stream);
247
248   stream->priv = priv;
249
250   priv->dscp_qos = -1;
251   priv->control = g_strdup (DEFAULT_CONTROL);
252   priv->profiles = DEFAULT_PROFILES;
253   priv->protocols = DEFAULT_PROTOCOLS;
254   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
255
256   g_mutex_init (&priv->lock);
257
258   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
259       NULL, (GDestroyNotify) gst_caps_unref);
260   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
261       (GDestroyNotify) gst_caps_unref);
262 }
263
264 static void
265 gst_rtsp_stream_finalize (GObject * obj)
266 {
267   GstRTSPStream *stream;
268   GstRTSPStreamPrivate *priv;
269
270   stream = GST_RTSP_STREAM (obj);
271   priv = stream->priv;
272
273   GST_DEBUG ("finalize stream %p", stream);
274
275   /* we really need to be unjoined now */
276   g_return_if_fail (priv->joined_bin == NULL);
277
278   if (priv->mcast_addr_v4)
279     gst_rtsp_address_free (priv->mcast_addr_v4);
280   if (priv->mcast_addr_v6)
281     gst_rtsp_address_free (priv->mcast_addr_v6);
282   if (priv->server_addr_v4)
283     gst_rtsp_address_free (priv->server_addr_v4);
284   if (priv->server_addr_v6)
285     gst_rtsp_address_free (priv->server_addr_v6);
286   if (priv->pool)
287     g_object_unref (priv->pool);
288   if (priv->rtxsend)
289     g_object_unref (priv->rtxsend);
290
291   g_free (priv->multicast_iface);
292
293   gst_object_unref (priv->payloader);
294   if (priv->srcpad)
295     gst_object_unref (priv->srcpad);
296   if (priv->sinkpad)
297     gst_object_unref (priv->sinkpad);
298   g_free (priv->control);
299   g_mutex_clear (&priv->lock);
300
301   g_hash_table_unref (priv->keys);
302   g_hash_table_destroy (priv->ptmap);
303
304   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
305 }
306
307 static void
308 gst_rtsp_stream_get_property (GObject * object, guint propid,
309     GValue * value, GParamSpec * pspec)
310 {
311   GstRTSPStream *stream = GST_RTSP_STREAM (object);
312
313   switch (propid) {
314     case PROP_CONTROL:
315       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
316       break;
317     case PROP_PROFILES:
318       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
319       break;
320     case PROP_PROTOCOLS:
321       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
325   }
326 }
327
328 static void
329 gst_rtsp_stream_set_property (GObject * object, guint propid,
330     const GValue * value, GParamSpec * pspec)
331 {
332   GstRTSPStream *stream = GST_RTSP_STREAM (object);
333
334   switch (propid) {
335     case PROP_CONTROL:
336       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
337       break;
338     case PROP_PROFILES:
339       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
340       break;
341     case PROP_PROTOCOLS:
342       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
343       break;
344     default:
345       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
346   }
347 }
348
349 /**
350  * gst_rtsp_stream_new:
351  * @idx: an index
352  * @pad: a #GstPad
353  * @payloader: a #GstElement
354  *
355  * Create a new media stream with index @idx that handles RTP data on
356  * @pad and has a payloader element @payloader if @pad is a source pad
357  * or a depayloader element @payloader if @pad is a sink pad.
358  *
359  * Returns: (transfer full): a new #GstRTSPStream
360  */
361 GstRTSPStream *
362 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
363 {
364   GstRTSPStreamPrivate *priv;
365   GstRTSPStream *stream;
366
367   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
368   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
369
370   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
371   priv = stream->priv;
372   priv->idx = idx;
373   priv->payloader = gst_object_ref (payloader);
374   if (GST_PAD_IS_SRC (pad))
375     priv->srcpad = gst_object_ref (pad);
376   else
377     priv->sinkpad = gst_object_ref (pad);
378
379   return stream;
380 }
381
382 /**
383  * gst_rtsp_stream_get_index:
384  * @stream: a #GstRTSPStream
385  *
386  * Get the stream index.
387  *
388  * Return: the stream index.
389  */
390 guint
391 gst_rtsp_stream_get_index (GstRTSPStream * stream)
392 {
393   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
394
395   return stream->priv->idx;
396 }
397
398 /**
399  * gst_rtsp_stream_get_pt:
400  * @stream: a #GstRTSPStream
401  *
402  * Get the stream payload type.
403  *
404  * Return: the stream payload type.
405  */
406 guint
407 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
408 {
409   GstRTSPStreamPrivate *priv;
410   guint pt;
411
412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
413
414   priv = stream->priv;
415
416   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
417
418   return pt;
419 }
420
421 /**
422  * gst_rtsp_stream_get_srcpad:
423  * @stream: a #GstRTSPStream
424  *
425  * Get the srcpad associated with @stream.
426  *
427  * Returns: (transfer full): the srcpad. Unref after usage.
428  */
429 GstPad *
430 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
431 {
432   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
433
434   if (!stream->priv->srcpad)
435     return NULL;
436
437   return gst_object_ref (stream->priv->srcpad);
438 }
439
440 /**
441  * gst_rtsp_stream_get_sinkpad:
442  * @stream: a #GstRTSPStream
443  *
444  * Get the sinkpad associated with @stream.
445  *
446  * Returns: (transfer full): the sinkpad. Unref after usage.
447  */
448 GstPad *
449 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
450 {
451   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
452
453   if (!stream->priv->sinkpad)
454     return NULL;
455
456   return gst_object_ref (stream->priv->sinkpad);
457 }
458
459 /**
460  * gst_rtsp_stream_get_control:
461  * @stream: a #GstRTSPStream
462  *
463  * Get the control string to identify this stream.
464  *
465  * Returns: (transfer full): the control string. g_free() after usage.
466  */
467 gchar *
468 gst_rtsp_stream_get_control (GstRTSPStream * stream)
469 {
470   GstRTSPStreamPrivate *priv;
471   gchar *result;
472
473   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
474
475   priv = stream->priv;
476
477   g_mutex_lock (&priv->lock);
478   if ((result = g_strdup (priv->control)) == NULL)
479     result = g_strdup_printf ("stream=%u", priv->idx);
480   g_mutex_unlock (&priv->lock);
481
482   return result;
483 }
484
485 /**
486  * gst_rtsp_stream_set_control:
487  * @stream: a #GstRTSPStream
488  * @control: a control string
489  *
490  * Set the control string in @stream.
491  */
492 void
493 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
494 {
495   GstRTSPStreamPrivate *priv;
496
497   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
498
499   priv = stream->priv;
500
501   g_mutex_lock (&priv->lock);
502   g_free (priv->control);
503   priv->control = g_strdup (control);
504   g_mutex_unlock (&priv->lock);
505 }
506
507 /**
508  * gst_rtsp_stream_has_control:
509  * @stream: a #GstRTSPStream
510  * @control: a control string
511  *
512  * Check if @stream has the control string @control.
513  *
514  * Returns: %TRUE is @stream has @control as the control string
515  */
516 gboolean
517 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
518 {
519   GstRTSPStreamPrivate *priv;
520   gboolean res;
521
522   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
523
524   priv = stream->priv;
525
526   g_mutex_lock (&priv->lock);
527   if (priv->control)
528     res = (g_strcmp0 (priv->control, control) == 0);
529   else {
530     guint streamid;
531
532     if (sscanf (control, "stream=%u", &streamid) > 0)
533       res = (streamid == priv->idx);
534     else
535       res = FALSE;
536   }
537   g_mutex_unlock (&priv->lock);
538
539   return res;
540 }
541
542 /**
543  * gst_rtsp_stream_set_mtu:
544  * @stream: a #GstRTSPStream
545  * @mtu: a new MTU
546  *
547  * Configure the mtu in the payloader of @stream to @mtu.
548  */
549 void
550 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
551 {
552   GstRTSPStreamPrivate *priv;
553
554   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
555
556   priv = stream->priv;
557
558   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
559
560   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
561 }
562
563 /**
564  * gst_rtsp_stream_get_mtu:
565  * @stream: a #GstRTSPStream
566  *
567  * Get the configured MTU in the payloader of @stream.
568  *
569  * Returns: the MTU of the payloader.
570  */
571 guint
572 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
573 {
574   GstRTSPStreamPrivate *priv;
575   guint mtu;
576
577   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
578
579   priv = stream->priv;
580
581   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
582
583   return mtu;
584 }
585
586 /* Update the dscp qos property on the udp sinks */
587 static void
588 update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2])
589 {
590   GstRTSPStreamPrivate *priv;
591
592   priv = stream->priv;
593
594   if (udpsink[0]) {
595     g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL);
596   }
597
598   if (udpsink[1]) {
599     g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL);
600   }
601 }
602
603 /**
604  * gst_rtsp_stream_set_dscp_qos:
605  * @stream: a #GstRTSPStream
606  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
607  *
608  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
609  */
610 void
611 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
612 {
613   GstRTSPStreamPrivate *priv;
614
615   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
616
617   priv = stream->priv;
618
619   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
620
621   if (dscp_qos < -1 || dscp_qos > 63) {
622     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
623     return;
624   }
625
626   priv->dscp_qos = dscp_qos;
627
628   update_dscp_qos (stream, priv->udpsink);
629 }
630
631 /**
632  * gst_rtsp_stream_get_dscp_qos:
633  * @stream: a #GstRTSPStream
634  *
635  * Get the configured DSCP QoS in of the outgoing sockets.
636  *
637  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
638  */
639 gint
640 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
641 {
642   GstRTSPStreamPrivate *priv;
643
644   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
645
646   priv = stream->priv;
647
648   return priv->dscp_qos;
649 }
650
651 /**
652  * gst_rtsp_stream_is_transport_supported:
653  * @stream: a #GstRTSPStream
654  * @transport: (transfer none): a #GstRTSPTransport
655  *
656  * Check if @transport can be handled by stream
657  *
658  * Returns: %TRUE if @transport can be handled by @stream.
659  */
660 gboolean
661 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
662     GstRTSPTransport * transport)
663 {
664   GstRTSPStreamPrivate *priv;
665
666   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
667
668   priv = stream->priv;
669
670   g_mutex_lock (&priv->lock);
671   if (transport->trans != GST_RTSP_TRANS_RTP)
672     goto unsupported_transmode;
673
674   if (!(transport->profile & priv->profiles))
675     goto unsupported_profile;
676
677   if (!(transport->lower_transport & priv->protocols))
678     goto unsupported_ltrans;
679
680   g_mutex_unlock (&priv->lock);
681
682   return TRUE;
683
684   /* ERRORS */
685 unsupported_transmode:
686   {
687     GST_DEBUG ("unsupported transport mode %d", transport->trans);
688     g_mutex_unlock (&priv->lock);
689     return FALSE;
690   }
691 unsupported_profile:
692   {
693     GST_DEBUG ("unsupported profile %d", transport->profile);
694     g_mutex_unlock (&priv->lock);
695     return FALSE;
696   }
697 unsupported_ltrans:
698   {
699     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
700     g_mutex_unlock (&priv->lock);
701     return FALSE;
702   }
703 }
704
705 /**
706  * gst_rtsp_stream_set_profiles:
707  * @stream: a #GstRTSPStream
708  * @profiles: the new profiles
709  *
710  * Configure the allowed profiles for @stream.
711  */
712 void
713 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
714 {
715   GstRTSPStreamPrivate *priv;
716
717   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
718
719   priv = stream->priv;
720
721   g_mutex_lock (&priv->lock);
722   priv->profiles = profiles;
723   g_mutex_unlock (&priv->lock);
724 }
725
726 /**
727  * gst_rtsp_stream_get_profiles:
728  * @stream: a #GstRTSPStream
729  *
730  * Get the allowed profiles of @stream.
731  *
732  * Returns: a #GstRTSPProfile
733  */
734 GstRTSPProfile
735 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
736 {
737   GstRTSPStreamPrivate *priv;
738   GstRTSPProfile res;
739
740   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
741
742   priv = stream->priv;
743
744   g_mutex_lock (&priv->lock);
745   res = priv->profiles;
746   g_mutex_unlock (&priv->lock);
747
748   return res;
749 }
750
751 /**
752  * gst_rtsp_stream_set_protocols:
753  * @stream: a #GstRTSPStream
754  * @protocols: the new flags
755  *
756  * Configure the allowed lower transport for @stream.
757  */
758 void
759 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
760     GstRTSPLowerTrans protocols)
761 {
762   GstRTSPStreamPrivate *priv;
763
764   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
765
766   priv = stream->priv;
767
768   g_mutex_lock (&priv->lock);
769   priv->protocols = protocols;
770   g_mutex_unlock (&priv->lock);
771 }
772
773 /**
774  * gst_rtsp_stream_get_protocols:
775  * @stream: a #GstRTSPStream
776  *
777  * Get the allowed protocols of @stream.
778  *
779  * Returns: a #GstRTSPLowerTrans
780  */
781 GstRTSPLowerTrans
782 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
783 {
784   GstRTSPStreamPrivate *priv;
785   GstRTSPLowerTrans res;
786
787   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
788       GST_RTSP_LOWER_TRANS_UNKNOWN);
789
790   priv = stream->priv;
791
792   g_mutex_lock (&priv->lock);
793   res = priv->protocols;
794   g_mutex_unlock (&priv->lock);
795
796   return res;
797 }
798
799 /**
800  * gst_rtsp_stream_set_address_pool:
801  * @stream: a #GstRTSPStream
802  * @pool: (transfer none): a #GstRTSPAddressPool
803  *
804  * configure @pool to be used as the address pool of @stream.
805  */
806 void
807 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
808     GstRTSPAddressPool * pool)
809 {
810   GstRTSPStreamPrivate *priv;
811   GstRTSPAddressPool *old;
812
813   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
814
815   priv = stream->priv;
816
817   GST_LOG_OBJECT (stream, "set address pool %p", pool);
818
819   g_mutex_lock (&priv->lock);
820   if ((old = priv->pool) != pool)
821     priv->pool = pool ? g_object_ref (pool) : NULL;
822   else
823     old = NULL;
824   g_mutex_unlock (&priv->lock);
825
826   if (old)
827     g_object_unref (old);
828 }
829
830 /**
831  * gst_rtsp_stream_get_address_pool:
832  * @stream: a #GstRTSPStream
833  *
834  * Get the #GstRTSPAddressPool used as the address pool of @stream.
835  *
836  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
837  * usage.
838  */
839 GstRTSPAddressPool *
840 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
841 {
842   GstRTSPStreamPrivate *priv;
843   GstRTSPAddressPool *result;
844
845   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
846
847   priv = stream->priv;
848
849   g_mutex_lock (&priv->lock);
850   if ((result = priv->pool))
851     g_object_ref (result);
852   g_mutex_unlock (&priv->lock);
853
854   return result;
855 }
856
857 /**
858  * gst_rtsp_stream_set_multicast_iface:
859  * @stream: a #GstRTSPStream
860  * @multicast_iface: (transfer none): a multicast interface
861  *
862  * configure @multicast_iface to be used for @stream.
863  */
864 void
865 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
866     const gchar * multicast_iface)
867 {
868   GstRTSPStreamPrivate *priv;
869   gchar *old;
870
871   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
872
873   priv = stream->priv;
874
875   GST_LOG_OBJECT (stream, "set multicast iface %s",
876       GST_STR_NULL (multicast_iface));
877
878   g_mutex_lock (&priv->lock);
879   if ((old = priv->multicast_iface) != multicast_iface)
880     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
881   else
882     old = NULL;
883   g_mutex_unlock (&priv->lock);
884
885   if (old)
886     g_free (old);
887 }
888
889 /**
890  * gst_rtsp_stream_get_multicast_iface:
891  * @stream: a #GstRTSPStream
892  *
893  * Get the multicast interface used for @stream.
894  *
895  * Returns: (transfer full): the multicast interface for @stream. g_free() after
896  * usage.
897  */
898 gchar *
899 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
900 {
901   GstRTSPStreamPrivate *priv;
902   gchar *result;
903
904   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
905
906   priv = stream->priv;
907
908   g_mutex_lock (&priv->lock);
909   if ((result = priv->multicast_iface))
910     result = g_strdup (result);
911   g_mutex_unlock (&priv->lock);
912
913   return result;
914 }
915
916
917 static GstRTSPAddress *
918 gst_rtsp_stream_get_multicast_address_locked (GstRTSPStream * stream,
919     GSocketFamily family)
920 {
921   GstRTSPStreamPrivate *priv;
922   GstRTSPAddress *result;
923   GstRTSPAddress **addrp;
924   GstRTSPAddressFlags flags;
925
926   priv = stream->priv;
927
928   if (family == G_SOCKET_FAMILY_IPV6) {
929     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
930     addrp = &priv->mcast_addr_v6;
931   } else {
932     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
933     addrp = &priv->mcast_addr_v4;
934   }
935
936   if (*addrp == NULL) {
937     if (priv->pool == NULL)
938       goto no_pool;
939
940     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
941
942     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
943     if (*addrp == NULL)
944       goto no_address;
945
946     /* FIXME: Also reserve the same port with unicast ANY address, since that's
947      * where we are going to bind our socket. Probably loop until we find a port
948      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
949      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
950      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
951   }
952   result = gst_rtsp_address_copy (*addrp);
953
954   return result;
955
956   /* ERRORS */
957 no_pool:
958   {
959     GST_ERROR_OBJECT (stream, "no address pool specified");
960     return NULL;
961   }
962 no_address:
963   {
964     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
965     return NULL;
966   }
967 }
968
969 /**
970  * gst_rtsp_stream_get_multicast_address:
971  * @stream: a #GstRTSPStream
972  * @family: the #GSocketFamily
973  *
974  * Get the multicast address of @stream for @family. The original
975  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
976  * won't release the address from the pool.
977  *
978  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
979  * or %NULL when no address could be allocated. gst_rtsp_address_free()
980  * after usage.
981  */
982 GstRTSPAddress *
983 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
984     GSocketFamily family)
985 {
986   GstRTSPAddress *result;
987
988   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
989
990   g_mutex_lock (&stream->priv->lock);
991   result = gst_rtsp_stream_get_multicast_address_locked (stream, family);
992   g_mutex_unlock (&stream->priv->lock);
993
994   return result;
995 }
996
997 /**
998  * gst_rtsp_stream_reserve_address:
999  * @stream: a #GstRTSPStream
1000  * @address: an address
1001  * @port: a port
1002  * @n_ports: n_ports
1003  * @ttl: a TTL
1004  *
1005  * Reserve @address and @port as the address and port of @stream. The original
1006  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1007  * won't release the address from the pool.
1008  *
1009  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1010  * the address could be reserved. gst_rtsp_address_free() after usage.
1011  */
1012 GstRTSPAddress *
1013 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1014     const gchar * address, guint port, guint n_ports, guint ttl)
1015 {
1016   GstRTSPStreamPrivate *priv;
1017   GstRTSPAddress *result;
1018   GInetAddress *addr;
1019   GSocketFamily family;
1020   GstRTSPAddress **addrp;
1021
1022   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1023   g_return_val_if_fail (address != NULL, NULL);
1024   g_return_val_if_fail (port > 0, NULL);
1025   g_return_val_if_fail (n_ports > 0, NULL);
1026   g_return_val_if_fail (ttl > 0, NULL);
1027
1028   priv = stream->priv;
1029
1030   addr = g_inet_address_new_from_string (address);
1031   if (!addr) {
1032     GST_ERROR ("failed to get inet addr from %s", address);
1033     family = G_SOCKET_FAMILY_IPV4;
1034   } else {
1035     family = g_inet_address_get_family (addr);
1036     g_object_unref (addr);
1037   }
1038
1039   if (family == G_SOCKET_FAMILY_IPV6)
1040     addrp = &priv->mcast_addr_v6;
1041   else
1042     addrp = &priv->mcast_addr_v4;
1043
1044   g_mutex_lock (&priv->lock);
1045   if (*addrp == NULL) {
1046     GstRTSPAddressPoolResult res;
1047
1048     if (priv->pool == NULL)
1049       goto no_pool;
1050
1051     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1052         port, n_ports, ttl, addrp);
1053     if (res != GST_RTSP_ADDRESS_POOL_OK)
1054       goto no_address;
1055
1056     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1057      * where we are going to bind our socket. */
1058   } else {
1059     if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1060         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1061         (*addrp)->ttl != ttl)
1062       goto different_address;
1063   }
1064   result = gst_rtsp_address_copy (*addrp);
1065   g_mutex_unlock (&priv->lock);
1066
1067   return result;
1068
1069   /* ERRORS */
1070 no_pool:
1071   {
1072     GST_ERROR_OBJECT (stream, "no address pool specified");
1073     g_mutex_unlock (&priv->lock);
1074     return NULL;
1075   }
1076 no_address:
1077   {
1078     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1079         address);
1080     g_mutex_unlock (&priv->lock);
1081     return NULL;
1082   }
1083 different_address:
1084   {
1085     GST_ERROR_OBJECT (stream,
1086         "address %s is not the same as %s that was already" " reserved",
1087         address, (*addrp)->address);
1088     g_mutex_unlock (&priv->lock);
1089     return NULL;
1090   }
1091 }
1092
1093 /* must be called with lock */
1094 static void
1095 set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket,
1096     GSocket * rtcp_socket, GSocketFamily family)
1097 {
1098   const gchar *multisink_socket;
1099
1100   if (family == G_SOCKET_FAMILY_IPV6)
1101     multisink_socket = "socket-v6";
1102   else
1103     multisink_socket = "socket";
1104
1105   g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL);
1106   g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL);
1107 }
1108
1109 static gboolean
1110 create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2])
1111 {
1112   GstRTSPStreamPrivate *priv = stream->priv;
1113   GstElement *udpsink0, *udpsink1;
1114
1115   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1116   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1117
1118   if (!udpsink0 || !udpsink1)
1119     goto no_udp_protocol;
1120
1121   /* configure sinks */
1122
1123   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1124   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1125
1126   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1127   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1128
1129   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1130
1131   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1132   /* Needs to be async for RECORD streams, otherwise we will never go to
1133    * PLAYING because the sinks will wait for data while the udpsrc can't
1134    * provide data with timestamps in PAUSED. */
1135   if (priv->sinkpad)
1136     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1137   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1138
1139   /* join multicast group when adding clients, so we'll start receiving from it.
1140    * We cannot rely on the udpsrc to join the group since its socket is always a
1141    * local unicast one. */
1142   g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL);
1143   g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL);
1144
1145   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1146   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1147
1148   udpsink[0] = udpsink0;
1149   udpsink[1] = udpsink1;
1150
1151   /* update the dscp qos field in the sinks */
1152   update_dscp_qos (stream, udpsink);
1153
1154   return TRUE;
1155
1156   /* ERRORS */
1157 no_udp_protocol:
1158   {
1159     return FALSE;
1160   }
1161 }
1162
1163 /* must be called with lock */
1164 static gboolean
1165 create_and_configure_udpsources (GstElement * udpsrc_out[2],
1166     GSocket * rtp_socket, GSocket * rtcp_socket)
1167 {
1168   GstStateChangeReturn ret;
1169
1170   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1171   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1172
1173   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1174     goto error;
1175
1176   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1177   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1178
1179   /* The udpsrc cannot do the join because its socket is always a local unicast
1180    * one. The udpsink sharing the same socket will do it for us. */
1181   g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL);
1182   g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL);
1183
1184   g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1185   g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1186
1187   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1188   if (ret == GST_STATE_CHANGE_FAILURE)
1189     goto error;
1190   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1191   if (ret == GST_STATE_CHANGE_FAILURE)
1192     goto error;
1193
1194   return TRUE;
1195
1196   /* ERRORS */
1197 error:
1198   {
1199     if (udpsrc_out[0]) {
1200       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1201       g_clear_object (&udpsrc_out[0]);
1202     }
1203     if (udpsrc_out[1]) {
1204       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1205       g_clear_object (&udpsrc_out[1]);
1206     }
1207     return FALSE;
1208   }
1209 }
1210
1211 static gboolean
1212 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1213     GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
1214     GstRTSPAddress ** server_addr_out, gboolean multicast)
1215 {
1216   GstRTSPStreamPrivate *priv = stream->priv;
1217   GSocket *rtp_socket = NULL;
1218   GSocket *rtcp_socket;
1219   gint tmp_rtp, tmp_rtcp;
1220   guint count;
1221   gint rtpport, rtcpport;
1222   GList *rejected_addresses = NULL;
1223   GstRTSPAddress *addr = NULL;
1224   GInetAddress *inetaddr = NULL;
1225   gchar *addr_str;
1226   GSocketAddress *rtp_sockaddr = NULL;
1227   GSocketAddress *rtcp_sockaddr = NULL;
1228   GstRTSPAddressPool *pool;
1229
1230   g_assert (!udpsrc_out[0]);
1231   g_assert (!udpsrc_out[1]);
1232   g_assert ((!udpsink_out[0] && !udpsink_out[1]) ||
1233       (udpsink_out[0] && udpsink_out[1]));
1234   g_assert (*server_addr_out == NULL);
1235
1236   pool = priv->pool;
1237   count = 0;
1238
1239   /* Start with random port */
1240   tmp_rtp = 0;
1241
1242   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1243       G_SOCKET_PROTOCOL_UDP, NULL);
1244   if (!rtcp_socket)
1245     goto no_udp_protocol;
1246   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1247
1248   /* try to allocate 2 UDP ports, the RTP port should be an even
1249    * number and the RTCP port should be the next (uneven) port */
1250 again:
1251
1252   if (rtp_socket == NULL) {
1253     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1254         G_SOCKET_PROTOCOL_UDP, NULL);
1255     if (!rtp_socket)
1256       goto no_udp_protocol;
1257     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1258   }
1259
1260   if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1261     GstRTSPAddressFlags flags;
1262
1263     if (addr)
1264       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1265
1266     if (!pool)
1267       goto no_ports;
1268
1269     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1270     if (multicast)
1271       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1272     else
1273       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1274
1275     if (family == G_SOCKET_FAMILY_IPV6)
1276       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1277     else
1278       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1279
1280     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1281
1282     if (addr == NULL)
1283       goto no_ports;
1284
1285     tmp_rtp = addr->port;
1286
1287     g_clear_object (&inetaddr);
1288     if (multicast)
1289       inetaddr = g_inet_address_new_any (family);
1290     else
1291       inetaddr = g_inet_address_new_from_string (addr->address);
1292   } else {
1293     if (tmp_rtp != 0) {
1294       tmp_rtp += 2;
1295       if (++count > 20)
1296         goto no_ports;
1297     }
1298
1299     if (inetaddr == NULL)
1300       inetaddr = g_inet_address_new_any (family);
1301   }
1302
1303   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1304   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1305     g_object_unref (rtp_sockaddr);
1306     goto again;
1307   }
1308   g_object_unref (rtp_sockaddr);
1309
1310   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1311   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1312     g_clear_object (&rtp_sockaddr);
1313     goto socket_error;
1314   }
1315
1316   tmp_rtp =
1317       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1318   g_object_unref (rtp_sockaddr);
1319
1320   /* check if port is even */
1321   if ((tmp_rtp & 1) != 0) {
1322     /* port not even, close and allocate another */
1323     tmp_rtp++;
1324     g_clear_object (&rtp_socket);
1325     goto again;
1326   }
1327
1328   /* set port */
1329   tmp_rtcp = tmp_rtp + 1;
1330
1331   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1332   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1333     g_object_unref (rtcp_sockaddr);
1334     g_clear_object (&rtp_socket);
1335     goto again;
1336   }
1337   g_object_unref (rtcp_sockaddr);
1338
1339   if (!addr) {
1340     addr = g_slice_new0 (GstRTSPAddress);
1341     addr->address = g_inet_address_to_string (inetaddr);
1342     addr->port = tmp_rtp;
1343     addr->n_ports = 2;
1344   }
1345
1346   addr_str = addr->address;
1347   g_clear_object (&inetaddr);
1348
1349   if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
1350     goto no_udp_protocol;
1351   }
1352
1353   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1354   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1355
1356   /* this should not happen... */
1357   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1358     goto port_error;
1359
1360   /* This function is called twice (for v4 and v6) but we create only one pair
1361    * of udpsinks. */
1362   if (!udpsink_out[0]
1363       && !create_and_configure_udpsinks (stream, udpsink_out))
1364     goto no_udp_protocol;
1365
1366   if (multicast) {
1367     g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface",
1368         priv->multicast_iface, NULL);
1369     g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface",
1370         priv->multicast_iface, NULL);
1371
1372     g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL);
1373     g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL);
1374   }
1375
1376   set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
1377
1378   *server_addr_out = addr;
1379
1380   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1381
1382   g_object_unref (rtp_socket);
1383   g_object_unref (rtcp_socket);
1384
1385   return TRUE;
1386
1387   /* ERRORS */
1388 no_udp_protocol:
1389   {
1390     goto cleanup;
1391   }
1392 no_ports:
1393   {
1394     goto cleanup;
1395   }
1396 port_error:
1397   {
1398     goto cleanup;
1399   }
1400 socket_error:
1401   {
1402     goto cleanup;
1403   }
1404 cleanup:
1405   {
1406     if (inetaddr)
1407       g_object_unref (inetaddr);
1408     g_list_free_full (rejected_addresses,
1409         (GDestroyNotify) gst_rtsp_address_free);
1410     if (addr)
1411       gst_rtsp_address_free (addr);
1412     if (rtp_socket)
1413       g_object_unref (rtp_socket);
1414     if (rtcp_socket)
1415       g_object_unref (rtcp_socket);
1416     return FALSE;
1417   }
1418 }
1419
1420 /**
1421  * gst_rtsp_stream_allocate_udp_sockets:
1422  * @stream: a #GstRTSPStream
1423  * @family: protocol family
1424  * @transport_method: transport method
1425  *
1426  * Allocates RTP and RTCP ports.
1427  *
1428  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1429  * Deprecated: This function shouldn't have been made public
1430  */
1431 gboolean
1432 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1433     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1434 {
1435   g_warn_if_reached ();
1436   return FALSE;
1437 }
1438
1439 /**
1440  * gst_rtsp_stream_set_client_side:
1441  * @stream: a #GstRTSPStream
1442  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1443  * an RTSP connection.
1444  *
1445  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1446  * streams to an RTSP server via RECORD. This has the practical effect
1447  * of changing which UDP port numbers are used when setting up the local
1448  * side of the stream sending to be either the 'server' or 'client' pair
1449  * of a configured UDP transport.
1450  */
1451 void
1452 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1453 {
1454   GstRTSPStreamPrivate *priv;
1455
1456   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1457   priv = stream->priv;
1458   g_mutex_lock (&priv->lock);
1459   priv->client_side = client_side;
1460   g_mutex_unlock (&priv->lock);
1461 }
1462
1463 /**
1464  * gst_rtsp_stream_is_client_side:
1465  * @stream: a #GstRTSPStream
1466  *
1467  * See gst_rtsp_stream_set_client_side()
1468  *
1469  * Returns: TRUE if this #GstRTSPStream is client-side.
1470  */
1471 gboolean
1472 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1473 {
1474   GstRTSPStreamPrivate *priv;
1475   gboolean ret;
1476
1477   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1478
1479   priv = stream->priv;
1480   g_mutex_lock (&priv->lock);
1481   ret = priv->client_side;
1482   g_mutex_unlock (&priv->lock);
1483
1484   return ret;
1485 }
1486
1487 /* must be called with lock */
1488 static gboolean
1489 alloc_ports (GstRTSPStream * stream)
1490 {
1491   GstRTSPStreamPrivate *priv = stream->priv;
1492   gboolean ret = TRUE;
1493
1494   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) {
1495     ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1496         priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE);
1497
1498     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1499         priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE);
1500   }
1501
1502   /* FIXME: Maybe actually consider the return values? */
1503   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1504     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1505         priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE);
1506
1507     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1508         priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE);
1509   }
1510
1511   return ret;
1512 }
1513
1514 /**
1515  * gst_rtsp_stream_get_server_port:
1516  * @stream: a #GstRTSPStream
1517  * @server_port: (out): result server port
1518  * @family: the port family to get
1519  *
1520  * Fill @server_port with the port pair used by the server. This function can
1521  * only be called when @stream has been joined.
1522  */
1523 void
1524 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1525     GstRTSPRange * server_port, GSocketFamily family)
1526 {
1527   GstRTSPStreamPrivate *priv;
1528
1529   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1530   priv = stream->priv;
1531   g_return_if_fail (priv->joined_bin != NULL);
1532
1533   g_mutex_lock (&priv->lock);
1534   if (family == G_SOCKET_FAMILY_IPV4) {
1535     if (server_port) {
1536       server_port->min = priv->server_addr_v4->port;
1537       server_port->max =
1538           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1539     }
1540   } else {
1541     if (server_port) {
1542       server_port->min = priv->server_addr_v6->port;
1543       server_port->max =
1544           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1545     }
1546   }
1547   g_mutex_unlock (&priv->lock);
1548 }
1549
1550 /**
1551  * gst_rtsp_stream_get_rtpsession:
1552  * @stream: a #GstRTSPStream
1553  *
1554  * Get the RTP session of this stream.
1555  *
1556  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1557  */
1558 GObject *
1559 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1560 {
1561   GstRTSPStreamPrivate *priv;
1562   GObject *session;
1563
1564   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1565
1566   priv = stream->priv;
1567
1568   g_mutex_lock (&priv->lock);
1569   if ((session = priv->session))
1570     g_object_ref (session);
1571   g_mutex_unlock (&priv->lock);
1572
1573   return session;
1574 }
1575
1576 /**
1577  * gst_rtsp_stream_get_encoder:
1578  * @stream: a #GstRTSPStream
1579  *
1580  * Get the SRTP encoder for this stream.
1581  *
1582  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1583  */
1584 GstElement *
1585 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1586 {
1587   GstRTSPStreamPrivate *priv;
1588   GstElement *encoder;
1589
1590   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1591
1592   priv = stream->priv;
1593
1594   g_mutex_lock (&priv->lock);
1595   if ((encoder = priv->srtpenc))
1596     g_object_ref (encoder);
1597   g_mutex_unlock (&priv->lock);
1598
1599   return encoder;
1600 }
1601
1602 /**
1603  * gst_rtsp_stream_get_ssrc:
1604  * @stream: a #GstRTSPStream
1605  * @ssrc: (out): result ssrc
1606  *
1607  * Get the SSRC used by the RTP session of this stream. This function can only
1608  * be called when @stream has been joined.
1609  */
1610 void
1611 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1612 {
1613   GstRTSPStreamPrivate *priv;
1614
1615   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1616   priv = stream->priv;
1617   g_return_if_fail (priv->joined_bin != NULL);
1618
1619   g_mutex_lock (&priv->lock);
1620   if (ssrc && priv->session)
1621     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1622   g_mutex_unlock (&priv->lock);
1623 }
1624
1625 /**
1626  * gst_rtsp_stream_set_retransmission_time:
1627  * @stream: a #GstRTSPStream
1628  * @time: a #GstClockTime
1629  *
1630  * Set the amount of time to store retransmission packets.
1631  */
1632 void
1633 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1634     GstClockTime time)
1635 {
1636   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1637
1638   g_mutex_lock (&stream->priv->lock);
1639   stream->priv->rtx_time = time;
1640   if (stream->priv->rtxsend)
1641     g_object_set (stream->priv->rtxsend, "max-size-time",
1642         GST_TIME_AS_MSECONDS (time), NULL);
1643   g_mutex_unlock (&stream->priv->lock);
1644 }
1645
1646 /**
1647  * gst_rtsp_stream_get_retransmission_time:
1648  * @stream: a #GstRTSPStream
1649  *
1650  * Get the amount of time to store retransmission data.
1651  *
1652  * Returns: the amount of time to store retransmission data.
1653  */
1654 GstClockTime
1655 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1656 {
1657   GstClockTime ret;
1658
1659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1660
1661   g_mutex_lock (&stream->priv->lock);
1662   ret = stream->priv->rtx_time;
1663   g_mutex_unlock (&stream->priv->lock);
1664
1665   return ret;
1666 }
1667
1668 /**
1669  * gst_rtsp_stream_set_retransmission_pt:
1670  * @stream: a #GstRTSPStream
1671  * @rtx_pt: a #guint
1672  *
1673  * Set the payload type (pt) for retransmission of this stream.
1674  */
1675 void
1676 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1677 {
1678   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1679
1680   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1681
1682   g_mutex_lock (&stream->priv->lock);
1683   stream->priv->rtx_pt = rtx_pt;
1684   if (stream->priv->rtxsend) {
1685     guint pt = gst_rtsp_stream_get_pt (stream);
1686     gchar *pt_s = g_strdup_printf ("%d", pt);
1687     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1688         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1689     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1690     g_free (pt_s);
1691     gst_structure_free (rtx_pt_map);
1692   }
1693   g_mutex_unlock (&stream->priv->lock);
1694 }
1695
1696 /**
1697  * gst_rtsp_stream_get_retransmission_pt:
1698  * @stream: a #GstRTSPStream
1699  *
1700  * Get the payload-type used for retransmission of this stream
1701  *
1702  * Returns: The retransmission PT.
1703  */
1704 guint
1705 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1706 {
1707   guint rtx_pt;
1708
1709   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1710
1711   g_mutex_lock (&stream->priv->lock);
1712   rtx_pt = stream->priv->rtx_pt;
1713   g_mutex_unlock (&stream->priv->lock);
1714
1715   return rtx_pt;
1716 }
1717
1718 /**
1719  * gst_rtsp_stream_set_buffer_size:
1720  * @stream: a #GstRTSPStream
1721  * @size: the buffer size
1722  *
1723  * Set the size of the UDP transmission buffer (in bytes)
1724  * Needs to be set before the stream is joined to a bin.
1725  *
1726  * Since: 1.6
1727  */
1728 void
1729 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1730 {
1731   g_mutex_lock (&stream->priv->lock);
1732   stream->priv->buffer_size = size;
1733   g_mutex_unlock (&stream->priv->lock);
1734 }
1735
1736 /**
1737  * gst_rtsp_stream_get_buffer_size:
1738  * @stream: a #GstRTSPStream
1739  *
1740  * Get the size of the UDP transmission buffer (in bytes)
1741  *
1742  * Returns: the size of the UDP TX buffer
1743  *
1744  * Since: 1.6
1745  */
1746 guint
1747 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1748 {
1749   guint buffer_size;
1750
1751   g_mutex_lock (&stream->priv->lock);
1752   buffer_size = stream->priv->buffer_size;
1753   g_mutex_unlock (&stream->priv->lock);
1754
1755   return buffer_size;
1756 }
1757
1758 /* executed from streaming thread */
1759 static void
1760 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1761 {
1762   GstRTSPStreamPrivate *priv = stream->priv;
1763   GstCaps *newcaps, *oldcaps;
1764
1765   newcaps = gst_pad_get_current_caps (pad);
1766
1767   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1768       newcaps);
1769
1770   g_mutex_lock (&priv->lock);
1771   oldcaps = priv->caps;
1772   priv->caps = newcaps;
1773   g_mutex_unlock (&priv->lock);
1774
1775   if (oldcaps)
1776     gst_caps_unref (oldcaps);
1777 }
1778
1779 static void
1780 dump_structure (const GstStructure * s)
1781 {
1782   gchar *sstr;
1783
1784   sstr = gst_structure_to_string (s);
1785   GST_INFO ("structure: %s", sstr);
1786   g_free (sstr);
1787 }
1788
1789 static GstRTSPStreamTransport *
1790 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1791 {
1792   GstRTSPStreamPrivate *priv = stream->priv;
1793   GList *walk;
1794   GstRTSPStreamTransport *result = NULL;
1795   const gchar *tmp;
1796   gchar *dest;
1797   guint port;
1798
1799   if (rtcp_from == NULL)
1800     return NULL;
1801
1802   tmp = g_strrstr (rtcp_from, ":");
1803   if (tmp == NULL)
1804     return NULL;
1805
1806   port = atoi (tmp + 1);
1807   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1808
1809   g_mutex_lock (&priv->lock);
1810   GST_INFO ("finding %s:%d in %d transports", dest, port,
1811       g_list_length (priv->transports));
1812
1813   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1814     GstRTSPStreamTransport *trans = walk->data;
1815     const GstRTSPTransport *tr;
1816     gint min, max;
1817
1818     tr = gst_rtsp_stream_transport_get_transport (trans);
1819
1820     if (priv->client_side) {
1821       /* In client side mode the 'destination' is the RTSP server, so send
1822        * to those ports */
1823       min = tr->server_port.min;
1824       max = tr->server_port.max;
1825     } else {
1826       min = tr->client_port.min;
1827       max = tr->client_port.max;
1828     }
1829
1830     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1831       result = trans;
1832       break;
1833     }
1834   }
1835   if (result)
1836     g_object_ref (result);
1837   g_mutex_unlock (&priv->lock);
1838
1839   g_free (dest);
1840
1841   return result;
1842 }
1843
1844 static GstRTSPStreamTransport *
1845 check_transport (GObject * source, GstRTSPStream * stream)
1846 {
1847   GstStructure *stats;
1848   GstRTSPStreamTransport *trans;
1849
1850   /* see if we have a stream to match with the origin of the RTCP packet */
1851   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1852   if (trans == NULL) {
1853     g_object_get (source, "stats", &stats, NULL);
1854     if (stats) {
1855       const gchar *rtcp_from;
1856
1857       dump_structure (stats);
1858
1859       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1860       if ((trans = find_transport (stream, rtcp_from))) {
1861         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1862             source);
1863         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1864             g_object_unref);
1865       }
1866       gst_structure_free (stats);
1867     }
1868   }
1869   return trans;
1870 }
1871
1872
1873 static void
1874 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1875 {
1876   GstRTSPStreamTransport *trans;
1877
1878   GST_INFO ("%p: new source %p", stream, source);
1879
1880   trans = check_transport (source, stream);
1881
1882   if (trans)
1883     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1884 }
1885
1886 static void
1887 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1888 {
1889   GST_INFO ("%p: new SDES %p", stream, source);
1890 }
1891
1892 static void
1893 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1894 {
1895   GstRTSPStreamTransport *trans;
1896
1897   trans = check_transport (source, stream);
1898
1899   if (trans) {
1900     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1901     gst_rtsp_stream_transport_keep_alive (trans);
1902   }
1903 #ifdef DUMP_STATS
1904   {
1905     GstStructure *stats;
1906     g_object_get (source, "stats", &stats, NULL);
1907     if (stats) {
1908       dump_structure (stats);
1909       gst_structure_free (stats);
1910     }
1911   }
1912 #endif
1913 }
1914
1915 static void
1916 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1917 {
1918   GST_INFO ("%p: source %p bye", stream, source);
1919 }
1920
1921 static void
1922 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1923 {
1924   GstRTSPStreamTransport *trans;
1925
1926   GST_INFO ("%p: source %p bye timeout", stream, source);
1927
1928   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1929     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1930     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1931   }
1932 }
1933
1934 static void
1935 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1936 {
1937   GstRTSPStreamTransport *trans;
1938
1939   GST_INFO ("%p: source %p timeout", stream, source);
1940
1941   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1942     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1943     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1944   }
1945 }
1946
1947 static void
1948 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1949 {
1950   GST_INFO ("%p: new sender source %p", stream, source);
1951 #ifndef DUMP_STATS
1952   {
1953     GstStructure *stats;
1954     g_object_get (source, "stats", &stats, NULL);
1955     if (stats) {
1956       dump_structure (stats);
1957       gst_structure_free (stats);
1958     }
1959   }
1960 #endif
1961 }
1962
1963 static void
1964 on_sender_ssrc_active (GObject * session, GObject * source,
1965     GstRTSPStream * stream)
1966 {
1967 #ifndef DUMP_STATS
1968   {
1969     GstStructure *stats;
1970     g_object_get (source, "stats", &stats, NULL);
1971     if (stats) {
1972       dump_structure (stats);
1973       gst_structure_free (stats);
1974     }
1975   }
1976 #endif
1977 }
1978
1979 static void
1980 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1981 {
1982   if (is_rtp) {
1983     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1984     g_list_free (priv->tr_cache_rtp);
1985     priv->tr_cache_rtp = NULL;
1986   } else {
1987     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1988     g_list_free (priv->tr_cache_rtcp);
1989     priv->tr_cache_rtcp = NULL;
1990   }
1991 }
1992
1993 static GstFlowReturn
1994 handle_new_sample (GstAppSink * sink, gpointer user_data)
1995 {
1996   GstRTSPStreamPrivate *priv;
1997   GList *walk;
1998   GstSample *sample;
1999   GstBuffer *buffer;
2000   GstRTSPStream *stream;
2001   gboolean is_rtp;
2002
2003   sample = gst_app_sink_pull_sample (sink);
2004   if (!sample)
2005     return GST_FLOW_OK;
2006
2007   stream = (GstRTSPStream *) user_data;
2008   priv = stream->priv;
2009   buffer = gst_sample_get_buffer (sample);
2010
2011   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2012
2013   g_mutex_lock (&priv->lock);
2014   if (is_rtp) {
2015     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2016       clear_tr_cache (priv, is_rtp);
2017       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2018         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2019         priv->tr_cache_rtp =
2020             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2021       }
2022       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2023     }
2024   } else {
2025     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2026       clear_tr_cache (priv, is_rtp);
2027       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2028         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2029         priv->tr_cache_rtcp =
2030             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2031       }
2032       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2033     }
2034   }
2035   g_mutex_unlock (&priv->lock);
2036
2037   if (is_rtp) {
2038     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2039       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2040       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2041     }
2042   } else {
2043     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2044       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2045       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2046     }
2047   }
2048   gst_sample_unref (sample);
2049
2050   return GST_FLOW_OK;
2051 }
2052
2053 static GstAppSinkCallbacks sink_cb = {
2054   NULL,                         /* not interested in EOS */
2055   NULL,                         /* not interested in preroll samples */
2056   handle_new_sample,
2057 };
2058
2059 static GstElement *
2060 get_rtp_encoder (GstRTSPStream * stream, guint session)
2061 {
2062   GstRTSPStreamPrivate *priv = stream->priv;
2063
2064   if (priv->srtpenc == NULL) {
2065     gchar *name;
2066
2067     name = g_strdup_printf ("srtpenc_%u", session);
2068     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2069     g_free (name);
2070
2071     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2072   }
2073   return gst_object_ref (priv->srtpenc);
2074 }
2075
2076 static GstElement *
2077 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2078 {
2079   GstRTSPStreamPrivate *priv = stream->priv;
2080   GstElement *oldenc, *enc;
2081   GstPad *pad;
2082   gchar *name;
2083
2084   if (priv->idx != session)
2085     return NULL;
2086
2087   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2088
2089   oldenc = priv->srtpenc;
2090   enc = get_rtp_encoder (stream, session);
2091   name = g_strdup_printf ("rtp_sink_%d", session);
2092   pad = gst_element_get_request_pad (enc, name);
2093   g_free (name);
2094   gst_object_unref (pad);
2095
2096   if (oldenc == NULL)
2097     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2098         enc);
2099
2100   return enc;
2101 }
2102
2103 static GstElement *
2104 request_rtcp_encoder (GstElement * rtpbin, guint session,
2105     GstRTSPStream * stream)
2106 {
2107   GstRTSPStreamPrivate *priv = stream->priv;
2108   GstElement *oldenc, *enc;
2109   GstPad *pad;
2110   gchar *name;
2111
2112   if (priv->idx != session)
2113     return NULL;
2114
2115   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2116
2117   oldenc = priv->srtpenc;
2118   enc = get_rtp_encoder (stream, session);
2119   name = g_strdup_printf ("rtcp_sink_%d", session);
2120   pad = gst_element_get_request_pad (enc, name);
2121   g_free (name);
2122   gst_object_unref (pad);
2123
2124   if (oldenc == NULL)
2125     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2126         enc);
2127
2128   return enc;
2129 }
2130
2131 static GstCaps *
2132 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2133 {
2134   GstRTSPStreamPrivate *priv = stream->priv;
2135   GstCaps *caps;
2136
2137   GST_DEBUG ("request key %08x", ssrc);
2138
2139   g_mutex_lock (&priv->lock);
2140   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2141     gst_caps_ref (caps);
2142   g_mutex_unlock (&priv->lock);
2143
2144   return caps;
2145 }
2146
2147 static GstElement *
2148 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2149     GstRTSPStream * stream)
2150 {
2151   GstRTSPStreamPrivate *priv = stream->priv;
2152
2153   if (priv->idx != session)
2154     return NULL;
2155
2156   if (priv->srtpdec == NULL) {
2157     gchar *name;
2158
2159     name = g_strdup_printf ("srtpdec_%u", session);
2160     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2161     g_free (name);
2162
2163     g_signal_connect (priv->srtpdec, "request-key",
2164         (GCallback) request_key, stream);
2165   }
2166   return gst_object_ref (priv->srtpdec);
2167 }
2168
2169 /**
2170  * gst_rtsp_stream_request_aux_sender:
2171  * @stream: a #GstRTSPStream
2172  * @sessid: the session id
2173  *
2174  * Creating a rtxsend bin
2175  *
2176  * Returns: (transfer full): a #GstElement.
2177  *
2178  * Since: 1.6
2179  */
2180 GstElement *
2181 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2182 {
2183   GstElement *bin;
2184   GstPad *pad;
2185   GstStructure *pt_map;
2186   gchar *name;
2187   guint pt, rtx_pt;
2188   gchar *pt_s;
2189
2190   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2191
2192   pt = gst_rtsp_stream_get_pt (stream);
2193   pt_s = g_strdup_printf ("%u", pt);
2194   rtx_pt = stream->priv->rtx_pt;
2195
2196   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2197
2198   bin = gst_bin_new (NULL);
2199   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2200   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2201       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2202   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2203       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2204   g_free (pt_s);
2205   gst_structure_free (pt_map);
2206   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2207
2208   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2209   name = g_strdup_printf ("src_%u", sessid);
2210   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2211   g_free (name);
2212   gst_object_unref (pad);
2213
2214   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2215   name = g_strdup_printf ("sink_%u", sessid);
2216   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2217   g_free (name);
2218   gst_object_unref (pad);
2219
2220   return bin;
2221 }
2222
2223 /**
2224  * gst_rtsp_stream_set_pt_map:
2225  * @stream: a #GstRTSPStream
2226  * @pt: the pt
2227  * @caps: a #GstCaps
2228  *
2229  * Configure a pt map between @pt and @caps.
2230  */
2231 void
2232 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2233 {
2234   GstRTSPStreamPrivate *priv = stream->priv;
2235
2236   g_mutex_lock (&priv->lock);
2237   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2238   g_mutex_unlock (&priv->lock);
2239 }
2240
2241 /**
2242  * gst_rtsp_stream_set_publish_clock_mode:
2243  * @stream: a #GstRTSPStream
2244  * @mode: the clock publish mode
2245  *
2246  * Sets if and how the stream clock should be published according to RFC7273.
2247  *
2248  * Since: 1.8
2249  */
2250 void
2251 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2252     GstRTSPPublishClockMode mode)
2253 {
2254   GstRTSPStreamPrivate *priv;
2255
2256   priv = stream->priv;
2257   g_mutex_lock (&priv->lock);
2258   priv->publish_clock_mode = mode;
2259   g_mutex_unlock (&priv->lock);
2260 }
2261
2262 /**
2263  * gst_rtsp_stream_get_publish_clock_mode:
2264  * @factory: a #GstRTSPStream
2265  *
2266  * Gets if and how the stream clock should be published according to RFC7273.
2267  *
2268  * Returns: The GstRTSPPublishClockMode
2269  *
2270  * Since: 1.8
2271  */
2272 GstRTSPPublishClockMode
2273 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2274 {
2275   GstRTSPStreamPrivate *priv;
2276   GstRTSPPublishClockMode ret;
2277
2278   priv = stream->priv;
2279   g_mutex_lock (&priv->lock);
2280   ret = priv->publish_clock_mode;
2281   g_mutex_unlock (&priv->lock);
2282
2283   return ret;
2284 }
2285
2286 static GstCaps *
2287 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2288     GstRTSPStream * stream)
2289 {
2290   GstRTSPStreamPrivate *priv = stream->priv;
2291   GstCaps *caps = NULL;
2292
2293   g_mutex_lock (&priv->lock);
2294
2295   if (priv->idx == session) {
2296     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2297     if (caps) {
2298       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2299       gst_caps_ref (caps);
2300     } else {
2301       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2302     }
2303   }
2304
2305   g_mutex_unlock (&priv->lock);
2306
2307   return caps;
2308 }
2309
2310 static void
2311 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2312 {
2313   GstRTSPStreamPrivate *priv = stream->priv;
2314   gchar *name;
2315   GstPadLinkReturn ret;
2316   guint sessid;
2317
2318   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2319       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2320
2321   name = gst_pad_get_name (pad);
2322   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2323     g_free (name);
2324     return;
2325   }
2326   g_free (name);
2327
2328   if (priv->idx != sessid)
2329     return;
2330
2331   if (gst_pad_is_linked (priv->sinkpad)) {
2332     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2333         GST_DEBUG_PAD_NAME (priv->sinkpad));
2334     return;
2335   }
2336
2337   /* link the RTP pad to the session manager, it should not really fail unless
2338    * this is not really an RTP pad */
2339   ret = gst_pad_link (pad, priv->sinkpad);
2340   if (ret != GST_PAD_LINK_OK)
2341     goto link_failed;
2342   priv->recv_rtp_src = gst_object_ref (pad);
2343
2344   return;
2345
2346 /* ERRORS */
2347 link_failed:
2348   {
2349     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2350         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2351   }
2352 }
2353
2354 static void
2355 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2356     GstRTSPStream * stream)
2357 {
2358   /* TODO: What to do here other than this? */
2359   GST_DEBUG ("Stream %p: Got EOS", stream);
2360   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2361 }
2362
2363 static void
2364 plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
2365     GstElement ** queue_out)
2366 {
2367   GstPad *pad;
2368   GstPad *teepad;
2369   GstPad *queuepad;
2370
2371   gst_bin_add (bin, sink);
2372
2373   *queue_out = gst_element_factory_make ("queue", NULL);
2374   g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
2375       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2376   gst_bin_add (bin, *queue_out);
2377
2378   /* link tee to queue */
2379   teepad = gst_element_get_request_pad (tee, "src_%u");
2380   pad = gst_element_get_static_pad (*queue_out, "sink");
2381   gst_pad_link (teepad, pad);
2382   gst_object_unref (pad);
2383   gst_object_unref (teepad);
2384
2385   /* link queue to sink */
2386   queuepad = gst_element_get_static_pad (*queue_out, "src");
2387   pad = gst_element_get_static_pad (sink, "sink");
2388   gst_pad_link (queuepad, pad);
2389   gst_object_unref (queuepad);
2390   gst_object_unref (pad);
2391 }
2392
2393 /* must be called with lock */
2394 static void
2395 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2396 {
2397   GstRTSPStreamPrivate *priv;
2398   GstPad *pad;
2399   gboolean is_tcp, is_udp;
2400   gint i;
2401
2402   priv = stream->priv;
2403
2404   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2405   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2406       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2407
2408   for (i = 0; i < 2; i++) {
2409     /* For the sender we create this bit of pipeline for both
2410      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2411      * we need to add a queue before appsink and udpsink to make
2412      * the pipeline not block. For the TCP case, we want to pump
2413      * client as fast as possible anyway. This pipeline is used
2414      * when both TCP and UDP are present.
2415      *
2416      * .--------.      .-----.    .---------.    .---------.
2417      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2418      * |       send->sink   src->sink      src->sink       |
2419      * '--------'      |     |    '---------'    '---------'
2420      *                 |     |    .---------.    .---------.
2421      *                 |     |    |  queue  |    | appsink |
2422      *                 |    src->sink      src->sink       |
2423      *                 '-----'    '---------'    '---------'
2424      *
2425      * When only UDP or only TCP is allowed, we skip the tee and queue
2426      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2427      * the session.
2428      */
2429
2430     /* Only link the RTP send src if we're going to send RTP, link
2431      * the RTCP send src always */
2432     if (!priv->srcpad && i == 0)
2433       continue;
2434
2435     if (is_tcp) {
2436       /* make appsink */
2437       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2438       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2439       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2440           &sink_cb, stream, NULL);
2441     }
2442
2443     /* If we have udp always use a tee because we could have mcast clients
2444      * requesting different ports, in which case we'll have to plug more
2445      * udpsinks. */
2446     if (is_udp) {
2447       /* make tee for RTP/RTCP */
2448       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2449       gst_bin_add (bin, priv->tee[i]);
2450
2451       /* and link to rtpbin send pad */
2452       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2453       gst_pad_link (priv->send_src[i], pad);
2454       gst_object_unref (pad);
2455
2456       if (priv->udpsink[i])
2457         plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
2458
2459       if (priv->mcast_udpsink[i])
2460         plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
2461             &priv->mcast_udpqueue[i]);
2462
2463       if (is_tcp) {
2464         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2465         plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
2466       }
2467     } else if (is_tcp) {
2468       /* only appsink needed, link it to the session */
2469       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2470       gst_pad_link (priv->send_src[i], pad);
2471       gst_object_unref (pad);
2472
2473       /* when its only TCP, we need to set sync and preroll to FALSE
2474        * for the sink to avoid deadlock. And this is only needed for
2475        * sink used for RTCP data, not the RTP data. */
2476       if (i == 1)
2477         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2478     }
2479
2480     /* check if we need to set to a special state */
2481     if (state != GST_STATE_NULL) {
2482       if (priv->udpsink[i])
2483         gst_element_set_state (priv->udpsink[i], state);
2484       if (priv->mcast_udpsink[i])
2485         gst_element_set_state (priv->mcast_udpsink[i], state);
2486       if (priv->appsink[i])
2487         gst_element_set_state (priv->appsink[i], state);
2488       if (priv->appqueue[i])
2489         gst_element_set_state (priv->appqueue[i], state);
2490       if (priv->udpqueue[i])
2491         gst_element_set_state (priv->udpqueue[i], state);
2492       if (priv->mcast_udpqueue[i])
2493         gst_element_set_state (priv->mcast_udpqueue[i], state);
2494       if (priv->tee[i])
2495         gst_element_set_state (priv->tee[i], state);
2496     }
2497   }
2498 }
2499
2500 /* must be called with lock */
2501 static void
2502 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2503     GstElement * funnel)
2504 {
2505   GstRTSPStreamPrivate *priv;
2506   GstPad *pad, *selpad;
2507
2508   priv = stream->priv;
2509
2510   if (priv->srcpad) {
2511     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2512      * values. This is only relevant for PLAY pipelines */
2513     gst_element_set_state (src, GST_STATE_PLAYING);
2514     gst_element_set_locked_state (src, TRUE);
2515   }
2516
2517   /* add src */
2518   gst_bin_add (bin, src);
2519
2520   /* and link to the funnel */
2521   selpad = gst_element_get_request_pad (funnel, "sink_%u");
2522   pad = gst_element_get_static_pad (src, "src");
2523   gst_pad_link (pad, selpad);
2524   gst_object_unref (pad);
2525   gst_object_unref (selpad);
2526 }
2527
2528 /* must be called with lock */
2529 static void
2530 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2531 {
2532   GstRTSPStreamPrivate *priv;
2533   GstPad *pad;
2534   gboolean is_tcp;
2535   gint i;
2536
2537   priv = stream->priv;
2538
2539   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2540
2541   for (i = 0; i < 2; i++) {
2542     /* For the receiver we create this bit of pipeline for both
2543      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2544      * and it is all funneled into the rtpbin receive pad.
2545      *
2546      * .--------.     .--------.    .--------.
2547      * | udpsrc |     | funnel |    | rtpbin |
2548      * |       src->sink      src->sink      |
2549      * '--------'     |        |    '--------'
2550      * .--------.     |        |
2551      * | appsrc |     |        |
2552      * |       src->sink       |
2553      * '--------'     '--------'
2554      */
2555
2556     if (!priv->sinkpad && i == 0) {
2557       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2558        * RTCP sink always */
2559       continue;
2560     }
2561
2562     /* make funnel for the RTP/RTCP receivers */
2563     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2564     gst_bin_add (bin, priv->funnel[i]);
2565
2566     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2567     gst_pad_link (pad, priv->recv_sink[i]);
2568     gst_object_unref (pad);
2569
2570     if (priv->udpsrc_v4[i])
2571       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
2572
2573     if (priv->udpsrc_v6[i])
2574       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
2575
2576     if (priv->mcast_udpsrc_v4[i])
2577       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
2578
2579     if (priv->mcast_udpsrc_v6[i])
2580       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
2581
2582     if (is_tcp) {
2583       /* make and add appsrc */
2584       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2585       priv->appsrc_base_time[i] = -1;
2586       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2587           TRUE, NULL);
2588       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
2589     }
2590
2591     /* check if we need to set to a special state */
2592     if (state != GST_STATE_NULL) {
2593       gst_element_set_state (priv->funnel[i], state);
2594     }
2595   }
2596 }
2597
2598 static gboolean
2599 check_mcast_part_for_transport (GstRTSPStream * stream,
2600     const GstRTSPTransport * tr)
2601 {
2602   GstRTSPStreamPrivate *priv = stream->priv;
2603   GInetAddress *inetaddr;
2604   GSocketFamily family;
2605   GstRTSPAddress *mcast_addr;
2606
2607   /* Check if it's a ipv4 or ipv6 transport */
2608   inetaddr = g_inet_address_new_from_string (tr->destination);
2609   family = g_inet_address_get_family (inetaddr);
2610   g_object_unref (inetaddr);
2611
2612   /* Select fields corresponding to the family */
2613   if (family == G_SOCKET_FAMILY_IPV4) {
2614     mcast_addr = priv->mcast_addr_v4;
2615   } else {
2616     mcast_addr = priv->mcast_addr_v6;
2617   }
2618
2619   /* We support only one mcast group per family, make sure this transport
2620    * matches it. */
2621   if (!mcast_addr)
2622     goto no_addr;
2623
2624   if (!g_str_equal (tr->destination, mcast_addr->address) ||
2625       tr->port.min != mcast_addr->port ||
2626       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
2627       tr->ttl != mcast_addr->ttl)
2628     goto wrong_addr;
2629
2630   return TRUE;
2631
2632 no_addr:
2633   {
2634     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
2635         "has been reserved");
2636     return FALSE;
2637   }
2638 wrong_addr:
2639   {
2640     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
2641         "the reserved address");
2642     return FALSE;
2643   }
2644 }
2645
2646 /**
2647  * gst_rtsp_stream_join_bin:
2648  * @stream: a #GstRTSPStream
2649  * @bin: (transfer none): a #GstBin to join
2650  * @rtpbin: (transfer none): a rtpbin element in @bin
2651  * @state: the target state of the new elements
2652  *
2653  * Join the #GstBin @bin that contains the element @rtpbin.
2654  *
2655  * @stream will link to @rtpbin, which must be inside @bin. The elements
2656  * added to @bin will be set to the state given in @state.
2657  *
2658  * Returns: %TRUE on success.
2659  */
2660 gboolean
2661 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2662     GstElement * rtpbin, GstState state)
2663 {
2664   GstRTSPStreamPrivate *priv;
2665   guint idx;
2666   gchar *name;
2667   GstPadLinkReturn ret;
2668
2669   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2670   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2671   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2672
2673   priv = stream->priv;
2674
2675   g_mutex_lock (&priv->lock);
2676   if (priv->joined_bin != NULL)
2677     goto was_joined;
2678
2679   /* create a session with the same index as the stream */
2680   idx = priv->idx;
2681
2682   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2683
2684   if (!alloc_ports (stream))
2685     goto no_ports;
2686
2687   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2688       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2689     /* For SRTP */
2690     g_signal_connect (rtpbin, "request-rtp-encoder",
2691         (GCallback) request_rtp_encoder, stream);
2692     g_signal_connect (rtpbin, "request-rtcp-encoder",
2693         (GCallback) request_rtcp_encoder, stream);
2694     g_signal_connect (rtpbin, "request-rtp-decoder",
2695         (GCallback) request_rtp_rtcp_decoder, stream);
2696     g_signal_connect (rtpbin, "request-rtcp-decoder",
2697         (GCallback) request_rtp_rtcp_decoder, stream);
2698   }
2699
2700   if (priv->sinkpad) {
2701     g_signal_connect (rtpbin, "request-pt-map",
2702         (GCallback) request_pt_map, stream);
2703   }
2704
2705   /* get pads from the RTP session element for sending and receiving
2706    * RTP/RTCP*/
2707   if (priv->srcpad) {
2708     /* get a pad for sending RTP */
2709     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2710     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2711     g_free (name);
2712
2713     /* link the RTP pad to the session manager, it should not really fail unless
2714      * this is not really an RTP pad */
2715     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2716     if (ret != GST_PAD_LINK_OK)
2717       goto link_failed;
2718
2719     name = g_strdup_printf ("send_rtp_src_%u", idx);
2720     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2721     g_free (name);
2722   } else {
2723     /* Need to connect our sinkpad from here */
2724     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2725     /* EOS */
2726     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2727
2728     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2729     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2730     g_free (name);
2731   }
2732
2733   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2734   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2735   g_free (name);
2736   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2737   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2738   g_free (name);
2739
2740   /* get the session */
2741   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2742
2743   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2744       stream);
2745   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2746       stream);
2747   g_signal_connect (priv->session, "on-ssrc-active",
2748       (GCallback) on_ssrc_active, stream);
2749   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2750       stream);
2751   g_signal_connect (priv->session, "on-bye-timeout",
2752       (GCallback) on_bye_timeout, stream);
2753   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2754       stream);
2755
2756   /* signal for sender ssrc */
2757   g_signal_connect (priv->session, "on-new-sender-ssrc",
2758       (GCallback) on_new_sender_ssrc, stream);
2759   g_signal_connect (priv->session, "on-sender-ssrc-active",
2760       (GCallback) on_sender_ssrc_active, stream);
2761
2762   create_sender_part (stream, bin, state);
2763   create_receiver_part (stream, bin, state);
2764
2765   if (priv->srcpad) {
2766     /* be notified of caps changes */
2767     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2768         (GCallback) caps_notify, stream);
2769   }
2770
2771   priv->joined_bin = gst_object_ref (bin);
2772   g_mutex_unlock (&priv->lock);
2773
2774   return TRUE;
2775
2776   /* ERRORS */
2777 was_joined:
2778   {
2779     g_mutex_unlock (&priv->lock);
2780     return TRUE;
2781   }
2782 no_ports:
2783   {
2784     g_mutex_unlock (&priv->lock);
2785     GST_WARNING ("failed to allocate ports %u", idx);
2786     return FALSE;
2787   }
2788 link_failed:
2789   {
2790     GST_WARNING ("failed to link stream %u", idx);
2791     gst_object_unref (priv->send_rtp_sink);
2792     priv->send_rtp_sink = NULL;
2793     g_mutex_unlock (&priv->lock);
2794     return FALSE;
2795   }
2796 }
2797
2798 static void
2799 clear_element (GstBin * bin, GstElement ** elementptr)
2800 {
2801   if (*elementptr) {
2802     gst_element_set_locked_state (*elementptr, FALSE);
2803     gst_element_set_state (*elementptr, GST_STATE_NULL);
2804     if (GST_ELEMENT_PARENT (*elementptr))
2805       gst_bin_remove (bin, *elementptr);
2806     else
2807       gst_object_unref (*elementptr);
2808     *elementptr = NULL;
2809   }
2810 }
2811
2812 /**
2813  * gst_rtsp_stream_leave_bin:
2814  * @stream: a #GstRTSPStream
2815  * @bin: (transfer none): a #GstBin
2816  * @rtpbin: (transfer none): a rtpbin #GstElement
2817  *
2818  * Remove the elements of @stream from @bin.
2819  *
2820  * Return: %TRUE on success.
2821  */
2822 gboolean
2823 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2824     GstElement * rtpbin)
2825 {
2826   GstRTSPStreamPrivate *priv;
2827   gint i;
2828
2829   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2830   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2831   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2832
2833   priv = stream->priv;
2834
2835   g_mutex_lock (&priv->lock);
2836   if (priv->joined_bin == NULL)
2837     goto was_not_joined;
2838   if (priv->joined_bin != bin)
2839     goto wrong_bin;
2840
2841   priv->joined_bin = NULL;
2842
2843   /* all transports must be removed by now */
2844   if (priv->transports != NULL)
2845     goto transports_not_removed;
2846
2847   clear_tr_cache (priv, TRUE);
2848   clear_tr_cache (priv, FALSE);
2849
2850   GST_INFO ("stream %p leaving bin", stream);
2851
2852   if (priv->srcpad) {
2853     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2854
2855     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2856     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2857     gst_object_unref (priv->send_rtp_sink);
2858     priv->send_rtp_sink = NULL;
2859   } else if (priv->recv_rtp_src) {
2860     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2861     gst_object_unref (priv->recv_rtp_src);
2862     priv->recv_rtp_src = NULL;
2863   }
2864
2865   for (i = 0; i < 2; i++) {
2866     clear_element (bin, &priv->udpsrc_v4[i]);
2867     clear_element (bin, &priv->udpsrc_v6[i]);
2868     clear_element (bin, &priv->udpqueue[i]);
2869     clear_element (bin, &priv->udpsink[i]);
2870
2871     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
2872     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
2873     clear_element (bin, &priv->mcast_udpqueue[i]);
2874     clear_element (bin, &priv->mcast_udpsink[i]);
2875
2876     clear_element (bin, &priv->appsrc[i]);
2877     clear_element (bin, &priv->appqueue[i]);
2878     clear_element (bin, &priv->appsink[i]);
2879
2880     clear_element (bin, &priv->tee[i]);
2881     clear_element (bin, &priv->funnel[i]);
2882
2883     if (priv->sinkpad || i == 1) {
2884       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2885       gst_object_unref (priv->recv_sink[i]);
2886       priv->recv_sink[i] = NULL;
2887     }
2888   }
2889
2890   if (priv->srcpad) {
2891     gst_object_unref (priv->send_src[0]);
2892     priv->send_src[0] = NULL;
2893   }
2894
2895   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2896   gst_object_unref (priv->send_src[1]);
2897   priv->send_src[1] = NULL;
2898
2899   g_object_unref (priv->session);
2900   priv->session = NULL;
2901   if (priv->caps)
2902     gst_caps_unref (priv->caps);
2903   priv->caps = NULL;
2904
2905   if (priv->srtpenc)
2906     gst_object_unref (priv->srtpenc);
2907   if (priv->srtpdec)
2908     gst_object_unref (priv->srtpdec);
2909
2910   if (priv->mcast_addr_v4)
2911     gst_rtsp_address_free (priv->mcast_addr_v4);
2912   priv->mcast_addr_v4 = NULL;
2913   if (priv->mcast_addr_v6)
2914     gst_rtsp_address_free (priv->mcast_addr_v6);
2915   priv->mcast_addr_v6 = NULL;
2916   if (priv->server_addr_v4)
2917     gst_rtsp_address_free (priv->server_addr_v4);
2918   priv->server_addr_v4 = NULL;
2919   if (priv->server_addr_v6)
2920     gst_rtsp_address_free (priv->server_addr_v6);
2921   priv->server_addr_v6 = NULL;
2922
2923   g_clear_object (&priv->joined_bin);
2924   g_mutex_unlock (&priv->lock);
2925
2926   return TRUE;
2927
2928 was_not_joined:
2929   {
2930     g_mutex_unlock (&priv->lock);
2931     return TRUE;
2932   }
2933 transports_not_removed:
2934   {
2935     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2936     g_mutex_unlock (&priv->lock);
2937     return FALSE;
2938   }
2939 wrong_bin:
2940   {
2941     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
2942     g_mutex_unlock (&priv->lock);
2943     return FALSE;
2944   }
2945 }
2946
2947 /**
2948  * gst_rtsp_stream_get_joined_bin:
2949  * @stream: a #GstRTSPStream
2950  *
2951  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
2952  *
2953  * Return: (transfer full): the joined bin or NULL.
2954  */
2955 GstBin *
2956 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
2957 {
2958   GstRTSPStreamPrivate *priv;
2959   GstBin *bin = NULL;
2960
2961   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2962
2963   priv = stream->priv;
2964
2965   g_mutex_lock (&priv->lock);
2966   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
2967   g_mutex_unlock (&priv->lock);
2968
2969   return bin;
2970 }
2971
2972 /**
2973  * gst_rtsp_stream_get_rtpinfo:
2974  * @stream: a #GstRTSPStream
2975  * @rtptime: (allow-none): result RTP timestamp
2976  * @seq: (allow-none): result RTP seqnum
2977  * @clock_rate: (allow-none): the clock rate
2978  * @running_time: (allow-none): result running-time
2979  *
2980  * Retrieve the current rtptime, seq and running-time. This is used to
2981  * construct a RTPInfo reply header.
2982  *
2983  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2984  */
2985 gboolean
2986 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2987     guint * rtptime, guint * seq, guint * clock_rate,
2988     GstClockTime * running_time)
2989 {
2990   GstRTSPStreamPrivate *priv;
2991   GstStructure *stats;
2992   GObjectClass *payobjclass;
2993
2994   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2995
2996   priv = stream->priv;
2997
2998   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2999
3000   g_mutex_lock (&priv->lock);
3001
3002   /* First try to extract the information from the last buffer on the sinks.
3003    * This will have a more accurate sequence number and timestamp, as between
3004    * the payloader and the sink there can be some queues
3005    */
3006   if (priv->udpsink[0] || priv->appsink[0]) {
3007     GstSample *last_sample;
3008
3009     if (priv->udpsink[0])
3010       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3011     else
3012       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3013
3014     if (last_sample) {
3015       GstCaps *caps;
3016       GstBuffer *buffer;
3017       GstSegment *segment;
3018       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3019
3020       caps = gst_sample_get_caps (last_sample);
3021       buffer = gst_sample_get_buffer (last_sample);
3022       segment = gst_sample_get_segment (last_sample);
3023
3024       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3025         if (seq) {
3026           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3027         }
3028
3029         if (rtptime) {
3030           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3031         }
3032
3033         gst_rtp_buffer_unmap (&rtp_buffer);
3034
3035         if (running_time) {
3036           *running_time =
3037               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3038               GST_BUFFER_TIMESTAMP (buffer));
3039         }
3040
3041         if (clock_rate) {
3042           GstStructure *s = gst_caps_get_structure (caps, 0);
3043
3044           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3045
3046           if (*clock_rate == 0 && running_time)
3047             *running_time = GST_CLOCK_TIME_NONE;
3048         }
3049         gst_sample_unref (last_sample);
3050
3051         goto done;
3052       } else {
3053         gst_sample_unref (last_sample);
3054       }
3055     }
3056   }
3057
3058   if (g_object_class_find_property (payobjclass, "stats")) {
3059     g_object_get (priv->payloader, "stats", &stats, NULL);
3060     if (stats == NULL)
3061       goto no_stats;
3062
3063     if (seq)
3064       gst_structure_get_uint (stats, "seqnum", seq);
3065
3066     if (rtptime)
3067       gst_structure_get_uint (stats, "timestamp", rtptime);
3068
3069     if (running_time)
3070       gst_structure_get_clock_time (stats, "running-time", running_time);
3071
3072     if (clock_rate) {
3073       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3074       if (*clock_rate == 0 && running_time)
3075         *running_time = GST_CLOCK_TIME_NONE;
3076     }
3077     gst_structure_free (stats);
3078   } else {
3079     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3080         !g_object_class_find_property (payobjclass, "timestamp"))
3081       goto no_stats;
3082
3083     if (seq)
3084       g_object_get (priv->payloader, "seqnum", seq, NULL);
3085
3086     if (rtptime)
3087       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3088
3089     if (running_time)
3090       *running_time = GST_CLOCK_TIME_NONE;
3091   }
3092
3093 done:
3094   g_mutex_unlock (&priv->lock);
3095
3096   return TRUE;
3097
3098   /* ERRORS */
3099 no_stats:
3100   {
3101     GST_WARNING ("Could not get payloader stats");
3102     g_mutex_unlock (&priv->lock);
3103     return FALSE;
3104   }
3105 }
3106
3107 /**
3108  * gst_rtsp_stream_get_caps:
3109  * @stream: a #GstRTSPStream
3110  *
3111  * Retrieve the current caps of @stream.
3112  *
3113  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3114  * after usage.
3115  */
3116 GstCaps *
3117 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3118 {
3119   GstRTSPStreamPrivate *priv;
3120   GstCaps *result;
3121
3122   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3123
3124   priv = stream->priv;
3125
3126   g_mutex_lock (&priv->lock);
3127   if ((result = priv->caps))
3128     gst_caps_ref (result);
3129   g_mutex_unlock (&priv->lock);
3130
3131   return result;
3132 }
3133
3134 /**
3135  * gst_rtsp_stream_recv_rtp:
3136  * @stream: a #GstRTSPStream
3137  * @buffer: (transfer full): a #GstBuffer
3138  *
3139  * Handle an RTP buffer for the stream. This method is usually called when a
3140  * message has been received from a client using the TCP transport.
3141  *
3142  * This function takes ownership of @buffer.
3143  *
3144  * Returns: a GstFlowReturn.
3145  */
3146 GstFlowReturn
3147 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3148 {
3149   GstRTSPStreamPrivate *priv;
3150   GstFlowReturn ret;
3151   GstElement *element;
3152
3153   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3154   priv = stream->priv;
3155   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3156   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3157
3158   g_mutex_lock (&priv->lock);
3159   if (priv->appsrc[0])
3160     element = gst_object_ref (priv->appsrc[0]);
3161   else
3162     element = NULL;
3163   g_mutex_unlock (&priv->lock);
3164
3165   if (element) {
3166     if (priv->appsrc_base_time[0] == -1) {
3167       /* Take current running_time. This timestamp will be put on
3168        * the first buffer of each stream because we are a live source and so we
3169        * timestamp with the running_time. When we are dealing with TCP, we also
3170        * only timestamp the first buffer (using the DISCONT flag) because a server
3171        * typically bursts data, for which we don't want to compensate by speeding
3172        * up the media. The other timestamps will be interpollated from this one
3173        * using the RTP timestamps. */
3174       GST_OBJECT_LOCK (element);
3175       if (GST_ELEMENT_CLOCK (element)) {
3176         GstClockTime now;
3177         GstClockTime base_time;
3178
3179         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3180         base_time = GST_ELEMENT_CAST (element)->base_time;
3181
3182         priv->appsrc_base_time[0] = now - base_time;
3183         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3184         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3185             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3186             GST_TIME_ARGS (base_time));
3187       }
3188       GST_OBJECT_UNLOCK (element);
3189     }
3190
3191     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3192     gst_object_unref (element);
3193   } else {
3194     ret = GST_FLOW_OK;
3195   }
3196   return ret;
3197 }
3198
3199 /**
3200  * gst_rtsp_stream_recv_rtcp:
3201  * @stream: a #GstRTSPStream
3202  * @buffer: (transfer full): a #GstBuffer
3203  *
3204  * Handle an RTCP buffer for the stream. This method is usually called when a
3205  * message has been received from a client using the TCP transport.
3206  *
3207  * This function takes ownership of @buffer.
3208  *
3209  * Returns: a GstFlowReturn.
3210  */
3211 GstFlowReturn
3212 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3213 {
3214   GstRTSPStreamPrivate *priv;
3215   GstFlowReturn ret;
3216   GstElement *element;
3217
3218   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3219   priv = stream->priv;
3220   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3221
3222   if (priv->joined_bin == NULL) {
3223     gst_buffer_unref (buffer);
3224     return GST_FLOW_NOT_LINKED;
3225   }
3226   g_mutex_lock (&priv->lock);
3227   if (priv->appsrc[1])
3228     element = gst_object_ref (priv->appsrc[1]);
3229   else
3230     element = NULL;
3231   g_mutex_unlock (&priv->lock);
3232
3233   if (element) {
3234     if (priv->appsrc_base_time[1] == -1) {
3235       /* Take current running_time. This timestamp will be put on
3236        * the first buffer of each stream because we are a live source and so we
3237        * timestamp with the running_time. When we are dealing with TCP, we also
3238        * only timestamp the first buffer (using the DISCONT flag) because a server
3239        * typically bursts data, for which we don't want to compensate by speeding
3240        * up the media. The other timestamps will be interpollated from this one
3241        * using the RTP timestamps. */
3242       GST_OBJECT_LOCK (element);
3243       if (GST_ELEMENT_CLOCK (element)) {
3244         GstClockTime now;
3245         GstClockTime base_time;
3246
3247         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3248         base_time = GST_ELEMENT_CAST (element)->base_time;
3249
3250         priv->appsrc_base_time[1] = now - base_time;
3251         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3252         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3253             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3254             GST_TIME_ARGS (base_time));
3255       }
3256       GST_OBJECT_UNLOCK (element);
3257     }
3258
3259     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3260     gst_object_unref (element);
3261   } else {
3262     ret = GST_FLOW_OK;
3263     gst_buffer_unref (buffer);
3264   }
3265   return ret;
3266 }
3267
3268 /* must be called with lock */
3269 static gboolean
3270 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3271     gboolean add)
3272 {
3273   GstRTSPStreamPrivate *priv = stream->priv;
3274   const GstRTSPTransport *tr;
3275
3276   tr = gst_rtsp_stream_transport_get_transport (trans);
3277
3278   switch (tr->lower_transport) {
3279     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3280     {
3281       if (add) {
3282         if (!check_mcast_part_for_transport (stream, tr))
3283           goto mcast_error;
3284         priv->transports = g_list_prepend (priv->transports, trans);
3285       } else {
3286         priv->transports = g_list_remove (priv->transports, trans);
3287       }
3288       break;
3289     }
3290     case GST_RTSP_LOWER_TRANS_UDP:
3291     {
3292       gchar *dest;
3293       gint min, max;
3294       guint ttl = 0;
3295
3296       dest = tr->destination;
3297       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3298         min = tr->port.min;
3299         max = tr->port.max;
3300         ttl = tr->ttl;
3301       } else if (priv->client_side) {
3302         /* In client side mode the 'destination' is the RTSP server, so send
3303          * to those ports */
3304         min = tr->server_port.min;
3305         max = tr->server_port.max;
3306       } else {
3307         min = tr->client_port.min;
3308         max = tr->client_port.max;
3309       }
3310
3311       if (add) {
3312         if (ttl > 0) {
3313           GST_INFO ("setting ttl-mc %d", ttl);
3314           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3315           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3316         }
3317         GST_INFO ("adding %s:%d-%d", dest, min, max);
3318         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3319         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3320         priv->transports = g_list_prepend (priv->transports, trans);
3321       } else {
3322         GST_INFO ("removing %s:%d-%d", dest, min, max);
3323         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3324         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3325         priv->transports = g_list_remove (priv->transports, trans);
3326       }
3327       priv->transports_cookie++;
3328       break;
3329     }
3330     case GST_RTSP_LOWER_TRANS_TCP:
3331       if (add) {
3332         GST_INFO ("adding TCP %s", tr->destination);
3333         priv->transports = g_list_prepend (priv->transports, trans);
3334       } else {
3335         GST_INFO ("removing TCP %s", tr->destination);
3336         priv->transports = g_list_remove (priv->transports, trans);
3337       }
3338       priv->transports_cookie++;
3339       break;
3340     default:
3341       goto unknown_transport;
3342   }
3343   return TRUE;
3344
3345   /* ERRORS */
3346 unknown_transport:
3347   {
3348     GST_INFO ("Unknown transport %d", tr->lower_transport);
3349     return FALSE;
3350   }
3351 mcast_error:
3352   {
3353     return FALSE;
3354   }
3355 }
3356
3357
3358 /**
3359  * gst_rtsp_stream_add_transport:
3360  * @stream: a #GstRTSPStream
3361  * @trans: (transfer none): a #GstRTSPStreamTransport
3362  *
3363  * Add the transport in @trans to @stream. The media of @stream will
3364  * then also be send to the values configured in @trans.
3365  *
3366  * @stream must be joined to a bin.
3367  *
3368  * @trans must contain a valid #GstRTSPTransport.
3369  *
3370  * Returns: %TRUE if @trans was added
3371  */
3372 gboolean
3373 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3374     GstRTSPStreamTransport * trans)
3375 {
3376   GstRTSPStreamPrivate *priv;
3377   gboolean res;
3378
3379   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3380   priv = stream->priv;
3381   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3382   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3383
3384   g_mutex_lock (&priv->lock);
3385   res = update_transport (stream, trans, TRUE);
3386   g_mutex_unlock (&priv->lock);
3387
3388   return res;
3389 }
3390
3391 /**
3392  * gst_rtsp_stream_remove_transport:
3393  * @stream: a #GstRTSPStream
3394  * @trans: (transfer none): a #GstRTSPStreamTransport
3395  *
3396  * Remove the transport in @trans from @stream. The media of @stream will
3397  * not be sent to the values configured in @trans.
3398  *
3399  * @stream must be joined to a bin.
3400  *
3401  * @trans must contain a valid #GstRTSPTransport.
3402  *
3403  * Returns: %TRUE if @trans was removed
3404  */
3405 gboolean
3406 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3407     GstRTSPStreamTransport * trans)
3408 {
3409   GstRTSPStreamPrivate *priv;
3410   gboolean res;
3411
3412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3413   priv = stream->priv;
3414   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3415   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3416
3417   g_mutex_lock (&priv->lock);
3418   res = update_transport (stream, trans, FALSE);
3419   g_mutex_unlock (&priv->lock);
3420
3421   return res;
3422 }
3423
3424 /**
3425  * gst_rtsp_stream_update_crypto:
3426  * @stream: a #GstRTSPStream
3427  * @ssrc: the SSRC
3428  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3429  *
3430  * Update the new crypto information for @ssrc in @stream. If information
3431  * for @ssrc did not exist, it will be added. If information
3432  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3433  * be removed from @stream.
3434  *
3435  * Returns: %TRUE if @crypto could be updated
3436  */
3437 gboolean
3438 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3439     guint ssrc, GstCaps * crypto)
3440 {
3441   GstRTSPStreamPrivate *priv;
3442
3443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3444   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3445
3446   priv = stream->priv;
3447
3448   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3449
3450   g_mutex_lock (&priv->lock);
3451   if (crypto)
3452     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3453         gst_caps_ref (crypto));
3454   else
3455     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3456   g_mutex_unlock (&priv->lock);
3457
3458   return TRUE;
3459 }
3460
3461 /**
3462  * gst_rtsp_stream_get_rtp_socket:
3463  * @stream: a #GstRTSPStream
3464  * @family: the socket family
3465  *
3466  * Get the RTP socket from @stream for a @family.
3467  *
3468  * @stream must be joined to a bin.
3469  *
3470  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3471  * socket could be allocated for @family. Unref after usage
3472  */
3473 GSocket *
3474 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3475 {
3476   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3477   GSocket *socket;
3478   const gchar *name;
3479
3480   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3481   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3482       family == G_SOCKET_FAMILY_IPV6, NULL);
3483   g_return_val_if_fail (priv->udpsink[0], NULL);
3484
3485   if (family == G_SOCKET_FAMILY_IPV6)
3486     name = "socket-v6";
3487   else
3488     name = "socket";
3489
3490   g_object_get (priv->udpsink[0], name, &socket, NULL);
3491
3492   return socket;
3493 }
3494
3495 /**
3496  * gst_rtsp_stream_get_rtcp_socket:
3497  * @stream: a #GstRTSPStream
3498  * @family: the socket family
3499  *
3500  * Get the RTCP socket from @stream for a @family.
3501  *
3502  * @stream must be joined to a bin.
3503  *
3504  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3505  * socket could be allocated for @family. Unref after usage
3506  */
3507 GSocket *
3508 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3509 {
3510   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3511   GSocket *socket;
3512   const gchar *name;
3513
3514   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3515   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3516       family == G_SOCKET_FAMILY_IPV6, NULL);
3517   g_return_val_if_fail (priv->udpsink[1], NULL);
3518
3519   if (family == G_SOCKET_FAMILY_IPV6)
3520     name = "socket-v6";
3521   else
3522     name = "socket";
3523
3524   g_object_get (priv->udpsink[1], name, &socket, NULL);
3525
3526   return socket;
3527 }
3528
3529 /**
3530  * gst_rtsp_stream_set_seqnum:
3531  * @stream: a #GstRTSPStream
3532  * @seqnum: a new sequence number
3533  *
3534  * Configure the sequence number in the payloader of @stream to @seqnum.
3535  */
3536 void
3537 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3538 {
3539   GstRTSPStreamPrivate *priv;
3540
3541   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3542
3543   priv = stream->priv;
3544
3545   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3546 }
3547
3548 /**
3549  * gst_rtsp_stream_get_seqnum:
3550  * @stream: a #GstRTSPStream
3551  *
3552  * Get the configured sequence number in the payloader of @stream.
3553  *
3554  * Returns: the sequence number of the payloader.
3555  */
3556 guint16
3557 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3558 {
3559   GstRTSPStreamPrivate *priv;
3560   guint seqnum;
3561
3562   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3563
3564   priv = stream->priv;
3565
3566   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3567
3568   return seqnum;
3569 }
3570
3571 /**
3572  * gst_rtsp_stream_transport_filter:
3573  * @stream: a #GstRTSPStream
3574  * @func: (scope call) (allow-none): a callback
3575  * @user_data: (closure): user data passed to @func
3576  *
3577  * Call @func for each transport managed by @stream. The result value of @func
3578  * determines what happens to the transport. @func will be called with @stream
3579  * locked so no further actions on @stream can be performed from @func.
3580  *
3581  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3582  * @stream.
3583  *
3584  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3585  *
3586  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3587  * will also be added with an additional ref to the result #GList of this
3588  * function..
3589  *
3590  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3591  *
3592  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3593  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3594  * element in the #GList should be unreffed before the list is freed.
3595  */
3596 GList *
3597 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3598     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3599 {
3600   GstRTSPStreamPrivate *priv;
3601   GList *result, *walk, *next;
3602   GHashTable *visited = NULL;
3603   guint cookie;
3604
3605   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3606
3607   priv = stream->priv;
3608
3609   result = NULL;
3610   if (func)
3611     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3612
3613   g_mutex_lock (&priv->lock);
3614 restart:
3615   cookie = priv->transports_cookie;
3616   for (walk = priv->transports; walk; walk = next) {
3617     GstRTSPStreamTransport *trans = walk->data;
3618     GstRTSPFilterResult res;
3619     gboolean changed;
3620
3621     next = g_list_next (walk);
3622
3623     if (func) {
3624       /* only visit each transport once */
3625       if (g_hash_table_contains (visited, trans))
3626         continue;
3627
3628       g_hash_table_add (visited, g_object_ref (trans));
3629       g_mutex_unlock (&priv->lock);
3630
3631       res = func (stream, trans, user_data);
3632
3633       g_mutex_lock (&priv->lock);
3634     } else
3635       res = GST_RTSP_FILTER_REF;
3636
3637     changed = (cookie != priv->transports_cookie);
3638
3639     switch (res) {
3640       case GST_RTSP_FILTER_REMOVE:
3641         update_transport (stream, trans, FALSE);
3642         break;
3643       case GST_RTSP_FILTER_REF:
3644         result = g_list_prepend (result, g_object_ref (trans));
3645         break;
3646       case GST_RTSP_FILTER_KEEP:
3647       default:
3648         break;
3649     }
3650     if (changed)
3651       goto restart;
3652   }
3653   g_mutex_unlock (&priv->lock);
3654
3655   if (func)
3656     g_hash_table_unref (visited);
3657
3658   return result;
3659 }
3660
3661 static GstPadProbeReturn
3662 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3663 {
3664   GstRTSPStreamPrivate *priv;
3665   GstRTSPStream *stream;
3666
3667   stream = user_data;
3668   priv = stream->priv;
3669
3670   GST_DEBUG_OBJECT (pad, "now blocking");
3671
3672   g_mutex_lock (&priv->lock);
3673   priv->blocking = TRUE;
3674   g_mutex_unlock (&priv->lock);
3675
3676   gst_element_post_message (priv->payloader,
3677       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3678           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3679
3680   return GST_PAD_PROBE_OK;
3681 }
3682
3683 /**
3684  * gst_rtsp_stream_set_blocked:
3685  * @stream: a #GstRTSPStream
3686  * @blocked: boolean indicating we should block or unblock
3687  *
3688  * Blocks or unblocks the dataflow on @stream.
3689  *
3690  * Returns: %TRUE on success
3691  */
3692 gboolean
3693 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3694 {
3695   GstRTSPStreamPrivate *priv;
3696
3697   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3698
3699   priv = stream->priv;
3700
3701   g_mutex_lock (&priv->lock);
3702   if (blocked) {
3703     priv->blocking = FALSE;
3704     if (priv->blocked_id == 0) {
3705       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3706           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3707           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3708           g_object_ref (stream), g_object_unref);
3709     }
3710   } else {
3711     if (priv->blocked_id != 0) {
3712       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3713       priv->blocked_id = 0;
3714       priv->blocking = FALSE;
3715     }
3716   }
3717   g_mutex_unlock (&priv->lock);
3718
3719   return TRUE;
3720 }
3721
3722 /**
3723  * gst_rtsp_stream_is_blocking:
3724  * @stream: a #GstRTSPStream
3725  *
3726  * Check if @stream is blocking on a #GstBuffer.
3727  *
3728  * Returns: %TRUE if @stream is blocking
3729  */
3730 gboolean
3731 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3732 {
3733   GstRTSPStreamPrivate *priv;
3734   gboolean result;
3735
3736   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3737
3738   priv = stream->priv;
3739
3740   g_mutex_lock (&priv->lock);
3741   result = priv->blocking;
3742   g_mutex_unlock (&priv->lock);
3743
3744   return result;
3745 }
3746
3747 /**
3748  * gst_rtsp_stream_query_position:
3749  * @stream: a #GstRTSPStream
3750  *
3751  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3752  * the RTP parts of the pipeline and not the RTCP parts.
3753  *
3754  * Returns: %TRUE if the position could be queried
3755  */
3756 gboolean
3757 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3758 {
3759   GstRTSPStreamPrivate *priv;
3760   GstElement *sink;
3761   gboolean ret;
3762
3763   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3764
3765   priv = stream->priv;
3766
3767   g_mutex_lock (&priv->lock);
3768   /* depending on the transport type, it should query corresponding sink */
3769   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3770       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3771     sink = priv->udpsink[0];
3772   else
3773     sink = priv->appsink[0];
3774
3775   if (sink)
3776     gst_object_ref (sink);
3777   g_mutex_unlock (&priv->lock);
3778
3779   if (!sink)
3780     return FALSE;
3781
3782   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3783   gst_object_unref (sink);
3784
3785   return ret;
3786 }
3787
3788 /**
3789  * gst_rtsp_stream_query_stop:
3790  * @stream: a #GstRTSPStream
3791  *
3792  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3793  * the RTP parts of the pipeline and not the RTCP parts.
3794  *
3795  * Returns: %TRUE if the stop could be queried
3796  */
3797 gboolean
3798 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3799 {
3800   GstRTSPStreamPrivate *priv;
3801   GstElement *sink;
3802   GstQuery *query;
3803   gboolean ret;
3804
3805   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3806
3807   priv = stream->priv;
3808
3809   g_mutex_lock (&priv->lock);
3810   /* depending on the transport type, it should query corresponding sink */
3811   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3812       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3813     sink = priv->udpsink[0];
3814   else
3815     sink = priv->appsink[0];
3816
3817   if (sink)
3818     gst_object_ref (sink);
3819   g_mutex_unlock (&priv->lock);
3820
3821   if (!sink)
3822     return FALSE;
3823
3824   query = gst_query_new_segment (GST_FORMAT_TIME);
3825   if ((ret = gst_element_query (sink, query))) {
3826     GstFormat format;
3827
3828     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3829     if (format != GST_FORMAT_TIME)
3830       *stop = -1;
3831   }
3832   gst_query_unref (query);
3833   gst_object_unref (sink);
3834
3835   return ret;
3836
3837 }