gi: Fix some annotations and docstrings
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102
103   /* for UDP multicast */
104   GstElement *mcast_udpsrc_v4[2];
105   GstElement *mcast_udpsrc_v6[2];
106   GstElement *mcast_udpqueue[2];
107   GstElement *mcast_udpsink[2];
108
109   /* for TCP transport */
110   GstElement *appsrc[2];
111   GstClockTime appsrc_base_time[2];
112   GstElement *appqueue[2];
113   GstElement *appsink[2];
114
115   GstElement *tee[2];
116   GstElement *funnel[2];
117
118   /* retransmission */
119   GstElement *rtxsend;
120   guint rtx_pt;
121   GstClockTime rtx_time;
122
123   /* pool used to manage unicast and multicast addresses */
124   GstRTSPAddressPool *pool;
125
126   /* unicast server addr/port */
127   GstRTSPAddress *server_addr_v4;
128   GstRTSPAddress *server_addr_v6;
129
130   /* multicast addresses */
131   GstRTSPAddress *mcast_addr_v4;
132   GstRTSPAddress *mcast_addr_v6;
133
134   gchar *multicast_iface;
135
136   /* the caps of the stream */
137   gulong caps_sig;
138   GstCaps *caps;
139
140   /* transports we stream to */
141   guint n_active;
142   GList *transports;
143   guint transports_cookie;
144   GList *tr_cache_rtp;
145   GList *tr_cache_rtcp;
146   guint tr_cache_cookie_rtp;
147   guint tr_cache_cookie_rtcp;
148
149   gint dscp_qos;
150
151   /* stream blocking */
152   gulong blocked_id[2];
153   gboolean blocking;
154
155   /* pt->caps map for RECORD streams */
156   GHashTable *ptmap;
157
158   GstRTSPPublishClockMode publish_clock_mode;
159 };
160
161 #define DEFAULT_CONTROL         NULL
162 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
163 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
164                                         GST_RTSP_LOWER_TRANS_TCP
165
166 enum
167 {
168   PROP_0,
169   PROP_CONTROL,
170   PROP_PROFILES,
171   PROP_PROTOCOLS,
172   PROP_LAST
173 };
174
175 enum
176 {
177   SIGNAL_NEW_RTP_ENCODER,
178   SIGNAL_NEW_RTCP_ENCODER,
179   SIGNAL_LAST
180 };
181
182 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
183 #define GST_CAT_DEFAULT rtsp_stream_debug
184
185 static GQuark ssrc_stream_map_key;
186
187 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
188     GValue * value, GParamSpec * pspec);
189 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
190     const GValue * value, GParamSpec * pspec);
191
192 static void gst_rtsp_stream_finalize (GObject * obj);
193
194 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
195
196 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
197
198 static void
199 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
200 {
201   GObjectClass *gobject_class;
202
203   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
204
205   gobject_class = G_OBJECT_CLASS (klass);
206
207   gobject_class->get_property = gst_rtsp_stream_get_property;
208   gobject_class->set_property = gst_rtsp_stream_set_property;
209   gobject_class->finalize = gst_rtsp_stream_finalize;
210
211   g_object_class_install_property (gobject_class, PROP_CONTROL,
212       g_param_spec_string ("control", "Control",
213           "The control string for this stream", DEFAULT_CONTROL,
214           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215
216   g_object_class_install_property (gobject_class, PROP_PROFILES,
217       g_param_spec_flags ("profiles", "Profiles",
218           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
219           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220
221   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
222       g_param_spec_flags ("protocols", "Protocols",
223           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
224           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
225
226   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
227       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
228       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
229       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
230
231   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
232       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
234       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
235
236   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
237
238   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
239 }
240
241 static void
242 gst_rtsp_stream_init (GstRTSPStream * stream)
243 {
244   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
245
246   GST_DEBUG ("new stream %p", stream);
247
248   stream->priv = priv;
249
250   priv->dscp_qos = -1;
251   priv->control = g_strdup (DEFAULT_CONTROL);
252   priv->profiles = DEFAULT_PROFILES;
253   priv->protocols = DEFAULT_PROTOCOLS;
254   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
255
256   g_mutex_init (&priv->lock);
257
258   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
259       NULL, (GDestroyNotify) gst_caps_unref);
260   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
261       (GDestroyNotify) gst_caps_unref);
262 }
263
264 static void
265 gst_rtsp_stream_finalize (GObject * obj)
266 {
267   GstRTSPStream *stream;
268   GstRTSPStreamPrivate *priv;
269
270   stream = GST_RTSP_STREAM (obj);
271   priv = stream->priv;
272
273   GST_DEBUG ("finalize stream %p", stream);
274
275   /* we really need to be unjoined now */
276   g_return_if_fail (priv->joined_bin == NULL);
277
278   if (priv->mcast_addr_v4)
279     gst_rtsp_address_free (priv->mcast_addr_v4);
280   if (priv->mcast_addr_v6)
281     gst_rtsp_address_free (priv->mcast_addr_v6);
282   if (priv->server_addr_v4)
283     gst_rtsp_address_free (priv->server_addr_v4);
284   if (priv->server_addr_v6)
285     gst_rtsp_address_free (priv->server_addr_v6);
286   if (priv->pool)
287     g_object_unref (priv->pool);
288   if (priv->rtxsend)
289     g_object_unref (priv->rtxsend);
290
291   g_free (priv->multicast_iface);
292
293   gst_object_unref (priv->payloader);
294   if (priv->srcpad)
295     gst_object_unref (priv->srcpad);
296   if (priv->sinkpad)
297     gst_object_unref (priv->sinkpad);
298   g_free (priv->control);
299   g_mutex_clear (&priv->lock);
300
301   g_hash_table_unref (priv->keys);
302   g_hash_table_destroy (priv->ptmap);
303
304   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
305 }
306
307 static void
308 gst_rtsp_stream_get_property (GObject * object, guint propid,
309     GValue * value, GParamSpec * pspec)
310 {
311   GstRTSPStream *stream = GST_RTSP_STREAM (object);
312
313   switch (propid) {
314     case PROP_CONTROL:
315       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
316       break;
317     case PROP_PROFILES:
318       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
319       break;
320     case PROP_PROTOCOLS:
321       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
325   }
326 }
327
328 static void
329 gst_rtsp_stream_set_property (GObject * object, guint propid,
330     const GValue * value, GParamSpec * pspec)
331 {
332   GstRTSPStream *stream = GST_RTSP_STREAM (object);
333
334   switch (propid) {
335     case PROP_CONTROL:
336       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
337       break;
338     case PROP_PROFILES:
339       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
340       break;
341     case PROP_PROTOCOLS:
342       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
343       break;
344     default:
345       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
346   }
347 }
348
349 /**
350  * gst_rtsp_stream_new:
351  * @idx: an index
352  * @pad: a #GstPad
353  * @payloader: a #GstElement
354  *
355  * Create a new media stream with index @idx that handles RTP data on
356  * @pad and has a payloader element @payloader if @pad is a source pad
357  * or a depayloader element @payloader if @pad is a sink pad.
358  *
359  * Returns: (transfer full): a new #GstRTSPStream
360  */
361 GstRTSPStream *
362 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
363 {
364   GstRTSPStreamPrivate *priv;
365   GstRTSPStream *stream;
366
367   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
368   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
369
370   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
371   priv = stream->priv;
372   priv->idx = idx;
373   priv->payloader = gst_object_ref (payloader);
374   if (GST_PAD_IS_SRC (pad))
375     priv->srcpad = gst_object_ref (pad);
376   else
377     priv->sinkpad = gst_object_ref (pad);
378
379   return stream;
380 }
381
382 /**
383  * gst_rtsp_stream_get_index:
384  * @stream: a #GstRTSPStream
385  *
386  * Get the stream index.
387  *
388  * Return: the stream index.
389  */
390 guint
391 gst_rtsp_stream_get_index (GstRTSPStream * stream)
392 {
393   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
394
395   return stream->priv->idx;
396 }
397
398 /**
399  * gst_rtsp_stream_get_pt:
400  * @stream: a #GstRTSPStream
401  *
402  * Get the stream payload type.
403  *
404  * Return: the stream payload type.
405  */
406 guint
407 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
408 {
409   GstRTSPStreamPrivate *priv;
410   guint pt;
411
412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
413
414   priv = stream->priv;
415
416   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
417
418   return pt;
419 }
420
421 /**
422  * gst_rtsp_stream_get_srcpad:
423  * @stream: a #GstRTSPStream
424  *
425  * Get the srcpad associated with @stream.
426  *
427  * Returns: (transfer full): the srcpad. Unref after usage.
428  */
429 GstPad *
430 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
431 {
432   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
433
434   if (!stream->priv->srcpad)
435     return NULL;
436
437   return gst_object_ref (stream->priv->srcpad);
438 }
439
440 /**
441  * gst_rtsp_stream_get_sinkpad:
442  * @stream: a #GstRTSPStream
443  *
444  * Get the sinkpad associated with @stream.
445  *
446  * Returns: (transfer full): the sinkpad. Unref after usage.
447  */
448 GstPad *
449 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
450 {
451   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
452
453   if (!stream->priv->sinkpad)
454     return NULL;
455
456   return gst_object_ref (stream->priv->sinkpad);
457 }
458
459 /**
460  * gst_rtsp_stream_get_control:
461  * @stream: a #GstRTSPStream
462  *
463  * Get the control string to identify this stream.
464  *
465  * Returns: (transfer full): the control string. g_free() after usage.
466  */
467 gchar *
468 gst_rtsp_stream_get_control (GstRTSPStream * stream)
469 {
470   GstRTSPStreamPrivate *priv;
471   gchar *result;
472
473   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
474
475   priv = stream->priv;
476
477   g_mutex_lock (&priv->lock);
478   if ((result = g_strdup (priv->control)) == NULL)
479     result = g_strdup_printf ("stream=%u", priv->idx);
480   g_mutex_unlock (&priv->lock);
481
482   return result;
483 }
484
485 /**
486  * gst_rtsp_stream_set_control:
487  * @stream: a #GstRTSPStream
488  * @control: a control string
489  *
490  * Set the control string in @stream.
491  */
492 void
493 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
494 {
495   GstRTSPStreamPrivate *priv;
496
497   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
498
499   priv = stream->priv;
500
501   g_mutex_lock (&priv->lock);
502   g_free (priv->control);
503   priv->control = g_strdup (control);
504   g_mutex_unlock (&priv->lock);
505 }
506
507 /**
508  * gst_rtsp_stream_has_control:
509  * @stream: a #GstRTSPStream
510  * @control: a control string
511  *
512  * Check if @stream has the control string @control.
513  *
514  * Returns: %TRUE is @stream has @control as the control string
515  */
516 gboolean
517 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
518 {
519   GstRTSPStreamPrivate *priv;
520   gboolean res;
521
522   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
523
524   priv = stream->priv;
525
526   g_mutex_lock (&priv->lock);
527   if (priv->control)
528     res = (g_strcmp0 (priv->control, control) == 0);
529   else {
530     guint streamid;
531
532     if (sscanf (control, "stream=%u", &streamid) > 0)
533       res = (streamid == priv->idx);
534     else
535       res = FALSE;
536   }
537   g_mutex_unlock (&priv->lock);
538
539   return res;
540 }
541
542 /**
543  * gst_rtsp_stream_set_mtu:
544  * @stream: a #GstRTSPStream
545  * @mtu: a new MTU
546  *
547  * Configure the mtu in the payloader of @stream to @mtu.
548  */
549 void
550 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
551 {
552   GstRTSPStreamPrivate *priv;
553
554   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
555
556   priv = stream->priv;
557
558   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
559
560   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
561 }
562
563 /**
564  * gst_rtsp_stream_get_mtu:
565  * @stream: a #GstRTSPStream
566  *
567  * Get the configured MTU in the payloader of @stream.
568  *
569  * Returns: the MTU of the payloader.
570  */
571 guint
572 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
573 {
574   GstRTSPStreamPrivate *priv;
575   guint mtu;
576
577   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
578
579   priv = stream->priv;
580
581   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
582
583   return mtu;
584 }
585
586 /* Update the dscp qos property on the udp sinks */
587 static void
588 update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2])
589 {
590   GstRTSPStreamPrivate *priv;
591
592   priv = stream->priv;
593
594   if (udpsink[0]) {
595     g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL);
596   }
597
598   if (udpsink[1]) {
599     g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL);
600   }
601 }
602
603 /**
604  * gst_rtsp_stream_set_dscp_qos:
605  * @stream: a #GstRTSPStream
606  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
607  *
608  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
609  */
610 void
611 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
612 {
613   GstRTSPStreamPrivate *priv;
614
615   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
616
617   priv = stream->priv;
618
619   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
620
621   if (dscp_qos < -1 || dscp_qos > 63) {
622     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
623     return;
624   }
625
626   priv->dscp_qos = dscp_qos;
627
628   update_dscp_qos (stream, priv->udpsink);
629 }
630
631 /**
632  * gst_rtsp_stream_get_dscp_qos:
633  * @stream: a #GstRTSPStream
634  *
635  * Get the configured DSCP QoS in of the outgoing sockets.
636  *
637  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
638  */
639 gint
640 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
641 {
642   GstRTSPStreamPrivate *priv;
643
644   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
645
646   priv = stream->priv;
647
648   return priv->dscp_qos;
649 }
650
651 /**
652  * gst_rtsp_stream_is_transport_supported:
653  * @stream: a #GstRTSPStream
654  * @transport: (transfer none): a #GstRTSPTransport
655  *
656  * Check if @transport can be handled by stream
657  *
658  * Returns: %TRUE if @transport can be handled by @stream.
659  */
660 gboolean
661 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
662     GstRTSPTransport * transport)
663 {
664   GstRTSPStreamPrivate *priv;
665
666   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
667
668   priv = stream->priv;
669
670   g_mutex_lock (&priv->lock);
671   if (transport->trans != GST_RTSP_TRANS_RTP)
672     goto unsupported_transmode;
673
674   if (!(transport->profile & priv->profiles))
675     goto unsupported_profile;
676
677   if (!(transport->lower_transport & priv->protocols))
678     goto unsupported_ltrans;
679
680   g_mutex_unlock (&priv->lock);
681
682   return TRUE;
683
684   /* ERRORS */
685 unsupported_transmode:
686   {
687     GST_DEBUG ("unsupported transport mode %d", transport->trans);
688     g_mutex_unlock (&priv->lock);
689     return FALSE;
690   }
691 unsupported_profile:
692   {
693     GST_DEBUG ("unsupported profile %d", transport->profile);
694     g_mutex_unlock (&priv->lock);
695     return FALSE;
696   }
697 unsupported_ltrans:
698   {
699     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
700     g_mutex_unlock (&priv->lock);
701     return FALSE;
702   }
703 }
704
705 /**
706  * gst_rtsp_stream_set_profiles:
707  * @stream: a #GstRTSPStream
708  * @profiles: the new profiles
709  *
710  * Configure the allowed profiles for @stream.
711  */
712 void
713 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
714 {
715   GstRTSPStreamPrivate *priv;
716
717   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
718
719   priv = stream->priv;
720
721   g_mutex_lock (&priv->lock);
722   priv->profiles = profiles;
723   g_mutex_unlock (&priv->lock);
724 }
725
726 /**
727  * gst_rtsp_stream_get_profiles:
728  * @stream: a #GstRTSPStream
729  *
730  * Get the allowed profiles of @stream.
731  *
732  * Returns: a #GstRTSPProfile
733  */
734 GstRTSPProfile
735 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
736 {
737   GstRTSPStreamPrivate *priv;
738   GstRTSPProfile res;
739
740   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
741
742   priv = stream->priv;
743
744   g_mutex_lock (&priv->lock);
745   res = priv->profiles;
746   g_mutex_unlock (&priv->lock);
747
748   return res;
749 }
750
751 /**
752  * gst_rtsp_stream_set_protocols:
753  * @stream: a #GstRTSPStream
754  * @protocols: the new flags
755  *
756  * Configure the allowed lower transport for @stream.
757  */
758 void
759 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
760     GstRTSPLowerTrans protocols)
761 {
762   GstRTSPStreamPrivate *priv;
763
764   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
765
766   priv = stream->priv;
767
768   g_mutex_lock (&priv->lock);
769   priv->protocols = protocols;
770   g_mutex_unlock (&priv->lock);
771 }
772
773 /**
774  * gst_rtsp_stream_get_protocols:
775  * @stream: a #GstRTSPStream
776  *
777  * Get the allowed protocols of @stream.
778  *
779  * Returns: a #GstRTSPLowerTrans
780  */
781 GstRTSPLowerTrans
782 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
783 {
784   GstRTSPStreamPrivate *priv;
785   GstRTSPLowerTrans res;
786
787   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
788       GST_RTSP_LOWER_TRANS_UNKNOWN);
789
790   priv = stream->priv;
791
792   g_mutex_lock (&priv->lock);
793   res = priv->protocols;
794   g_mutex_unlock (&priv->lock);
795
796   return res;
797 }
798
799 /**
800  * gst_rtsp_stream_set_address_pool:
801  * @stream: a #GstRTSPStream
802  * @pool: (transfer none): a #GstRTSPAddressPool
803  *
804  * configure @pool to be used as the address pool of @stream.
805  */
806 void
807 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
808     GstRTSPAddressPool * pool)
809 {
810   GstRTSPStreamPrivate *priv;
811   GstRTSPAddressPool *old;
812
813   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
814
815   priv = stream->priv;
816
817   GST_LOG_OBJECT (stream, "set address pool %p", pool);
818
819   g_mutex_lock (&priv->lock);
820   if ((old = priv->pool) != pool)
821     priv->pool = pool ? g_object_ref (pool) : NULL;
822   else
823     old = NULL;
824   g_mutex_unlock (&priv->lock);
825
826   if (old)
827     g_object_unref (old);
828 }
829
830 /**
831  * gst_rtsp_stream_get_address_pool:
832  * @stream: a #GstRTSPStream
833  *
834  * Get the #GstRTSPAddressPool used as the address pool of @stream.
835  *
836  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
837  * usage.
838  */
839 GstRTSPAddressPool *
840 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
841 {
842   GstRTSPStreamPrivate *priv;
843   GstRTSPAddressPool *result;
844
845   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
846
847   priv = stream->priv;
848
849   g_mutex_lock (&priv->lock);
850   if ((result = priv->pool))
851     g_object_ref (result);
852   g_mutex_unlock (&priv->lock);
853
854   return result;
855 }
856
857 /**
858  * gst_rtsp_stream_set_multicast_iface:
859  * @stream: a #GstRTSPStream
860  * @multicast_iface: (transfer none): a multicast interface name
861  *
862  * configure @multicast_iface to be used for @stream.
863  */
864 void
865 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
866     const gchar * multicast_iface)
867 {
868   GstRTSPStreamPrivate *priv;
869   gchar *old;
870
871   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
872
873   priv = stream->priv;
874
875   GST_LOG_OBJECT (stream, "set multicast iface %s",
876       GST_STR_NULL (multicast_iface));
877
878   g_mutex_lock (&priv->lock);
879   if ((old = priv->multicast_iface) != multicast_iface)
880     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
881   else
882     old = NULL;
883   g_mutex_unlock (&priv->lock);
884
885   if (old)
886     g_free (old);
887 }
888
889 /**
890  * gst_rtsp_stream_get_multicast_iface:
891  * @stream: a #GstRTSPStream
892  *
893  * Get the multicast interface used for @stream.
894  *
895  * Returns: (transfer full): the multicast interface for @stream. g_free() after
896  * usage.
897  */
898 gchar *
899 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
900 {
901   GstRTSPStreamPrivate *priv;
902   gchar *result;
903
904   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
905
906   priv = stream->priv;
907
908   g_mutex_lock (&priv->lock);
909   if ((result = priv->multicast_iface))
910     result = g_strdup (result);
911   g_mutex_unlock (&priv->lock);
912
913   return result;
914 }
915
916 /**
917  * gst_rtsp_stream_get_multicast_address:
918  * @stream: a #GstRTSPStream
919  * @family: the #GSocketFamily
920  *
921  * Get the multicast address of @stream for @family. The original
922  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
923  * won't release the address from the pool.
924  *
925  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
926  * or %NULL when no address could be allocated. gst_rtsp_address_free()
927  * after usage.
928  */
929 GstRTSPAddress *
930 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
931     GSocketFamily family)
932 {
933   GstRTSPStreamPrivate *priv;
934   GstRTSPAddress *result;
935   GstRTSPAddress **addrp;
936   GstRTSPAddressFlags flags;
937
938   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
939
940   priv = stream->priv;
941
942   g_mutex_lock (&stream->priv->lock);
943
944   if (family == G_SOCKET_FAMILY_IPV6) {
945     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
946     addrp = &priv->mcast_addr_v6;
947   } else {
948     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
949     addrp = &priv->mcast_addr_v4;
950   }
951
952   if (*addrp == NULL) {
953     if (priv->pool == NULL)
954       goto no_pool;
955
956     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
957
958     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
959     if (*addrp == NULL)
960       goto no_address;
961
962     /* FIXME: Also reserve the same port with unicast ANY address, since that's
963      * where we are going to bind our socket. Probably loop until we find a port
964      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
965      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
966      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
967   }
968   result = gst_rtsp_address_copy (*addrp);
969
970   g_mutex_unlock (&stream->priv->lock);
971
972   return result;
973
974   /* ERRORS */
975 no_pool:
976   {
977     GST_ERROR_OBJECT (stream, "no address pool specified");
978     g_mutex_unlock (&stream->priv->lock);
979     return NULL;
980   }
981 no_address:
982   {
983     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
984     g_mutex_unlock (&stream->priv->lock);
985     return NULL;
986   }
987 }
988
989 /**
990  * gst_rtsp_stream_reserve_address:
991  * @stream: a #GstRTSPStream
992  * @address: an address
993  * @port: a port
994  * @n_ports: n_ports
995  * @ttl: a TTL
996  *
997  * Reserve @address and @port as the address and port of @stream. The original
998  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
999  * won't release the address from the pool.
1000  *
1001  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1002  * the address could be reserved. gst_rtsp_address_free() after usage.
1003  */
1004 GstRTSPAddress *
1005 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1006     const gchar * address, guint port, guint n_ports, guint ttl)
1007 {
1008   GstRTSPStreamPrivate *priv;
1009   GstRTSPAddress *result;
1010   GInetAddress *addr;
1011   GSocketFamily family;
1012   GstRTSPAddress **addrp;
1013
1014   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1015   g_return_val_if_fail (address != NULL, NULL);
1016   g_return_val_if_fail (port > 0, NULL);
1017   g_return_val_if_fail (n_ports > 0, NULL);
1018   g_return_val_if_fail (ttl > 0, NULL);
1019
1020   priv = stream->priv;
1021
1022   addr = g_inet_address_new_from_string (address);
1023   if (!addr) {
1024     GST_ERROR ("failed to get inet addr from %s", address);
1025     family = G_SOCKET_FAMILY_IPV4;
1026   } else {
1027     family = g_inet_address_get_family (addr);
1028     g_object_unref (addr);
1029   }
1030
1031   if (family == G_SOCKET_FAMILY_IPV6)
1032     addrp = &priv->mcast_addr_v6;
1033   else
1034     addrp = &priv->mcast_addr_v4;
1035
1036   g_mutex_lock (&priv->lock);
1037   if (*addrp == NULL) {
1038     GstRTSPAddressPoolResult res;
1039
1040     if (priv->pool == NULL)
1041       goto no_pool;
1042
1043     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1044         port, n_ports, ttl, addrp);
1045     if (res != GST_RTSP_ADDRESS_POOL_OK)
1046       goto no_address;
1047
1048     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1049      * where we are going to bind our socket. */
1050   } else {
1051     if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1052         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1053         (*addrp)->ttl != ttl)
1054       goto different_address;
1055   }
1056   result = gst_rtsp_address_copy (*addrp);
1057   g_mutex_unlock (&priv->lock);
1058
1059   return result;
1060
1061   /* ERRORS */
1062 no_pool:
1063   {
1064     GST_ERROR_OBJECT (stream, "no address pool specified");
1065     g_mutex_unlock (&priv->lock);
1066     return NULL;
1067   }
1068 no_address:
1069   {
1070     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1071         address);
1072     g_mutex_unlock (&priv->lock);
1073     return NULL;
1074   }
1075 different_address:
1076   {
1077     GST_ERROR_OBJECT (stream,
1078         "address %s is not the same as %s that was already reserved",
1079         address, (*addrp)->address);
1080     g_mutex_unlock (&priv->lock);
1081     return NULL;
1082   }
1083 }
1084
1085 /* must be called with lock */
1086 static void
1087 set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket,
1088     GSocket * rtcp_socket, GSocketFamily family)
1089 {
1090   const gchar *multisink_socket;
1091
1092   if (family == G_SOCKET_FAMILY_IPV6)
1093     multisink_socket = "socket-v6";
1094   else
1095     multisink_socket = "socket";
1096
1097   g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL);
1098   g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL);
1099 }
1100
1101 static gboolean
1102 create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2])
1103 {
1104   GstRTSPStreamPrivate *priv = stream->priv;
1105   GstElement *udpsink0, *udpsink1;
1106
1107   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1108   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1109
1110   if (!udpsink0 || !udpsink1)
1111     goto no_udp_protocol;
1112
1113   /* configure sinks */
1114
1115   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1116   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1117
1118   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1119   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1120
1121   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1122
1123   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1124   /* Needs to be async for RECORD streams, otherwise we will never go to
1125    * PLAYING because the sinks will wait for data while the udpsrc can't
1126    * provide data with timestamps in PAUSED. */
1127   if (priv->sinkpad)
1128     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1129   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1130
1131   /* join multicast group when adding clients, so we'll start receiving from it.
1132    * We cannot rely on the udpsrc to join the group since its socket is always a
1133    * local unicast one. */
1134   g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL);
1135   g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL);
1136
1137   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1138   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1139
1140   udpsink[0] = udpsink0;
1141   udpsink[1] = udpsink1;
1142
1143   /* update the dscp qos field in the sinks */
1144   update_dscp_qos (stream, udpsink);
1145
1146   return TRUE;
1147
1148   /* ERRORS */
1149 no_udp_protocol:
1150   {
1151     return FALSE;
1152   }
1153 }
1154
1155 /* must be called with lock */
1156 static gboolean
1157 create_and_configure_udpsources (GstElement * udpsrc_out[2],
1158     GSocket * rtp_socket, GSocket * rtcp_socket)
1159 {
1160   GstStateChangeReturn ret;
1161
1162   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1163   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1164
1165   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1166     goto error;
1167
1168   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1169   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1170
1171   /* The udpsrc cannot do the join because its socket is always a local unicast
1172    * one. The udpsink sharing the same socket will do it for us. */
1173   g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL);
1174   g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL);
1175
1176   g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1177   g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1178
1179   g_object_set (G_OBJECT (udpsrc_out[0]), "close-socket", FALSE, NULL);
1180   g_object_set (G_OBJECT (udpsrc_out[1]), "close-socket", FALSE, NULL);
1181
1182   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1183   if (ret == GST_STATE_CHANGE_FAILURE)
1184     goto error;
1185   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1186   if (ret == GST_STATE_CHANGE_FAILURE)
1187     goto error;
1188
1189   return TRUE;
1190
1191   /* ERRORS */
1192 error:
1193   {
1194     if (udpsrc_out[0]) {
1195       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1196       g_clear_object (&udpsrc_out[0]);
1197     }
1198     if (udpsrc_out[1]) {
1199       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1200       g_clear_object (&udpsrc_out[1]);
1201     }
1202     return FALSE;
1203   }
1204 }
1205
1206 static gboolean
1207 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1208     GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
1209     GstRTSPAddress ** server_addr_out, gboolean multicast)
1210 {
1211   GstRTSPStreamPrivate *priv = stream->priv;
1212   GSocket *rtp_socket = NULL;
1213   GSocket *rtcp_socket;
1214   gint tmp_rtp, tmp_rtcp;
1215   guint count;
1216   gint rtpport, rtcpport;
1217   GList *rejected_addresses = NULL;
1218   GstRTSPAddress *addr = NULL;
1219   GInetAddress *inetaddr = NULL;
1220   gchar *addr_str;
1221   GSocketAddress *rtp_sockaddr = NULL;
1222   GSocketAddress *rtcp_sockaddr = NULL;
1223   GstRTSPAddressPool *pool;
1224
1225   g_assert (!udpsrc_out[0]);
1226   g_assert (!udpsrc_out[1]);
1227   g_assert ((!udpsink_out[0] && !udpsink_out[1]) ||
1228       (udpsink_out[0] && udpsink_out[1]));
1229   g_assert (*server_addr_out == NULL);
1230
1231   pool = priv->pool;
1232   count = 0;
1233
1234   /* Start with random port */
1235   tmp_rtp = 0;
1236
1237   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1238       G_SOCKET_PROTOCOL_UDP, NULL);
1239   if (!rtcp_socket)
1240     goto no_udp_protocol;
1241   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1242
1243   /* try to allocate 2 UDP ports, the RTP port should be an even
1244    * number and the RTCP port should be the next (uneven) port */
1245 again:
1246
1247   if (rtp_socket == NULL) {
1248     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1249         G_SOCKET_PROTOCOL_UDP, NULL);
1250     if (!rtp_socket)
1251       goto no_udp_protocol;
1252     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1253   }
1254
1255   if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1256     GstRTSPAddressFlags flags;
1257
1258     if (addr)
1259       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1260
1261     if (!pool)
1262       goto no_ports;
1263
1264     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1265     if (multicast)
1266       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1267     else
1268       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1269
1270     if (family == G_SOCKET_FAMILY_IPV6)
1271       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1272     else
1273       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1274
1275     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1276
1277     if (addr == NULL)
1278       goto no_ports;
1279
1280     tmp_rtp = addr->port;
1281
1282     g_clear_object (&inetaddr);
1283     if (multicast)
1284       inetaddr = g_inet_address_new_any (family);
1285     else
1286       inetaddr = g_inet_address_new_from_string (addr->address);
1287   } else {
1288     if (tmp_rtp != 0) {
1289       tmp_rtp += 2;
1290       if (++count > 20)
1291         goto no_ports;
1292     }
1293
1294     if (inetaddr == NULL)
1295       inetaddr = g_inet_address_new_any (family);
1296   }
1297
1298   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1299   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1300     g_object_unref (rtp_sockaddr);
1301     goto again;
1302   }
1303   g_object_unref (rtp_sockaddr);
1304
1305   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1306   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1307     g_clear_object (&rtp_sockaddr);
1308     goto socket_error;
1309   }
1310
1311   tmp_rtp =
1312       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1313   g_object_unref (rtp_sockaddr);
1314
1315   /* check if port is even */
1316   if ((tmp_rtp & 1) != 0) {
1317     /* port not even, close and allocate another */
1318     tmp_rtp++;
1319     g_clear_object (&rtp_socket);
1320     goto again;
1321   }
1322
1323   /* set port */
1324   tmp_rtcp = tmp_rtp + 1;
1325
1326   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1327   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1328     g_object_unref (rtcp_sockaddr);
1329     g_clear_object (&rtp_socket);
1330     goto again;
1331   }
1332   g_object_unref (rtcp_sockaddr);
1333
1334   if (!addr) {
1335     addr = g_slice_new0 (GstRTSPAddress);
1336     addr->address = g_inet_address_to_string (inetaddr);
1337     addr->port = tmp_rtp;
1338     addr->n_ports = 2;
1339   }
1340
1341   addr_str = addr->address;
1342   g_clear_object (&inetaddr);
1343
1344   if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
1345     goto no_udp_protocol;
1346   }
1347
1348   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1349   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1350
1351   /* this should not happen... */
1352   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1353     goto port_error;
1354
1355   /* This function is called twice (for v4 and v6) but we create only one pair
1356    * of udpsinks. */
1357   if (!udpsink_out[0]
1358       && !create_and_configure_udpsinks (stream, udpsink_out))
1359     goto no_udp_protocol;
1360
1361   if (multicast) {
1362     g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface",
1363         priv->multicast_iface, NULL);
1364     g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface",
1365         priv->multicast_iface, NULL);
1366
1367     g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL);
1368     g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL);
1369   }
1370
1371   set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
1372
1373   *server_addr_out = addr;
1374
1375   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1376
1377   g_object_unref (rtp_socket);
1378   g_object_unref (rtcp_socket);
1379
1380   return TRUE;
1381
1382   /* ERRORS */
1383 no_udp_protocol:
1384   {
1385     goto cleanup;
1386   }
1387 no_ports:
1388   {
1389     goto cleanup;
1390   }
1391 port_error:
1392   {
1393     goto cleanup;
1394   }
1395 socket_error:
1396   {
1397     goto cleanup;
1398   }
1399 cleanup:
1400   {
1401     if (inetaddr)
1402       g_object_unref (inetaddr);
1403     g_list_free_full (rejected_addresses,
1404         (GDestroyNotify) gst_rtsp_address_free);
1405     if (addr)
1406       gst_rtsp_address_free (addr);
1407     if (rtp_socket)
1408       g_object_unref (rtp_socket);
1409     if (rtcp_socket)
1410       g_object_unref (rtcp_socket);
1411     return FALSE;
1412   }
1413 }
1414
1415 /**
1416  * gst_rtsp_stream_allocate_udp_sockets:
1417  * @stream: a #GstRTSPStream
1418  * @family: protocol family
1419  * @transport: transport method
1420  * @use_client_setttings: Whether to use client settings or not
1421  *
1422  * Allocates RTP and RTCP ports.
1423  *
1424  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1425  * Deprecated: This function shouldn't have been made public
1426  */
1427 gboolean
1428 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1429     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1430 {
1431   g_warn_if_reached ();
1432   return FALSE;
1433 }
1434
1435 /**
1436  * gst_rtsp_stream_set_client_side:
1437  * @stream: a #GstRTSPStream
1438  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1439  * an RTSP connection.
1440  *
1441  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1442  * streams to an RTSP server via RECORD. This has the practical effect
1443  * of changing which UDP port numbers are used when setting up the local
1444  * side of the stream sending to be either the 'server' or 'client' pair
1445  * of a configured UDP transport.
1446  */
1447 void
1448 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1449 {
1450   GstRTSPStreamPrivate *priv;
1451
1452   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1453   priv = stream->priv;
1454   g_mutex_lock (&priv->lock);
1455   priv->client_side = client_side;
1456   g_mutex_unlock (&priv->lock);
1457 }
1458
1459 /**
1460  * gst_rtsp_stream_is_client_side:
1461  * @stream: a #GstRTSPStream
1462  *
1463  * See gst_rtsp_stream_set_client_side()
1464  *
1465  * Returns: TRUE if this #GstRTSPStream is client-side.
1466  */
1467 gboolean
1468 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1469 {
1470   GstRTSPStreamPrivate *priv;
1471   gboolean ret;
1472
1473   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1474
1475   priv = stream->priv;
1476   g_mutex_lock (&priv->lock);
1477   ret = priv->client_side;
1478   g_mutex_unlock (&priv->lock);
1479
1480   return ret;
1481 }
1482
1483 /* must be called with lock */
1484 static gboolean
1485 alloc_ports (GstRTSPStream * stream)
1486 {
1487   GstRTSPStreamPrivate *priv = stream->priv;
1488   gboolean ret = TRUE;
1489
1490   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) {
1491     ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1492         priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE);
1493
1494     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1495         priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE);
1496   }
1497
1498   /* FIXME: Maybe actually consider the return values? */
1499   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1500     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1501         priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE);
1502
1503     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1504         priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE);
1505   }
1506
1507   return ret;
1508 }
1509
1510 /**
1511  * gst_rtsp_stream_get_server_port:
1512  * @stream: a #GstRTSPStream
1513  * @server_port: (out): result server port
1514  * @family: the port family to get
1515  *
1516  * Fill @server_port with the port pair used by the server. This function can
1517  * only be called when @stream has been joined.
1518  */
1519 void
1520 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1521     GstRTSPRange * server_port, GSocketFamily family)
1522 {
1523   GstRTSPStreamPrivate *priv;
1524
1525   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1526   priv = stream->priv;
1527   g_return_if_fail (priv->joined_bin != NULL);
1528
1529   if (server_port) {
1530     server_port->min = 0;
1531     server_port->max = 0;
1532   }
1533
1534   g_mutex_lock (&priv->lock);
1535   if (family == G_SOCKET_FAMILY_IPV4) {
1536     if (server_port && priv->server_addr_v4) {
1537       server_port->min = priv->server_addr_v4->port;
1538       server_port->max =
1539           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1540     }
1541   } else {
1542     if (server_port && priv->server_addr_v6) {
1543       server_port->min = priv->server_addr_v6->port;
1544       server_port->max =
1545           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1546     }
1547   }
1548   g_mutex_unlock (&priv->lock);
1549 }
1550
1551 /**
1552  * gst_rtsp_stream_get_rtpsession:
1553  * @stream: a #GstRTSPStream
1554  *
1555  * Get the RTP session of this stream.
1556  *
1557  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1558  */
1559 GObject *
1560 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1561 {
1562   GstRTSPStreamPrivate *priv;
1563   GObject *session;
1564
1565   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1566
1567   priv = stream->priv;
1568
1569   g_mutex_lock (&priv->lock);
1570   if ((session = priv->session))
1571     g_object_ref (session);
1572   g_mutex_unlock (&priv->lock);
1573
1574   return session;
1575 }
1576
1577 /**
1578  * gst_rtsp_stream_get_srtp_encoder:
1579  * @stream: a #GstRTSPStream
1580  *
1581  * Get the SRTP encoder for this stream.
1582  *
1583  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1584  */
1585 GstElement *
1586 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1587 {
1588   GstRTSPStreamPrivate *priv;
1589   GstElement *encoder;
1590
1591   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1592
1593   priv = stream->priv;
1594
1595   g_mutex_lock (&priv->lock);
1596   if ((encoder = priv->srtpenc))
1597     g_object_ref (encoder);
1598   g_mutex_unlock (&priv->lock);
1599
1600   return encoder;
1601 }
1602
1603 /**
1604  * gst_rtsp_stream_get_ssrc:
1605  * @stream: a #GstRTSPStream
1606  * @ssrc: (out): result ssrc
1607  *
1608  * Get the SSRC used by the RTP session of this stream. This function can only
1609  * be called when @stream has been joined.
1610  */
1611 void
1612 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1613 {
1614   GstRTSPStreamPrivate *priv;
1615
1616   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1617   priv = stream->priv;
1618   g_return_if_fail (priv->joined_bin != NULL);
1619
1620   g_mutex_lock (&priv->lock);
1621   if (ssrc && priv->session)
1622     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1623   g_mutex_unlock (&priv->lock);
1624 }
1625
1626 /**
1627  * gst_rtsp_stream_set_retransmission_time:
1628  * @stream: a #GstRTSPStream
1629  * @time: a #GstClockTime
1630  *
1631  * Set the amount of time to store retransmission packets.
1632  */
1633 void
1634 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1635     GstClockTime time)
1636 {
1637   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1638
1639   g_mutex_lock (&stream->priv->lock);
1640   stream->priv->rtx_time = time;
1641   if (stream->priv->rtxsend)
1642     g_object_set (stream->priv->rtxsend, "max-size-time",
1643         GST_TIME_AS_MSECONDS (time), NULL);
1644   g_mutex_unlock (&stream->priv->lock);
1645 }
1646
1647 /**
1648  * gst_rtsp_stream_get_retransmission_time:
1649  * @stream: a #GstRTSPStream
1650  *
1651  * Get the amount of time to store retransmission data.
1652  *
1653  * Returns: the amount of time to store retransmission data.
1654  */
1655 GstClockTime
1656 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1657 {
1658   GstClockTime ret;
1659
1660   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1661
1662   g_mutex_lock (&stream->priv->lock);
1663   ret = stream->priv->rtx_time;
1664   g_mutex_unlock (&stream->priv->lock);
1665
1666   return ret;
1667 }
1668
1669 /**
1670  * gst_rtsp_stream_set_retransmission_pt:
1671  * @stream: a #GstRTSPStream
1672  * @rtx_pt: a #guint
1673  *
1674  * Set the payload type (pt) for retransmission of this stream.
1675  */
1676 void
1677 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1678 {
1679   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1680
1681   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1682
1683   g_mutex_lock (&stream->priv->lock);
1684   stream->priv->rtx_pt = rtx_pt;
1685   if (stream->priv->rtxsend) {
1686     guint pt = gst_rtsp_stream_get_pt (stream);
1687     gchar *pt_s = g_strdup_printf ("%d", pt);
1688     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1689         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1690     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1691     g_free (pt_s);
1692     gst_structure_free (rtx_pt_map);
1693   }
1694   g_mutex_unlock (&stream->priv->lock);
1695 }
1696
1697 /**
1698  * gst_rtsp_stream_get_retransmission_pt:
1699  * @stream: a #GstRTSPStream
1700  *
1701  * Get the payload-type used for retransmission of this stream
1702  *
1703  * Returns: The retransmission PT.
1704  */
1705 guint
1706 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1707 {
1708   guint rtx_pt;
1709
1710   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1711
1712   g_mutex_lock (&stream->priv->lock);
1713   rtx_pt = stream->priv->rtx_pt;
1714   g_mutex_unlock (&stream->priv->lock);
1715
1716   return rtx_pt;
1717 }
1718
1719 /**
1720  * gst_rtsp_stream_set_buffer_size:
1721  * @stream: a #GstRTSPStream
1722  * @size: the buffer size
1723  *
1724  * Set the size of the UDP transmission buffer (in bytes)
1725  * Needs to be set before the stream is joined to a bin.
1726  *
1727  * Since: 1.6
1728  */
1729 void
1730 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1731 {
1732   g_mutex_lock (&stream->priv->lock);
1733   stream->priv->buffer_size = size;
1734   g_mutex_unlock (&stream->priv->lock);
1735 }
1736
1737 /**
1738  * gst_rtsp_stream_get_buffer_size:
1739  * @stream: a #GstRTSPStream
1740  *
1741  * Get the size of the UDP transmission buffer (in bytes)
1742  *
1743  * Returns: the size of the UDP TX buffer
1744  *
1745  * Since: 1.6
1746  */
1747 guint
1748 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1749 {
1750   guint buffer_size;
1751
1752   g_mutex_lock (&stream->priv->lock);
1753   buffer_size = stream->priv->buffer_size;
1754   g_mutex_unlock (&stream->priv->lock);
1755
1756   return buffer_size;
1757 }
1758
1759 /* executed from streaming thread */
1760 static void
1761 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1762 {
1763   GstRTSPStreamPrivate *priv = stream->priv;
1764   GstCaps *newcaps, *oldcaps;
1765
1766   newcaps = gst_pad_get_current_caps (pad);
1767
1768   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1769       newcaps);
1770
1771   g_mutex_lock (&priv->lock);
1772   oldcaps = priv->caps;
1773   priv->caps = newcaps;
1774   g_mutex_unlock (&priv->lock);
1775
1776   if (oldcaps)
1777     gst_caps_unref (oldcaps);
1778 }
1779
1780 static void
1781 dump_structure (const GstStructure * s)
1782 {
1783   gchar *sstr;
1784
1785   sstr = gst_structure_to_string (s);
1786   GST_INFO ("structure: %s", sstr);
1787   g_free (sstr);
1788 }
1789
1790 static GstRTSPStreamTransport *
1791 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1792 {
1793   GstRTSPStreamPrivate *priv = stream->priv;
1794   GList *walk;
1795   GstRTSPStreamTransport *result = NULL;
1796   const gchar *tmp;
1797   gchar *dest;
1798   guint port;
1799
1800   if (rtcp_from == NULL)
1801     return NULL;
1802
1803   tmp = g_strrstr (rtcp_from, ":");
1804   if (tmp == NULL)
1805     return NULL;
1806
1807   port = atoi (tmp + 1);
1808   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1809
1810   g_mutex_lock (&priv->lock);
1811   GST_INFO ("finding %s:%d in %d transports", dest, port,
1812       g_list_length (priv->transports));
1813
1814   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1815     GstRTSPStreamTransport *trans = walk->data;
1816     const GstRTSPTransport *tr;
1817     gint min, max;
1818
1819     tr = gst_rtsp_stream_transport_get_transport (trans);
1820
1821     if (priv->client_side) {
1822       /* In client side mode the 'destination' is the RTSP server, so send
1823        * to those ports */
1824       min = tr->server_port.min;
1825       max = tr->server_port.max;
1826     } else {
1827       min = tr->client_port.min;
1828       max = tr->client_port.max;
1829     }
1830
1831     if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
1832         (min == port || max == port)) {
1833       result = trans;
1834       break;
1835     }
1836   }
1837   if (result)
1838     g_object_ref (result);
1839   g_mutex_unlock (&priv->lock);
1840
1841   g_free (dest);
1842
1843   return result;
1844 }
1845
1846 static GstRTSPStreamTransport *
1847 check_transport (GObject * source, GstRTSPStream * stream)
1848 {
1849   GstStructure *stats;
1850   GstRTSPStreamTransport *trans;
1851
1852   /* see if we have a stream to match with the origin of the RTCP packet */
1853   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1854   if (trans == NULL) {
1855     g_object_get (source, "stats", &stats, NULL);
1856     if (stats) {
1857       const gchar *rtcp_from;
1858
1859       dump_structure (stats);
1860
1861       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1862       if ((trans = find_transport (stream, rtcp_from))) {
1863         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1864             source);
1865         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1866             g_object_unref);
1867       }
1868       gst_structure_free (stats);
1869     }
1870   }
1871   return trans;
1872 }
1873
1874
1875 static void
1876 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1877 {
1878   GstRTSPStreamTransport *trans;
1879
1880   GST_INFO ("%p: new source %p", stream, source);
1881
1882   trans = check_transport (source, stream);
1883
1884   if (trans)
1885     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1886 }
1887
1888 static void
1889 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1890 {
1891   GST_INFO ("%p: new SDES %p", stream, source);
1892 }
1893
1894 static void
1895 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1896 {
1897   GstRTSPStreamTransport *trans;
1898
1899   trans = check_transport (source, stream);
1900
1901   if (trans) {
1902     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1903     gst_rtsp_stream_transport_keep_alive (trans);
1904   }
1905 #ifdef DUMP_STATS
1906   {
1907     GstStructure *stats;
1908     g_object_get (source, "stats", &stats, NULL);
1909     if (stats) {
1910       dump_structure (stats);
1911       gst_structure_free (stats);
1912     }
1913   }
1914 #endif
1915 }
1916
1917 static void
1918 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1919 {
1920   GST_INFO ("%p: source %p bye", stream, source);
1921 }
1922
1923 static void
1924 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1925 {
1926   GstRTSPStreamTransport *trans;
1927
1928   GST_INFO ("%p: source %p bye timeout", stream, source);
1929
1930   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1931     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1932     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1933   }
1934 }
1935
1936 static void
1937 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1938 {
1939   GstRTSPStreamTransport *trans;
1940
1941   GST_INFO ("%p: source %p timeout", stream, source);
1942
1943   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1944     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1945     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1946   }
1947 }
1948
1949 static void
1950 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1951 {
1952   GST_INFO ("%p: new sender source %p", stream, source);
1953 #ifndef DUMP_STATS
1954   {
1955     GstStructure *stats;
1956     g_object_get (source, "stats", &stats, NULL);
1957     if (stats) {
1958       dump_structure (stats);
1959       gst_structure_free (stats);
1960     }
1961   }
1962 #endif
1963 }
1964
1965 static void
1966 on_sender_ssrc_active (GObject * session, GObject * source,
1967     GstRTSPStream * stream)
1968 {
1969 #ifndef DUMP_STATS
1970   {
1971     GstStructure *stats;
1972     g_object_get (source, "stats", &stats, NULL);
1973     if (stats) {
1974       dump_structure (stats);
1975       gst_structure_free (stats);
1976     }
1977   }
1978 #endif
1979 }
1980
1981 static void
1982 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1983 {
1984   if (is_rtp) {
1985     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1986     g_list_free (priv->tr_cache_rtp);
1987     priv->tr_cache_rtp = NULL;
1988   } else {
1989     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1990     g_list_free (priv->tr_cache_rtcp);
1991     priv->tr_cache_rtcp = NULL;
1992   }
1993 }
1994
1995 static GstFlowReturn
1996 handle_new_sample (GstAppSink * sink, gpointer user_data)
1997 {
1998   GstRTSPStreamPrivate *priv;
1999   GList *walk;
2000   GstSample *sample;
2001   GstBuffer *buffer;
2002   GstRTSPStream *stream;
2003   gboolean is_rtp;
2004
2005   sample = gst_app_sink_pull_sample (sink);
2006   if (!sample)
2007     return GST_FLOW_OK;
2008
2009   stream = (GstRTSPStream *) user_data;
2010   priv = stream->priv;
2011   buffer = gst_sample_get_buffer (sample);
2012
2013   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2014
2015   g_mutex_lock (&priv->lock);
2016   if (is_rtp) {
2017     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2018       clear_tr_cache (priv, is_rtp);
2019       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2020         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2021         priv->tr_cache_rtp =
2022             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2023       }
2024       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2025     }
2026   } else {
2027     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2028       clear_tr_cache (priv, is_rtp);
2029       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2030         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2031         priv->tr_cache_rtcp =
2032             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2033       }
2034       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2035     }
2036   }
2037   g_mutex_unlock (&priv->lock);
2038
2039   if (is_rtp) {
2040     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2041       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2042       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2043     }
2044   } else {
2045     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2046       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2047       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2048     }
2049   }
2050   gst_sample_unref (sample);
2051
2052   return GST_FLOW_OK;
2053 }
2054
2055 static GstAppSinkCallbacks sink_cb = {
2056   NULL,                         /* not interested in EOS */
2057   NULL,                         /* not interested in preroll samples */
2058   handle_new_sample,
2059 };
2060
2061 static GstElement *
2062 get_rtp_encoder (GstRTSPStream * stream, guint session)
2063 {
2064   GstRTSPStreamPrivate *priv = stream->priv;
2065
2066   if (priv->srtpenc == NULL) {
2067     gchar *name;
2068
2069     name = g_strdup_printf ("srtpenc_%u", session);
2070     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2071     g_free (name);
2072
2073     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2074   }
2075   return gst_object_ref (priv->srtpenc);
2076 }
2077
2078 static GstElement *
2079 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2080 {
2081   GstRTSPStreamPrivate *priv = stream->priv;
2082   GstElement *oldenc, *enc;
2083   GstPad *pad;
2084   gchar *name;
2085
2086   if (priv->idx != session)
2087     return NULL;
2088
2089   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2090
2091   oldenc = priv->srtpenc;
2092   enc = get_rtp_encoder (stream, session);
2093   name = g_strdup_printf ("rtp_sink_%d", session);
2094   pad = gst_element_get_request_pad (enc, name);
2095   g_free (name);
2096   gst_object_unref (pad);
2097
2098   if (oldenc == NULL)
2099     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2100         enc);
2101
2102   return enc;
2103 }
2104
2105 static GstElement *
2106 request_rtcp_encoder (GstElement * rtpbin, guint session,
2107     GstRTSPStream * stream)
2108 {
2109   GstRTSPStreamPrivate *priv = stream->priv;
2110   GstElement *oldenc, *enc;
2111   GstPad *pad;
2112   gchar *name;
2113
2114   if (priv->idx != session)
2115     return NULL;
2116
2117   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2118
2119   oldenc = priv->srtpenc;
2120   enc = get_rtp_encoder (stream, session);
2121   name = g_strdup_printf ("rtcp_sink_%d", session);
2122   pad = gst_element_get_request_pad (enc, name);
2123   g_free (name);
2124   gst_object_unref (pad);
2125
2126   if (oldenc == NULL)
2127     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2128         enc);
2129
2130   return enc;
2131 }
2132
2133 static GstCaps *
2134 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2135 {
2136   GstRTSPStreamPrivate *priv = stream->priv;
2137   GstCaps *caps;
2138
2139   GST_DEBUG ("request key %08x", ssrc);
2140
2141   g_mutex_lock (&priv->lock);
2142   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2143     gst_caps_ref (caps);
2144   g_mutex_unlock (&priv->lock);
2145
2146   return caps;
2147 }
2148
2149 static GstElement *
2150 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2151     GstRTSPStream * stream)
2152 {
2153   GstRTSPStreamPrivate *priv = stream->priv;
2154
2155   if (priv->idx != session)
2156     return NULL;
2157
2158   if (priv->srtpdec == NULL) {
2159     gchar *name;
2160
2161     name = g_strdup_printf ("srtpdec_%u", session);
2162     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2163     g_free (name);
2164
2165     g_signal_connect (priv->srtpdec, "request-key",
2166         (GCallback) request_key, stream);
2167   }
2168   return gst_object_ref (priv->srtpdec);
2169 }
2170
2171 /**
2172  * gst_rtsp_stream_request_aux_sender:
2173  * @stream: a #GstRTSPStream
2174  * @sessid: the session id
2175  *
2176  * Creating a rtxsend bin
2177  *
2178  * Returns: (transfer full): a #GstElement.
2179  *
2180  * Since: 1.6
2181  */
2182 GstElement *
2183 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2184 {
2185   GstElement *bin;
2186   GstPad *pad;
2187   GstStructure *pt_map;
2188   gchar *name;
2189   guint pt, rtx_pt;
2190   gchar *pt_s;
2191
2192   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2193
2194   pt = gst_rtsp_stream_get_pt (stream);
2195   pt_s = g_strdup_printf ("%u", pt);
2196   rtx_pt = stream->priv->rtx_pt;
2197
2198   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2199
2200   bin = gst_bin_new (NULL);
2201   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2202   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2203       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2204   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2205       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2206   g_free (pt_s);
2207   gst_structure_free (pt_map);
2208   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2209
2210   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2211   name = g_strdup_printf ("src_%u", sessid);
2212   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2213   g_free (name);
2214   gst_object_unref (pad);
2215
2216   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2217   name = g_strdup_printf ("sink_%u", sessid);
2218   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2219   g_free (name);
2220   gst_object_unref (pad);
2221
2222   return bin;
2223 }
2224
2225 /**
2226  * gst_rtsp_stream_set_pt_map:
2227  * @stream: a #GstRTSPStream
2228  * @pt: the pt
2229  * @caps: a #GstCaps
2230  *
2231  * Configure a pt map between @pt and @caps.
2232  */
2233 void
2234 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2235 {
2236   GstRTSPStreamPrivate *priv = stream->priv;
2237
2238   g_mutex_lock (&priv->lock);
2239   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2240   g_mutex_unlock (&priv->lock);
2241 }
2242
2243 /**
2244  * gst_rtsp_stream_set_publish_clock_mode:
2245  * @stream: a #GstRTSPStream
2246  * @mode: the clock publish mode
2247  *
2248  * Sets if and how the stream clock should be published according to RFC7273.
2249  *
2250  * Since: 1.8
2251  */
2252 void
2253 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2254     GstRTSPPublishClockMode mode)
2255 {
2256   GstRTSPStreamPrivate *priv;
2257
2258   priv = stream->priv;
2259   g_mutex_lock (&priv->lock);
2260   priv->publish_clock_mode = mode;
2261   g_mutex_unlock (&priv->lock);
2262 }
2263
2264 /**
2265  * gst_rtsp_stream_get_publish_clock_mode:
2266  * @stream: a #GstRTSPStream
2267  *
2268  * Gets if and how the stream clock should be published according to RFC7273.
2269  *
2270  * Returns: The GstRTSPPublishClockMode
2271  *
2272  * Since: 1.8
2273  */
2274 GstRTSPPublishClockMode
2275 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2276 {
2277   GstRTSPStreamPrivate *priv;
2278   GstRTSPPublishClockMode ret;
2279
2280   priv = stream->priv;
2281   g_mutex_lock (&priv->lock);
2282   ret = priv->publish_clock_mode;
2283   g_mutex_unlock (&priv->lock);
2284
2285   return ret;
2286 }
2287
2288 static GstCaps *
2289 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2290     GstRTSPStream * stream)
2291 {
2292   GstRTSPStreamPrivate *priv = stream->priv;
2293   GstCaps *caps = NULL;
2294
2295   g_mutex_lock (&priv->lock);
2296
2297   if (priv->idx == session) {
2298     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2299     if (caps) {
2300       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2301       gst_caps_ref (caps);
2302     } else {
2303       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2304     }
2305   }
2306
2307   g_mutex_unlock (&priv->lock);
2308
2309   return caps;
2310 }
2311
2312 static void
2313 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2314 {
2315   GstRTSPStreamPrivate *priv = stream->priv;
2316   gchar *name;
2317   GstPadLinkReturn ret;
2318   guint sessid;
2319
2320   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2321       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2322
2323   name = gst_pad_get_name (pad);
2324   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2325     g_free (name);
2326     return;
2327   }
2328   g_free (name);
2329
2330   if (priv->idx != sessid)
2331     return;
2332
2333   if (gst_pad_is_linked (priv->sinkpad)) {
2334     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2335         GST_DEBUG_PAD_NAME (priv->sinkpad));
2336     return;
2337   }
2338
2339   /* link the RTP pad to the session manager, it should not really fail unless
2340    * this is not really an RTP pad */
2341   ret = gst_pad_link (pad, priv->sinkpad);
2342   if (ret != GST_PAD_LINK_OK)
2343     goto link_failed;
2344   priv->recv_rtp_src = gst_object_ref (pad);
2345
2346   return;
2347
2348 /* ERRORS */
2349 link_failed:
2350   {
2351     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2352         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2353   }
2354 }
2355
2356 static void
2357 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2358     GstRTSPStream * stream)
2359 {
2360   /* TODO: What to do here other than this? */
2361   GST_DEBUG ("Stream %p: Got EOS", stream);
2362   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2363 }
2364
2365 static void
2366 plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
2367     GstElement ** queue_out)
2368 {
2369   GstPad *pad;
2370   GstPad *teepad;
2371   GstPad *queuepad;
2372
2373   gst_bin_add (bin, sink);
2374
2375   *queue_out = gst_element_factory_make ("queue", NULL);
2376   g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
2377       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2378   gst_bin_add (bin, *queue_out);
2379
2380   /* link tee to queue */
2381   teepad = gst_element_get_request_pad (tee, "src_%u");
2382   pad = gst_element_get_static_pad (*queue_out, "sink");
2383   gst_pad_link (teepad, pad);
2384   gst_object_unref (pad);
2385   gst_object_unref (teepad);
2386
2387   /* link queue to sink */
2388   queuepad = gst_element_get_static_pad (*queue_out, "src");
2389   pad = gst_element_get_static_pad (sink, "sink");
2390   gst_pad_link (queuepad, pad);
2391   gst_object_unref (queuepad);
2392   gst_object_unref (pad);
2393 }
2394
2395 /* must be called with lock */
2396 static void
2397 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2398 {
2399   GstRTSPStreamPrivate *priv;
2400   GstPad *pad;
2401   gboolean is_tcp, is_udp;
2402   gint i;
2403
2404   priv = stream->priv;
2405
2406   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2407   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2408       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2409
2410   for (i = 0; i < 2; i++) {
2411     /* For the sender we create this bit of pipeline for both
2412      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2413      * we need to add a queue before appsink and udpsink to make
2414      * the pipeline not block. For the TCP case, we want to pump
2415      * client as fast as possible anyway. This pipeline is used
2416      * when both TCP and UDP are present.
2417      *
2418      * .--------.      .-----.    .---------.    .---------.
2419      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2420      * |       send->sink   src->sink      src->sink       |
2421      * '--------'      |     |    '---------'    '---------'
2422      *                 |     |    .---------.    .---------.
2423      *                 |     |    |  queue  |    | appsink |
2424      *                 |    src->sink      src->sink       |
2425      *                 '-----'    '---------'    '---------'
2426      *
2427      * When only UDP or only TCP is allowed, we skip the tee and queue
2428      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2429      * the session.
2430      */
2431
2432     /* Only link the RTP send src if we're going to send RTP, link
2433      * the RTCP send src always */
2434     if (!priv->srcpad && i == 0)
2435       continue;
2436
2437     if (is_tcp) {
2438       /* make appsink */
2439       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2440       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2441       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2442           &sink_cb, stream, NULL);
2443     }
2444
2445     /* If we have udp always use a tee because we could have mcast clients
2446      * requesting different ports, in which case we'll have to plug more
2447      * udpsinks. */
2448     if (is_udp) {
2449       /* make tee for RTP/RTCP */
2450       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2451       gst_bin_add (bin, priv->tee[i]);
2452
2453       /* and link to rtpbin send pad */
2454       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2455       gst_pad_link (priv->send_src[i], pad);
2456       gst_object_unref (pad);
2457
2458       if (priv->udpsink[i])
2459         plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
2460
2461       if (priv->mcast_udpsink[i])
2462         plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
2463             &priv->mcast_udpqueue[i]);
2464
2465       if (is_tcp) {
2466         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2467         plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
2468       }
2469     } else if (is_tcp) {
2470       /* only appsink needed, link it to the session */
2471       gst_bin_add (bin, priv->appsink[i]);
2472       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2473       gst_pad_link (priv->send_src[i], pad);
2474       gst_object_unref (pad);
2475
2476       /* when its only TCP, we need to set sync and preroll to FALSE
2477        * for the sink to avoid deadlock. And this is only needed for
2478        * sink used for RTCP data, not the RTP data. */
2479       if (i == 1)
2480         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2481     }
2482
2483     /* check if we need to set to a special state */
2484     if (state != GST_STATE_NULL) {
2485       if (priv->udpsink[i])
2486         gst_element_set_state (priv->udpsink[i], state);
2487       if (priv->mcast_udpsink[i])
2488         gst_element_set_state (priv->mcast_udpsink[i], state);
2489       if (priv->appsink[i])
2490         gst_element_set_state (priv->appsink[i], state);
2491       if (priv->appqueue[i])
2492         gst_element_set_state (priv->appqueue[i], state);
2493       if (priv->udpqueue[i])
2494         gst_element_set_state (priv->udpqueue[i], state);
2495       if (priv->mcast_udpqueue[i])
2496         gst_element_set_state (priv->mcast_udpqueue[i], state);
2497       if (priv->tee[i])
2498         gst_element_set_state (priv->tee[i], state);
2499     }
2500   }
2501 }
2502
2503 /* must be called with lock */
2504 static void
2505 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2506     GstElement * funnel)
2507 {
2508   GstRTSPStreamPrivate *priv;
2509   GstPad *pad, *selpad;
2510
2511   priv = stream->priv;
2512
2513   if (priv->srcpad) {
2514     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2515      * values. This is only relevant for PLAY pipelines */
2516     gst_element_set_state (src, GST_STATE_PLAYING);
2517     gst_element_set_locked_state (src, TRUE);
2518   }
2519
2520   /* add src */
2521   gst_bin_add (bin, src);
2522
2523   /* and link to the funnel */
2524   selpad = gst_element_get_request_pad (funnel, "sink_%u");
2525   pad = gst_element_get_static_pad (src, "src");
2526   gst_pad_link (pad, selpad);
2527   gst_object_unref (pad);
2528   gst_object_unref (selpad);
2529 }
2530
2531 /* must be called with lock */
2532 static void
2533 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2534 {
2535   GstRTSPStreamPrivate *priv;
2536   GstPad *pad;
2537   gboolean is_tcp;
2538   gint i;
2539
2540   priv = stream->priv;
2541
2542   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2543
2544   for (i = 0; i < 2; i++) {
2545     /* For the receiver we create this bit of pipeline for both
2546      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2547      * and it is all funneled into the rtpbin receive pad.
2548      *
2549      * .--------.     .--------.    .--------.
2550      * | udpsrc |     | funnel |    | rtpbin |
2551      * |       src->sink      src->sink      |
2552      * '--------'     |        |    '--------'
2553      * .--------.     |        |
2554      * | appsrc |     |        |
2555      * |       src->sink       |
2556      * '--------'     '--------'
2557      */
2558
2559     if (!priv->sinkpad && i == 0) {
2560       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2561        * RTCP sink always */
2562       continue;
2563     }
2564
2565     /* make funnel for the RTP/RTCP receivers */
2566     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2567     gst_bin_add (bin, priv->funnel[i]);
2568
2569     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2570     gst_pad_link (pad, priv->recv_sink[i]);
2571     gst_object_unref (pad);
2572
2573     if (priv->udpsrc_v4[i])
2574       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
2575
2576     if (priv->udpsrc_v6[i])
2577       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
2578
2579     if (priv->mcast_udpsrc_v4[i])
2580       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
2581
2582     if (priv->mcast_udpsrc_v6[i])
2583       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
2584
2585     if (is_tcp) {
2586       /* make and add appsrc */
2587       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2588       priv->appsrc_base_time[i] = -1;
2589       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2590           TRUE, NULL);
2591       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
2592     }
2593
2594     /* check if we need to set to a special state */
2595     if (state != GST_STATE_NULL) {
2596       gst_element_set_state (priv->funnel[i], state);
2597     }
2598   }
2599 }
2600
2601 static gboolean
2602 check_mcast_part_for_transport (GstRTSPStream * stream,
2603     const GstRTSPTransport * tr)
2604 {
2605   GstRTSPStreamPrivate *priv = stream->priv;
2606   GInetAddress *inetaddr;
2607   GSocketFamily family;
2608   GstRTSPAddress *mcast_addr;
2609
2610   /* Check if it's a ipv4 or ipv6 transport */
2611   inetaddr = g_inet_address_new_from_string (tr->destination);
2612   family = g_inet_address_get_family (inetaddr);
2613   g_object_unref (inetaddr);
2614
2615   /* Select fields corresponding to the family */
2616   if (family == G_SOCKET_FAMILY_IPV4) {
2617     mcast_addr = priv->mcast_addr_v4;
2618   } else {
2619     mcast_addr = priv->mcast_addr_v6;
2620   }
2621
2622   /* We support only one mcast group per family, make sure this transport
2623    * matches it. */
2624   if (!mcast_addr)
2625     goto no_addr;
2626
2627   if (g_ascii_strcasecmp (tr->destination, mcast_addr->address) != 0 ||
2628       tr->port.min != mcast_addr->port ||
2629       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
2630       tr->ttl != mcast_addr->ttl)
2631     goto wrong_addr;
2632
2633   return TRUE;
2634
2635 no_addr:
2636   {
2637     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
2638         "has been reserved");
2639     return FALSE;
2640   }
2641 wrong_addr:
2642   {
2643     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
2644         "the reserved address");
2645     return FALSE;
2646   }
2647 }
2648
2649 /**
2650  * gst_rtsp_stream_join_bin:
2651  * @stream: a #GstRTSPStream
2652  * @bin: (transfer none): a #GstBin to join
2653  * @rtpbin: (transfer none): a rtpbin element in @bin
2654  * @state: the target state of the new elements
2655  *
2656  * Join the #GstBin @bin that contains the element @rtpbin.
2657  *
2658  * @stream will link to @rtpbin, which must be inside @bin. The elements
2659  * added to @bin will be set to the state given in @state.
2660  *
2661  * Returns: %TRUE on success.
2662  */
2663 gboolean
2664 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2665     GstElement * rtpbin, GstState state)
2666 {
2667   GstRTSPStreamPrivate *priv;
2668   guint idx;
2669   gchar *name;
2670   GstPadLinkReturn ret;
2671
2672   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2673   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2674   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2675
2676   priv = stream->priv;
2677
2678   g_mutex_lock (&priv->lock);
2679   if (priv->joined_bin != NULL)
2680     goto was_joined;
2681
2682   /* create a session with the same index as the stream */
2683   idx = priv->idx;
2684
2685   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2686
2687   if (!alloc_ports (stream))
2688     goto no_ports;
2689
2690   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2691       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2692     /* For SRTP */
2693     g_signal_connect (rtpbin, "request-rtp-encoder",
2694         (GCallback) request_rtp_encoder, stream);
2695     g_signal_connect (rtpbin, "request-rtcp-encoder",
2696         (GCallback) request_rtcp_encoder, stream);
2697     g_signal_connect (rtpbin, "request-rtp-decoder",
2698         (GCallback) request_rtp_rtcp_decoder, stream);
2699     g_signal_connect (rtpbin, "request-rtcp-decoder",
2700         (GCallback) request_rtp_rtcp_decoder, stream);
2701   }
2702
2703   if (priv->sinkpad) {
2704     g_signal_connect (rtpbin, "request-pt-map",
2705         (GCallback) request_pt_map, stream);
2706   }
2707
2708   /* get pads from the RTP session element for sending and receiving
2709    * RTP/RTCP*/
2710   if (priv->srcpad) {
2711     /* get a pad for sending RTP */
2712     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2713     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2714     g_free (name);
2715
2716     /* link the RTP pad to the session manager, it should not really fail unless
2717      * this is not really an RTP pad */
2718     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2719     if (ret != GST_PAD_LINK_OK)
2720       goto link_failed;
2721
2722     name = g_strdup_printf ("send_rtp_src_%u", idx);
2723     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2724     g_free (name);
2725   } else {
2726     /* Need to connect our sinkpad from here */
2727     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2728     /* EOS */
2729     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2730
2731     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2732     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2733     g_free (name);
2734   }
2735
2736   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2737   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2738   g_free (name);
2739   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2740   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2741   g_free (name);
2742
2743   /* get the session */
2744   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2745
2746   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2747       stream);
2748   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2749       stream);
2750   g_signal_connect (priv->session, "on-ssrc-active",
2751       (GCallback) on_ssrc_active, stream);
2752   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2753       stream);
2754   g_signal_connect (priv->session, "on-bye-timeout",
2755       (GCallback) on_bye_timeout, stream);
2756   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2757       stream);
2758
2759   /* signal for sender ssrc */
2760   g_signal_connect (priv->session, "on-new-sender-ssrc",
2761       (GCallback) on_new_sender_ssrc, stream);
2762   g_signal_connect (priv->session, "on-sender-ssrc-active",
2763       (GCallback) on_sender_ssrc_active, stream);
2764
2765   create_sender_part (stream, bin, state);
2766   create_receiver_part (stream, bin, state);
2767
2768   if (priv->srcpad) {
2769     /* be notified of caps changes */
2770     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2771         (GCallback) caps_notify, stream);
2772     priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
2773   }
2774
2775   priv->joined_bin = bin;
2776   g_mutex_unlock (&priv->lock);
2777
2778   return TRUE;
2779
2780   /* ERRORS */
2781 was_joined:
2782   {
2783     g_mutex_unlock (&priv->lock);
2784     return TRUE;
2785   }
2786 no_ports:
2787   {
2788     g_mutex_unlock (&priv->lock);
2789     GST_WARNING ("failed to allocate ports %u", idx);
2790     return FALSE;
2791   }
2792 link_failed:
2793   {
2794     GST_WARNING ("failed to link stream %u", idx);
2795     gst_object_unref (priv->send_rtp_sink);
2796     priv->send_rtp_sink = NULL;
2797     g_mutex_unlock (&priv->lock);
2798     return FALSE;
2799   }
2800 }
2801
2802 static void
2803 clear_element (GstBin * bin, GstElement ** elementptr)
2804 {
2805   if (*elementptr) {
2806     gst_element_set_locked_state (*elementptr, FALSE);
2807     gst_element_set_state (*elementptr, GST_STATE_NULL);
2808     if (GST_ELEMENT_PARENT (*elementptr))
2809       gst_bin_remove (bin, *elementptr);
2810     else
2811       gst_object_unref (*elementptr);
2812     *elementptr = NULL;
2813   }
2814 }
2815
2816 /**
2817  * gst_rtsp_stream_leave_bin:
2818  * @stream: a #GstRTSPStream
2819  * @bin: (transfer none): a #GstBin
2820  * @rtpbin: (transfer none): a rtpbin #GstElement
2821  *
2822  * Remove the elements of @stream from @bin.
2823  *
2824  * Return: %TRUE on success.
2825  */
2826 gboolean
2827 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2828     GstElement * rtpbin)
2829 {
2830   GstRTSPStreamPrivate *priv;
2831   gint i;
2832
2833   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2834   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2835   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2836
2837   priv = stream->priv;
2838
2839   g_mutex_lock (&priv->lock);
2840   if (priv->joined_bin == NULL)
2841     goto was_not_joined;
2842   if (priv->joined_bin != bin)
2843     goto wrong_bin;
2844
2845   priv->joined_bin = NULL;
2846
2847   /* all transports must be removed by now */
2848   if (priv->transports != NULL)
2849     goto transports_not_removed;
2850
2851   clear_tr_cache (priv, TRUE);
2852   clear_tr_cache (priv, FALSE);
2853
2854   GST_INFO ("stream %p leaving bin", stream);
2855
2856   if (priv->srcpad) {
2857     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2858
2859     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2860     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2861     gst_object_unref (priv->send_rtp_sink);
2862     priv->send_rtp_sink = NULL;
2863   } else if (priv->recv_rtp_src) {
2864     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2865     gst_object_unref (priv->recv_rtp_src);
2866     priv->recv_rtp_src = NULL;
2867   }
2868
2869   for (i = 0; i < 2; i++) {
2870     clear_element (bin, &priv->udpsrc_v4[i]);
2871     clear_element (bin, &priv->udpsrc_v6[i]);
2872     clear_element (bin, &priv->udpqueue[i]);
2873     clear_element (bin, &priv->udpsink[i]);
2874
2875     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
2876     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
2877     clear_element (bin, &priv->mcast_udpqueue[i]);
2878     clear_element (bin, &priv->mcast_udpsink[i]);
2879
2880     clear_element (bin, &priv->appsrc[i]);
2881     clear_element (bin, &priv->appqueue[i]);
2882     clear_element (bin, &priv->appsink[i]);
2883
2884     clear_element (bin, &priv->tee[i]);
2885     clear_element (bin, &priv->funnel[i]);
2886
2887     if (priv->sinkpad || i == 1) {
2888       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2889       gst_object_unref (priv->recv_sink[i]);
2890       priv->recv_sink[i] = NULL;
2891     }
2892   }
2893
2894   if (priv->srcpad) {
2895     gst_object_unref (priv->send_src[0]);
2896     priv->send_src[0] = NULL;
2897   }
2898
2899   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2900   gst_object_unref (priv->send_src[1]);
2901   priv->send_src[1] = NULL;
2902
2903   g_object_unref (priv->session);
2904   priv->session = NULL;
2905   if (priv->caps)
2906     gst_caps_unref (priv->caps);
2907   priv->caps = NULL;
2908
2909   if (priv->srtpenc)
2910     gst_object_unref (priv->srtpenc);
2911   if (priv->srtpdec)
2912     gst_object_unref (priv->srtpdec);
2913
2914   if (priv->mcast_addr_v4)
2915     gst_rtsp_address_free (priv->mcast_addr_v4);
2916   priv->mcast_addr_v4 = NULL;
2917   if (priv->mcast_addr_v6)
2918     gst_rtsp_address_free (priv->mcast_addr_v6);
2919   priv->mcast_addr_v6 = NULL;
2920   if (priv->server_addr_v4)
2921     gst_rtsp_address_free (priv->server_addr_v4);
2922   priv->server_addr_v4 = NULL;
2923   if (priv->server_addr_v6)
2924     gst_rtsp_address_free (priv->server_addr_v6);
2925   priv->server_addr_v6 = NULL;
2926
2927   g_mutex_unlock (&priv->lock);
2928
2929   return TRUE;
2930
2931 was_not_joined:
2932   {
2933     g_mutex_unlock (&priv->lock);
2934     return TRUE;
2935   }
2936 transports_not_removed:
2937   {
2938     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2939     g_mutex_unlock (&priv->lock);
2940     return FALSE;
2941   }
2942 wrong_bin:
2943   {
2944     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
2945     g_mutex_unlock (&priv->lock);
2946     return FALSE;
2947   }
2948 }
2949
2950 /**
2951  * gst_rtsp_stream_get_joined_bin:
2952  * @stream: a #GstRTSPStream
2953  *
2954  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
2955  *
2956  * Return: (transfer full): the joined bin or NULL.
2957  */
2958 GstBin *
2959 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
2960 {
2961   GstRTSPStreamPrivate *priv;
2962   GstBin *bin = NULL;
2963
2964   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2965
2966   priv = stream->priv;
2967
2968   g_mutex_lock (&priv->lock);
2969   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
2970   g_mutex_unlock (&priv->lock);
2971
2972   return bin;
2973 }
2974
2975 /**
2976  * gst_rtsp_stream_get_rtpinfo:
2977  * @stream: a #GstRTSPStream
2978  * @rtptime: (allow-none): result RTP timestamp
2979  * @seq: (allow-none): result RTP seqnum
2980  * @clock_rate: (allow-none): the clock rate
2981  * @running_time: result running-time
2982  *
2983  * Retrieve the current rtptime, seq and running-time. This is used to
2984  * construct a RTPInfo reply header.
2985  *
2986  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2987  */
2988 gboolean
2989 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2990     guint * rtptime, guint * seq, guint * clock_rate,
2991     GstClockTime * running_time)
2992 {
2993   GstRTSPStreamPrivate *priv;
2994   GstStructure *stats;
2995   GObjectClass *payobjclass;
2996
2997   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2998
2999   priv = stream->priv;
3000
3001   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3002
3003   g_mutex_lock (&priv->lock);
3004
3005   /* First try to extract the information from the last buffer on the sinks.
3006    * This will have a more accurate sequence number and timestamp, as between
3007    * the payloader and the sink there can be some queues
3008    */
3009   if (priv->udpsink[0] || priv->appsink[0]) {
3010     GstSample *last_sample;
3011
3012     if (priv->udpsink[0])
3013       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3014     else
3015       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3016
3017     if (last_sample) {
3018       GstCaps *caps;
3019       GstBuffer *buffer;
3020       GstSegment *segment;
3021       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3022
3023       caps = gst_sample_get_caps (last_sample);
3024       buffer = gst_sample_get_buffer (last_sample);
3025       segment = gst_sample_get_segment (last_sample);
3026
3027       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3028         if (seq) {
3029           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3030         }
3031
3032         if (rtptime) {
3033           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3034         }
3035
3036         gst_rtp_buffer_unmap (&rtp_buffer);
3037
3038         if (running_time) {
3039           *running_time =
3040               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3041               GST_BUFFER_TIMESTAMP (buffer));
3042         }
3043
3044         if (clock_rate) {
3045           GstStructure *s = gst_caps_get_structure (caps, 0);
3046
3047           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3048
3049           if (*clock_rate == 0 && running_time)
3050             *running_time = GST_CLOCK_TIME_NONE;
3051         }
3052         gst_sample_unref (last_sample);
3053
3054         goto done;
3055       } else {
3056         gst_sample_unref (last_sample);
3057       }
3058     }
3059   }
3060
3061   if (g_object_class_find_property (payobjclass, "stats")) {
3062     g_object_get (priv->payloader, "stats", &stats, NULL);
3063     if (stats == NULL)
3064       goto no_stats;
3065
3066     if (seq)
3067       gst_structure_get_uint (stats, "seqnum", seq);
3068
3069     if (rtptime)
3070       gst_structure_get_uint (stats, "timestamp", rtptime);
3071
3072     if (running_time)
3073       gst_structure_get_clock_time (stats, "running-time", running_time);
3074
3075     if (clock_rate) {
3076       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3077       if (*clock_rate == 0 && running_time)
3078         *running_time = GST_CLOCK_TIME_NONE;
3079     }
3080     gst_structure_free (stats);
3081   } else {
3082     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3083         !g_object_class_find_property (payobjclass, "timestamp"))
3084       goto no_stats;
3085
3086     if (seq)
3087       g_object_get (priv->payloader, "seqnum", seq, NULL);
3088
3089     if (rtptime)
3090       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3091
3092     if (running_time)
3093       *running_time = GST_CLOCK_TIME_NONE;
3094   }
3095
3096 done:
3097   g_mutex_unlock (&priv->lock);
3098
3099   return TRUE;
3100
3101   /* ERRORS */
3102 no_stats:
3103   {
3104     GST_WARNING ("Could not get payloader stats");
3105     g_mutex_unlock (&priv->lock);
3106     return FALSE;
3107   }
3108 }
3109
3110 /**
3111  * gst_rtsp_stream_get_caps:
3112  * @stream: a #GstRTSPStream
3113  *
3114  * Retrieve the current caps of @stream.
3115  *
3116  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3117  * after usage.
3118  */
3119 GstCaps *
3120 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3121 {
3122   GstRTSPStreamPrivate *priv;
3123   GstCaps *result;
3124
3125   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3126
3127   priv = stream->priv;
3128
3129   g_mutex_lock (&priv->lock);
3130   if ((result = priv->caps))
3131     gst_caps_ref (result);
3132   g_mutex_unlock (&priv->lock);
3133
3134   return result;
3135 }
3136
3137 /**
3138  * gst_rtsp_stream_recv_rtp:
3139  * @stream: a #GstRTSPStream
3140  * @buffer: (transfer full): a #GstBuffer
3141  *
3142  * Handle an RTP buffer for the stream. This method is usually called when a
3143  * message has been received from a client using the TCP transport.
3144  *
3145  * This function takes ownership of @buffer.
3146  *
3147  * Returns: a GstFlowReturn.
3148  */
3149 GstFlowReturn
3150 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3151 {
3152   GstRTSPStreamPrivate *priv;
3153   GstFlowReturn ret;
3154   GstElement *element;
3155
3156   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3157   priv = stream->priv;
3158   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3159   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3160
3161   g_mutex_lock (&priv->lock);
3162   if (priv->appsrc[0])
3163     element = gst_object_ref (priv->appsrc[0]);
3164   else
3165     element = NULL;
3166   g_mutex_unlock (&priv->lock);
3167
3168   if (element) {
3169     if (priv->appsrc_base_time[0] == -1) {
3170       /* Take current running_time. This timestamp will be put on
3171        * the first buffer of each stream because we are a live source and so we
3172        * timestamp with the running_time. When we are dealing with TCP, we also
3173        * only timestamp the first buffer (using the DISCONT flag) because a server
3174        * typically bursts data, for which we don't want to compensate by speeding
3175        * up the media. The other timestamps will be interpollated from this one
3176        * using the RTP timestamps. */
3177       GST_OBJECT_LOCK (element);
3178       if (GST_ELEMENT_CLOCK (element)) {
3179         GstClockTime now;
3180         GstClockTime base_time;
3181
3182         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3183         base_time = GST_ELEMENT_CAST (element)->base_time;
3184
3185         priv->appsrc_base_time[0] = now - base_time;
3186         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3187         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3188             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3189             GST_TIME_ARGS (base_time));
3190       }
3191       GST_OBJECT_UNLOCK (element);
3192     }
3193
3194     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3195     gst_object_unref (element);
3196   } else {
3197     ret = GST_FLOW_OK;
3198   }
3199   return ret;
3200 }
3201
3202 /**
3203  * gst_rtsp_stream_recv_rtcp:
3204  * @stream: a #GstRTSPStream
3205  * @buffer: (transfer full): a #GstBuffer
3206  *
3207  * Handle an RTCP buffer for the stream. This method is usually called when a
3208  * message has been received from a client using the TCP transport.
3209  *
3210  * This function takes ownership of @buffer.
3211  *
3212  * Returns: a GstFlowReturn.
3213  */
3214 GstFlowReturn
3215 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3216 {
3217   GstRTSPStreamPrivate *priv;
3218   GstFlowReturn ret;
3219   GstElement *element;
3220
3221   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3222   priv = stream->priv;
3223   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3224
3225   if (priv->joined_bin == NULL) {
3226     gst_buffer_unref (buffer);
3227     return GST_FLOW_NOT_LINKED;
3228   }
3229   g_mutex_lock (&priv->lock);
3230   if (priv->appsrc[1])
3231     element = gst_object_ref (priv->appsrc[1]);
3232   else
3233     element = NULL;
3234   g_mutex_unlock (&priv->lock);
3235
3236   if (element) {
3237     if (priv->appsrc_base_time[1] == -1) {
3238       /* Take current running_time. This timestamp will be put on
3239        * the first buffer of each stream because we are a live source and so we
3240        * timestamp with the running_time. When we are dealing with TCP, we also
3241        * only timestamp the first buffer (using the DISCONT flag) because a server
3242        * typically bursts data, for which we don't want to compensate by speeding
3243        * up the media. The other timestamps will be interpollated from this one
3244        * using the RTP timestamps. */
3245       GST_OBJECT_LOCK (element);
3246       if (GST_ELEMENT_CLOCK (element)) {
3247         GstClockTime now;
3248         GstClockTime base_time;
3249
3250         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3251         base_time = GST_ELEMENT_CAST (element)->base_time;
3252
3253         priv->appsrc_base_time[1] = now - base_time;
3254         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3255         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3256             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3257             GST_TIME_ARGS (base_time));
3258       }
3259       GST_OBJECT_UNLOCK (element);
3260     }
3261
3262     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3263     gst_object_unref (element);
3264   } else {
3265     ret = GST_FLOW_OK;
3266     gst_buffer_unref (buffer);
3267   }
3268   return ret;
3269 }
3270
3271 /* must be called with lock */
3272 static gboolean
3273 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3274     gboolean add)
3275 {
3276   GstRTSPStreamPrivate *priv = stream->priv;
3277   const GstRTSPTransport *tr;
3278
3279   tr = gst_rtsp_stream_transport_get_transport (trans);
3280
3281   switch (tr->lower_transport) {
3282     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3283     {
3284       if (add) {
3285         if (!check_mcast_part_for_transport (stream, tr))
3286           goto mcast_error;
3287         priv->transports = g_list_prepend (priv->transports, trans);
3288       } else {
3289         priv->transports = g_list_remove (priv->transports, trans);
3290       }
3291       break;
3292     }
3293     case GST_RTSP_LOWER_TRANS_UDP:
3294     {
3295       gchar *dest;
3296       gint min, max;
3297       guint ttl = 0;
3298
3299       dest = tr->destination;
3300       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3301         min = tr->port.min;
3302         max = tr->port.max;
3303         ttl = tr->ttl;
3304       } else if (priv->client_side) {
3305         /* In client side mode the 'destination' is the RTSP server, so send
3306          * to those ports */
3307         min = tr->server_port.min;
3308         max = tr->server_port.max;
3309       } else {
3310         min = tr->client_port.min;
3311         max = tr->client_port.max;
3312       }
3313
3314       if (add) {
3315         if (ttl > 0) {
3316           GST_INFO ("setting ttl-mc %d", ttl);
3317           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3318           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3319         }
3320         GST_INFO ("adding %s:%d-%d", dest, min, max);
3321         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3322         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3323         priv->transports = g_list_prepend (priv->transports, trans);
3324       } else {
3325         GST_INFO ("removing %s:%d-%d", dest, min, max);
3326         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3327         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3328         priv->transports = g_list_remove (priv->transports, trans);
3329       }
3330       priv->transports_cookie++;
3331       break;
3332     }
3333     case GST_RTSP_LOWER_TRANS_TCP:
3334       if (add) {
3335         GST_INFO ("adding TCP %s", tr->destination);
3336         priv->transports = g_list_prepend (priv->transports, trans);
3337       } else {
3338         GST_INFO ("removing TCP %s", tr->destination);
3339         priv->transports = g_list_remove (priv->transports, trans);
3340       }
3341       priv->transports_cookie++;
3342       break;
3343     default:
3344       goto unknown_transport;
3345   }
3346   return TRUE;
3347
3348   /* ERRORS */
3349 unknown_transport:
3350   {
3351     GST_INFO ("Unknown transport %d", tr->lower_transport);
3352     return FALSE;
3353   }
3354 mcast_error:
3355   {
3356     return FALSE;
3357   }
3358 }
3359
3360
3361 /**
3362  * gst_rtsp_stream_add_transport:
3363  * @stream: a #GstRTSPStream
3364  * @trans: (transfer none): a #GstRTSPStreamTransport
3365  *
3366  * Add the transport in @trans to @stream. The media of @stream will
3367  * then also be send to the values configured in @trans.
3368  *
3369  * @stream must be joined to a bin.
3370  *
3371  * @trans must contain a valid #GstRTSPTransport.
3372  *
3373  * Returns: %TRUE if @trans was added
3374  */
3375 gboolean
3376 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3377     GstRTSPStreamTransport * trans)
3378 {
3379   GstRTSPStreamPrivate *priv;
3380   gboolean res;
3381
3382   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3383   priv = stream->priv;
3384   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3385   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3386
3387   g_mutex_lock (&priv->lock);
3388   res = update_transport (stream, trans, TRUE);
3389   g_mutex_unlock (&priv->lock);
3390
3391   return res;
3392 }
3393
3394 /**
3395  * gst_rtsp_stream_remove_transport:
3396  * @stream: a #GstRTSPStream
3397  * @trans: (transfer none): a #GstRTSPStreamTransport
3398  *
3399  * Remove the transport in @trans from @stream. The media of @stream will
3400  * not be sent to the values configured in @trans.
3401  *
3402  * @stream must be joined to a bin.
3403  *
3404  * @trans must contain a valid #GstRTSPTransport.
3405  *
3406  * Returns: %TRUE if @trans was removed
3407  */
3408 gboolean
3409 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3410     GstRTSPStreamTransport * trans)
3411 {
3412   GstRTSPStreamPrivate *priv;
3413   gboolean res;
3414
3415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3416   priv = stream->priv;
3417   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3418   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3419
3420   g_mutex_lock (&priv->lock);
3421   res = update_transport (stream, trans, FALSE);
3422   g_mutex_unlock (&priv->lock);
3423
3424   return res;
3425 }
3426
3427 /**
3428  * gst_rtsp_stream_update_crypto:
3429  * @stream: a #GstRTSPStream
3430  * @ssrc: the SSRC
3431  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3432  *
3433  * Update the new crypto information for @ssrc in @stream. If information
3434  * for @ssrc did not exist, it will be added. If information
3435  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3436  * be removed from @stream.
3437  *
3438  * Returns: %TRUE if @crypto could be updated
3439  */
3440 gboolean
3441 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3442     guint ssrc, GstCaps * crypto)
3443 {
3444   GstRTSPStreamPrivate *priv;
3445
3446   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3447   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3448
3449   priv = stream->priv;
3450
3451   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3452
3453   g_mutex_lock (&priv->lock);
3454   if (crypto)
3455     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3456         gst_caps_ref (crypto));
3457   else
3458     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3459   g_mutex_unlock (&priv->lock);
3460
3461   return TRUE;
3462 }
3463
3464 /**
3465  * gst_rtsp_stream_get_rtp_socket:
3466  * @stream: a #GstRTSPStream
3467  * @family: the socket family
3468  *
3469  * Get the RTP socket from @stream for a @family.
3470  *
3471  * @stream must be joined to a bin.
3472  *
3473  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3474  * socket could be allocated for @family. Unref after usage
3475  */
3476 GSocket *
3477 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3478 {
3479   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3480   GSocket *socket;
3481   const gchar *name;
3482
3483   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3484   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3485       family == G_SOCKET_FAMILY_IPV6, NULL);
3486   g_return_val_if_fail (priv->udpsink[0], NULL);
3487
3488   if (family == G_SOCKET_FAMILY_IPV6)
3489     name = "socket-v6";
3490   else
3491     name = "socket";
3492
3493   g_object_get (priv->udpsink[0], name, &socket, NULL);
3494
3495   return socket;
3496 }
3497
3498 /**
3499  * gst_rtsp_stream_get_rtcp_socket:
3500  * @stream: a #GstRTSPStream
3501  * @family: the socket family
3502  *
3503  * Get the RTCP socket from @stream for a @family.
3504  *
3505  * @stream must be joined to a bin.
3506  *
3507  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3508  * socket could be allocated for @family. Unref after usage
3509  */
3510 GSocket *
3511 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3512 {
3513   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3514   GSocket *socket;
3515   const gchar *name;
3516
3517   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3518   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3519       family == G_SOCKET_FAMILY_IPV6, NULL);
3520   g_return_val_if_fail (priv->udpsink[1], NULL);
3521
3522   if (family == G_SOCKET_FAMILY_IPV6)
3523     name = "socket-v6";
3524   else
3525     name = "socket";
3526
3527   g_object_get (priv->udpsink[1], name, &socket, NULL);
3528
3529   return socket;
3530 }
3531
3532 /**
3533  * gst_rtsp_stream_set_seqnum:
3534  * @stream: a #GstRTSPStream
3535  * @seqnum: a new sequence number
3536  *
3537  * Configure the sequence number in the payloader of @stream to @seqnum.
3538  */
3539 void
3540 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3541 {
3542   GstRTSPStreamPrivate *priv;
3543
3544   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3545
3546   priv = stream->priv;
3547
3548   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3549 }
3550
3551 /**
3552  * gst_rtsp_stream_get_seqnum:
3553  * @stream: a #GstRTSPStream
3554  *
3555  * Get the configured sequence number in the payloader of @stream.
3556  *
3557  * Returns: the sequence number of the payloader.
3558  */
3559 guint16
3560 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3561 {
3562   GstRTSPStreamPrivate *priv;
3563   guint seqnum;
3564
3565   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3566
3567   priv = stream->priv;
3568
3569   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3570
3571   return seqnum;
3572 }
3573
3574 /**
3575  * gst_rtsp_stream_transport_filter:
3576  * @stream: a #GstRTSPStream
3577  * @func: (scope call) (allow-none): a callback
3578  * @user_data: (closure): user data passed to @func
3579  *
3580  * Call @func for each transport managed by @stream. The result value of @func
3581  * determines what happens to the transport. @func will be called with @stream
3582  * locked so no further actions on @stream can be performed from @func.
3583  *
3584  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3585  * @stream.
3586  *
3587  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3588  *
3589  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3590  * will also be added with an additional ref to the result #GList of this
3591  * function..
3592  *
3593  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3594  *
3595  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3596  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3597  * element in the #GList should be unreffed before the list is freed.
3598  */
3599 GList *
3600 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3601     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3602 {
3603   GstRTSPStreamPrivate *priv;
3604   GList *result, *walk, *next;
3605   GHashTable *visited = NULL;
3606   guint cookie;
3607
3608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3609
3610   priv = stream->priv;
3611
3612   result = NULL;
3613   if (func)
3614     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3615
3616   g_mutex_lock (&priv->lock);
3617 restart:
3618   cookie = priv->transports_cookie;
3619   for (walk = priv->transports; walk; walk = next) {
3620     GstRTSPStreamTransport *trans = walk->data;
3621     GstRTSPFilterResult res;
3622     gboolean changed;
3623
3624     next = g_list_next (walk);
3625
3626     if (func) {
3627       /* only visit each transport once */
3628       if (g_hash_table_contains (visited, trans))
3629         continue;
3630
3631       g_hash_table_add (visited, g_object_ref (trans));
3632       g_mutex_unlock (&priv->lock);
3633
3634       res = func (stream, trans, user_data);
3635
3636       g_mutex_lock (&priv->lock);
3637     } else
3638       res = GST_RTSP_FILTER_REF;
3639
3640     changed = (cookie != priv->transports_cookie);
3641
3642     switch (res) {
3643       case GST_RTSP_FILTER_REMOVE:
3644         update_transport (stream, trans, FALSE);
3645         break;
3646       case GST_RTSP_FILTER_REF:
3647         result = g_list_prepend (result, g_object_ref (trans));
3648         break;
3649       case GST_RTSP_FILTER_KEEP:
3650       default:
3651         break;
3652     }
3653     if (changed)
3654       goto restart;
3655   }
3656   g_mutex_unlock (&priv->lock);
3657
3658   if (func)
3659     g_hash_table_unref (visited);
3660
3661   return result;
3662 }
3663
3664 static GstPadProbeReturn
3665 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3666 {
3667   GstRTSPStreamPrivate *priv;
3668   GstRTSPStream *stream;
3669
3670   stream = user_data;
3671   priv = stream->priv;
3672
3673   GST_DEBUG_OBJECT (pad, "now blocking");
3674
3675   g_mutex_lock (&priv->lock);
3676   priv->blocking = TRUE;
3677   g_mutex_unlock (&priv->lock);
3678
3679   gst_element_post_message (priv->payloader,
3680       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3681           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3682
3683   return GST_PAD_PROBE_OK;
3684 }
3685
3686 /**
3687  * gst_rtsp_stream_set_blocked:
3688  * @stream: a #GstRTSPStream
3689  * @blocked: boolean indicating we should block or unblock
3690  *
3691  * Blocks or unblocks the dataflow on @stream.
3692  *
3693  * Returns: %TRUE on success
3694  */
3695 gboolean
3696 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3697 {
3698   GstRTSPStreamPrivate *priv;
3699   int i;
3700
3701   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3702
3703   priv = stream->priv;
3704
3705   g_mutex_lock (&priv->lock);
3706   if (blocked) {
3707     priv->blocking = FALSE;
3708     for (i = 0; i < 2; i++) {
3709       if (priv->blocked_id[i] == 0) {
3710         priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
3711             GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3712             GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3713             g_object_ref (stream), g_object_unref);
3714       }
3715     }
3716   } else {
3717     for (i = 0; i < 2; i++) {
3718       if (priv->blocked_id[i] != 0) {
3719         gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
3720         priv->blocked_id[i] = 0;
3721       }
3722     }
3723     priv->blocking = FALSE;
3724   }
3725   g_mutex_unlock (&priv->lock);
3726
3727   return TRUE;
3728 }
3729
3730 /**
3731  * gst_rtsp_stream_is_blocking:
3732  * @stream: a #GstRTSPStream
3733  *
3734  * Check if @stream is blocking on a #GstBuffer.
3735  *
3736  * Returns: %TRUE if @stream is blocking
3737  */
3738 gboolean
3739 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3740 {
3741   GstRTSPStreamPrivate *priv;
3742   gboolean result;
3743
3744   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3745
3746   priv = stream->priv;
3747
3748   g_mutex_lock (&priv->lock);
3749   result = priv->blocking;
3750   g_mutex_unlock (&priv->lock);
3751
3752   return result;
3753 }
3754
3755 /**
3756  * gst_rtsp_stream_query_position:
3757  * @stream: a #GstRTSPStream
3758  *
3759  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3760  * the RTP parts of the pipeline and not the RTCP parts.
3761  *
3762  * Returns: %TRUE if the position could be queried
3763  */
3764 gboolean
3765 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3766 {
3767   GstRTSPStreamPrivate *priv;
3768   GstElement *sink;
3769   gboolean ret;
3770
3771   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3772
3773   priv = stream->priv;
3774
3775   g_mutex_lock (&priv->lock);
3776   /* depending on the transport type, it should query corresponding sink */
3777   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3778       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3779     sink = priv->udpsink[0];
3780   else
3781     sink = priv->appsink[0];
3782
3783   if (sink)
3784     gst_object_ref (sink);
3785   g_mutex_unlock (&priv->lock);
3786
3787   if (!sink)
3788     return FALSE;
3789
3790   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3791   gst_object_unref (sink);
3792
3793   return ret;
3794 }
3795
3796 /**
3797  * gst_rtsp_stream_query_stop:
3798  * @stream: a #GstRTSPStream
3799  *
3800  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3801  * the RTP parts of the pipeline and not the RTCP parts.
3802  *
3803  * Returns: %TRUE if the stop could be queried
3804  */
3805 gboolean
3806 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3807 {
3808   GstRTSPStreamPrivate *priv;
3809   GstElement *sink;
3810   GstQuery *query;
3811   gboolean ret;
3812
3813   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3814
3815   priv = stream->priv;
3816
3817   g_mutex_lock (&priv->lock);
3818   /* depending on the transport type, it should query corresponding sink */
3819   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3820       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3821     sink = priv->udpsink[0];
3822   else
3823     sink = priv->appsink[0];
3824
3825   if (sink)
3826     gst_object_ref (sink);
3827   g_mutex_unlock (&priv->lock);
3828
3829   if (!sink)
3830     return FALSE;
3831
3832   query = gst_query_new_segment (GST_FORMAT_TIME);
3833   if ((ret = gst_element_query (sink, query))) {
3834     GstFormat format;
3835
3836     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3837     if (format != GST_FORMAT_TIME)
3838       *stop = -1;
3839   }
3840   gst_query_unref (query);
3841   gst_object_unref (sink);
3842
3843   return ret;
3844
3845 }