c5722cd39b1bcd74bd2742642d37b0018e87cbef
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102
103   /* for UDP multicast */
104   GstElement *mcast_udpsrc_v4[2];
105   GstElement *mcast_udpsrc_v6[2];
106   GstElement *mcast_udpqueue[2];
107   GstElement *mcast_udpsink[2];
108
109   /* for TCP transport */
110   GstElement *appsrc[2];
111   GstClockTime appsrc_base_time[2];
112   GstElement *appqueue[2];
113   GstElement *appsink[2];
114
115   GstElement *tee[2];
116   GstElement *funnel[2];
117
118   /* retransmission */
119   GstElement *rtxsend;
120   guint rtx_pt;
121   GstClockTime rtx_time;
122
123   /* pool used to manage unicast and multicast addresses */
124   GstRTSPAddressPool *pool;
125
126   /* unicast server addr/port */
127   GstRTSPAddress *server_addr_v4;
128   GstRTSPAddress *server_addr_v6;
129
130   /* multicast addresses */
131   GstRTSPAddress *mcast_addr_v4;
132   GstRTSPAddress *mcast_addr_v6;
133
134   gchar *multicast_iface;
135
136   /* the caps of the stream */
137   gulong caps_sig;
138   GstCaps *caps;
139
140   /* transports we stream to */
141   guint n_active;
142   GList *transports;
143   guint transports_cookie;
144   GList *tr_cache_rtp;
145   GList *tr_cache_rtcp;
146   guint tr_cache_cookie_rtp;
147   guint tr_cache_cookie_rtcp;
148
149   gint dscp_qos;
150
151   /* stream blocking */
152   gulong blocked_id;
153   gboolean blocking;
154
155   /* pt->caps map for RECORD streams */
156   GHashTable *ptmap;
157
158   GstRTSPPublishClockMode publish_clock_mode;
159 };
160
161 #define DEFAULT_CONTROL         NULL
162 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
163 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
164                                         GST_RTSP_LOWER_TRANS_TCP
165
166 enum
167 {
168   PROP_0,
169   PROP_CONTROL,
170   PROP_PROFILES,
171   PROP_PROTOCOLS,
172   PROP_LAST
173 };
174
175 enum
176 {
177   SIGNAL_NEW_RTP_ENCODER,
178   SIGNAL_NEW_RTCP_ENCODER,
179   SIGNAL_LAST
180 };
181
182 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
183 #define GST_CAT_DEFAULT rtsp_stream_debug
184
185 static GQuark ssrc_stream_map_key;
186
187 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
188     GValue * value, GParamSpec * pspec);
189 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
190     const GValue * value, GParamSpec * pspec);
191
192 static void gst_rtsp_stream_finalize (GObject * obj);
193
194 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
195
196 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
197
198 static void
199 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
200 {
201   GObjectClass *gobject_class;
202
203   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
204
205   gobject_class = G_OBJECT_CLASS (klass);
206
207   gobject_class->get_property = gst_rtsp_stream_get_property;
208   gobject_class->set_property = gst_rtsp_stream_set_property;
209   gobject_class->finalize = gst_rtsp_stream_finalize;
210
211   g_object_class_install_property (gobject_class, PROP_CONTROL,
212       g_param_spec_string ("control", "Control",
213           "The control string for this stream", DEFAULT_CONTROL,
214           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215
216   g_object_class_install_property (gobject_class, PROP_PROFILES,
217       g_param_spec_flags ("profiles", "Profiles",
218           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
219           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220
221   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
222       g_param_spec_flags ("protocols", "Protocols",
223           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
224           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
225
226   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
227       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
228       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
229       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
230
231   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
232       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
234       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
235
236   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
237
238   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
239 }
240
241 static void
242 gst_rtsp_stream_init (GstRTSPStream * stream)
243 {
244   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
245
246   GST_DEBUG ("new stream %p", stream);
247
248   stream->priv = priv;
249
250   priv->dscp_qos = -1;
251   priv->control = g_strdup (DEFAULT_CONTROL);
252   priv->profiles = DEFAULT_PROFILES;
253   priv->protocols = DEFAULT_PROTOCOLS;
254   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
255
256   g_mutex_init (&priv->lock);
257
258   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
259       NULL, (GDestroyNotify) gst_caps_unref);
260   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
261       (GDestroyNotify) gst_caps_unref);
262 }
263
264 static void
265 gst_rtsp_stream_finalize (GObject * obj)
266 {
267   GstRTSPStream *stream;
268   GstRTSPStreamPrivate *priv;
269
270   stream = GST_RTSP_STREAM (obj);
271   priv = stream->priv;
272
273   GST_DEBUG ("finalize stream %p", stream);
274
275   /* we really need to be unjoined now */
276   g_return_if_fail (priv->joined_bin == NULL);
277
278   if (priv->mcast_addr_v4)
279     gst_rtsp_address_free (priv->mcast_addr_v4);
280   if (priv->mcast_addr_v6)
281     gst_rtsp_address_free (priv->mcast_addr_v6);
282   if (priv->server_addr_v4)
283     gst_rtsp_address_free (priv->server_addr_v4);
284   if (priv->server_addr_v6)
285     gst_rtsp_address_free (priv->server_addr_v6);
286   if (priv->pool)
287     g_object_unref (priv->pool);
288   if (priv->rtxsend)
289     g_object_unref (priv->rtxsend);
290
291   g_free (priv->multicast_iface);
292
293   gst_object_unref (priv->payloader);
294   if (priv->srcpad)
295     gst_object_unref (priv->srcpad);
296   if (priv->sinkpad)
297     gst_object_unref (priv->sinkpad);
298   g_free (priv->control);
299   g_mutex_clear (&priv->lock);
300
301   g_hash_table_unref (priv->keys);
302   g_hash_table_destroy (priv->ptmap);
303
304   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
305 }
306
307 static void
308 gst_rtsp_stream_get_property (GObject * object, guint propid,
309     GValue * value, GParamSpec * pspec)
310 {
311   GstRTSPStream *stream = GST_RTSP_STREAM (object);
312
313   switch (propid) {
314     case PROP_CONTROL:
315       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
316       break;
317     case PROP_PROFILES:
318       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
319       break;
320     case PROP_PROTOCOLS:
321       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
325   }
326 }
327
328 static void
329 gst_rtsp_stream_set_property (GObject * object, guint propid,
330     const GValue * value, GParamSpec * pspec)
331 {
332   GstRTSPStream *stream = GST_RTSP_STREAM (object);
333
334   switch (propid) {
335     case PROP_CONTROL:
336       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
337       break;
338     case PROP_PROFILES:
339       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
340       break;
341     case PROP_PROTOCOLS:
342       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
343       break;
344     default:
345       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
346   }
347 }
348
349 /**
350  * gst_rtsp_stream_new:
351  * @idx: an index
352  * @pad: a #GstPad
353  * @payloader: a #GstElement
354  *
355  * Create a new media stream with index @idx that handles RTP data on
356  * @pad and has a payloader element @payloader if @pad is a source pad
357  * or a depayloader element @payloader if @pad is a sink pad.
358  *
359  * Returns: (transfer full): a new #GstRTSPStream
360  */
361 GstRTSPStream *
362 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
363 {
364   GstRTSPStreamPrivate *priv;
365   GstRTSPStream *stream;
366
367   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
368   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
369
370   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
371   priv = stream->priv;
372   priv->idx = idx;
373   priv->payloader = gst_object_ref (payloader);
374   if (GST_PAD_IS_SRC (pad))
375     priv->srcpad = gst_object_ref (pad);
376   else
377     priv->sinkpad = gst_object_ref (pad);
378
379   return stream;
380 }
381
382 /**
383  * gst_rtsp_stream_get_index:
384  * @stream: a #GstRTSPStream
385  *
386  * Get the stream index.
387  *
388  * Return: the stream index.
389  */
390 guint
391 gst_rtsp_stream_get_index (GstRTSPStream * stream)
392 {
393   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
394
395   return stream->priv->idx;
396 }
397
398 /**
399  * gst_rtsp_stream_get_pt:
400  * @stream: a #GstRTSPStream
401  *
402  * Get the stream payload type.
403  *
404  * Return: the stream payload type.
405  */
406 guint
407 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
408 {
409   GstRTSPStreamPrivate *priv;
410   guint pt;
411
412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
413
414   priv = stream->priv;
415
416   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
417
418   return pt;
419 }
420
421 /**
422  * gst_rtsp_stream_get_srcpad:
423  * @stream: a #GstRTSPStream
424  *
425  * Get the srcpad associated with @stream.
426  *
427  * Returns: (transfer full): the srcpad. Unref after usage.
428  */
429 GstPad *
430 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
431 {
432   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
433
434   if (!stream->priv->srcpad)
435     return NULL;
436
437   return gst_object_ref (stream->priv->srcpad);
438 }
439
440 /**
441  * gst_rtsp_stream_get_sinkpad:
442  * @stream: a #GstRTSPStream
443  *
444  * Get the sinkpad associated with @stream.
445  *
446  * Returns: (transfer full): the sinkpad. Unref after usage.
447  */
448 GstPad *
449 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
450 {
451   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
452
453   if (!stream->priv->sinkpad)
454     return NULL;
455
456   return gst_object_ref (stream->priv->sinkpad);
457 }
458
459 /**
460  * gst_rtsp_stream_get_control:
461  * @stream: a #GstRTSPStream
462  *
463  * Get the control string to identify this stream.
464  *
465  * Returns: (transfer full): the control string. g_free() after usage.
466  */
467 gchar *
468 gst_rtsp_stream_get_control (GstRTSPStream * stream)
469 {
470   GstRTSPStreamPrivate *priv;
471   gchar *result;
472
473   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
474
475   priv = stream->priv;
476
477   g_mutex_lock (&priv->lock);
478   if ((result = g_strdup (priv->control)) == NULL)
479     result = g_strdup_printf ("stream=%u", priv->idx);
480   g_mutex_unlock (&priv->lock);
481
482   return result;
483 }
484
485 /**
486  * gst_rtsp_stream_set_control:
487  * @stream: a #GstRTSPStream
488  * @control: a control string
489  *
490  * Set the control string in @stream.
491  */
492 void
493 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
494 {
495   GstRTSPStreamPrivate *priv;
496
497   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
498
499   priv = stream->priv;
500
501   g_mutex_lock (&priv->lock);
502   g_free (priv->control);
503   priv->control = g_strdup (control);
504   g_mutex_unlock (&priv->lock);
505 }
506
507 /**
508  * gst_rtsp_stream_has_control:
509  * @stream: a #GstRTSPStream
510  * @control: a control string
511  *
512  * Check if @stream has the control string @control.
513  *
514  * Returns: %TRUE is @stream has @control as the control string
515  */
516 gboolean
517 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
518 {
519   GstRTSPStreamPrivate *priv;
520   gboolean res;
521
522   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
523
524   priv = stream->priv;
525
526   g_mutex_lock (&priv->lock);
527   if (priv->control)
528     res = (g_strcmp0 (priv->control, control) == 0);
529   else {
530     guint streamid;
531
532     if (sscanf (control, "stream=%u", &streamid) > 0)
533       res = (streamid == priv->idx);
534     else
535       res = FALSE;
536   }
537   g_mutex_unlock (&priv->lock);
538
539   return res;
540 }
541
542 /**
543  * gst_rtsp_stream_set_mtu:
544  * @stream: a #GstRTSPStream
545  * @mtu: a new MTU
546  *
547  * Configure the mtu in the payloader of @stream to @mtu.
548  */
549 void
550 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
551 {
552   GstRTSPStreamPrivate *priv;
553
554   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
555
556   priv = stream->priv;
557
558   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
559
560   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
561 }
562
563 /**
564  * gst_rtsp_stream_get_mtu:
565  * @stream: a #GstRTSPStream
566  *
567  * Get the configured MTU in the payloader of @stream.
568  *
569  * Returns: the MTU of the payloader.
570  */
571 guint
572 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
573 {
574   GstRTSPStreamPrivate *priv;
575   guint mtu;
576
577   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
578
579   priv = stream->priv;
580
581   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
582
583   return mtu;
584 }
585
586 /* Update the dscp qos property on the udp sinks */
587 static void
588 update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2])
589 {
590   GstRTSPStreamPrivate *priv;
591
592   priv = stream->priv;
593
594   if (udpsink[0]) {
595     g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL);
596   }
597
598   if (udpsink[1]) {
599     g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL);
600   }
601 }
602
603 /**
604  * gst_rtsp_stream_set_dscp_qos:
605  * @stream: a #GstRTSPStream
606  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
607  *
608  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
609  */
610 void
611 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
612 {
613   GstRTSPStreamPrivate *priv;
614
615   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
616
617   priv = stream->priv;
618
619   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
620
621   if (dscp_qos < -1 || dscp_qos > 63) {
622     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
623     return;
624   }
625
626   priv->dscp_qos = dscp_qos;
627
628   update_dscp_qos (stream, priv->udpsink);
629 }
630
631 /**
632  * gst_rtsp_stream_get_dscp_qos:
633  * @stream: a #GstRTSPStream
634  *
635  * Get the configured DSCP QoS in of the outgoing sockets.
636  *
637  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
638  */
639 gint
640 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
641 {
642   GstRTSPStreamPrivate *priv;
643
644   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
645
646   priv = stream->priv;
647
648   return priv->dscp_qos;
649 }
650
651 /**
652  * gst_rtsp_stream_is_transport_supported:
653  * @stream: a #GstRTSPStream
654  * @transport: (transfer none): a #GstRTSPTransport
655  *
656  * Check if @transport can be handled by stream
657  *
658  * Returns: %TRUE if @transport can be handled by @stream.
659  */
660 gboolean
661 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
662     GstRTSPTransport * transport)
663 {
664   GstRTSPStreamPrivate *priv;
665
666   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
667
668   priv = stream->priv;
669
670   g_mutex_lock (&priv->lock);
671   if (transport->trans != GST_RTSP_TRANS_RTP)
672     goto unsupported_transmode;
673
674   if (!(transport->profile & priv->profiles))
675     goto unsupported_profile;
676
677   if (!(transport->lower_transport & priv->protocols))
678     goto unsupported_ltrans;
679
680   g_mutex_unlock (&priv->lock);
681
682   return TRUE;
683
684   /* ERRORS */
685 unsupported_transmode:
686   {
687     GST_DEBUG ("unsupported transport mode %d", transport->trans);
688     g_mutex_unlock (&priv->lock);
689     return FALSE;
690   }
691 unsupported_profile:
692   {
693     GST_DEBUG ("unsupported profile %d", transport->profile);
694     g_mutex_unlock (&priv->lock);
695     return FALSE;
696   }
697 unsupported_ltrans:
698   {
699     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
700     g_mutex_unlock (&priv->lock);
701     return FALSE;
702   }
703 }
704
705 /**
706  * gst_rtsp_stream_set_profiles:
707  * @stream: a #GstRTSPStream
708  * @profiles: the new profiles
709  *
710  * Configure the allowed profiles for @stream.
711  */
712 void
713 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
714 {
715   GstRTSPStreamPrivate *priv;
716
717   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
718
719   priv = stream->priv;
720
721   g_mutex_lock (&priv->lock);
722   priv->profiles = profiles;
723   g_mutex_unlock (&priv->lock);
724 }
725
726 /**
727  * gst_rtsp_stream_get_profiles:
728  * @stream: a #GstRTSPStream
729  *
730  * Get the allowed profiles of @stream.
731  *
732  * Returns: a #GstRTSPProfile
733  */
734 GstRTSPProfile
735 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
736 {
737   GstRTSPStreamPrivate *priv;
738   GstRTSPProfile res;
739
740   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
741
742   priv = stream->priv;
743
744   g_mutex_lock (&priv->lock);
745   res = priv->profiles;
746   g_mutex_unlock (&priv->lock);
747
748   return res;
749 }
750
751 /**
752  * gst_rtsp_stream_set_protocols:
753  * @stream: a #GstRTSPStream
754  * @protocols: the new flags
755  *
756  * Configure the allowed lower transport for @stream.
757  */
758 void
759 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
760     GstRTSPLowerTrans protocols)
761 {
762   GstRTSPStreamPrivate *priv;
763
764   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
765
766   priv = stream->priv;
767
768   g_mutex_lock (&priv->lock);
769   priv->protocols = protocols;
770   g_mutex_unlock (&priv->lock);
771 }
772
773 /**
774  * gst_rtsp_stream_get_protocols:
775  * @stream: a #GstRTSPStream
776  *
777  * Get the allowed protocols of @stream.
778  *
779  * Returns: a #GstRTSPLowerTrans
780  */
781 GstRTSPLowerTrans
782 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
783 {
784   GstRTSPStreamPrivate *priv;
785   GstRTSPLowerTrans res;
786
787   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
788       GST_RTSP_LOWER_TRANS_UNKNOWN);
789
790   priv = stream->priv;
791
792   g_mutex_lock (&priv->lock);
793   res = priv->protocols;
794   g_mutex_unlock (&priv->lock);
795
796   return res;
797 }
798
799 /**
800  * gst_rtsp_stream_set_address_pool:
801  * @stream: a #GstRTSPStream
802  * @pool: (transfer none): a #GstRTSPAddressPool
803  *
804  * configure @pool to be used as the address pool of @stream.
805  */
806 void
807 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
808     GstRTSPAddressPool * pool)
809 {
810   GstRTSPStreamPrivate *priv;
811   GstRTSPAddressPool *old;
812
813   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
814
815   priv = stream->priv;
816
817   GST_LOG_OBJECT (stream, "set address pool %p", pool);
818
819   g_mutex_lock (&priv->lock);
820   if ((old = priv->pool) != pool)
821     priv->pool = pool ? g_object_ref (pool) : NULL;
822   else
823     old = NULL;
824   g_mutex_unlock (&priv->lock);
825
826   if (old)
827     g_object_unref (old);
828 }
829
830 /**
831  * gst_rtsp_stream_get_address_pool:
832  * @stream: a #GstRTSPStream
833  *
834  * Get the #GstRTSPAddressPool used as the address pool of @stream.
835  *
836  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
837  * usage.
838  */
839 GstRTSPAddressPool *
840 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
841 {
842   GstRTSPStreamPrivate *priv;
843   GstRTSPAddressPool *result;
844
845   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
846
847   priv = stream->priv;
848
849   g_mutex_lock (&priv->lock);
850   if ((result = priv->pool))
851     g_object_ref (result);
852   g_mutex_unlock (&priv->lock);
853
854   return result;
855 }
856
857 /**
858  * gst_rtsp_stream_set_multicast_iface:
859  * @stream: a #GstRTSPStream
860  * @multicast_iface: (transfer none): a multicast interface
861  *
862  * configure @multicast_iface to be used for @stream.
863  */
864 void
865 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
866     const gchar * multicast_iface)
867 {
868   GstRTSPStreamPrivate *priv;
869   gchar *old;
870
871   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
872
873   priv = stream->priv;
874
875   GST_LOG_OBJECT (stream, "set multicast iface %s",
876       GST_STR_NULL (multicast_iface));
877
878   g_mutex_lock (&priv->lock);
879   if ((old = priv->multicast_iface) != multicast_iface)
880     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
881   else
882     old = NULL;
883   g_mutex_unlock (&priv->lock);
884
885   if (old)
886     g_free (old);
887 }
888
889 /**
890  * gst_rtsp_stream_get_multicast_iface:
891  * @stream: a #GstRTSPStream
892  *
893  * Get the multicast interface used for @stream.
894  *
895  * Returns: (transfer full): the multicast interface for @stream. g_free() after
896  * usage.
897  */
898 gchar *
899 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
900 {
901   GstRTSPStreamPrivate *priv;
902   gchar *result;
903
904   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
905
906   priv = stream->priv;
907
908   g_mutex_lock (&priv->lock);
909   if ((result = priv->multicast_iface))
910     result = g_strdup (result);
911   g_mutex_unlock (&priv->lock);
912
913   return result;
914 }
915
916
917 static GstRTSPAddress *
918 gst_rtsp_stream_get_multicast_address_locked (GstRTSPStream * stream,
919     GSocketFamily family)
920 {
921   GstRTSPStreamPrivate *priv;
922   GstRTSPAddress *result;
923   GstRTSPAddress **addrp;
924   GstRTSPAddressFlags flags;
925
926   priv = stream->priv;
927
928   if (family == G_SOCKET_FAMILY_IPV6) {
929     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
930     addrp = &priv->mcast_addr_v6;
931   } else {
932     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
933     addrp = &priv->mcast_addr_v4;
934   }
935
936   if (*addrp == NULL) {
937     if (priv->pool == NULL)
938       goto no_pool;
939
940     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
941
942     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
943     if (*addrp == NULL)
944       goto no_address;
945
946     /* FIXME: Also reserve the same port with unicast ANY address, since that's
947      * where we are going to bind our socket. Probably loop until we find a port
948      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
949      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
950      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
951   }
952   result = gst_rtsp_address_copy (*addrp);
953
954   return result;
955
956   /* ERRORS */
957 no_pool:
958   {
959     GST_ERROR_OBJECT (stream, "no address pool specified");
960     return NULL;
961   }
962 no_address:
963   {
964     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
965     return NULL;
966   }
967 }
968
969 /**
970  * gst_rtsp_stream_get_multicast_address:
971  * @stream: a #GstRTSPStream
972  * @family: the #GSocketFamily
973  *
974  * Get the multicast address of @stream for @family. The original
975  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
976  * won't release the address from the pool.
977  *
978  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
979  * or %NULL when no address could be allocated. gst_rtsp_address_free()
980  * after usage.
981  */
982 GstRTSPAddress *
983 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
984     GSocketFamily family)
985 {
986   GstRTSPAddress *result;
987
988   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
989
990   g_mutex_lock (&stream->priv->lock);
991   result = gst_rtsp_stream_get_multicast_address_locked (stream, family);
992   g_mutex_unlock (&stream->priv->lock);
993
994   return result;
995 }
996
997 /**
998  * gst_rtsp_stream_reserve_address:
999  * @stream: a #GstRTSPStream
1000  * @address: an address
1001  * @port: a port
1002  * @n_ports: n_ports
1003  * @ttl: a TTL
1004  *
1005  * Reserve @address and @port as the address and port of @stream. The original
1006  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1007  * won't release the address from the pool.
1008  *
1009  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1010  * the address could be reserved. gst_rtsp_address_free() after usage.
1011  */
1012 GstRTSPAddress *
1013 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1014     const gchar * address, guint port, guint n_ports, guint ttl)
1015 {
1016   GstRTSPStreamPrivate *priv;
1017   GstRTSPAddress *result;
1018   GInetAddress *addr;
1019   GSocketFamily family;
1020   GstRTSPAddress **addrp;
1021
1022   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1023   g_return_val_if_fail (address != NULL, NULL);
1024   g_return_val_if_fail (port > 0, NULL);
1025   g_return_val_if_fail (n_ports > 0, NULL);
1026   g_return_val_if_fail (ttl > 0, NULL);
1027
1028   priv = stream->priv;
1029
1030   addr = g_inet_address_new_from_string (address);
1031   if (!addr) {
1032     GST_ERROR ("failed to get inet addr from %s", address);
1033     family = G_SOCKET_FAMILY_IPV4;
1034   } else {
1035     family = g_inet_address_get_family (addr);
1036     g_object_unref (addr);
1037   }
1038
1039   if (family == G_SOCKET_FAMILY_IPV6)
1040     addrp = &priv->mcast_addr_v6;
1041   else
1042     addrp = &priv->mcast_addr_v4;
1043
1044   g_mutex_lock (&priv->lock);
1045   if (*addrp == NULL) {
1046     GstRTSPAddressPoolResult res;
1047
1048     if (priv->pool == NULL)
1049       goto no_pool;
1050
1051     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1052         port, n_ports, ttl, addrp);
1053     if (res != GST_RTSP_ADDRESS_POOL_OK)
1054       goto no_address;
1055
1056     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1057      * where we are going to bind our socket. */
1058   } else {
1059     if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1060         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1061         (*addrp)->ttl != ttl)
1062       goto different_address;
1063   }
1064   result = gst_rtsp_address_copy (*addrp);
1065   g_mutex_unlock (&priv->lock);
1066
1067   return result;
1068
1069   /* ERRORS */
1070 no_pool:
1071   {
1072     GST_ERROR_OBJECT (stream, "no address pool specified");
1073     g_mutex_unlock (&priv->lock);
1074     return NULL;
1075   }
1076 no_address:
1077   {
1078     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1079         address);
1080     g_mutex_unlock (&priv->lock);
1081     return NULL;
1082   }
1083 different_address:
1084   {
1085     GST_ERROR_OBJECT (stream,
1086         "address %s is not the same as %s that was already" " reserved",
1087         address, (*addrp)->address);
1088     g_mutex_unlock (&priv->lock);
1089     return NULL;
1090   }
1091 }
1092
1093 /* must be called with lock */
1094 static void
1095 set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket,
1096     GSocket * rtcp_socket, GSocketFamily family)
1097 {
1098   const gchar *multisink_socket;
1099
1100   if (family == G_SOCKET_FAMILY_IPV6)
1101     multisink_socket = "socket-v6";
1102   else
1103     multisink_socket = "socket";
1104
1105   g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL);
1106   g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL);
1107 }
1108
1109 static gboolean
1110 create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2])
1111 {
1112   GstRTSPStreamPrivate *priv = stream->priv;
1113   GstElement *udpsink0, *udpsink1;
1114
1115   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1116   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1117
1118   if (!udpsink0 || !udpsink1)
1119     goto no_udp_protocol;
1120
1121   /* configure sinks */
1122
1123   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1124   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1125
1126   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1127   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1128
1129   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1130
1131   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1132   /* Needs to be async for RECORD streams, otherwise we will never go to
1133    * PLAYING because the sinks will wait for data while the udpsrc can't
1134    * provide data with timestamps in PAUSED. */
1135   if (priv->sinkpad)
1136     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1137   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1138
1139   /* join multicast group when adding clients, so we'll start receiving from it.
1140    * We cannot rely on the udpsrc to join the group since its socket is always a
1141    * local unicast one. */
1142   g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL);
1143   g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL);
1144
1145   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1146   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1147
1148   udpsink[0] = udpsink0;
1149   udpsink[1] = udpsink1;
1150
1151   /* update the dscp qos field in the sinks */
1152   update_dscp_qos (stream, udpsink);
1153
1154   return TRUE;
1155
1156   /* ERRORS */
1157 no_udp_protocol:
1158   {
1159     return FALSE;
1160   }
1161 }
1162
1163 /* must be called with lock */
1164 static gboolean
1165 create_and_configure_udpsources (GstElement * udpsrc_out[2],
1166     GSocket * rtp_socket, GSocket * rtcp_socket)
1167 {
1168   GstStateChangeReturn ret;
1169
1170   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1171   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1172
1173   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1174     goto error;
1175
1176   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1177   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1178
1179   /* The udpsrc cannot do the join because its socket is always a local unicast
1180    * one. The udpsink sharing the same socket will do it for us. */
1181   g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL);
1182   g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL);
1183
1184   g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1185   g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1186
1187   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1188   if (ret == GST_STATE_CHANGE_FAILURE)
1189     goto error;
1190   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1191   if (ret == GST_STATE_CHANGE_FAILURE)
1192     goto error;
1193
1194   return TRUE;
1195
1196   /* ERRORS */
1197 error:
1198   {
1199     if (udpsrc_out[0]) {
1200       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1201       g_clear_object (&udpsrc_out[0]);
1202     }
1203     if (udpsrc_out[1]) {
1204       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1205       g_clear_object (&udpsrc_out[1]);
1206     }
1207     return FALSE;
1208   }
1209 }
1210
1211 static gboolean
1212 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1213     GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
1214     GstRTSPAddress ** server_addr_out, gboolean multicast)
1215 {
1216   GstRTSPStreamPrivate *priv = stream->priv;
1217   GSocket *rtp_socket = NULL;
1218   GSocket *rtcp_socket;
1219   gint tmp_rtp, tmp_rtcp;
1220   guint count;
1221   gint rtpport, rtcpport;
1222   GList *rejected_addresses = NULL;
1223   GstRTSPAddress *addr = NULL;
1224   GInetAddress *inetaddr = NULL;
1225   gchar *addr_str;
1226   GSocketAddress *rtp_sockaddr = NULL;
1227   GSocketAddress *rtcp_sockaddr = NULL;
1228   GstRTSPAddressPool *pool;
1229
1230   g_assert (!udpsrc_out[0]);
1231   g_assert (!udpsrc_out[1]);
1232   g_assert ((!udpsink_out[0] && !udpsink_out[1]) ||
1233       (udpsink_out[0] && udpsink_out[1]));
1234   g_assert (*server_addr_out == NULL);
1235
1236   pool = priv->pool;
1237   count = 0;
1238
1239   /* Start with random port */
1240   tmp_rtp = 0;
1241
1242   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1243       G_SOCKET_PROTOCOL_UDP, NULL);
1244   if (!rtcp_socket)
1245     goto no_udp_protocol;
1246   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1247
1248   /* try to allocate 2 UDP ports, the RTP port should be an even
1249    * number and the RTCP port should be the next (uneven) port */
1250 again:
1251
1252   if (rtp_socket == NULL) {
1253     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1254         G_SOCKET_PROTOCOL_UDP, NULL);
1255     if (!rtp_socket)
1256       goto no_udp_protocol;
1257     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1258   }
1259
1260   if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1261     GstRTSPAddressFlags flags;
1262
1263     if (addr)
1264       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1265
1266     if (!pool)
1267       goto no_ports;
1268
1269     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1270     if (multicast)
1271       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1272     else
1273       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1274
1275     if (family == G_SOCKET_FAMILY_IPV6)
1276       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1277     else
1278       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1279
1280     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1281
1282     if (addr == NULL)
1283       goto no_ports;
1284
1285     tmp_rtp = addr->port;
1286
1287     g_clear_object (&inetaddr);
1288     if (multicast)
1289       inetaddr = g_inet_address_new_any (family);
1290     else
1291       inetaddr = g_inet_address_new_from_string (addr->address);
1292   } else {
1293     if (tmp_rtp != 0) {
1294       tmp_rtp += 2;
1295       if (++count > 20)
1296         goto no_ports;
1297     }
1298
1299     if (inetaddr == NULL)
1300       inetaddr = g_inet_address_new_any (family);
1301   }
1302
1303   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1304   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1305     g_object_unref (rtp_sockaddr);
1306     goto again;
1307   }
1308   g_object_unref (rtp_sockaddr);
1309
1310   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1311   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1312     g_clear_object (&rtp_sockaddr);
1313     goto socket_error;
1314   }
1315
1316   tmp_rtp =
1317       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1318   g_object_unref (rtp_sockaddr);
1319
1320   /* check if port is even */
1321   if ((tmp_rtp & 1) != 0) {
1322     /* port not even, close and allocate another */
1323     tmp_rtp++;
1324     g_clear_object (&rtp_socket);
1325     goto again;
1326   }
1327
1328   /* set port */
1329   tmp_rtcp = tmp_rtp + 1;
1330
1331   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1332   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1333     g_object_unref (rtcp_sockaddr);
1334     g_clear_object (&rtp_socket);
1335     goto again;
1336   }
1337   g_object_unref (rtcp_sockaddr);
1338
1339   if (!addr) {
1340     addr = g_slice_new0 (GstRTSPAddress);
1341     addr->address = g_inet_address_to_string (inetaddr);
1342     addr->port = tmp_rtp;
1343     addr->n_ports = 2;
1344   }
1345
1346   addr_str = addr->address;
1347   g_clear_object (&inetaddr);
1348
1349   if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
1350     goto no_udp_protocol;
1351   }
1352
1353   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1354   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1355
1356   /* this should not happen... */
1357   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1358     goto port_error;
1359
1360   /* This function is called twice (for v4 and v6) but we create only one pair
1361    * of udpsinks. */
1362   if (!udpsink_out[0]
1363       && !create_and_configure_udpsinks (stream, udpsink_out))
1364     goto no_udp_protocol;
1365
1366   if (multicast) {
1367     g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface",
1368         priv->multicast_iface, NULL);
1369     g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface",
1370         priv->multicast_iface, NULL);
1371
1372     g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL);
1373     g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL);
1374   }
1375
1376   set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
1377
1378   *server_addr_out = addr;
1379
1380   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1381
1382   g_object_unref (rtp_socket);
1383   g_object_unref (rtcp_socket);
1384
1385   return TRUE;
1386
1387   /* ERRORS */
1388 no_udp_protocol:
1389   {
1390     goto cleanup;
1391   }
1392 no_ports:
1393   {
1394     goto cleanup;
1395   }
1396 port_error:
1397   {
1398     goto cleanup;
1399   }
1400 socket_error:
1401   {
1402     goto cleanup;
1403   }
1404 cleanup:
1405   {
1406     if (inetaddr)
1407       g_object_unref (inetaddr);
1408     g_list_free_full (rejected_addresses,
1409         (GDestroyNotify) gst_rtsp_address_free);
1410     if (addr)
1411       gst_rtsp_address_free (addr);
1412     if (rtp_socket)
1413       g_object_unref (rtp_socket);
1414     if (rtcp_socket)
1415       g_object_unref (rtcp_socket);
1416     return FALSE;
1417   }
1418 }
1419
1420 /**
1421  * gst_rtsp_stream_allocate_udp_sockets:
1422  * @stream: a #GstRTSPStream
1423  * @family: protocol family
1424  * @transport_method: transport method
1425  *
1426  * Allocates RTP and RTCP ports.
1427  *
1428  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1429  * Deprecated: This function shouldn't have been made public
1430  */
1431 gboolean
1432 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1433     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1434 {
1435   g_warn_if_reached ();
1436   return FALSE;
1437 }
1438
1439 /**
1440  * gst_rtsp_stream_set_client_side:
1441  * @stream: a #GstRTSPStream
1442  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1443  * an RTSP connection.
1444  *
1445  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1446  * streams to an RTSP server via RECORD. This has the practical effect
1447  * of changing which UDP port numbers are used when setting up the local
1448  * side of the stream sending to be either the 'server' or 'client' pair
1449  * of a configured UDP transport.
1450  */
1451 void
1452 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1453 {
1454   GstRTSPStreamPrivate *priv;
1455
1456   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1457   priv = stream->priv;
1458   g_mutex_lock (&priv->lock);
1459   priv->client_side = client_side;
1460   g_mutex_unlock (&priv->lock);
1461 }
1462
1463 /**
1464  * gst_rtsp_stream_is_client_side:
1465  * @stream: a #GstRTSPStream
1466  *
1467  * See gst_rtsp_stream_set_client_side()
1468  *
1469  * Returns: TRUE if this #GstRTSPStream is client-side.
1470  */
1471 gboolean
1472 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1473 {
1474   GstRTSPStreamPrivate *priv;
1475   gboolean ret;
1476
1477   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1478
1479   priv = stream->priv;
1480   g_mutex_lock (&priv->lock);
1481   ret = priv->client_side;
1482   g_mutex_unlock (&priv->lock);
1483
1484   return ret;
1485 }
1486
1487 /* must be called with lock */
1488 static gboolean
1489 alloc_ports (GstRTSPStream * stream)
1490 {
1491   GstRTSPStreamPrivate *priv = stream->priv;
1492   gboolean ret = TRUE;
1493
1494   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) {
1495     ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1496         priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE);
1497
1498     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1499         priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE);
1500   }
1501
1502   /* FIXME: Maybe actually consider the return values? */
1503   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1504     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1505         priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE);
1506
1507     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1508         priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE);
1509   }
1510
1511   return ret;
1512 }
1513
1514 /**
1515  * gst_rtsp_stream_get_server_port:
1516  * @stream: a #GstRTSPStream
1517  * @server_port: (out): result server port
1518  * @family: the port family to get
1519  *
1520  * Fill @server_port with the port pair used by the server. This function can
1521  * only be called when @stream has been joined.
1522  */
1523 void
1524 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1525     GstRTSPRange * server_port, GSocketFamily family)
1526 {
1527   GstRTSPStreamPrivate *priv;
1528
1529   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1530   priv = stream->priv;
1531   g_return_if_fail (priv->joined_bin != NULL);
1532
1533   g_mutex_lock (&priv->lock);
1534   if (family == G_SOCKET_FAMILY_IPV4) {
1535     if (server_port) {
1536       server_port->min = priv->server_addr_v4->port;
1537       server_port->max =
1538           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1539     }
1540   } else {
1541     if (server_port) {
1542       server_port->min = priv->server_addr_v6->port;
1543       server_port->max =
1544           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1545     }
1546   }
1547   g_mutex_unlock (&priv->lock);
1548 }
1549
1550 /**
1551  * gst_rtsp_stream_get_rtpsession:
1552  * @stream: a #GstRTSPStream
1553  *
1554  * Get the RTP session of this stream.
1555  *
1556  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1557  */
1558 GObject *
1559 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1560 {
1561   GstRTSPStreamPrivate *priv;
1562   GObject *session;
1563
1564   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1565
1566   priv = stream->priv;
1567
1568   g_mutex_lock (&priv->lock);
1569   if ((session = priv->session))
1570     g_object_ref (session);
1571   g_mutex_unlock (&priv->lock);
1572
1573   return session;
1574 }
1575
1576 /**
1577  * gst_rtsp_stream_get_encoder:
1578  * @stream: a #GstRTSPStream
1579  *
1580  * Get the SRTP encoder for this stream.
1581  *
1582  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1583  */
1584 GstElement *
1585 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1586 {
1587   GstRTSPStreamPrivate *priv;
1588   GstElement *encoder;
1589
1590   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1591
1592   priv = stream->priv;
1593
1594   g_mutex_lock (&priv->lock);
1595   if ((encoder = priv->srtpenc))
1596     g_object_ref (encoder);
1597   g_mutex_unlock (&priv->lock);
1598
1599   return encoder;
1600 }
1601
1602 /**
1603  * gst_rtsp_stream_get_ssrc:
1604  * @stream: a #GstRTSPStream
1605  * @ssrc: (out): result ssrc
1606  *
1607  * Get the SSRC used by the RTP session of this stream. This function can only
1608  * be called when @stream has been joined.
1609  */
1610 void
1611 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1612 {
1613   GstRTSPStreamPrivate *priv;
1614
1615   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1616   priv = stream->priv;
1617   g_return_if_fail (priv->joined_bin != NULL);
1618
1619   g_mutex_lock (&priv->lock);
1620   if (ssrc && priv->session)
1621     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1622   g_mutex_unlock (&priv->lock);
1623 }
1624
1625 /**
1626  * gst_rtsp_stream_set_retransmission_time:
1627  * @stream: a #GstRTSPStream
1628  * @time: a #GstClockTime
1629  *
1630  * Set the amount of time to store retransmission packets.
1631  */
1632 void
1633 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1634     GstClockTime time)
1635 {
1636   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1637
1638   g_mutex_lock (&stream->priv->lock);
1639   stream->priv->rtx_time = time;
1640   if (stream->priv->rtxsend)
1641     g_object_set (stream->priv->rtxsend, "max-size-time",
1642         GST_TIME_AS_MSECONDS (time), NULL);
1643   g_mutex_unlock (&stream->priv->lock);
1644 }
1645
1646 /**
1647  * gst_rtsp_stream_get_retransmission_time:
1648  * @stream: a #GstRTSPStream
1649  *
1650  * Get the amount of time to store retransmission data.
1651  *
1652  * Returns: the amount of time to store retransmission data.
1653  */
1654 GstClockTime
1655 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1656 {
1657   GstClockTime ret;
1658
1659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1660
1661   g_mutex_lock (&stream->priv->lock);
1662   ret = stream->priv->rtx_time;
1663   g_mutex_unlock (&stream->priv->lock);
1664
1665   return ret;
1666 }
1667
1668 /**
1669  * gst_rtsp_stream_set_retransmission_pt:
1670  * @stream: a #GstRTSPStream
1671  * @rtx_pt: a #guint
1672  *
1673  * Set the payload type (pt) for retransmission of this stream.
1674  */
1675 void
1676 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1677 {
1678   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1679
1680   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1681
1682   g_mutex_lock (&stream->priv->lock);
1683   stream->priv->rtx_pt = rtx_pt;
1684   if (stream->priv->rtxsend) {
1685     guint pt = gst_rtsp_stream_get_pt (stream);
1686     gchar *pt_s = g_strdup_printf ("%d", pt);
1687     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1688         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1689     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1690     g_free (pt_s);
1691     gst_structure_free (rtx_pt_map);
1692   }
1693   g_mutex_unlock (&stream->priv->lock);
1694 }
1695
1696 /**
1697  * gst_rtsp_stream_get_retransmission_pt:
1698  * @stream: a #GstRTSPStream
1699  *
1700  * Get the payload-type used for retransmission of this stream
1701  *
1702  * Returns: The retransmission PT.
1703  */
1704 guint
1705 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1706 {
1707   guint rtx_pt;
1708
1709   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1710
1711   g_mutex_lock (&stream->priv->lock);
1712   rtx_pt = stream->priv->rtx_pt;
1713   g_mutex_unlock (&stream->priv->lock);
1714
1715   return rtx_pt;
1716 }
1717
1718 /**
1719  * gst_rtsp_stream_set_buffer_size:
1720  * @stream: a #GstRTSPStream
1721  * @size: the buffer size
1722  *
1723  * Set the size of the UDP transmission buffer (in bytes)
1724  * Needs to be set before the stream is joined to a bin.
1725  *
1726  * Since: 1.6
1727  */
1728 void
1729 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1730 {
1731   g_mutex_lock (&stream->priv->lock);
1732   stream->priv->buffer_size = size;
1733   g_mutex_unlock (&stream->priv->lock);
1734 }
1735
1736 /**
1737  * gst_rtsp_stream_get_buffer_size:
1738  * @stream: a #GstRTSPStream
1739  *
1740  * Get the size of the UDP transmission buffer (in bytes)
1741  *
1742  * Returns: the size of the UDP TX buffer
1743  *
1744  * Since: 1.6
1745  */
1746 guint
1747 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1748 {
1749   guint buffer_size;
1750
1751   g_mutex_lock (&stream->priv->lock);
1752   buffer_size = stream->priv->buffer_size;
1753   g_mutex_unlock (&stream->priv->lock);
1754
1755   return buffer_size;
1756 }
1757
1758 /* executed from streaming thread */
1759 static void
1760 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1761 {
1762   GstRTSPStreamPrivate *priv = stream->priv;
1763   GstCaps *newcaps, *oldcaps;
1764
1765   newcaps = gst_pad_get_current_caps (pad);
1766
1767   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1768       newcaps);
1769
1770   g_mutex_lock (&priv->lock);
1771   oldcaps = priv->caps;
1772   priv->caps = newcaps;
1773   g_mutex_unlock (&priv->lock);
1774
1775   if (oldcaps)
1776     gst_caps_unref (oldcaps);
1777 }
1778
1779 static void
1780 dump_structure (const GstStructure * s)
1781 {
1782   gchar *sstr;
1783
1784   sstr = gst_structure_to_string (s);
1785   GST_INFO ("structure: %s", sstr);
1786   g_free (sstr);
1787 }
1788
1789 static GstRTSPStreamTransport *
1790 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1791 {
1792   GstRTSPStreamPrivate *priv = stream->priv;
1793   GList *walk;
1794   GstRTSPStreamTransport *result = NULL;
1795   const gchar *tmp;
1796   gchar *dest;
1797   guint port;
1798
1799   if (rtcp_from == NULL)
1800     return NULL;
1801
1802   tmp = g_strrstr (rtcp_from, ":");
1803   if (tmp == NULL)
1804     return NULL;
1805
1806   port = atoi (tmp + 1);
1807   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1808
1809   g_mutex_lock (&priv->lock);
1810   GST_INFO ("finding %s:%d in %d transports", dest, port,
1811       g_list_length (priv->transports));
1812
1813   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1814     GstRTSPStreamTransport *trans = walk->data;
1815     const GstRTSPTransport *tr;
1816     gint min, max;
1817
1818     tr = gst_rtsp_stream_transport_get_transport (trans);
1819
1820     if (priv->client_side) {
1821       /* In client side mode the 'destination' is the RTSP server, so send
1822        * to those ports */
1823       min = tr->server_port.min;
1824       max = tr->server_port.max;
1825     } else {
1826       min = tr->client_port.min;
1827       max = tr->client_port.max;
1828     }
1829
1830     if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
1831         (min == port || max == port)) {
1832       result = trans;
1833       break;
1834     }
1835   }
1836   if (result)
1837     g_object_ref (result);
1838   g_mutex_unlock (&priv->lock);
1839
1840   g_free (dest);
1841
1842   return result;
1843 }
1844
1845 static GstRTSPStreamTransport *
1846 check_transport (GObject * source, GstRTSPStream * stream)
1847 {
1848   GstStructure *stats;
1849   GstRTSPStreamTransport *trans;
1850
1851   /* see if we have a stream to match with the origin of the RTCP packet */
1852   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1853   if (trans == NULL) {
1854     g_object_get (source, "stats", &stats, NULL);
1855     if (stats) {
1856       const gchar *rtcp_from;
1857
1858       dump_structure (stats);
1859
1860       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1861       if ((trans = find_transport (stream, rtcp_from))) {
1862         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1863             source);
1864         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1865             g_object_unref);
1866       }
1867       gst_structure_free (stats);
1868     }
1869   }
1870   return trans;
1871 }
1872
1873
1874 static void
1875 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1876 {
1877   GstRTSPStreamTransport *trans;
1878
1879   GST_INFO ("%p: new source %p", stream, source);
1880
1881   trans = check_transport (source, stream);
1882
1883   if (trans)
1884     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1885 }
1886
1887 static void
1888 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1889 {
1890   GST_INFO ("%p: new SDES %p", stream, source);
1891 }
1892
1893 static void
1894 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1895 {
1896   GstRTSPStreamTransport *trans;
1897
1898   trans = check_transport (source, stream);
1899
1900   if (trans) {
1901     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1902     gst_rtsp_stream_transport_keep_alive (trans);
1903   }
1904 #ifdef DUMP_STATS
1905   {
1906     GstStructure *stats;
1907     g_object_get (source, "stats", &stats, NULL);
1908     if (stats) {
1909       dump_structure (stats);
1910       gst_structure_free (stats);
1911     }
1912   }
1913 #endif
1914 }
1915
1916 static void
1917 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1918 {
1919   GST_INFO ("%p: source %p bye", stream, source);
1920 }
1921
1922 static void
1923 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1924 {
1925   GstRTSPStreamTransport *trans;
1926
1927   GST_INFO ("%p: source %p bye timeout", stream, source);
1928
1929   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1930     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1931     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1932   }
1933 }
1934
1935 static void
1936 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1937 {
1938   GstRTSPStreamTransport *trans;
1939
1940   GST_INFO ("%p: source %p timeout", stream, source);
1941
1942   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1943     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1944     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1945   }
1946 }
1947
1948 static void
1949 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1950 {
1951   GST_INFO ("%p: new sender source %p", stream, source);
1952 #ifndef DUMP_STATS
1953   {
1954     GstStructure *stats;
1955     g_object_get (source, "stats", &stats, NULL);
1956     if (stats) {
1957       dump_structure (stats);
1958       gst_structure_free (stats);
1959     }
1960   }
1961 #endif
1962 }
1963
1964 static void
1965 on_sender_ssrc_active (GObject * session, GObject * source,
1966     GstRTSPStream * stream)
1967 {
1968 #ifndef DUMP_STATS
1969   {
1970     GstStructure *stats;
1971     g_object_get (source, "stats", &stats, NULL);
1972     if (stats) {
1973       dump_structure (stats);
1974       gst_structure_free (stats);
1975     }
1976   }
1977 #endif
1978 }
1979
1980 static void
1981 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1982 {
1983   if (is_rtp) {
1984     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1985     g_list_free (priv->tr_cache_rtp);
1986     priv->tr_cache_rtp = NULL;
1987   } else {
1988     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1989     g_list_free (priv->tr_cache_rtcp);
1990     priv->tr_cache_rtcp = NULL;
1991   }
1992 }
1993
1994 static GstFlowReturn
1995 handle_new_sample (GstAppSink * sink, gpointer user_data)
1996 {
1997   GstRTSPStreamPrivate *priv;
1998   GList *walk;
1999   GstSample *sample;
2000   GstBuffer *buffer;
2001   GstRTSPStream *stream;
2002   gboolean is_rtp;
2003
2004   sample = gst_app_sink_pull_sample (sink);
2005   if (!sample)
2006     return GST_FLOW_OK;
2007
2008   stream = (GstRTSPStream *) user_data;
2009   priv = stream->priv;
2010   buffer = gst_sample_get_buffer (sample);
2011
2012   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2013
2014   g_mutex_lock (&priv->lock);
2015   if (is_rtp) {
2016     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2017       clear_tr_cache (priv, is_rtp);
2018       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2019         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2020         priv->tr_cache_rtp =
2021             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2022       }
2023       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2024     }
2025   } else {
2026     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2027       clear_tr_cache (priv, is_rtp);
2028       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2029         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2030         priv->tr_cache_rtcp =
2031             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2032       }
2033       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2034     }
2035   }
2036   g_mutex_unlock (&priv->lock);
2037
2038   if (is_rtp) {
2039     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2040       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2041       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2042     }
2043   } else {
2044     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2045       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2046       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2047     }
2048   }
2049   gst_sample_unref (sample);
2050
2051   return GST_FLOW_OK;
2052 }
2053
2054 static GstAppSinkCallbacks sink_cb = {
2055   NULL,                         /* not interested in EOS */
2056   NULL,                         /* not interested in preroll samples */
2057   handle_new_sample,
2058 };
2059
2060 static GstElement *
2061 get_rtp_encoder (GstRTSPStream * stream, guint session)
2062 {
2063   GstRTSPStreamPrivate *priv = stream->priv;
2064
2065   if (priv->srtpenc == NULL) {
2066     gchar *name;
2067
2068     name = g_strdup_printf ("srtpenc_%u", session);
2069     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2070     g_free (name);
2071
2072     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2073   }
2074   return gst_object_ref (priv->srtpenc);
2075 }
2076
2077 static GstElement *
2078 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2079 {
2080   GstRTSPStreamPrivate *priv = stream->priv;
2081   GstElement *oldenc, *enc;
2082   GstPad *pad;
2083   gchar *name;
2084
2085   if (priv->idx != session)
2086     return NULL;
2087
2088   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2089
2090   oldenc = priv->srtpenc;
2091   enc = get_rtp_encoder (stream, session);
2092   name = g_strdup_printf ("rtp_sink_%d", session);
2093   pad = gst_element_get_request_pad (enc, name);
2094   g_free (name);
2095   gst_object_unref (pad);
2096
2097   if (oldenc == NULL)
2098     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2099         enc);
2100
2101   return enc;
2102 }
2103
2104 static GstElement *
2105 request_rtcp_encoder (GstElement * rtpbin, guint session,
2106     GstRTSPStream * stream)
2107 {
2108   GstRTSPStreamPrivate *priv = stream->priv;
2109   GstElement *oldenc, *enc;
2110   GstPad *pad;
2111   gchar *name;
2112
2113   if (priv->idx != session)
2114     return NULL;
2115
2116   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2117
2118   oldenc = priv->srtpenc;
2119   enc = get_rtp_encoder (stream, session);
2120   name = g_strdup_printf ("rtcp_sink_%d", session);
2121   pad = gst_element_get_request_pad (enc, name);
2122   g_free (name);
2123   gst_object_unref (pad);
2124
2125   if (oldenc == NULL)
2126     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2127         enc);
2128
2129   return enc;
2130 }
2131
2132 static GstCaps *
2133 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2134 {
2135   GstRTSPStreamPrivate *priv = stream->priv;
2136   GstCaps *caps;
2137
2138   GST_DEBUG ("request key %08x", ssrc);
2139
2140   g_mutex_lock (&priv->lock);
2141   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2142     gst_caps_ref (caps);
2143   g_mutex_unlock (&priv->lock);
2144
2145   return caps;
2146 }
2147
2148 static GstElement *
2149 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2150     GstRTSPStream * stream)
2151 {
2152   GstRTSPStreamPrivate *priv = stream->priv;
2153
2154   if (priv->idx != session)
2155     return NULL;
2156
2157   if (priv->srtpdec == NULL) {
2158     gchar *name;
2159
2160     name = g_strdup_printf ("srtpdec_%u", session);
2161     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2162     g_free (name);
2163
2164     g_signal_connect (priv->srtpdec, "request-key",
2165         (GCallback) request_key, stream);
2166   }
2167   return gst_object_ref (priv->srtpdec);
2168 }
2169
2170 /**
2171  * gst_rtsp_stream_request_aux_sender:
2172  * @stream: a #GstRTSPStream
2173  * @sessid: the session id
2174  *
2175  * Creating a rtxsend bin
2176  *
2177  * Returns: (transfer full): a #GstElement.
2178  *
2179  * Since: 1.6
2180  */
2181 GstElement *
2182 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2183 {
2184   GstElement *bin;
2185   GstPad *pad;
2186   GstStructure *pt_map;
2187   gchar *name;
2188   guint pt, rtx_pt;
2189   gchar *pt_s;
2190
2191   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2192
2193   pt = gst_rtsp_stream_get_pt (stream);
2194   pt_s = g_strdup_printf ("%u", pt);
2195   rtx_pt = stream->priv->rtx_pt;
2196
2197   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2198
2199   bin = gst_bin_new (NULL);
2200   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2201   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2202       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2203   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2204       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2205   g_free (pt_s);
2206   gst_structure_free (pt_map);
2207   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2208
2209   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2210   name = g_strdup_printf ("src_%u", sessid);
2211   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2212   g_free (name);
2213   gst_object_unref (pad);
2214
2215   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2216   name = g_strdup_printf ("sink_%u", sessid);
2217   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2218   g_free (name);
2219   gst_object_unref (pad);
2220
2221   return bin;
2222 }
2223
2224 /**
2225  * gst_rtsp_stream_set_pt_map:
2226  * @stream: a #GstRTSPStream
2227  * @pt: the pt
2228  * @caps: a #GstCaps
2229  *
2230  * Configure a pt map between @pt and @caps.
2231  */
2232 void
2233 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2234 {
2235   GstRTSPStreamPrivate *priv = stream->priv;
2236
2237   g_mutex_lock (&priv->lock);
2238   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2239   g_mutex_unlock (&priv->lock);
2240 }
2241
2242 /**
2243  * gst_rtsp_stream_set_publish_clock_mode:
2244  * @stream: a #GstRTSPStream
2245  * @mode: the clock publish mode
2246  *
2247  * Sets if and how the stream clock should be published according to RFC7273.
2248  *
2249  * Since: 1.8
2250  */
2251 void
2252 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2253     GstRTSPPublishClockMode mode)
2254 {
2255   GstRTSPStreamPrivate *priv;
2256
2257   priv = stream->priv;
2258   g_mutex_lock (&priv->lock);
2259   priv->publish_clock_mode = mode;
2260   g_mutex_unlock (&priv->lock);
2261 }
2262
2263 /**
2264  * gst_rtsp_stream_get_publish_clock_mode:
2265  * @factory: a #GstRTSPStream
2266  *
2267  * Gets if and how the stream clock should be published according to RFC7273.
2268  *
2269  * Returns: The GstRTSPPublishClockMode
2270  *
2271  * Since: 1.8
2272  */
2273 GstRTSPPublishClockMode
2274 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2275 {
2276   GstRTSPStreamPrivate *priv;
2277   GstRTSPPublishClockMode ret;
2278
2279   priv = stream->priv;
2280   g_mutex_lock (&priv->lock);
2281   ret = priv->publish_clock_mode;
2282   g_mutex_unlock (&priv->lock);
2283
2284   return ret;
2285 }
2286
2287 static GstCaps *
2288 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2289     GstRTSPStream * stream)
2290 {
2291   GstRTSPStreamPrivate *priv = stream->priv;
2292   GstCaps *caps = NULL;
2293
2294   g_mutex_lock (&priv->lock);
2295
2296   if (priv->idx == session) {
2297     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2298     if (caps) {
2299       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2300       gst_caps_ref (caps);
2301     } else {
2302       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2303     }
2304   }
2305
2306   g_mutex_unlock (&priv->lock);
2307
2308   return caps;
2309 }
2310
2311 static void
2312 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2313 {
2314   GstRTSPStreamPrivate *priv = stream->priv;
2315   gchar *name;
2316   GstPadLinkReturn ret;
2317   guint sessid;
2318
2319   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2320       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2321
2322   name = gst_pad_get_name (pad);
2323   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2324     g_free (name);
2325     return;
2326   }
2327   g_free (name);
2328
2329   if (priv->idx != sessid)
2330     return;
2331
2332   if (gst_pad_is_linked (priv->sinkpad)) {
2333     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2334         GST_DEBUG_PAD_NAME (priv->sinkpad));
2335     return;
2336   }
2337
2338   /* link the RTP pad to the session manager, it should not really fail unless
2339    * this is not really an RTP pad */
2340   ret = gst_pad_link (pad, priv->sinkpad);
2341   if (ret != GST_PAD_LINK_OK)
2342     goto link_failed;
2343   priv->recv_rtp_src = gst_object_ref (pad);
2344
2345   return;
2346
2347 /* ERRORS */
2348 link_failed:
2349   {
2350     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2351         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2352   }
2353 }
2354
2355 static void
2356 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2357     GstRTSPStream * stream)
2358 {
2359   /* TODO: What to do here other than this? */
2360   GST_DEBUG ("Stream %p: Got EOS", stream);
2361   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2362 }
2363
2364 static void
2365 plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
2366     GstElement ** queue_out)
2367 {
2368   GstPad *pad;
2369   GstPad *teepad;
2370   GstPad *queuepad;
2371
2372   gst_bin_add (bin, sink);
2373
2374   *queue_out = gst_element_factory_make ("queue", NULL);
2375   g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
2376       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2377   gst_bin_add (bin, *queue_out);
2378
2379   /* link tee to queue */
2380   teepad = gst_element_get_request_pad (tee, "src_%u");
2381   pad = gst_element_get_static_pad (*queue_out, "sink");
2382   gst_pad_link (teepad, pad);
2383   gst_object_unref (pad);
2384   gst_object_unref (teepad);
2385
2386   /* link queue to sink */
2387   queuepad = gst_element_get_static_pad (*queue_out, "src");
2388   pad = gst_element_get_static_pad (sink, "sink");
2389   gst_pad_link (queuepad, pad);
2390   gst_object_unref (queuepad);
2391   gst_object_unref (pad);
2392 }
2393
2394 /* must be called with lock */
2395 static void
2396 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2397 {
2398   GstRTSPStreamPrivate *priv;
2399   GstPad *pad;
2400   gboolean is_tcp, is_udp;
2401   gint i;
2402
2403   priv = stream->priv;
2404
2405   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2406   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2407       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2408
2409   for (i = 0; i < 2; i++) {
2410     /* For the sender we create this bit of pipeline for both
2411      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2412      * we need to add a queue before appsink and udpsink to make
2413      * the pipeline not block. For the TCP case, we want to pump
2414      * client as fast as possible anyway. This pipeline is used
2415      * when both TCP and UDP are present.
2416      *
2417      * .--------.      .-----.    .---------.    .---------.
2418      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2419      * |       send->sink   src->sink      src->sink       |
2420      * '--------'      |     |    '---------'    '---------'
2421      *                 |     |    .---------.    .---------.
2422      *                 |     |    |  queue  |    | appsink |
2423      *                 |    src->sink      src->sink       |
2424      *                 '-----'    '---------'    '---------'
2425      *
2426      * When only UDP or only TCP is allowed, we skip the tee and queue
2427      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2428      * the session.
2429      */
2430
2431     /* Only link the RTP send src if we're going to send RTP, link
2432      * the RTCP send src always */
2433     if (!priv->srcpad && i == 0)
2434       continue;
2435
2436     if (is_tcp) {
2437       /* make appsink */
2438       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2439       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2440       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2441           &sink_cb, stream, NULL);
2442     }
2443
2444     /* If we have udp always use a tee because we could have mcast clients
2445      * requesting different ports, in which case we'll have to plug more
2446      * udpsinks. */
2447     if (is_udp) {
2448       /* make tee for RTP/RTCP */
2449       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2450       gst_bin_add (bin, priv->tee[i]);
2451
2452       /* and link to rtpbin send pad */
2453       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2454       gst_pad_link (priv->send_src[i], pad);
2455       gst_object_unref (pad);
2456
2457       if (priv->udpsink[i])
2458         plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
2459
2460       if (priv->mcast_udpsink[i])
2461         plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
2462             &priv->mcast_udpqueue[i]);
2463
2464       if (is_tcp) {
2465         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2466         plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
2467       }
2468     } else if (is_tcp) {
2469       /* only appsink needed, link it to the session */
2470       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2471       gst_pad_link (priv->send_src[i], pad);
2472       gst_object_unref (pad);
2473
2474       /* when its only TCP, we need to set sync and preroll to FALSE
2475        * for the sink to avoid deadlock. And this is only needed for
2476        * sink used for RTCP data, not the RTP data. */
2477       if (i == 1)
2478         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2479     }
2480
2481     /* check if we need to set to a special state */
2482     if (state != GST_STATE_NULL) {
2483       if (priv->udpsink[i])
2484         gst_element_set_state (priv->udpsink[i], state);
2485       if (priv->mcast_udpsink[i])
2486         gst_element_set_state (priv->mcast_udpsink[i], state);
2487       if (priv->appsink[i])
2488         gst_element_set_state (priv->appsink[i], state);
2489       if (priv->appqueue[i])
2490         gst_element_set_state (priv->appqueue[i], state);
2491       if (priv->udpqueue[i])
2492         gst_element_set_state (priv->udpqueue[i], state);
2493       if (priv->mcast_udpqueue[i])
2494         gst_element_set_state (priv->mcast_udpqueue[i], state);
2495       if (priv->tee[i])
2496         gst_element_set_state (priv->tee[i], state);
2497     }
2498   }
2499 }
2500
2501 /* must be called with lock */
2502 static void
2503 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2504     GstElement * funnel)
2505 {
2506   GstRTSPStreamPrivate *priv;
2507   GstPad *pad, *selpad;
2508
2509   priv = stream->priv;
2510
2511   if (priv->srcpad) {
2512     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2513      * values. This is only relevant for PLAY pipelines */
2514     gst_element_set_state (src, GST_STATE_PLAYING);
2515     gst_element_set_locked_state (src, TRUE);
2516   }
2517
2518   /* add src */
2519   gst_bin_add (bin, src);
2520
2521   /* and link to the funnel */
2522   selpad = gst_element_get_request_pad (funnel, "sink_%u");
2523   pad = gst_element_get_static_pad (src, "src");
2524   gst_pad_link (pad, selpad);
2525   gst_object_unref (pad);
2526   gst_object_unref (selpad);
2527 }
2528
2529 /* must be called with lock */
2530 static void
2531 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2532 {
2533   GstRTSPStreamPrivate *priv;
2534   GstPad *pad;
2535   gboolean is_tcp;
2536   gint i;
2537
2538   priv = stream->priv;
2539
2540   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2541
2542   for (i = 0; i < 2; i++) {
2543     /* For the receiver we create this bit of pipeline for both
2544      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2545      * and it is all funneled into the rtpbin receive pad.
2546      *
2547      * .--------.     .--------.    .--------.
2548      * | udpsrc |     | funnel |    | rtpbin |
2549      * |       src->sink      src->sink      |
2550      * '--------'     |        |    '--------'
2551      * .--------.     |        |
2552      * | appsrc |     |        |
2553      * |       src->sink       |
2554      * '--------'     '--------'
2555      */
2556
2557     if (!priv->sinkpad && i == 0) {
2558       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2559        * RTCP sink always */
2560       continue;
2561     }
2562
2563     /* make funnel for the RTP/RTCP receivers */
2564     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2565     gst_bin_add (bin, priv->funnel[i]);
2566
2567     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2568     gst_pad_link (pad, priv->recv_sink[i]);
2569     gst_object_unref (pad);
2570
2571     if (priv->udpsrc_v4[i])
2572       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
2573
2574     if (priv->udpsrc_v6[i])
2575       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
2576
2577     if (priv->mcast_udpsrc_v4[i])
2578       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
2579
2580     if (priv->mcast_udpsrc_v6[i])
2581       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
2582
2583     if (is_tcp) {
2584       /* make and add appsrc */
2585       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2586       priv->appsrc_base_time[i] = -1;
2587       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2588           TRUE, NULL);
2589       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
2590     }
2591
2592     /* check if we need to set to a special state */
2593     if (state != GST_STATE_NULL) {
2594       gst_element_set_state (priv->funnel[i], state);
2595     }
2596   }
2597 }
2598
2599 static gboolean
2600 check_mcast_part_for_transport (GstRTSPStream * stream,
2601     const GstRTSPTransport * tr)
2602 {
2603   GstRTSPStreamPrivate *priv = stream->priv;
2604   GInetAddress *inetaddr;
2605   GSocketFamily family;
2606   GstRTSPAddress *mcast_addr;
2607
2608   /* Check if it's a ipv4 or ipv6 transport */
2609   inetaddr = g_inet_address_new_from_string (tr->destination);
2610   family = g_inet_address_get_family (inetaddr);
2611   g_object_unref (inetaddr);
2612
2613   /* Select fields corresponding to the family */
2614   if (family == G_SOCKET_FAMILY_IPV4) {
2615     mcast_addr = priv->mcast_addr_v4;
2616   } else {
2617     mcast_addr = priv->mcast_addr_v6;
2618   }
2619
2620   /* We support only one mcast group per family, make sure this transport
2621    * matches it. */
2622   if (!mcast_addr)
2623     goto no_addr;
2624
2625   if (g_ascii_strcasecmp (tr->destination, mcast_addr->address) != 0 ||
2626       tr->port.min != mcast_addr->port ||
2627       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
2628       tr->ttl != mcast_addr->ttl)
2629     goto wrong_addr;
2630
2631   return TRUE;
2632
2633 no_addr:
2634   {
2635     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
2636         "has been reserved");
2637     return FALSE;
2638   }
2639 wrong_addr:
2640   {
2641     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
2642         "the reserved address");
2643     return FALSE;
2644   }
2645 }
2646
2647 /**
2648  * gst_rtsp_stream_join_bin:
2649  * @stream: a #GstRTSPStream
2650  * @bin: (transfer none): a #GstBin to join
2651  * @rtpbin: (transfer none): a rtpbin element in @bin
2652  * @state: the target state of the new elements
2653  *
2654  * Join the #GstBin @bin that contains the element @rtpbin.
2655  *
2656  * @stream will link to @rtpbin, which must be inside @bin. The elements
2657  * added to @bin will be set to the state given in @state.
2658  *
2659  * Returns: %TRUE on success.
2660  */
2661 gboolean
2662 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2663     GstElement * rtpbin, GstState state)
2664 {
2665   GstRTSPStreamPrivate *priv;
2666   guint idx;
2667   gchar *name;
2668   GstPadLinkReturn ret;
2669
2670   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2671   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2672   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2673
2674   priv = stream->priv;
2675
2676   g_mutex_lock (&priv->lock);
2677   if (priv->joined_bin != NULL)
2678     goto was_joined;
2679
2680   /* create a session with the same index as the stream */
2681   idx = priv->idx;
2682
2683   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2684
2685   if (!alloc_ports (stream))
2686     goto no_ports;
2687
2688   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2689       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2690     /* For SRTP */
2691     g_signal_connect (rtpbin, "request-rtp-encoder",
2692         (GCallback) request_rtp_encoder, stream);
2693     g_signal_connect (rtpbin, "request-rtcp-encoder",
2694         (GCallback) request_rtcp_encoder, stream);
2695     g_signal_connect (rtpbin, "request-rtp-decoder",
2696         (GCallback) request_rtp_rtcp_decoder, stream);
2697     g_signal_connect (rtpbin, "request-rtcp-decoder",
2698         (GCallback) request_rtp_rtcp_decoder, stream);
2699   }
2700
2701   if (priv->sinkpad) {
2702     g_signal_connect (rtpbin, "request-pt-map",
2703         (GCallback) request_pt_map, stream);
2704   }
2705
2706   /* get pads from the RTP session element for sending and receiving
2707    * RTP/RTCP*/
2708   if (priv->srcpad) {
2709     /* get a pad for sending RTP */
2710     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2711     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2712     g_free (name);
2713
2714     /* link the RTP pad to the session manager, it should not really fail unless
2715      * this is not really an RTP pad */
2716     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2717     if (ret != GST_PAD_LINK_OK)
2718       goto link_failed;
2719
2720     name = g_strdup_printf ("send_rtp_src_%u", idx);
2721     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2722     g_free (name);
2723   } else {
2724     /* Need to connect our sinkpad from here */
2725     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2726     /* EOS */
2727     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2728
2729     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2730     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2731     g_free (name);
2732   }
2733
2734   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2735   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2736   g_free (name);
2737   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2738   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2739   g_free (name);
2740
2741   /* get the session */
2742   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2743
2744   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2745       stream);
2746   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2747       stream);
2748   g_signal_connect (priv->session, "on-ssrc-active",
2749       (GCallback) on_ssrc_active, stream);
2750   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2751       stream);
2752   g_signal_connect (priv->session, "on-bye-timeout",
2753       (GCallback) on_bye_timeout, stream);
2754   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2755       stream);
2756
2757   /* signal for sender ssrc */
2758   g_signal_connect (priv->session, "on-new-sender-ssrc",
2759       (GCallback) on_new_sender_ssrc, stream);
2760   g_signal_connect (priv->session, "on-sender-ssrc-active",
2761       (GCallback) on_sender_ssrc_active, stream);
2762
2763   create_sender_part (stream, bin, state);
2764   create_receiver_part (stream, bin, state);
2765
2766   if (priv->srcpad) {
2767     /* be notified of caps changes */
2768     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2769         (GCallback) caps_notify, stream);
2770   }
2771
2772   priv->joined_bin = bin;
2773   g_mutex_unlock (&priv->lock);
2774
2775   return TRUE;
2776
2777   /* ERRORS */
2778 was_joined:
2779   {
2780     g_mutex_unlock (&priv->lock);
2781     return TRUE;
2782   }
2783 no_ports:
2784   {
2785     g_mutex_unlock (&priv->lock);
2786     GST_WARNING ("failed to allocate ports %u", idx);
2787     return FALSE;
2788   }
2789 link_failed:
2790   {
2791     GST_WARNING ("failed to link stream %u", idx);
2792     gst_object_unref (priv->send_rtp_sink);
2793     priv->send_rtp_sink = NULL;
2794     g_mutex_unlock (&priv->lock);
2795     return FALSE;
2796   }
2797 }
2798
2799 static void
2800 clear_element (GstBin * bin, GstElement ** elementptr)
2801 {
2802   if (*elementptr) {
2803     gst_element_set_locked_state (*elementptr, FALSE);
2804     gst_element_set_state (*elementptr, GST_STATE_NULL);
2805     if (GST_ELEMENT_PARENT (*elementptr))
2806       gst_bin_remove (bin, *elementptr);
2807     else
2808       gst_object_unref (*elementptr);
2809     *elementptr = NULL;
2810   }
2811 }
2812
2813 /**
2814  * gst_rtsp_stream_leave_bin:
2815  * @stream: a #GstRTSPStream
2816  * @bin: (transfer none): a #GstBin
2817  * @rtpbin: (transfer none): a rtpbin #GstElement
2818  *
2819  * Remove the elements of @stream from @bin.
2820  *
2821  * Return: %TRUE on success.
2822  */
2823 gboolean
2824 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2825     GstElement * rtpbin)
2826 {
2827   GstRTSPStreamPrivate *priv;
2828   gint i;
2829
2830   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2831   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2832   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2833
2834   priv = stream->priv;
2835
2836   g_mutex_lock (&priv->lock);
2837   if (priv->joined_bin == NULL)
2838     goto was_not_joined;
2839   if (priv->joined_bin != bin)
2840     goto wrong_bin;
2841
2842   priv->joined_bin = NULL;
2843
2844   /* all transports must be removed by now */
2845   if (priv->transports != NULL)
2846     goto transports_not_removed;
2847
2848   clear_tr_cache (priv, TRUE);
2849   clear_tr_cache (priv, FALSE);
2850
2851   GST_INFO ("stream %p leaving bin", stream);
2852
2853   if (priv->srcpad) {
2854     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2855
2856     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2857     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2858     gst_object_unref (priv->send_rtp_sink);
2859     priv->send_rtp_sink = NULL;
2860   } else if (priv->recv_rtp_src) {
2861     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2862     gst_object_unref (priv->recv_rtp_src);
2863     priv->recv_rtp_src = NULL;
2864   }
2865
2866   for (i = 0; i < 2; i++) {
2867     clear_element (bin, &priv->udpsrc_v4[i]);
2868     clear_element (bin, &priv->udpsrc_v6[i]);
2869     clear_element (bin, &priv->udpqueue[i]);
2870     clear_element (bin, &priv->udpsink[i]);
2871
2872     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
2873     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
2874     clear_element (bin, &priv->mcast_udpqueue[i]);
2875     clear_element (bin, &priv->mcast_udpsink[i]);
2876
2877     clear_element (bin, &priv->appsrc[i]);
2878     clear_element (bin, &priv->appqueue[i]);
2879     clear_element (bin, &priv->appsink[i]);
2880
2881     clear_element (bin, &priv->tee[i]);
2882     clear_element (bin, &priv->funnel[i]);
2883
2884     if (priv->sinkpad || i == 1) {
2885       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2886       gst_object_unref (priv->recv_sink[i]);
2887       priv->recv_sink[i] = NULL;
2888     }
2889   }
2890
2891   if (priv->srcpad) {
2892     gst_object_unref (priv->send_src[0]);
2893     priv->send_src[0] = NULL;
2894   }
2895
2896   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2897   gst_object_unref (priv->send_src[1]);
2898   priv->send_src[1] = NULL;
2899
2900   g_object_unref (priv->session);
2901   priv->session = NULL;
2902   if (priv->caps)
2903     gst_caps_unref (priv->caps);
2904   priv->caps = NULL;
2905
2906   if (priv->srtpenc)
2907     gst_object_unref (priv->srtpenc);
2908   if (priv->srtpdec)
2909     gst_object_unref (priv->srtpdec);
2910
2911   if (priv->mcast_addr_v4)
2912     gst_rtsp_address_free (priv->mcast_addr_v4);
2913   priv->mcast_addr_v4 = NULL;
2914   if (priv->mcast_addr_v6)
2915     gst_rtsp_address_free (priv->mcast_addr_v6);
2916   priv->mcast_addr_v6 = NULL;
2917   if (priv->server_addr_v4)
2918     gst_rtsp_address_free (priv->server_addr_v4);
2919   priv->server_addr_v4 = NULL;
2920   if (priv->server_addr_v6)
2921     gst_rtsp_address_free (priv->server_addr_v6);
2922   priv->server_addr_v6 = NULL;
2923
2924   g_mutex_unlock (&priv->lock);
2925
2926   return TRUE;
2927
2928 was_not_joined:
2929   {
2930     g_mutex_unlock (&priv->lock);
2931     return TRUE;
2932   }
2933 transports_not_removed:
2934   {
2935     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2936     g_mutex_unlock (&priv->lock);
2937     return FALSE;
2938   }
2939 wrong_bin:
2940   {
2941     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
2942     g_mutex_unlock (&priv->lock);
2943     return FALSE;
2944   }
2945 }
2946
2947 /**
2948  * gst_rtsp_stream_get_joined_bin:
2949  * @stream: a #GstRTSPStream
2950  *
2951  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
2952  *
2953  * Return: (transfer full): the joined bin or NULL.
2954  */
2955 GstBin *
2956 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
2957 {
2958   GstRTSPStreamPrivate *priv;
2959   GstBin *bin = NULL;
2960
2961   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2962
2963   priv = stream->priv;
2964
2965   g_mutex_lock (&priv->lock);
2966   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
2967   g_mutex_unlock (&priv->lock);
2968
2969   return bin;
2970 }
2971
2972 /**
2973  * gst_rtsp_stream_get_rtpinfo:
2974  * @stream: a #GstRTSPStream
2975  * @rtptime: (allow-none): result RTP timestamp
2976  * @seq: (allow-none): result RTP seqnum
2977  * @clock_rate: (allow-none): the clock rate
2978  * @running_time: (allow-none): result running-time
2979  *
2980  * Retrieve the current rtptime, seq and running-time. This is used to
2981  * construct a RTPInfo reply header.
2982  *
2983  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2984  */
2985 gboolean
2986 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2987     guint * rtptime, guint * seq, guint * clock_rate,
2988     GstClockTime * running_time)
2989 {
2990   GstRTSPStreamPrivate *priv;
2991   GstStructure *stats;
2992   GObjectClass *payobjclass;
2993
2994   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2995
2996   priv = stream->priv;
2997
2998   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2999
3000   g_mutex_lock (&priv->lock);
3001
3002   /* First try to extract the information from the last buffer on the sinks.
3003    * This will have a more accurate sequence number and timestamp, as between
3004    * the payloader and the sink there can be some queues
3005    */
3006   if (priv->udpsink[0] || priv->appsink[0]) {
3007     GstSample *last_sample;
3008
3009     if (priv->udpsink[0])
3010       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3011     else
3012       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3013
3014     if (last_sample) {
3015       GstCaps *caps;
3016       GstBuffer *buffer;
3017       GstSegment *segment;
3018       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3019
3020       caps = gst_sample_get_caps (last_sample);
3021       buffer = gst_sample_get_buffer (last_sample);
3022       segment = gst_sample_get_segment (last_sample);
3023
3024       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3025         if (seq) {
3026           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3027         }
3028
3029         if (rtptime) {
3030           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3031         }
3032
3033         gst_rtp_buffer_unmap (&rtp_buffer);
3034
3035         if (running_time) {
3036           *running_time =
3037               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3038               GST_BUFFER_TIMESTAMP (buffer));
3039         }
3040
3041         if (clock_rate) {
3042           GstStructure *s = gst_caps_get_structure (caps, 0);
3043
3044           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3045
3046           if (*clock_rate == 0 && running_time)
3047             *running_time = GST_CLOCK_TIME_NONE;
3048         }
3049         gst_sample_unref (last_sample);
3050
3051         goto done;
3052       } else {
3053         gst_sample_unref (last_sample);
3054       }
3055     }
3056   }
3057
3058   if (g_object_class_find_property (payobjclass, "stats")) {
3059     g_object_get (priv->payloader, "stats", &stats, NULL);
3060     if (stats == NULL)
3061       goto no_stats;
3062
3063     if (seq)
3064       gst_structure_get_uint (stats, "seqnum", seq);
3065
3066     if (rtptime)
3067       gst_structure_get_uint (stats, "timestamp", rtptime);
3068
3069     if (running_time)
3070       gst_structure_get_clock_time (stats, "running-time", running_time);
3071
3072     if (clock_rate) {
3073       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3074       if (*clock_rate == 0 && running_time)
3075         *running_time = GST_CLOCK_TIME_NONE;
3076     }
3077     gst_structure_free (stats);
3078   } else {
3079     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3080         !g_object_class_find_property (payobjclass, "timestamp"))
3081       goto no_stats;
3082
3083     if (seq)
3084       g_object_get (priv->payloader, "seqnum", seq, NULL);
3085
3086     if (rtptime)
3087       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3088
3089     if (running_time)
3090       *running_time = GST_CLOCK_TIME_NONE;
3091   }
3092
3093 done:
3094   g_mutex_unlock (&priv->lock);
3095
3096   return TRUE;
3097
3098   /* ERRORS */
3099 no_stats:
3100   {
3101     GST_WARNING ("Could not get payloader stats");
3102     g_mutex_unlock (&priv->lock);
3103     return FALSE;
3104   }
3105 }
3106
3107 /**
3108  * gst_rtsp_stream_get_caps:
3109  * @stream: a #GstRTSPStream
3110  *
3111  * Retrieve the current caps of @stream.
3112  *
3113  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3114  * after usage.
3115  */
3116 GstCaps *
3117 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3118 {
3119   GstRTSPStreamPrivate *priv;
3120   GstCaps *result;
3121
3122   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3123
3124   priv = stream->priv;
3125
3126   g_mutex_lock (&priv->lock);
3127   if ((result = priv->caps))
3128     gst_caps_ref (result);
3129   g_mutex_unlock (&priv->lock);
3130
3131   return result;
3132 }
3133
3134 /**
3135  * gst_rtsp_stream_recv_rtp:
3136  * @stream: a #GstRTSPStream
3137  * @buffer: (transfer full): a #GstBuffer
3138  *
3139  * Handle an RTP buffer for the stream. This method is usually called when a
3140  * message has been received from a client using the TCP transport.
3141  *
3142  * This function takes ownership of @buffer.
3143  *
3144  * Returns: a GstFlowReturn.
3145  */
3146 GstFlowReturn
3147 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3148 {
3149   GstRTSPStreamPrivate *priv;
3150   GstFlowReturn ret;
3151   GstElement *element;
3152
3153   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3154   priv = stream->priv;
3155   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3156   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3157
3158   g_mutex_lock (&priv->lock);
3159   if (priv->appsrc[0])
3160     element = gst_object_ref (priv->appsrc[0]);
3161   else
3162     element = NULL;
3163   g_mutex_unlock (&priv->lock);
3164
3165   if (element) {
3166     if (priv->appsrc_base_time[0] == -1) {
3167       /* Take current running_time. This timestamp will be put on
3168        * the first buffer of each stream because we are a live source and so we
3169        * timestamp with the running_time. When we are dealing with TCP, we also
3170        * only timestamp the first buffer (using the DISCONT flag) because a server
3171        * typically bursts data, for which we don't want to compensate by speeding
3172        * up the media. The other timestamps will be interpollated from this one
3173        * using the RTP timestamps. */
3174       GST_OBJECT_LOCK (element);
3175       if (GST_ELEMENT_CLOCK (element)) {
3176         GstClockTime now;
3177         GstClockTime base_time;
3178
3179         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3180         base_time = GST_ELEMENT_CAST (element)->base_time;
3181
3182         priv->appsrc_base_time[0] = now - base_time;
3183         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3184         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3185             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3186             GST_TIME_ARGS (base_time));
3187       }
3188       GST_OBJECT_UNLOCK (element);
3189     }
3190
3191     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3192     gst_object_unref (element);
3193   } else {
3194     ret = GST_FLOW_OK;
3195   }
3196   return ret;
3197 }
3198
3199 /**
3200  * gst_rtsp_stream_recv_rtcp:
3201  * @stream: a #GstRTSPStream
3202  * @buffer: (transfer full): a #GstBuffer
3203  *
3204  * Handle an RTCP buffer for the stream. This method is usually called when a
3205  * message has been received from a client using the TCP transport.
3206  *
3207  * This function takes ownership of @buffer.
3208  *
3209  * Returns: a GstFlowReturn.
3210  */
3211 GstFlowReturn
3212 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3213 {
3214   GstRTSPStreamPrivate *priv;
3215   GstFlowReturn ret;
3216   GstElement *element;
3217
3218   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3219   priv = stream->priv;
3220   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3221
3222   if (priv->joined_bin == NULL) {
3223     gst_buffer_unref (buffer);
3224     return GST_FLOW_NOT_LINKED;
3225   }
3226   g_mutex_lock (&priv->lock);
3227   if (priv->appsrc[1])
3228     element = gst_object_ref (priv->appsrc[1]);
3229   else
3230     element = NULL;
3231   g_mutex_unlock (&priv->lock);
3232
3233   if (element) {
3234     if (priv->appsrc_base_time[1] == -1) {
3235       /* Take current running_time. This timestamp will be put on
3236        * the first buffer of each stream because we are a live source and so we
3237        * timestamp with the running_time. When we are dealing with TCP, we also
3238        * only timestamp the first buffer (using the DISCONT flag) because a server
3239        * typically bursts data, for which we don't want to compensate by speeding
3240        * up the media. The other timestamps will be interpollated from this one
3241        * using the RTP timestamps. */
3242       GST_OBJECT_LOCK (element);
3243       if (GST_ELEMENT_CLOCK (element)) {
3244         GstClockTime now;
3245         GstClockTime base_time;
3246
3247         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3248         base_time = GST_ELEMENT_CAST (element)->base_time;
3249
3250         priv->appsrc_base_time[1] = now - base_time;
3251         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3252         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3253             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3254             GST_TIME_ARGS (base_time));
3255       }
3256       GST_OBJECT_UNLOCK (element);
3257     }
3258
3259     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3260     gst_object_unref (element);
3261   } else {
3262     ret = GST_FLOW_OK;
3263     gst_buffer_unref (buffer);
3264   }
3265   return ret;
3266 }
3267
3268 /* must be called with lock */
3269 static gboolean
3270 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3271     gboolean add)
3272 {
3273   GstRTSPStreamPrivate *priv = stream->priv;
3274   const GstRTSPTransport *tr;
3275
3276   tr = gst_rtsp_stream_transport_get_transport (trans);
3277
3278   switch (tr->lower_transport) {
3279     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3280     {
3281       if (add) {
3282         if (!check_mcast_part_for_transport (stream, tr))
3283           goto mcast_error;
3284         priv->transports = g_list_prepend (priv->transports, trans);
3285       } else {
3286         priv->transports = g_list_remove (priv->transports, trans);
3287       }
3288       break;
3289     }
3290     case GST_RTSP_LOWER_TRANS_UDP:
3291     {
3292       gchar *dest;
3293       gint min, max;
3294       guint ttl = 0;
3295
3296       dest = tr->destination;
3297       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3298         min = tr->port.min;
3299         max = tr->port.max;
3300         ttl = tr->ttl;
3301       } else if (priv->client_side) {
3302         /* In client side mode the 'destination' is the RTSP server, so send
3303          * to those ports */
3304         min = tr->server_port.min;
3305         max = tr->server_port.max;
3306       } else {
3307         min = tr->client_port.min;
3308         max = tr->client_port.max;
3309       }
3310
3311       if (add) {
3312         if (ttl > 0) {
3313           GST_INFO ("setting ttl-mc %d", ttl);
3314           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3315           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3316         }
3317         GST_INFO ("adding %s:%d-%d", dest, min, max);
3318         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3319         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3320         priv->transports = g_list_prepend (priv->transports, trans);
3321       } else {
3322         GST_INFO ("removing %s:%d-%d", dest, min, max);
3323         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3324         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3325         priv->transports = g_list_remove (priv->transports, trans);
3326       }
3327       priv->transports_cookie++;
3328       break;
3329     }
3330     case GST_RTSP_LOWER_TRANS_TCP:
3331       if (add) {
3332         GST_INFO ("adding TCP %s", tr->destination);
3333         priv->transports = g_list_prepend (priv->transports, trans);
3334       } else {
3335         GST_INFO ("removing TCP %s", tr->destination);
3336         priv->transports = g_list_remove (priv->transports, trans);
3337       }
3338       priv->transports_cookie++;
3339       break;
3340     default:
3341       goto unknown_transport;
3342   }
3343   return TRUE;
3344
3345   /* ERRORS */
3346 unknown_transport:
3347   {
3348     GST_INFO ("Unknown transport %d", tr->lower_transport);
3349     return FALSE;
3350   }
3351 mcast_error:
3352   {
3353     return FALSE;
3354   }
3355 }
3356
3357
3358 /**
3359  * gst_rtsp_stream_add_transport:
3360  * @stream: a #GstRTSPStream
3361  * @trans: (transfer none): a #GstRTSPStreamTransport
3362  *
3363  * Add the transport in @trans to @stream. The media of @stream will
3364  * then also be send to the values configured in @trans.
3365  *
3366  * @stream must be joined to a bin.
3367  *
3368  * @trans must contain a valid #GstRTSPTransport.
3369  *
3370  * Returns: %TRUE if @trans was added
3371  */
3372 gboolean
3373 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3374     GstRTSPStreamTransport * trans)
3375 {
3376   GstRTSPStreamPrivate *priv;
3377   gboolean res;
3378
3379   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3380   priv = stream->priv;
3381   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3382   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3383
3384   g_mutex_lock (&priv->lock);
3385   res = update_transport (stream, trans, TRUE);
3386   g_mutex_unlock (&priv->lock);
3387
3388   return res;
3389 }
3390
3391 /**
3392  * gst_rtsp_stream_remove_transport:
3393  * @stream: a #GstRTSPStream
3394  * @trans: (transfer none): a #GstRTSPStreamTransport
3395  *
3396  * Remove the transport in @trans from @stream. The media of @stream will
3397  * not be sent to the values configured in @trans.
3398  *
3399  * @stream must be joined to a bin.
3400  *
3401  * @trans must contain a valid #GstRTSPTransport.
3402  *
3403  * Returns: %TRUE if @trans was removed
3404  */
3405 gboolean
3406 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3407     GstRTSPStreamTransport * trans)
3408 {
3409   GstRTSPStreamPrivate *priv;
3410   gboolean res;
3411
3412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3413   priv = stream->priv;
3414   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3415   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3416
3417   g_mutex_lock (&priv->lock);
3418   res = update_transport (stream, trans, FALSE);
3419   g_mutex_unlock (&priv->lock);
3420
3421   return res;
3422 }
3423
3424 /**
3425  * gst_rtsp_stream_update_crypto:
3426  * @stream: a #GstRTSPStream
3427  * @ssrc: the SSRC
3428  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3429  *
3430  * Update the new crypto information for @ssrc in @stream. If information
3431  * for @ssrc did not exist, it will be added. If information
3432  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3433  * be removed from @stream.
3434  *
3435  * Returns: %TRUE if @crypto could be updated
3436  */
3437 gboolean
3438 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3439     guint ssrc, GstCaps * crypto)
3440 {
3441   GstRTSPStreamPrivate *priv;
3442
3443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3444   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3445
3446   priv = stream->priv;
3447
3448   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3449
3450   g_mutex_lock (&priv->lock);
3451   if (crypto)
3452     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3453         gst_caps_ref (crypto));
3454   else
3455     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3456   g_mutex_unlock (&priv->lock);
3457
3458   return TRUE;
3459 }
3460
3461 /**
3462  * gst_rtsp_stream_get_rtp_socket:
3463  * @stream: a #GstRTSPStream
3464  * @family: the socket family
3465  *
3466  * Get the RTP socket from @stream for a @family.
3467  *
3468  * @stream must be joined to a bin.
3469  *
3470  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3471  * socket could be allocated for @family. Unref after usage
3472  */
3473 GSocket *
3474 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3475 {
3476   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3477   GSocket *socket;
3478   const gchar *name;
3479
3480   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3481   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3482       family == G_SOCKET_FAMILY_IPV6, NULL);
3483   g_return_val_if_fail (priv->udpsink[0], NULL);
3484
3485   if (family == G_SOCKET_FAMILY_IPV6)
3486     name = "socket-v6";
3487   else
3488     name = "socket";
3489
3490   g_object_get (priv->udpsink[0], name, &socket, NULL);
3491
3492   return socket;
3493 }
3494
3495 /**
3496  * gst_rtsp_stream_get_rtcp_socket:
3497  * @stream: a #GstRTSPStream
3498  * @family: the socket family
3499  *
3500  * Get the RTCP socket from @stream for a @family.
3501  *
3502  * @stream must be joined to a bin.
3503  *
3504  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3505  * socket could be allocated for @family. Unref after usage
3506  */
3507 GSocket *
3508 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3509 {
3510   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3511   GSocket *socket;
3512   const gchar *name;
3513
3514   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3515   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3516       family == G_SOCKET_FAMILY_IPV6, NULL);
3517   g_return_val_if_fail (priv->udpsink[1], NULL);
3518
3519   if (family == G_SOCKET_FAMILY_IPV6)
3520     name = "socket-v6";
3521   else
3522     name = "socket";
3523
3524   g_object_get (priv->udpsink[1], name, &socket, NULL);
3525
3526   return socket;
3527 }
3528
3529 /**
3530  * gst_rtsp_stream_set_seqnum:
3531  * @stream: a #GstRTSPStream
3532  * @seqnum: a new sequence number
3533  *
3534  * Configure the sequence number in the payloader of @stream to @seqnum.
3535  */
3536 void
3537 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3538 {
3539   GstRTSPStreamPrivate *priv;
3540
3541   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3542
3543   priv = stream->priv;
3544
3545   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3546 }
3547
3548 /**
3549  * gst_rtsp_stream_get_seqnum:
3550  * @stream: a #GstRTSPStream
3551  *
3552  * Get the configured sequence number in the payloader of @stream.
3553  *
3554  * Returns: the sequence number of the payloader.
3555  */
3556 guint16
3557 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3558 {
3559   GstRTSPStreamPrivate *priv;
3560   guint seqnum;
3561
3562   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3563
3564   priv = stream->priv;
3565
3566   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3567
3568   return seqnum;
3569 }
3570
3571 /**
3572  * gst_rtsp_stream_transport_filter:
3573  * @stream: a #GstRTSPStream
3574  * @func: (scope call) (allow-none): a callback
3575  * @user_data: (closure): user data passed to @func
3576  *
3577  * Call @func for each transport managed by @stream. The result value of @func
3578  * determines what happens to the transport. @func will be called with @stream
3579  * locked so no further actions on @stream can be performed from @func.
3580  *
3581  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3582  * @stream.
3583  *
3584  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3585  *
3586  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3587  * will also be added with an additional ref to the result #GList of this
3588  * function..
3589  *
3590  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3591  *
3592  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3593  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3594  * element in the #GList should be unreffed before the list is freed.
3595  */
3596 GList *
3597 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3598     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3599 {
3600   GstRTSPStreamPrivate *priv;
3601   GList *result, *walk, *next;
3602   GHashTable *visited = NULL;
3603   guint cookie;
3604
3605   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3606
3607   priv = stream->priv;
3608
3609   result = NULL;
3610   if (func)
3611     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3612
3613   g_mutex_lock (&priv->lock);
3614 restart:
3615   cookie = priv->transports_cookie;
3616   for (walk = priv->transports; walk; walk = next) {
3617     GstRTSPStreamTransport *trans = walk->data;
3618     GstRTSPFilterResult res;
3619     gboolean changed;
3620
3621     next = g_list_next (walk);
3622
3623     if (func) {
3624       /* only visit each transport once */
3625       if (g_hash_table_contains (visited, trans))
3626         continue;
3627
3628       g_hash_table_add (visited, g_object_ref (trans));
3629       g_mutex_unlock (&priv->lock);
3630
3631       res = func (stream, trans, user_data);
3632
3633       g_mutex_lock (&priv->lock);
3634     } else
3635       res = GST_RTSP_FILTER_REF;
3636
3637     changed = (cookie != priv->transports_cookie);
3638
3639     switch (res) {
3640       case GST_RTSP_FILTER_REMOVE:
3641         update_transport (stream, trans, FALSE);
3642         break;
3643       case GST_RTSP_FILTER_REF:
3644         result = g_list_prepend (result, g_object_ref (trans));
3645         break;
3646       case GST_RTSP_FILTER_KEEP:
3647       default:
3648         break;
3649     }
3650     if (changed)
3651       goto restart;
3652   }
3653   g_mutex_unlock (&priv->lock);
3654
3655   if (func)
3656     g_hash_table_unref (visited);
3657
3658   return result;
3659 }
3660
3661 static GstPadProbeReturn
3662 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3663 {
3664   GstRTSPStreamPrivate *priv;
3665   GstRTSPStream *stream;
3666
3667   stream = user_data;
3668   priv = stream->priv;
3669
3670   GST_DEBUG_OBJECT (pad, "now blocking");
3671
3672   g_mutex_lock (&priv->lock);
3673   priv->blocking = TRUE;
3674   g_mutex_unlock (&priv->lock);
3675
3676   gst_element_post_message (priv->payloader,
3677       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3678           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3679
3680   return GST_PAD_PROBE_OK;
3681 }
3682
3683 /**
3684  * gst_rtsp_stream_set_blocked:
3685  * @stream: a #GstRTSPStream
3686  * @blocked: boolean indicating we should block or unblock
3687  *
3688  * Blocks or unblocks the dataflow on @stream.
3689  *
3690  * Returns: %TRUE on success
3691  */
3692 gboolean
3693 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3694 {
3695   GstRTSPStreamPrivate *priv;
3696
3697   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3698
3699   priv = stream->priv;
3700
3701   g_mutex_lock (&priv->lock);
3702   if (blocked) {
3703     priv->blocking = FALSE;
3704     if (priv->blocked_id == 0) {
3705       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3706           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3707           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3708           g_object_ref (stream), g_object_unref);
3709     }
3710   } else {
3711     if (priv->blocked_id != 0) {
3712       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3713       priv->blocked_id = 0;
3714       priv->blocking = FALSE;
3715     }
3716   }
3717   g_mutex_unlock (&priv->lock);
3718
3719   return TRUE;
3720 }
3721
3722 /**
3723  * gst_rtsp_stream_is_blocking:
3724  * @stream: a #GstRTSPStream
3725  *
3726  * Check if @stream is blocking on a #GstBuffer.
3727  *
3728  * Returns: %TRUE if @stream is blocking
3729  */
3730 gboolean
3731 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3732 {
3733   GstRTSPStreamPrivate *priv;
3734   gboolean result;
3735
3736   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3737
3738   priv = stream->priv;
3739
3740   g_mutex_lock (&priv->lock);
3741   result = priv->blocking;
3742   g_mutex_unlock (&priv->lock);
3743
3744   return result;
3745 }
3746
3747 /**
3748  * gst_rtsp_stream_query_position:
3749  * @stream: a #GstRTSPStream
3750  *
3751  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3752  * the RTP parts of the pipeline and not the RTCP parts.
3753  *
3754  * Returns: %TRUE if the position could be queried
3755  */
3756 gboolean
3757 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3758 {
3759   GstRTSPStreamPrivate *priv;
3760   GstElement *sink;
3761   gboolean ret;
3762
3763   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3764
3765   priv = stream->priv;
3766
3767   g_mutex_lock (&priv->lock);
3768   /* depending on the transport type, it should query corresponding sink */
3769   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3770       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3771     sink = priv->udpsink[0];
3772   else
3773     sink = priv->appsink[0];
3774
3775   if (sink)
3776     gst_object_ref (sink);
3777   g_mutex_unlock (&priv->lock);
3778
3779   if (!sink)
3780     return FALSE;
3781
3782   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3783   gst_object_unref (sink);
3784
3785   return ret;
3786 }
3787
3788 /**
3789  * gst_rtsp_stream_query_stop:
3790  * @stream: a #GstRTSPStream
3791  *
3792  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3793  * the RTP parts of the pipeline and not the RTCP parts.
3794  *
3795  * Returns: %TRUE if the stop could be queried
3796  */
3797 gboolean
3798 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3799 {
3800   GstRTSPStreamPrivate *priv;
3801   GstElement *sink;
3802   GstQuery *query;
3803   gboolean ret;
3804
3805   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3806
3807   priv = stream->priv;
3808
3809   g_mutex_lock (&priv->lock);
3810   /* depending on the transport type, it should query corresponding sink */
3811   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3812       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3813     sink = priv->udpsink[0];
3814   else
3815     sink = priv->appsink[0];
3816
3817   if (sink)
3818     gst_object_ref (sink);
3819   g_mutex_unlock (&priv->lock);
3820
3821   if (!sink)
3822     return FALSE;
3823
3824   query = gst_query_new_segment (GST_FORMAT_TIME);
3825   if ((ret = gst_element_query (sink, query))) {
3826     GstFormat format;
3827
3828     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3829     if (format != GST_FORMAT_TIME)
3830       *stop = -1;
3831   }
3832   gst_query_unref (query);
3833   gst_object_unref (sink);
3834
3835   return ret;
3836
3837 }