5773fa60c91ebe0d8a5c105a37b8f4a85d44c10b
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102
103   /* for UDP multicast */
104   GstElement *mcast_udpsrc_v4[2];
105   GstElement *mcast_udpsrc_v6[2];
106   GstElement *mcast_udpqueue[2];
107   GstElement *mcast_udpsink[2];
108
109   /* for TCP transport */
110   GstElement *appsrc[2];
111   GstClockTime appsrc_base_time[2];
112   GstElement *appqueue[2];
113   GstElement *appsink[2];
114
115   GstElement *tee[2];
116   GstElement *funnel[2];
117
118   /* retransmission */
119   GstElement *rtxsend;
120   guint rtx_pt;
121   GstClockTime rtx_time;
122
123   /* pool used to manage unicast and multicast addresses */
124   GstRTSPAddressPool *pool;
125
126   /* unicast server addr/port */
127   GstRTSPAddress *server_addr_v4;
128   GstRTSPAddress *server_addr_v6;
129
130   /* multicast addresses */
131   GstRTSPAddress *mcast_addr_v4;
132   GstRTSPAddress *mcast_addr_v6;
133
134   gchar *multicast_iface;
135
136   /* the caps of the stream */
137   gulong caps_sig;
138   GstCaps *caps;
139
140   /* transports we stream to */
141   guint n_active;
142   GList *transports;
143   guint transports_cookie;
144   GList *tr_cache_rtp;
145   GList *tr_cache_rtcp;
146   guint tr_cache_cookie_rtp;
147   guint tr_cache_cookie_rtcp;
148
149   gint dscp_qos;
150
151   /* stream blocking */
152   gulong blocked_id;
153   gboolean blocking;
154
155   /* pt->caps map for RECORD streams */
156   GHashTable *ptmap;
157
158   GstRTSPPublishClockMode publish_clock_mode;
159 };
160
161 #define DEFAULT_CONTROL         NULL
162 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
163 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
164                                         GST_RTSP_LOWER_TRANS_TCP
165
166 enum
167 {
168   PROP_0,
169   PROP_CONTROL,
170   PROP_PROFILES,
171   PROP_PROTOCOLS,
172   PROP_LAST
173 };
174
175 enum
176 {
177   SIGNAL_NEW_RTP_ENCODER,
178   SIGNAL_NEW_RTCP_ENCODER,
179   SIGNAL_LAST
180 };
181
182 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
183 #define GST_CAT_DEFAULT rtsp_stream_debug
184
185 static GQuark ssrc_stream_map_key;
186
187 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
188     GValue * value, GParamSpec * pspec);
189 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
190     const GValue * value, GParamSpec * pspec);
191
192 static void gst_rtsp_stream_finalize (GObject * obj);
193
194 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
195
196 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
197
198 static void
199 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
200 {
201   GObjectClass *gobject_class;
202
203   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
204
205   gobject_class = G_OBJECT_CLASS (klass);
206
207   gobject_class->get_property = gst_rtsp_stream_get_property;
208   gobject_class->set_property = gst_rtsp_stream_set_property;
209   gobject_class->finalize = gst_rtsp_stream_finalize;
210
211   g_object_class_install_property (gobject_class, PROP_CONTROL,
212       g_param_spec_string ("control", "Control",
213           "The control string for this stream", DEFAULT_CONTROL,
214           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
215
216   g_object_class_install_property (gobject_class, PROP_PROFILES,
217       g_param_spec_flags ("profiles", "Profiles",
218           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
219           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220
221   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
222       g_param_spec_flags ("protocols", "Protocols",
223           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
224           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
225
226   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
227       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
228       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
229       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
230
231   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
232       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
234       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
235
236   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
237
238   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
239 }
240
241 static void
242 gst_rtsp_stream_init (GstRTSPStream * stream)
243 {
244   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
245
246   GST_DEBUG ("new stream %p", stream);
247
248   stream->priv = priv;
249
250   priv->dscp_qos = -1;
251   priv->control = g_strdup (DEFAULT_CONTROL);
252   priv->profiles = DEFAULT_PROFILES;
253   priv->protocols = DEFAULT_PROTOCOLS;
254   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
255
256   g_mutex_init (&priv->lock);
257
258   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
259       NULL, (GDestroyNotify) gst_caps_unref);
260   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
261       (GDestroyNotify) gst_caps_unref);
262 }
263
264 static void
265 gst_rtsp_stream_finalize (GObject * obj)
266 {
267   GstRTSPStream *stream;
268   GstRTSPStreamPrivate *priv;
269
270   stream = GST_RTSP_STREAM (obj);
271   priv = stream->priv;
272
273   GST_DEBUG ("finalize stream %p", stream);
274
275   /* we really need to be unjoined now */
276   g_return_if_fail (priv->joined_bin == NULL);
277
278   if (priv->mcast_addr_v4)
279     gst_rtsp_address_free (priv->mcast_addr_v4);
280   if (priv->mcast_addr_v6)
281     gst_rtsp_address_free (priv->mcast_addr_v6);
282   if (priv->server_addr_v4)
283     gst_rtsp_address_free (priv->server_addr_v4);
284   if (priv->server_addr_v6)
285     gst_rtsp_address_free (priv->server_addr_v6);
286   if (priv->pool)
287     g_object_unref (priv->pool);
288   if (priv->rtxsend)
289     g_object_unref (priv->rtxsend);
290
291   g_free (priv->multicast_iface);
292
293   gst_object_unref (priv->payloader);
294   if (priv->srcpad)
295     gst_object_unref (priv->srcpad);
296   if (priv->sinkpad)
297     gst_object_unref (priv->sinkpad);
298   g_free (priv->control);
299   g_mutex_clear (&priv->lock);
300
301   g_hash_table_unref (priv->keys);
302   g_hash_table_destroy (priv->ptmap);
303
304   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
305 }
306
307 static void
308 gst_rtsp_stream_get_property (GObject * object, guint propid,
309     GValue * value, GParamSpec * pspec)
310 {
311   GstRTSPStream *stream = GST_RTSP_STREAM (object);
312
313   switch (propid) {
314     case PROP_CONTROL:
315       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
316       break;
317     case PROP_PROFILES:
318       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
319       break;
320     case PROP_PROTOCOLS:
321       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
325   }
326 }
327
328 static void
329 gst_rtsp_stream_set_property (GObject * object, guint propid,
330     const GValue * value, GParamSpec * pspec)
331 {
332   GstRTSPStream *stream = GST_RTSP_STREAM (object);
333
334   switch (propid) {
335     case PROP_CONTROL:
336       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
337       break;
338     case PROP_PROFILES:
339       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
340       break;
341     case PROP_PROTOCOLS:
342       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
343       break;
344     default:
345       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
346   }
347 }
348
349 /**
350  * gst_rtsp_stream_new:
351  * @idx: an index
352  * @pad: a #GstPad
353  * @payloader: a #GstElement
354  *
355  * Create a new media stream with index @idx that handles RTP data on
356  * @pad and has a payloader element @payloader if @pad is a source pad
357  * or a depayloader element @payloader if @pad is a sink pad.
358  *
359  * Returns: (transfer full): a new #GstRTSPStream
360  */
361 GstRTSPStream *
362 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
363 {
364   GstRTSPStreamPrivate *priv;
365   GstRTSPStream *stream;
366
367   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
368   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
369
370   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
371   priv = stream->priv;
372   priv->idx = idx;
373   priv->payloader = gst_object_ref (payloader);
374   if (GST_PAD_IS_SRC (pad))
375     priv->srcpad = gst_object_ref (pad);
376   else
377     priv->sinkpad = gst_object_ref (pad);
378
379   return stream;
380 }
381
382 /**
383  * gst_rtsp_stream_get_index:
384  * @stream: a #GstRTSPStream
385  *
386  * Get the stream index.
387  *
388  * Return: the stream index.
389  */
390 guint
391 gst_rtsp_stream_get_index (GstRTSPStream * stream)
392 {
393   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
394
395   return stream->priv->idx;
396 }
397
398 /**
399  * gst_rtsp_stream_get_pt:
400  * @stream: a #GstRTSPStream
401  *
402  * Get the stream payload type.
403  *
404  * Return: the stream payload type.
405  */
406 guint
407 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
408 {
409   GstRTSPStreamPrivate *priv;
410   guint pt;
411
412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
413
414   priv = stream->priv;
415
416   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
417
418   return pt;
419 }
420
421 /**
422  * gst_rtsp_stream_get_srcpad:
423  * @stream: a #GstRTSPStream
424  *
425  * Get the srcpad associated with @stream.
426  *
427  * Returns: (transfer full): the srcpad. Unref after usage.
428  */
429 GstPad *
430 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
431 {
432   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
433
434   if (!stream->priv->srcpad)
435     return NULL;
436
437   return gst_object_ref (stream->priv->srcpad);
438 }
439
440 /**
441  * gst_rtsp_stream_get_sinkpad:
442  * @stream: a #GstRTSPStream
443  *
444  * Get the sinkpad associated with @stream.
445  *
446  * Returns: (transfer full): the sinkpad. Unref after usage.
447  */
448 GstPad *
449 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
450 {
451   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
452
453   if (!stream->priv->sinkpad)
454     return NULL;
455
456   return gst_object_ref (stream->priv->sinkpad);
457 }
458
459 /**
460  * gst_rtsp_stream_get_control:
461  * @stream: a #GstRTSPStream
462  *
463  * Get the control string to identify this stream.
464  *
465  * Returns: (transfer full): the control string. g_free() after usage.
466  */
467 gchar *
468 gst_rtsp_stream_get_control (GstRTSPStream * stream)
469 {
470   GstRTSPStreamPrivate *priv;
471   gchar *result;
472
473   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
474
475   priv = stream->priv;
476
477   g_mutex_lock (&priv->lock);
478   if ((result = g_strdup (priv->control)) == NULL)
479     result = g_strdup_printf ("stream=%u", priv->idx);
480   g_mutex_unlock (&priv->lock);
481
482   return result;
483 }
484
485 /**
486  * gst_rtsp_stream_set_control:
487  * @stream: a #GstRTSPStream
488  * @control: a control string
489  *
490  * Set the control string in @stream.
491  */
492 void
493 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
494 {
495   GstRTSPStreamPrivate *priv;
496
497   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
498
499   priv = stream->priv;
500
501   g_mutex_lock (&priv->lock);
502   g_free (priv->control);
503   priv->control = g_strdup (control);
504   g_mutex_unlock (&priv->lock);
505 }
506
507 /**
508  * gst_rtsp_stream_has_control:
509  * @stream: a #GstRTSPStream
510  * @control: a control string
511  *
512  * Check if @stream has the control string @control.
513  *
514  * Returns: %TRUE is @stream has @control as the control string
515  */
516 gboolean
517 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
518 {
519   GstRTSPStreamPrivate *priv;
520   gboolean res;
521
522   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
523
524   priv = stream->priv;
525
526   g_mutex_lock (&priv->lock);
527   if (priv->control)
528     res = (g_strcmp0 (priv->control, control) == 0);
529   else {
530     guint streamid;
531
532     if (sscanf (control, "stream=%u", &streamid) > 0)
533       res = (streamid == priv->idx);
534     else
535       res = FALSE;
536   }
537   g_mutex_unlock (&priv->lock);
538
539   return res;
540 }
541
542 /**
543  * gst_rtsp_stream_set_mtu:
544  * @stream: a #GstRTSPStream
545  * @mtu: a new MTU
546  *
547  * Configure the mtu in the payloader of @stream to @mtu.
548  */
549 void
550 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
551 {
552   GstRTSPStreamPrivate *priv;
553
554   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
555
556   priv = stream->priv;
557
558   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
559
560   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
561 }
562
563 /**
564  * gst_rtsp_stream_get_mtu:
565  * @stream: a #GstRTSPStream
566  *
567  * Get the configured MTU in the payloader of @stream.
568  *
569  * Returns: the MTU of the payloader.
570  */
571 guint
572 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
573 {
574   GstRTSPStreamPrivate *priv;
575   guint mtu;
576
577   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
578
579   priv = stream->priv;
580
581   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
582
583   return mtu;
584 }
585
586 /* Update the dscp qos property on the udp sinks */
587 static void
588 update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2])
589 {
590   GstRTSPStreamPrivate *priv;
591
592   priv = stream->priv;
593
594   if (udpsink[0]) {
595     g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL);
596   }
597
598   if (udpsink[1]) {
599     g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL);
600   }
601 }
602
603 /**
604  * gst_rtsp_stream_set_dscp_qos:
605  * @stream: a #GstRTSPStream
606  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
607  *
608  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
609  */
610 void
611 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
612 {
613   GstRTSPStreamPrivate *priv;
614
615   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
616
617   priv = stream->priv;
618
619   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
620
621   if (dscp_qos < -1 || dscp_qos > 63) {
622     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
623     return;
624   }
625
626   priv->dscp_qos = dscp_qos;
627
628   update_dscp_qos (stream, priv->udpsink);
629 }
630
631 /**
632  * gst_rtsp_stream_get_dscp_qos:
633  * @stream: a #GstRTSPStream
634  *
635  * Get the configured DSCP QoS in of the outgoing sockets.
636  *
637  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
638  */
639 gint
640 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
641 {
642   GstRTSPStreamPrivate *priv;
643
644   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
645
646   priv = stream->priv;
647
648   return priv->dscp_qos;
649 }
650
651 /**
652  * gst_rtsp_stream_is_transport_supported:
653  * @stream: a #GstRTSPStream
654  * @transport: (transfer none): a #GstRTSPTransport
655  *
656  * Check if @transport can be handled by stream
657  *
658  * Returns: %TRUE if @transport can be handled by @stream.
659  */
660 gboolean
661 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
662     GstRTSPTransport * transport)
663 {
664   GstRTSPStreamPrivate *priv;
665
666   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
667
668   priv = stream->priv;
669
670   g_mutex_lock (&priv->lock);
671   if (transport->trans != GST_RTSP_TRANS_RTP)
672     goto unsupported_transmode;
673
674   if (!(transport->profile & priv->profiles))
675     goto unsupported_profile;
676
677   if (!(transport->lower_transport & priv->protocols))
678     goto unsupported_ltrans;
679
680   g_mutex_unlock (&priv->lock);
681
682   return TRUE;
683
684   /* ERRORS */
685 unsupported_transmode:
686   {
687     GST_DEBUG ("unsupported transport mode %d", transport->trans);
688     g_mutex_unlock (&priv->lock);
689     return FALSE;
690   }
691 unsupported_profile:
692   {
693     GST_DEBUG ("unsupported profile %d", transport->profile);
694     g_mutex_unlock (&priv->lock);
695     return FALSE;
696   }
697 unsupported_ltrans:
698   {
699     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
700     g_mutex_unlock (&priv->lock);
701     return FALSE;
702   }
703 }
704
705 /**
706  * gst_rtsp_stream_set_profiles:
707  * @stream: a #GstRTSPStream
708  * @profiles: the new profiles
709  *
710  * Configure the allowed profiles for @stream.
711  */
712 void
713 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
714 {
715   GstRTSPStreamPrivate *priv;
716
717   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
718
719   priv = stream->priv;
720
721   g_mutex_lock (&priv->lock);
722   priv->profiles = profiles;
723   g_mutex_unlock (&priv->lock);
724 }
725
726 /**
727  * gst_rtsp_stream_get_profiles:
728  * @stream: a #GstRTSPStream
729  *
730  * Get the allowed profiles of @stream.
731  *
732  * Returns: a #GstRTSPProfile
733  */
734 GstRTSPProfile
735 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
736 {
737   GstRTSPStreamPrivate *priv;
738   GstRTSPProfile res;
739
740   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
741
742   priv = stream->priv;
743
744   g_mutex_lock (&priv->lock);
745   res = priv->profiles;
746   g_mutex_unlock (&priv->lock);
747
748   return res;
749 }
750
751 /**
752  * gst_rtsp_stream_set_protocols:
753  * @stream: a #GstRTSPStream
754  * @protocols: the new flags
755  *
756  * Configure the allowed lower transport for @stream.
757  */
758 void
759 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
760     GstRTSPLowerTrans protocols)
761 {
762   GstRTSPStreamPrivate *priv;
763
764   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
765
766   priv = stream->priv;
767
768   g_mutex_lock (&priv->lock);
769   priv->protocols = protocols;
770   g_mutex_unlock (&priv->lock);
771 }
772
773 /**
774  * gst_rtsp_stream_get_protocols:
775  * @stream: a #GstRTSPStream
776  *
777  * Get the allowed protocols of @stream.
778  *
779  * Returns: a #GstRTSPLowerTrans
780  */
781 GstRTSPLowerTrans
782 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
783 {
784   GstRTSPStreamPrivate *priv;
785   GstRTSPLowerTrans res;
786
787   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
788       GST_RTSP_LOWER_TRANS_UNKNOWN);
789
790   priv = stream->priv;
791
792   g_mutex_lock (&priv->lock);
793   res = priv->protocols;
794   g_mutex_unlock (&priv->lock);
795
796   return res;
797 }
798
799 /**
800  * gst_rtsp_stream_set_address_pool:
801  * @stream: a #GstRTSPStream
802  * @pool: (transfer none): a #GstRTSPAddressPool
803  *
804  * configure @pool to be used as the address pool of @stream.
805  */
806 void
807 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
808     GstRTSPAddressPool * pool)
809 {
810   GstRTSPStreamPrivate *priv;
811   GstRTSPAddressPool *old;
812
813   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
814
815   priv = stream->priv;
816
817   GST_LOG_OBJECT (stream, "set address pool %p", pool);
818
819   g_mutex_lock (&priv->lock);
820   if ((old = priv->pool) != pool)
821     priv->pool = pool ? g_object_ref (pool) : NULL;
822   else
823     old = NULL;
824   g_mutex_unlock (&priv->lock);
825
826   if (old)
827     g_object_unref (old);
828 }
829
830 /**
831  * gst_rtsp_stream_get_address_pool:
832  * @stream: a #GstRTSPStream
833  *
834  * Get the #GstRTSPAddressPool used as the address pool of @stream.
835  *
836  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
837  * usage.
838  */
839 GstRTSPAddressPool *
840 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
841 {
842   GstRTSPStreamPrivate *priv;
843   GstRTSPAddressPool *result;
844
845   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
846
847   priv = stream->priv;
848
849   g_mutex_lock (&priv->lock);
850   if ((result = priv->pool))
851     g_object_ref (result);
852   g_mutex_unlock (&priv->lock);
853
854   return result;
855 }
856
857 /**
858  * gst_rtsp_stream_set_multicast_iface:
859  * @stream: a #GstRTSPStream
860  * @multicast_iface: (transfer none): a multicast interface
861  *
862  * configure @multicast_iface to be used for @stream.
863  */
864 void
865 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
866     const gchar * multicast_iface)
867 {
868   GstRTSPStreamPrivate *priv;
869   gchar *old;
870
871   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
872
873   priv = stream->priv;
874
875   GST_LOG_OBJECT (stream, "set multicast iface %s",
876       GST_STR_NULL (multicast_iface));
877
878   g_mutex_lock (&priv->lock);
879   if ((old = priv->multicast_iface) != multicast_iface)
880     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
881   else
882     old = NULL;
883   g_mutex_unlock (&priv->lock);
884
885   if (old)
886     g_free (old);
887 }
888
889 /**
890  * gst_rtsp_stream_get_multicast_iface:
891  * @stream: a #GstRTSPStream
892  *
893  * Get the multicast interface used for @stream.
894  *
895  * Returns: (transfer full): the multicast interface for @stream. g_free() after
896  * usage.
897  */
898 gchar *
899 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
900 {
901   GstRTSPStreamPrivate *priv;
902   gchar *result;
903
904   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
905
906   priv = stream->priv;
907
908   g_mutex_lock (&priv->lock);
909   if ((result = priv->multicast_iface))
910     result = g_strdup (result);
911   g_mutex_unlock (&priv->lock);
912
913   return result;
914 }
915
916
917 static GstRTSPAddress *
918 gst_rtsp_stream_get_multicast_address_locked (GstRTSPStream * stream,
919     GSocketFamily family)
920 {
921   GstRTSPStreamPrivate *priv;
922   GstRTSPAddress *result;
923   GstRTSPAddress **addrp;
924   GstRTSPAddressFlags flags;
925
926   priv = stream->priv;
927
928   if (family == G_SOCKET_FAMILY_IPV6) {
929     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
930     addrp = &priv->mcast_addr_v6;
931   } else {
932     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
933     addrp = &priv->mcast_addr_v4;
934   }
935
936   if (*addrp == NULL) {
937     if (priv->pool == NULL)
938       goto no_pool;
939
940     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
941
942     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
943     if (*addrp == NULL)
944       goto no_address;
945
946     /* FIXME: Also reserve the same port with unicast ANY address, since that's
947      * where we are going to bind our socket. Probably loop until we find a port
948      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
949      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
950      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
951   }
952   result = gst_rtsp_address_copy (*addrp);
953
954   return result;
955
956   /* ERRORS */
957 no_pool:
958   {
959     GST_ERROR_OBJECT (stream, "no address pool specified");
960     return NULL;
961   }
962 no_address:
963   {
964     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
965     return NULL;
966   }
967 }
968
969 /**
970  * gst_rtsp_stream_get_multicast_address:
971  * @stream: a #GstRTSPStream
972  * @family: the #GSocketFamily
973  *
974  * Get the multicast address of @stream for @family. The original
975  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
976  * won't release the address from the pool.
977  *
978  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
979  * or %NULL when no address could be allocated. gst_rtsp_address_free()
980  * after usage.
981  */
982 GstRTSPAddress *
983 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
984     GSocketFamily family)
985 {
986   GstRTSPAddress *result;
987
988   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
989
990   g_mutex_lock (&stream->priv->lock);
991   result = gst_rtsp_stream_get_multicast_address_locked (stream, family);
992   g_mutex_unlock (&stream->priv->lock);
993
994   return result;
995 }
996
997 /**
998  * gst_rtsp_stream_reserve_address:
999  * @stream: a #GstRTSPStream
1000  * @address: an address
1001  * @port: a port
1002  * @n_ports: n_ports
1003  * @ttl: a TTL
1004  *
1005  * Reserve @address and @port as the address and port of @stream. The original
1006  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1007  * won't release the address from the pool.
1008  *
1009  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1010  * the address could be reserved. gst_rtsp_address_free() after usage.
1011  */
1012 GstRTSPAddress *
1013 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1014     const gchar * address, guint port, guint n_ports, guint ttl)
1015 {
1016   GstRTSPStreamPrivate *priv;
1017   GstRTSPAddress *result;
1018   GInetAddress *addr;
1019   GSocketFamily family;
1020   GstRTSPAddress **addrp;
1021
1022   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1023   g_return_val_if_fail (address != NULL, NULL);
1024   g_return_val_if_fail (port > 0, NULL);
1025   g_return_val_if_fail (n_ports > 0, NULL);
1026   g_return_val_if_fail (ttl > 0, NULL);
1027
1028   priv = stream->priv;
1029
1030   addr = g_inet_address_new_from_string (address);
1031   if (!addr) {
1032     GST_ERROR ("failed to get inet addr from %s", address);
1033     family = G_SOCKET_FAMILY_IPV4;
1034   } else {
1035     family = g_inet_address_get_family (addr);
1036     g_object_unref (addr);
1037   }
1038
1039   if (family == G_SOCKET_FAMILY_IPV6)
1040     addrp = &priv->mcast_addr_v6;
1041   else
1042     addrp = &priv->mcast_addr_v4;
1043
1044   g_mutex_lock (&priv->lock);
1045   if (*addrp == NULL) {
1046     GstRTSPAddressPoolResult res;
1047
1048     if (priv->pool == NULL)
1049       goto no_pool;
1050
1051     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1052         port, n_ports, ttl, addrp);
1053     if (res != GST_RTSP_ADDRESS_POOL_OK)
1054       goto no_address;
1055
1056     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1057      * where we are going to bind our socket. */
1058   } else {
1059     if (strcmp ((*addrp)->address, address) ||
1060         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1061         (*addrp)->ttl != ttl)
1062       goto different_address;
1063   }
1064   result = gst_rtsp_address_copy (*addrp);
1065   g_mutex_unlock (&priv->lock);
1066
1067   return result;
1068
1069   /* ERRORS */
1070 no_pool:
1071   {
1072     GST_ERROR_OBJECT (stream, "no address pool specified");
1073     g_mutex_unlock (&priv->lock);
1074     return NULL;
1075   }
1076 no_address:
1077   {
1078     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1079         address);
1080     g_mutex_unlock (&priv->lock);
1081     return NULL;
1082   }
1083 different_address:
1084   {
1085     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1086         " reserved", address);
1087     g_mutex_unlock (&priv->lock);
1088     return NULL;
1089   }
1090 }
1091
1092 /* must be called with lock */
1093 static void
1094 set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket,
1095     GSocket * rtcp_socket, GSocketFamily family)
1096 {
1097   const gchar *multisink_socket;
1098
1099   if (family == G_SOCKET_FAMILY_IPV6)
1100     multisink_socket = "socket-v6";
1101   else
1102     multisink_socket = "socket";
1103
1104   g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL);
1105   g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL);
1106 }
1107
1108 static gboolean
1109 create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2])
1110 {
1111   GstRTSPStreamPrivate *priv = stream->priv;
1112   GstElement *udpsink0, *udpsink1;
1113
1114   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1115   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1116
1117   if (!udpsink0 || !udpsink1)
1118     goto no_udp_protocol;
1119
1120   /* configure sinks */
1121
1122   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1123   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1124
1125   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1126   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1127
1128   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1129
1130   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1131   /* Needs to be async for RECORD streams, otherwise we will never go to
1132    * PLAYING because the sinks will wait for data while the udpsrc can't
1133    * provide data with timestamps in PAUSED. */
1134   if (priv->sinkpad)
1135     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1136   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1137
1138   /* join multicast group when adding clients, so we'll start receiving from it.
1139    * We cannot rely on the udpsrc to join the group since its socket is always a
1140    * local unicast one. */
1141   g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL);
1142   g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL);
1143
1144   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1145   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1146
1147   udpsink[0] = udpsink0;
1148   udpsink[1] = udpsink1;
1149
1150   /* update the dscp qos field in the sinks */
1151   update_dscp_qos (stream, udpsink);
1152
1153   return TRUE;
1154
1155   /* ERRORS */
1156 no_udp_protocol:
1157   {
1158     return FALSE;
1159   }
1160 }
1161
1162 /* must be called with lock */
1163 static gboolean
1164 create_and_configure_udpsources (GstElement * udpsrc_out[2],
1165     GSocket * rtp_socket, GSocket * rtcp_socket)
1166 {
1167   GstStateChangeReturn ret;
1168
1169   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1170   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1171
1172   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1173     goto error;
1174
1175   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1176   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1177
1178   /* The udpsrc cannot do the join because its socket is always a local unicast
1179    * one. The udpsink sharing the same socket will do it for us. */
1180   g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL);
1181   g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL);
1182
1183   g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1184   g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1185
1186   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1187   if (ret == GST_STATE_CHANGE_FAILURE)
1188     goto error;
1189   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1190   if (ret == GST_STATE_CHANGE_FAILURE)
1191     goto error;
1192
1193   return TRUE;
1194
1195   /* ERRORS */
1196 error:
1197   {
1198     if (udpsrc_out[0]) {
1199       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1200       g_clear_object (&udpsrc_out[0]);
1201     }
1202     if (udpsrc_out[1]) {
1203       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1204       g_clear_object (&udpsrc_out[1]);
1205     }
1206     return FALSE;
1207   }
1208 }
1209
1210 static gboolean
1211 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1212     GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
1213     GstRTSPAddress ** server_addr_out, gboolean multicast)
1214 {
1215   GstRTSPStreamPrivate *priv = stream->priv;
1216   GSocket *rtp_socket = NULL;
1217   GSocket *rtcp_socket;
1218   gint tmp_rtp, tmp_rtcp;
1219   guint count;
1220   gint rtpport, rtcpport;
1221   GList *rejected_addresses = NULL;
1222   GstRTSPAddress *addr = NULL;
1223   GInetAddress *inetaddr = NULL;
1224   gchar *addr_str;
1225   GSocketAddress *rtp_sockaddr = NULL;
1226   GSocketAddress *rtcp_sockaddr = NULL;
1227   GstRTSPAddressPool *pool;
1228
1229   g_assert (!udpsrc_out[0]);
1230   g_assert (!udpsrc_out[1]);
1231   g_assert ((!udpsink_out[0] && !udpsink_out[1]) ||
1232       (udpsink_out[0] && udpsink_out[1]));
1233   g_assert (*server_addr_out == NULL);
1234
1235   pool = priv->pool;
1236   count = 0;
1237
1238   /* Start with random port */
1239   tmp_rtp = 0;
1240
1241   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1242       G_SOCKET_PROTOCOL_UDP, NULL);
1243   if (!rtcp_socket)
1244     goto no_udp_protocol;
1245   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1246
1247   /* try to allocate 2 UDP ports, the RTP port should be an even
1248    * number and the RTCP port should be the next (uneven) port */
1249 again:
1250
1251   if (rtp_socket == NULL) {
1252     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1253         G_SOCKET_PROTOCOL_UDP, NULL);
1254     if (!rtp_socket)
1255       goto no_udp_protocol;
1256     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1257   }
1258
1259   if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1260     GstRTSPAddressFlags flags;
1261
1262     if (addr)
1263       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1264
1265     if (!pool)
1266       goto no_ports;
1267
1268     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1269     if (multicast)
1270       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1271     else
1272       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1273
1274     if (family == G_SOCKET_FAMILY_IPV6)
1275       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1276     else
1277       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1278
1279     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1280
1281     if (addr == NULL)
1282       goto no_ports;
1283
1284     tmp_rtp = addr->port;
1285
1286     g_clear_object (&inetaddr);
1287     inetaddr = g_inet_address_new_from_string (addr->address);
1288   } else {
1289     if (tmp_rtp != 0) {
1290       tmp_rtp += 2;
1291       if (++count > 20)
1292         goto no_ports;
1293     }
1294
1295     if (inetaddr == NULL)
1296       inetaddr = g_inet_address_new_any (family);
1297   }
1298
1299   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1300   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1301     g_object_unref (rtp_sockaddr);
1302     goto again;
1303   }
1304   g_object_unref (rtp_sockaddr);
1305
1306   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1307   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1308     g_clear_object (&rtp_sockaddr);
1309     goto socket_error;
1310   }
1311
1312   tmp_rtp =
1313       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1314   g_object_unref (rtp_sockaddr);
1315
1316   /* check if port is even */
1317   if ((tmp_rtp & 1) != 0) {
1318     /* port not even, close and allocate another */
1319     tmp_rtp++;
1320     g_clear_object (&rtp_socket);
1321     goto again;
1322   }
1323
1324   /* set port */
1325   tmp_rtcp = tmp_rtp + 1;
1326
1327   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1328   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1329     g_object_unref (rtcp_sockaddr);
1330     g_clear_object (&rtp_socket);
1331     goto again;
1332   }
1333   g_object_unref (rtcp_sockaddr);
1334
1335   if (!addr) {
1336     addr = g_slice_new0 (GstRTSPAddress);
1337     addr->address = g_inet_address_to_string (inetaddr);
1338     addr->port = tmp_rtp;
1339     addr->n_ports = 2;
1340   }
1341
1342   addr_str = addr->address;
1343   g_clear_object (&inetaddr);
1344
1345   if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
1346     goto no_udp_protocol;
1347   }
1348
1349   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1350   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1351
1352   /* this should not happen... */
1353   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1354     goto port_error;
1355
1356   /* This function is called twice (for v4 and v6) but we create only one pair
1357    * of udpsinks. */
1358   if (!udpsink_out[0]
1359       && !create_and_configure_udpsinks (stream, udpsink_out))
1360     goto no_udp_protocol;
1361
1362   if (multicast) {
1363     g_object_set (G_OBJECT (udpsink_out[0]), "multicast-iface",
1364         priv->multicast_iface, NULL);
1365     g_object_set (G_OBJECT (udpsink_out[1]), "multicast-iface",
1366         priv->multicast_iface, NULL);
1367
1368     g_signal_emit_by_name (udpsink_out[0], "add", addr_str, rtpport, NULL);
1369     g_signal_emit_by_name (udpsink_out[1], "add", addr_str, rtcpport, NULL);
1370   }
1371
1372   set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
1373
1374   *server_addr_out = addr;
1375
1376   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1377
1378   g_object_unref (rtp_socket);
1379   g_object_unref (rtcp_socket);
1380
1381   return TRUE;
1382
1383   /* ERRORS */
1384 no_udp_protocol:
1385   {
1386     goto cleanup;
1387   }
1388 no_ports:
1389   {
1390     goto cleanup;
1391   }
1392 port_error:
1393   {
1394     goto cleanup;
1395   }
1396 socket_error:
1397   {
1398     goto cleanup;
1399   }
1400 cleanup:
1401   {
1402     if (inetaddr)
1403       g_object_unref (inetaddr);
1404     g_list_free_full (rejected_addresses,
1405         (GDestroyNotify) gst_rtsp_address_free);
1406     if (addr)
1407       gst_rtsp_address_free (addr);
1408     if (rtp_socket)
1409       g_object_unref (rtp_socket);
1410     if (rtcp_socket)
1411       g_object_unref (rtcp_socket);
1412     return FALSE;
1413   }
1414 }
1415
1416 /**
1417  * gst_rtsp_stream_allocate_udp_sockets:
1418  * @stream: a #GstRTSPStream
1419  * @family: protocol family
1420  * @transport_method: transport method
1421  *
1422  * Allocates RTP and RTCP ports.
1423  *
1424  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1425  * Deprecated: This function shouldn't have been made public
1426  */
1427 gboolean
1428 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1429     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1430 {
1431   g_warn_if_reached ();
1432   return FALSE;
1433 }
1434
1435 /**
1436  * gst_rtsp_stream_set_client_side:
1437  * @stream: a #GstRTSPStream
1438  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1439  * an RTSP connection.
1440  *
1441  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1442  * streams to an RTSP server via RECORD. This has the practical effect
1443  * of changing which UDP port numbers are used when setting up the local
1444  * side of the stream sending to be either the 'server' or 'client' pair
1445  * of a configured UDP transport.
1446  */
1447 void
1448 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1449 {
1450   GstRTSPStreamPrivate *priv;
1451
1452   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1453   priv = stream->priv;
1454   g_mutex_lock (&priv->lock);
1455   priv->client_side = client_side;
1456   g_mutex_unlock (&priv->lock);
1457 }
1458
1459 /**
1460  * gst_rtsp_stream_is_client_side:
1461  * @stream: a #GstRTSPStream
1462  *
1463  * See gst_rtsp_stream_set_client_side()
1464  *
1465  * Returns: TRUE if this #GstRTSPStream is client-side.
1466  */
1467 gboolean
1468 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1469 {
1470   GstRTSPStreamPrivate *priv;
1471   gboolean ret;
1472
1473   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1474
1475   priv = stream->priv;
1476   g_mutex_lock (&priv->lock);
1477   ret = priv->client_side;
1478   g_mutex_unlock (&priv->lock);
1479
1480   return ret;
1481 }
1482
1483 /* must be called with lock */
1484 static gboolean
1485 alloc_ports (GstRTSPStream * stream)
1486 {
1487   GstRTSPStreamPrivate *priv = stream->priv;
1488   gboolean ret = TRUE;
1489
1490   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP) {
1491     ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1492         priv->udpsrc_v4, priv->udpsink, &priv->server_addr_v4, FALSE);
1493
1494     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1495         priv->udpsrc_v6, priv->udpsink, &priv->server_addr_v6, FALSE);
1496   }
1497
1498   /* FIXME: Maybe actually consider the return values? */
1499   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1500     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1501         priv->mcast_udpsrc_v4, priv->mcast_udpsink, &priv->mcast_addr_v4, TRUE);
1502
1503     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1504         priv->mcast_udpsrc_v6, priv->mcast_udpsink, &priv->mcast_addr_v6, TRUE);
1505   }
1506
1507   return ret;
1508 }
1509
1510 /**
1511  * gst_rtsp_stream_get_server_port:
1512  * @stream: a #GstRTSPStream
1513  * @server_port: (out): result server port
1514  * @family: the port family to get
1515  *
1516  * Fill @server_port with the port pair used by the server. This function can
1517  * only be called when @stream has been joined.
1518  */
1519 void
1520 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1521     GstRTSPRange * server_port, GSocketFamily family)
1522 {
1523   GstRTSPStreamPrivate *priv;
1524
1525   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1526   priv = stream->priv;
1527   g_return_if_fail (priv->joined_bin != NULL);
1528
1529   g_mutex_lock (&priv->lock);
1530   if (family == G_SOCKET_FAMILY_IPV4) {
1531     if (server_port) {
1532       server_port->min = priv->server_addr_v4->port;
1533       server_port->max =
1534           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1535     }
1536   } else {
1537     if (server_port) {
1538       server_port->min = priv->server_addr_v6->port;
1539       server_port->max =
1540           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1541     }
1542   }
1543   g_mutex_unlock (&priv->lock);
1544 }
1545
1546 /**
1547  * gst_rtsp_stream_get_rtpsession:
1548  * @stream: a #GstRTSPStream
1549  *
1550  * Get the RTP session of this stream.
1551  *
1552  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1553  */
1554 GObject *
1555 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1556 {
1557   GstRTSPStreamPrivate *priv;
1558   GObject *session;
1559
1560   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1561
1562   priv = stream->priv;
1563
1564   g_mutex_lock (&priv->lock);
1565   if ((session = priv->session))
1566     g_object_ref (session);
1567   g_mutex_unlock (&priv->lock);
1568
1569   return session;
1570 }
1571
1572 /**
1573  * gst_rtsp_stream_get_encoder:
1574  * @stream: a #GstRTSPStream
1575  *
1576  * Get the SRTP encoder for this stream.
1577  *
1578  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1579  */
1580 GstElement *
1581 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1582 {
1583   GstRTSPStreamPrivate *priv;
1584   GstElement *encoder;
1585
1586   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1587
1588   priv = stream->priv;
1589
1590   g_mutex_lock (&priv->lock);
1591   if ((encoder = priv->srtpenc))
1592     g_object_ref (encoder);
1593   g_mutex_unlock (&priv->lock);
1594
1595   return encoder;
1596 }
1597
1598 /**
1599  * gst_rtsp_stream_get_ssrc:
1600  * @stream: a #GstRTSPStream
1601  * @ssrc: (out): result ssrc
1602  *
1603  * Get the SSRC used by the RTP session of this stream. This function can only
1604  * be called when @stream has been joined.
1605  */
1606 void
1607 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1608 {
1609   GstRTSPStreamPrivate *priv;
1610
1611   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1612   priv = stream->priv;
1613   g_return_if_fail (priv->joined_bin != NULL);
1614
1615   g_mutex_lock (&priv->lock);
1616   if (ssrc && priv->session)
1617     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1618   g_mutex_unlock (&priv->lock);
1619 }
1620
1621 /**
1622  * gst_rtsp_stream_set_retransmission_time:
1623  * @stream: a #GstRTSPStream
1624  * @time: a #GstClockTime
1625  *
1626  * Set the amount of time to store retransmission packets.
1627  */
1628 void
1629 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1630     GstClockTime time)
1631 {
1632   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1633
1634   g_mutex_lock (&stream->priv->lock);
1635   stream->priv->rtx_time = time;
1636   if (stream->priv->rtxsend)
1637     g_object_set (stream->priv->rtxsend, "max-size-time",
1638         GST_TIME_AS_MSECONDS (time), NULL);
1639   g_mutex_unlock (&stream->priv->lock);
1640 }
1641
1642 /**
1643  * gst_rtsp_stream_get_retransmission_time:
1644  * @stream: a #GstRTSPStream
1645  *
1646  * Get the amount of time to store retransmission data.
1647  *
1648  * Returns: the amount of time to store retransmission data.
1649  */
1650 GstClockTime
1651 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1652 {
1653   GstClockTime ret;
1654
1655   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1656
1657   g_mutex_lock (&stream->priv->lock);
1658   ret = stream->priv->rtx_time;
1659   g_mutex_unlock (&stream->priv->lock);
1660
1661   return ret;
1662 }
1663
1664 /**
1665  * gst_rtsp_stream_set_retransmission_pt:
1666  * @stream: a #GstRTSPStream
1667  * @rtx_pt: a #guint
1668  *
1669  * Set the payload type (pt) for retransmission of this stream.
1670  */
1671 void
1672 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1673 {
1674   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1675
1676   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1677
1678   g_mutex_lock (&stream->priv->lock);
1679   stream->priv->rtx_pt = rtx_pt;
1680   if (stream->priv->rtxsend) {
1681     guint pt = gst_rtsp_stream_get_pt (stream);
1682     gchar *pt_s = g_strdup_printf ("%d", pt);
1683     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1684         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1685     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1686     g_free (pt_s);
1687     gst_structure_free (rtx_pt_map);
1688   }
1689   g_mutex_unlock (&stream->priv->lock);
1690 }
1691
1692 /**
1693  * gst_rtsp_stream_get_retransmission_pt:
1694  * @stream: a #GstRTSPStream
1695  *
1696  * Get the payload-type used for retransmission of this stream
1697  *
1698  * Returns: The retransmission PT.
1699  */
1700 guint
1701 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1702 {
1703   guint rtx_pt;
1704
1705   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1706
1707   g_mutex_lock (&stream->priv->lock);
1708   rtx_pt = stream->priv->rtx_pt;
1709   g_mutex_unlock (&stream->priv->lock);
1710
1711   return rtx_pt;
1712 }
1713
1714 /**
1715  * gst_rtsp_stream_set_buffer_size:
1716  * @stream: a #GstRTSPStream
1717  * @size: the buffer size
1718  *
1719  * Set the size of the UDP transmission buffer (in bytes)
1720  * Needs to be set before the stream is joined to a bin.
1721  *
1722  * Since: 1.6
1723  */
1724 void
1725 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1726 {
1727   g_mutex_lock (&stream->priv->lock);
1728   stream->priv->buffer_size = size;
1729   g_mutex_unlock (&stream->priv->lock);
1730 }
1731
1732 /**
1733  * gst_rtsp_stream_get_buffer_size:
1734  * @stream: a #GstRTSPStream
1735  *
1736  * Get the size of the UDP transmission buffer (in bytes)
1737  *
1738  * Returns: the size of the UDP TX buffer
1739  *
1740  * Since: 1.6
1741  */
1742 guint
1743 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1744 {
1745   guint buffer_size;
1746
1747   g_mutex_lock (&stream->priv->lock);
1748   buffer_size = stream->priv->buffer_size;
1749   g_mutex_unlock (&stream->priv->lock);
1750
1751   return buffer_size;
1752 }
1753
1754 /* executed from streaming thread */
1755 static void
1756 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1757 {
1758   GstRTSPStreamPrivate *priv = stream->priv;
1759   GstCaps *newcaps, *oldcaps;
1760
1761   newcaps = gst_pad_get_current_caps (pad);
1762
1763   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1764       newcaps);
1765
1766   g_mutex_lock (&priv->lock);
1767   oldcaps = priv->caps;
1768   priv->caps = newcaps;
1769   g_mutex_unlock (&priv->lock);
1770
1771   if (oldcaps)
1772     gst_caps_unref (oldcaps);
1773 }
1774
1775 static void
1776 dump_structure (const GstStructure * s)
1777 {
1778   gchar *sstr;
1779
1780   sstr = gst_structure_to_string (s);
1781   GST_INFO ("structure: %s", sstr);
1782   g_free (sstr);
1783 }
1784
1785 static GstRTSPStreamTransport *
1786 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1787 {
1788   GstRTSPStreamPrivate *priv = stream->priv;
1789   GList *walk;
1790   GstRTSPStreamTransport *result = NULL;
1791   const gchar *tmp;
1792   gchar *dest;
1793   guint port;
1794
1795   if (rtcp_from == NULL)
1796     return NULL;
1797
1798   tmp = g_strrstr (rtcp_from, ":");
1799   if (tmp == NULL)
1800     return NULL;
1801
1802   port = atoi (tmp + 1);
1803   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1804
1805   g_mutex_lock (&priv->lock);
1806   GST_INFO ("finding %s:%d in %d transports", dest, port,
1807       g_list_length (priv->transports));
1808
1809   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1810     GstRTSPStreamTransport *trans = walk->data;
1811     const GstRTSPTransport *tr;
1812     gint min, max;
1813
1814     tr = gst_rtsp_stream_transport_get_transport (trans);
1815
1816     if (priv->client_side) {
1817       /* In client side mode the 'destination' is the RTSP server, so send
1818        * to those ports */
1819       min = tr->server_port.min;
1820       max = tr->server_port.max;
1821     } else {
1822       min = tr->client_port.min;
1823       max = tr->client_port.max;
1824     }
1825
1826     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1827       result = trans;
1828       break;
1829     }
1830   }
1831   if (result)
1832     g_object_ref (result);
1833   g_mutex_unlock (&priv->lock);
1834
1835   g_free (dest);
1836
1837   return result;
1838 }
1839
1840 static GstRTSPStreamTransport *
1841 check_transport (GObject * source, GstRTSPStream * stream)
1842 {
1843   GstStructure *stats;
1844   GstRTSPStreamTransport *trans;
1845
1846   /* see if we have a stream to match with the origin of the RTCP packet */
1847   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1848   if (trans == NULL) {
1849     g_object_get (source, "stats", &stats, NULL);
1850     if (stats) {
1851       const gchar *rtcp_from;
1852
1853       dump_structure (stats);
1854
1855       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1856       if ((trans = find_transport (stream, rtcp_from))) {
1857         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1858             source);
1859         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1860             g_object_unref);
1861       }
1862       gst_structure_free (stats);
1863     }
1864   }
1865   return trans;
1866 }
1867
1868
1869 static void
1870 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1871 {
1872   GstRTSPStreamTransport *trans;
1873
1874   GST_INFO ("%p: new source %p", stream, source);
1875
1876   trans = check_transport (source, stream);
1877
1878   if (trans)
1879     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1880 }
1881
1882 static void
1883 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1884 {
1885   GST_INFO ("%p: new SDES %p", stream, source);
1886 }
1887
1888 static void
1889 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1890 {
1891   GstRTSPStreamTransport *trans;
1892
1893   trans = check_transport (source, stream);
1894
1895   if (trans) {
1896     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1897     gst_rtsp_stream_transport_keep_alive (trans);
1898   }
1899 #ifdef DUMP_STATS
1900   {
1901     GstStructure *stats;
1902     g_object_get (source, "stats", &stats, NULL);
1903     if (stats) {
1904       dump_structure (stats);
1905       gst_structure_free (stats);
1906     }
1907   }
1908 #endif
1909 }
1910
1911 static void
1912 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1913 {
1914   GST_INFO ("%p: source %p bye", stream, source);
1915 }
1916
1917 static void
1918 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1919 {
1920   GstRTSPStreamTransport *trans;
1921
1922   GST_INFO ("%p: source %p bye timeout", stream, source);
1923
1924   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1925     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1926     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1927   }
1928 }
1929
1930 static void
1931 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1932 {
1933   GstRTSPStreamTransport *trans;
1934
1935   GST_INFO ("%p: source %p timeout", stream, source);
1936
1937   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1938     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1939     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1940   }
1941 }
1942
1943 static void
1944 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1945 {
1946   GST_INFO ("%p: new sender source %p", stream, source);
1947 #ifndef DUMP_STATS
1948   {
1949     GstStructure *stats;
1950     g_object_get (source, "stats", &stats, NULL);
1951     if (stats) {
1952       dump_structure (stats);
1953       gst_structure_free (stats);
1954     }
1955   }
1956 #endif
1957 }
1958
1959 static void
1960 on_sender_ssrc_active (GObject * session, GObject * source,
1961     GstRTSPStream * stream)
1962 {
1963 #ifndef DUMP_STATS
1964   {
1965     GstStructure *stats;
1966     g_object_get (source, "stats", &stats, NULL);
1967     if (stats) {
1968       dump_structure (stats);
1969       gst_structure_free (stats);
1970     }
1971   }
1972 #endif
1973 }
1974
1975 static void
1976 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1977 {
1978   if (is_rtp) {
1979     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1980     g_list_free (priv->tr_cache_rtp);
1981     priv->tr_cache_rtp = NULL;
1982   } else {
1983     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1984     g_list_free (priv->tr_cache_rtcp);
1985     priv->tr_cache_rtcp = NULL;
1986   }
1987 }
1988
1989 static GstFlowReturn
1990 handle_new_sample (GstAppSink * sink, gpointer user_data)
1991 {
1992   GstRTSPStreamPrivate *priv;
1993   GList *walk;
1994   GstSample *sample;
1995   GstBuffer *buffer;
1996   GstRTSPStream *stream;
1997   gboolean is_rtp;
1998
1999   sample = gst_app_sink_pull_sample (sink);
2000   if (!sample)
2001     return GST_FLOW_OK;
2002
2003   stream = (GstRTSPStream *) user_data;
2004   priv = stream->priv;
2005   buffer = gst_sample_get_buffer (sample);
2006
2007   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2008
2009   g_mutex_lock (&priv->lock);
2010   if (is_rtp) {
2011     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2012       clear_tr_cache (priv, is_rtp);
2013       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2014         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2015         priv->tr_cache_rtp =
2016             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2017       }
2018       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2019     }
2020   } else {
2021     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2022       clear_tr_cache (priv, is_rtp);
2023       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2024         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2025         priv->tr_cache_rtcp =
2026             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2027       }
2028       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2029     }
2030   }
2031   g_mutex_unlock (&priv->lock);
2032
2033   if (is_rtp) {
2034     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2035       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2036       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2037     }
2038   } else {
2039     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2040       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2041       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2042     }
2043   }
2044   gst_sample_unref (sample);
2045
2046   return GST_FLOW_OK;
2047 }
2048
2049 static GstAppSinkCallbacks sink_cb = {
2050   NULL,                         /* not interested in EOS */
2051   NULL,                         /* not interested in preroll samples */
2052   handle_new_sample,
2053 };
2054
2055 static GstElement *
2056 get_rtp_encoder (GstRTSPStream * stream, guint session)
2057 {
2058   GstRTSPStreamPrivate *priv = stream->priv;
2059
2060   if (priv->srtpenc == NULL) {
2061     gchar *name;
2062
2063     name = g_strdup_printf ("srtpenc_%u", session);
2064     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2065     g_free (name);
2066
2067     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2068   }
2069   return gst_object_ref (priv->srtpenc);
2070 }
2071
2072 static GstElement *
2073 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2074 {
2075   GstRTSPStreamPrivate *priv = stream->priv;
2076   GstElement *oldenc, *enc;
2077   GstPad *pad;
2078   gchar *name;
2079
2080   if (priv->idx != session)
2081     return NULL;
2082
2083   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2084
2085   oldenc = priv->srtpenc;
2086   enc = get_rtp_encoder (stream, session);
2087   name = g_strdup_printf ("rtp_sink_%d", session);
2088   pad = gst_element_get_request_pad (enc, name);
2089   g_free (name);
2090   gst_object_unref (pad);
2091
2092   if (oldenc == NULL)
2093     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2094         enc);
2095
2096   return enc;
2097 }
2098
2099 static GstElement *
2100 request_rtcp_encoder (GstElement * rtpbin, guint session,
2101     GstRTSPStream * stream)
2102 {
2103   GstRTSPStreamPrivate *priv = stream->priv;
2104   GstElement *oldenc, *enc;
2105   GstPad *pad;
2106   gchar *name;
2107
2108   if (priv->idx != session)
2109     return NULL;
2110
2111   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2112
2113   oldenc = priv->srtpenc;
2114   enc = get_rtp_encoder (stream, session);
2115   name = g_strdup_printf ("rtcp_sink_%d", session);
2116   pad = gst_element_get_request_pad (enc, name);
2117   g_free (name);
2118   gst_object_unref (pad);
2119
2120   if (oldenc == NULL)
2121     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2122         enc);
2123
2124   return enc;
2125 }
2126
2127 static GstCaps *
2128 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2129 {
2130   GstRTSPStreamPrivate *priv = stream->priv;
2131   GstCaps *caps;
2132
2133   GST_DEBUG ("request key %08x", ssrc);
2134
2135   g_mutex_lock (&priv->lock);
2136   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2137     gst_caps_ref (caps);
2138   g_mutex_unlock (&priv->lock);
2139
2140   return caps;
2141 }
2142
2143 static GstElement *
2144 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2145     GstRTSPStream * stream)
2146 {
2147   GstRTSPStreamPrivate *priv = stream->priv;
2148
2149   if (priv->idx != session)
2150     return NULL;
2151
2152   if (priv->srtpdec == NULL) {
2153     gchar *name;
2154
2155     name = g_strdup_printf ("srtpdec_%u", session);
2156     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2157     g_free (name);
2158
2159     g_signal_connect (priv->srtpdec, "request-key",
2160         (GCallback) request_key, stream);
2161   }
2162   return gst_object_ref (priv->srtpdec);
2163 }
2164
2165 /**
2166  * gst_rtsp_stream_request_aux_sender:
2167  * @stream: a #GstRTSPStream
2168  * @sessid: the session id
2169  *
2170  * Creating a rtxsend bin
2171  *
2172  * Returns: (transfer full): a #GstElement.
2173  *
2174  * Since: 1.6
2175  */
2176 GstElement *
2177 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2178 {
2179   GstElement *bin;
2180   GstPad *pad;
2181   GstStructure *pt_map;
2182   gchar *name;
2183   guint pt, rtx_pt;
2184   gchar *pt_s;
2185
2186   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2187
2188   pt = gst_rtsp_stream_get_pt (stream);
2189   pt_s = g_strdup_printf ("%u", pt);
2190   rtx_pt = stream->priv->rtx_pt;
2191
2192   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2193
2194   bin = gst_bin_new (NULL);
2195   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2196   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2197       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2198   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2199       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2200   g_free (pt_s);
2201   gst_structure_free (pt_map);
2202   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2203
2204   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2205   name = g_strdup_printf ("src_%u", sessid);
2206   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2207   g_free (name);
2208   gst_object_unref (pad);
2209
2210   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2211   name = g_strdup_printf ("sink_%u", sessid);
2212   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2213   g_free (name);
2214   gst_object_unref (pad);
2215
2216   return bin;
2217 }
2218
2219 /**
2220  * gst_rtsp_stream_set_pt_map:
2221  * @stream: a #GstRTSPStream
2222  * @pt: the pt
2223  * @caps: a #GstCaps
2224  *
2225  * Configure a pt map between @pt and @caps.
2226  */
2227 void
2228 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2229 {
2230   GstRTSPStreamPrivate *priv = stream->priv;
2231
2232   g_mutex_lock (&priv->lock);
2233   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2234   g_mutex_unlock (&priv->lock);
2235 }
2236
2237 /**
2238  * gst_rtsp_stream_set_publish_clock_mode:
2239  * @stream: a #GstRTSPStream
2240  * @mode: the clock publish mode
2241  *
2242  * Sets if and how the stream clock should be published according to RFC7273.
2243  *
2244  * Since: 1.8
2245  */
2246 void
2247 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2248     GstRTSPPublishClockMode mode)
2249 {
2250   GstRTSPStreamPrivate *priv;
2251
2252   priv = stream->priv;
2253   g_mutex_lock (&priv->lock);
2254   priv->publish_clock_mode = mode;
2255   g_mutex_unlock (&priv->lock);
2256 }
2257
2258 /**
2259  * gst_rtsp_stream_get_publish_clock_mode:
2260  * @factory: a #GstRTSPStream
2261  *
2262  * Gets if and how the stream clock should be published according to RFC7273.
2263  *
2264  * Returns: The GstRTSPPublishClockMode
2265  *
2266  * Since: 1.8
2267  */
2268 GstRTSPPublishClockMode
2269 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2270 {
2271   GstRTSPStreamPrivate *priv;
2272   GstRTSPPublishClockMode ret;
2273
2274   priv = stream->priv;
2275   g_mutex_lock (&priv->lock);
2276   ret = priv->publish_clock_mode;
2277   g_mutex_unlock (&priv->lock);
2278
2279   return ret;
2280 }
2281
2282 static GstCaps *
2283 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2284     GstRTSPStream * stream)
2285 {
2286   GstRTSPStreamPrivate *priv = stream->priv;
2287   GstCaps *caps = NULL;
2288
2289   g_mutex_lock (&priv->lock);
2290
2291   if (priv->idx == session) {
2292     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2293     if (caps) {
2294       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2295       gst_caps_ref (caps);
2296     } else {
2297       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2298     }
2299   }
2300
2301   g_mutex_unlock (&priv->lock);
2302
2303   return caps;
2304 }
2305
2306 static void
2307 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2308 {
2309   GstRTSPStreamPrivate *priv = stream->priv;
2310   gchar *name;
2311   GstPadLinkReturn ret;
2312   guint sessid;
2313
2314   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2315       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2316
2317   name = gst_pad_get_name (pad);
2318   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2319     g_free (name);
2320     return;
2321   }
2322   g_free (name);
2323
2324   if (priv->idx != sessid)
2325     return;
2326
2327   if (gst_pad_is_linked (priv->sinkpad)) {
2328     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2329         GST_DEBUG_PAD_NAME (priv->sinkpad));
2330     return;
2331   }
2332
2333   /* link the RTP pad to the session manager, it should not really fail unless
2334    * this is not really an RTP pad */
2335   ret = gst_pad_link (pad, priv->sinkpad);
2336   if (ret != GST_PAD_LINK_OK)
2337     goto link_failed;
2338   priv->recv_rtp_src = gst_object_ref (pad);
2339
2340   return;
2341
2342 /* ERRORS */
2343 link_failed:
2344   {
2345     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2346         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2347   }
2348 }
2349
2350 static void
2351 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2352     GstRTSPStream * stream)
2353 {
2354   /* TODO: What to do here other than this? */
2355   GST_DEBUG ("Stream %p: Got EOS", stream);
2356   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2357 }
2358
2359 static void
2360 plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
2361     GstElement ** queue_out)
2362 {
2363   GstPad *pad;
2364   GstPad *teepad;
2365   GstPad *queuepad;
2366
2367   gst_bin_add (bin, sink);
2368
2369   *queue_out = gst_element_factory_make ("queue", NULL);
2370   g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
2371       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2372   gst_bin_add (bin, *queue_out);
2373
2374   /* link tee to queue */
2375   teepad = gst_element_get_request_pad (tee, "src_%u");
2376   pad = gst_element_get_static_pad (*queue_out, "sink");
2377   gst_pad_link (teepad, pad);
2378   gst_object_unref (pad);
2379   gst_object_unref (teepad);
2380
2381   /* link queue to sink */
2382   queuepad = gst_element_get_static_pad (*queue_out, "src");
2383   pad = gst_element_get_static_pad (sink, "sink");
2384   gst_pad_link (queuepad, pad);
2385   gst_object_unref (queuepad);
2386   gst_object_unref (pad);
2387 }
2388
2389 /* must be called with lock */
2390 static void
2391 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2392 {
2393   GstRTSPStreamPrivate *priv;
2394   GstPad *pad;
2395   gboolean is_tcp, is_udp;
2396   gint i;
2397
2398   priv = stream->priv;
2399
2400   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2401   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2402       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2403
2404   for (i = 0; i < 2; i++) {
2405     /* For the sender we create this bit of pipeline for both
2406      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2407      * we need to add a queue before appsink and udpsink to make
2408      * the pipeline not block. For the TCP case, we want to pump
2409      * client as fast as possible anyway. This pipeline is used
2410      * when both TCP and UDP are present.
2411      *
2412      * .--------.      .-----.    .---------.    .---------.
2413      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2414      * |       send->sink   src->sink      src->sink       |
2415      * '--------'      |     |    '---------'    '---------'
2416      *                 |     |    .---------.    .---------.
2417      *                 |     |    |  queue  |    | appsink |
2418      *                 |    src->sink      src->sink       |
2419      *                 '-----'    '---------'    '---------'
2420      *
2421      * When only UDP or only TCP is allowed, we skip the tee and queue
2422      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2423      * the session.
2424      */
2425
2426     /* Only link the RTP send src if we're going to send RTP, link
2427      * the RTCP send src always */
2428     if (!priv->srcpad && i == 0)
2429       continue;
2430
2431     if (is_tcp) {
2432       /* make appsink */
2433       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2434       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2435       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2436           &sink_cb, stream, NULL);
2437     }
2438
2439     /* If we have udp always use a tee because we could have mcast clients
2440      * requesting different ports, in which case we'll have to plug more
2441      * udpsinks. */
2442     if (is_udp) {
2443       /* make tee for RTP/RTCP */
2444       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2445       gst_bin_add (bin, priv->tee[i]);
2446
2447       /* and link to rtpbin send pad */
2448       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2449       gst_pad_link (priv->send_src[i], pad);
2450       gst_object_unref (pad);
2451
2452       if (priv->udpsink[i])
2453         plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
2454
2455       if (priv->mcast_udpsink[i])
2456         plug_sink (bin, priv->tee[i], priv->mcast_udpsink[i],
2457             &priv->mcast_udpqueue[i]);
2458
2459       if (is_tcp) {
2460         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2461         plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
2462       }
2463     } else if (is_tcp) {
2464       /* only appsink needed, link it to the session */
2465       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2466       gst_pad_link (priv->send_src[i], pad);
2467       gst_object_unref (pad);
2468
2469       /* when its only TCP, we need to set sync and preroll to FALSE
2470        * for the sink to avoid deadlock. And this is only needed for
2471        * sink used for RTCP data, not the RTP data. */
2472       if (i == 1)
2473         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2474     }
2475
2476     /* check if we need to set to a special state */
2477     if (state != GST_STATE_NULL) {
2478       if (priv->udpsink[i])
2479         gst_element_set_state (priv->udpsink[i], state);
2480       if (priv->mcast_udpsink[i])
2481         gst_element_set_state (priv->mcast_udpsink[i], state);
2482       if (priv->appsink[i])
2483         gst_element_set_state (priv->appsink[i], state);
2484       if (priv->appqueue[i])
2485         gst_element_set_state (priv->appqueue[i], state);
2486       if (priv->udpqueue[i])
2487         gst_element_set_state (priv->udpqueue[i], state);
2488       if (priv->mcast_udpqueue[i])
2489         gst_element_set_state (priv->mcast_udpqueue[i], state);
2490       if (priv->tee[i])
2491         gst_element_set_state (priv->tee[i], state);
2492     }
2493   }
2494 }
2495
2496 /* must be called with lock */
2497 static void
2498 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2499     GstElement * funnel)
2500 {
2501   GstRTSPStreamPrivate *priv;
2502   GstPad *pad, *selpad;
2503
2504   priv = stream->priv;
2505
2506   if (priv->srcpad) {
2507     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2508      * values. This is only relevant for PLAY pipelines */
2509     gst_element_set_state (src, GST_STATE_PLAYING);
2510     gst_element_set_locked_state (src, TRUE);
2511   }
2512
2513   /* add src */
2514   gst_bin_add (bin, src);
2515
2516   /* and link to the funnel */
2517   selpad = gst_element_get_request_pad (funnel, "sink_%u");
2518   pad = gst_element_get_static_pad (src, "src");
2519   gst_pad_link (pad, selpad);
2520   gst_object_unref (pad);
2521   gst_object_unref (selpad);
2522 }
2523
2524 /* must be called with lock */
2525 static void
2526 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2527 {
2528   GstRTSPStreamPrivate *priv;
2529   GstPad *pad;
2530   gboolean is_tcp;
2531   gint i;
2532
2533   priv = stream->priv;
2534
2535   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2536
2537   for (i = 0; i < 2; i++) {
2538     /* For the receiver we create this bit of pipeline for both
2539      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2540      * and it is all funneled into the rtpbin receive pad.
2541      *
2542      * .--------.     .--------.    .--------.
2543      * | udpsrc |     | funnel |    | rtpbin |
2544      * |       src->sink      src->sink      |
2545      * '--------'     |        |    '--------'
2546      * .--------.     |        |
2547      * | appsrc |     |        |
2548      * |       src->sink       |
2549      * '--------'     '--------'
2550      */
2551
2552     if (!priv->sinkpad && i == 0) {
2553       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2554        * RTCP sink always */
2555       continue;
2556     }
2557
2558     /* make funnel for the RTP/RTCP receivers */
2559     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2560     gst_bin_add (bin, priv->funnel[i]);
2561
2562     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2563     gst_pad_link (pad, priv->recv_sink[i]);
2564     gst_object_unref (pad);
2565
2566     if (priv->udpsrc_v4[i])
2567       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
2568
2569     if (priv->udpsrc_v6[i])
2570       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
2571
2572     if (priv->mcast_udpsrc_v4[i])
2573       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
2574
2575     if (priv->mcast_udpsrc_v6[i])
2576       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
2577
2578     if (is_tcp) {
2579       /* make and add appsrc */
2580       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2581       priv->appsrc_base_time[i] = -1;
2582       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2583           TRUE, NULL);
2584       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
2585     }
2586
2587     /* check if we need to set to a special state */
2588     if (state != GST_STATE_NULL) {
2589       gst_element_set_state (priv->funnel[i], state);
2590     }
2591   }
2592 }
2593
2594 static gboolean
2595 check_mcast_part_for_transport (GstRTSPStream * stream,
2596     const GstRTSPTransport * tr)
2597 {
2598   GstRTSPStreamPrivate *priv = stream->priv;
2599   GInetAddress *inetaddr;
2600   GSocketFamily family;
2601   GstRTSPAddress *mcast_addr;
2602
2603   /* Check if it's a ipv4 or ipv6 transport */
2604   inetaddr = g_inet_address_new_from_string (tr->destination);
2605   family = g_inet_address_get_family (inetaddr);
2606   g_object_unref (inetaddr);
2607
2608   /* Select fields corresponding to the family */
2609   if (family == G_SOCKET_FAMILY_IPV4) {
2610     mcast_addr = priv->mcast_addr_v4;
2611   } else {
2612     mcast_addr = priv->mcast_addr_v6;
2613   }
2614
2615   /* We support only one mcast group per family, make sure this transport
2616    * matches it. */
2617   if (!mcast_addr)
2618     goto no_addr;
2619
2620   if (!g_str_equal (tr->destination, mcast_addr->address) ||
2621       tr->port.min != mcast_addr->port ||
2622       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
2623       tr->ttl != mcast_addr->ttl)
2624     goto wrong_addr;
2625
2626   return TRUE;
2627
2628 no_addr:
2629   {
2630     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
2631         "has been reserved");
2632     return FALSE;
2633   }
2634 wrong_addr:
2635   {
2636     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
2637         "the reserved address");
2638     return FALSE;
2639   }
2640 }
2641
2642 /**
2643  * gst_rtsp_stream_join_bin:
2644  * @stream: a #GstRTSPStream
2645  * @bin: (transfer none): a #GstBin to join
2646  * @rtpbin: (transfer none): a rtpbin element in @bin
2647  * @state: the target state of the new elements
2648  *
2649  * Join the #GstBin @bin that contains the element @rtpbin.
2650  *
2651  * @stream will link to @rtpbin, which must be inside @bin. The elements
2652  * added to @bin will be set to the state given in @state.
2653  *
2654  * Returns: %TRUE on success.
2655  */
2656 gboolean
2657 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2658     GstElement * rtpbin, GstState state)
2659 {
2660   GstRTSPStreamPrivate *priv;
2661   guint idx;
2662   gchar *name;
2663   GstPadLinkReturn ret;
2664
2665   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2666   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2667   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2668
2669   priv = stream->priv;
2670
2671   g_mutex_lock (&priv->lock);
2672   if (priv->joined_bin != NULL)
2673     goto was_joined;
2674
2675   /* create a session with the same index as the stream */
2676   idx = priv->idx;
2677
2678   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2679
2680   if (!alloc_ports (stream))
2681     goto no_ports;
2682
2683   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2684       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2685     /* For SRTP */
2686     g_signal_connect (rtpbin, "request-rtp-encoder",
2687         (GCallback) request_rtp_encoder, stream);
2688     g_signal_connect (rtpbin, "request-rtcp-encoder",
2689         (GCallback) request_rtcp_encoder, stream);
2690     g_signal_connect (rtpbin, "request-rtp-decoder",
2691         (GCallback) request_rtp_rtcp_decoder, stream);
2692     g_signal_connect (rtpbin, "request-rtcp-decoder",
2693         (GCallback) request_rtp_rtcp_decoder, stream);
2694   }
2695
2696   if (priv->sinkpad) {
2697     g_signal_connect (rtpbin, "request-pt-map",
2698         (GCallback) request_pt_map, stream);
2699   }
2700
2701   /* get pads from the RTP session element for sending and receiving
2702    * RTP/RTCP*/
2703   if (priv->srcpad) {
2704     /* get a pad for sending RTP */
2705     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2706     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2707     g_free (name);
2708
2709     /* link the RTP pad to the session manager, it should not really fail unless
2710      * this is not really an RTP pad */
2711     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2712     if (ret != GST_PAD_LINK_OK)
2713       goto link_failed;
2714
2715     name = g_strdup_printf ("send_rtp_src_%u", idx);
2716     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2717     g_free (name);
2718   } else {
2719     /* Need to connect our sinkpad from here */
2720     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2721     /* EOS */
2722     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2723
2724     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2725     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2726     g_free (name);
2727   }
2728
2729   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2730   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2731   g_free (name);
2732   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2733   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2734   g_free (name);
2735
2736   /* get the session */
2737   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2738
2739   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2740       stream);
2741   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2742       stream);
2743   g_signal_connect (priv->session, "on-ssrc-active",
2744       (GCallback) on_ssrc_active, stream);
2745   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2746       stream);
2747   g_signal_connect (priv->session, "on-bye-timeout",
2748       (GCallback) on_bye_timeout, stream);
2749   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2750       stream);
2751
2752   /* signal for sender ssrc */
2753   g_signal_connect (priv->session, "on-new-sender-ssrc",
2754       (GCallback) on_new_sender_ssrc, stream);
2755   g_signal_connect (priv->session, "on-sender-ssrc-active",
2756       (GCallback) on_sender_ssrc_active, stream);
2757
2758   create_sender_part (stream, bin, state);
2759   create_receiver_part (stream, bin, state);
2760
2761   if (priv->srcpad) {
2762     /* be notified of caps changes */
2763     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2764         (GCallback) caps_notify, stream);
2765   }
2766
2767   priv->joined_bin = gst_object_ref (bin);
2768   g_mutex_unlock (&priv->lock);
2769
2770   return TRUE;
2771
2772   /* ERRORS */
2773 was_joined:
2774   {
2775     g_mutex_unlock (&priv->lock);
2776     return TRUE;
2777   }
2778 no_ports:
2779   {
2780     g_mutex_unlock (&priv->lock);
2781     GST_WARNING ("failed to allocate ports %u", idx);
2782     return FALSE;
2783   }
2784 link_failed:
2785   {
2786     GST_WARNING ("failed to link stream %u", idx);
2787     gst_object_unref (priv->send_rtp_sink);
2788     priv->send_rtp_sink = NULL;
2789     g_mutex_unlock (&priv->lock);
2790     return FALSE;
2791   }
2792 }
2793
2794 static void
2795 clear_element (GstBin * bin, GstElement ** elementptr)
2796 {
2797   if (*elementptr) {
2798     gst_element_set_locked_state (*elementptr, FALSE);
2799     gst_element_set_state (*elementptr, GST_STATE_NULL);
2800     if (GST_ELEMENT_PARENT (*elementptr))
2801       gst_bin_remove (bin, *elementptr);
2802     else
2803       gst_object_unref (*elementptr);
2804     *elementptr = NULL;
2805   }
2806 }
2807
2808 /**
2809  * gst_rtsp_stream_leave_bin:
2810  * @stream: a #GstRTSPStream
2811  * @bin: (transfer none): a #GstBin
2812  * @rtpbin: (transfer none): a rtpbin #GstElement
2813  *
2814  * Remove the elements of @stream from @bin.
2815  *
2816  * Return: %TRUE on success.
2817  */
2818 gboolean
2819 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2820     GstElement * rtpbin)
2821 {
2822   GstRTSPStreamPrivate *priv;
2823   gint i;
2824
2825   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2826   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2827   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2828
2829   priv = stream->priv;
2830
2831   g_mutex_lock (&priv->lock);
2832   if (priv->joined_bin == NULL)
2833     goto was_not_joined;
2834   if (priv->joined_bin != bin)
2835     goto wrong_bin;
2836
2837   priv->joined_bin = NULL;
2838
2839   /* all transports must be removed by now */
2840   if (priv->transports != NULL)
2841     goto transports_not_removed;
2842
2843   clear_tr_cache (priv, TRUE);
2844   clear_tr_cache (priv, FALSE);
2845
2846   GST_INFO ("stream %p leaving bin", stream);
2847
2848   if (priv->srcpad) {
2849     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2850
2851     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2852     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2853     gst_object_unref (priv->send_rtp_sink);
2854     priv->send_rtp_sink = NULL;
2855   } else if (priv->recv_rtp_src) {
2856     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2857     gst_object_unref (priv->recv_rtp_src);
2858     priv->recv_rtp_src = NULL;
2859   }
2860
2861   for (i = 0; i < 2; i++) {
2862     clear_element (bin, &priv->udpsrc_v4[i]);
2863     clear_element (bin, &priv->udpsrc_v6[i]);
2864     clear_element (bin, &priv->udpqueue[i]);
2865     clear_element (bin, &priv->udpsink[i]);
2866
2867     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
2868     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
2869     clear_element (bin, &priv->mcast_udpqueue[i]);
2870     clear_element (bin, &priv->mcast_udpsink[i]);
2871
2872     clear_element (bin, &priv->appsrc[i]);
2873     clear_element (bin, &priv->appqueue[i]);
2874     clear_element (bin, &priv->appsink[i]);
2875
2876     clear_element (bin, &priv->tee[i]);
2877     clear_element (bin, &priv->funnel[i]);
2878
2879     if (priv->sinkpad || i == 1) {
2880       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2881       gst_object_unref (priv->recv_sink[i]);
2882       priv->recv_sink[i] = NULL;
2883     }
2884   }
2885
2886   if (priv->srcpad) {
2887     gst_object_unref (priv->send_src[0]);
2888     priv->send_src[0] = NULL;
2889   }
2890
2891   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2892   gst_object_unref (priv->send_src[1]);
2893   priv->send_src[1] = NULL;
2894
2895   g_object_unref (priv->session);
2896   priv->session = NULL;
2897   if (priv->caps)
2898     gst_caps_unref (priv->caps);
2899   priv->caps = NULL;
2900
2901   if (priv->srtpenc)
2902     gst_object_unref (priv->srtpenc);
2903   if (priv->srtpdec)
2904     gst_object_unref (priv->srtpdec);
2905
2906   if (priv->mcast_addr_v4)
2907     gst_rtsp_address_free (priv->mcast_addr_v4);
2908   priv->mcast_addr_v4 = NULL;
2909   if (priv->mcast_addr_v6)
2910     gst_rtsp_address_free (priv->mcast_addr_v6);
2911   priv->mcast_addr_v6 = NULL;
2912   if (priv->server_addr_v4)
2913     gst_rtsp_address_free (priv->server_addr_v4);
2914   priv->server_addr_v4 = NULL;
2915   if (priv->server_addr_v6)
2916     gst_rtsp_address_free (priv->server_addr_v6);
2917   priv->server_addr_v6 = NULL;
2918
2919   g_clear_object (&priv->joined_bin);
2920   g_mutex_unlock (&priv->lock);
2921
2922   return TRUE;
2923
2924 was_not_joined:
2925   {
2926     g_mutex_unlock (&priv->lock);
2927     return TRUE;
2928   }
2929 transports_not_removed:
2930   {
2931     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2932     g_mutex_unlock (&priv->lock);
2933     return FALSE;
2934   }
2935 wrong_bin:
2936   {
2937     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
2938     g_mutex_unlock (&priv->lock);
2939     return FALSE;
2940   }
2941 }
2942
2943 /**
2944  * gst_rtsp_stream_get_joined_bin:
2945  * @stream: a #GstRTSPStream
2946  *
2947  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
2948  *
2949  * Return: (transfer full): the joined bin or NULL.
2950  */
2951 GstBin *
2952 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
2953 {
2954   GstRTSPStreamPrivate *priv;
2955   GstBin *bin = NULL;
2956
2957   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2958
2959   priv = stream->priv;
2960
2961   g_mutex_lock (&priv->lock);
2962   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
2963   g_mutex_unlock (&priv->lock);
2964
2965   return bin;
2966 }
2967
2968 /**
2969  * gst_rtsp_stream_get_rtpinfo:
2970  * @stream: a #GstRTSPStream
2971  * @rtptime: (allow-none): result RTP timestamp
2972  * @seq: (allow-none): result RTP seqnum
2973  * @clock_rate: (allow-none): the clock rate
2974  * @running_time: (allow-none): result running-time
2975  *
2976  * Retrieve the current rtptime, seq and running-time. This is used to
2977  * construct a RTPInfo reply header.
2978  *
2979  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2980  */
2981 gboolean
2982 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2983     guint * rtptime, guint * seq, guint * clock_rate,
2984     GstClockTime * running_time)
2985 {
2986   GstRTSPStreamPrivate *priv;
2987   GstStructure *stats;
2988   GObjectClass *payobjclass;
2989
2990   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2991
2992   priv = stream->priv;
2993
2994   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2995
2996   g_mutex_lock (&priv->lock);
2997
2998   /* First try to extract the information from the last buffer on the sinks.
2999    * This will have a more accurate sequence number and timestamp, as between
3000    * the payloader and the sink there can be some queues
3001    */
3002   if (priv->udpsink[0] || priv->appsink[0]) {
3003     GstSample *last_sample;
3004
3005     if (priv->udpsink[0])
3006       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3007     else
3008       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3009
3010     if (last_sample) {
3011       GstCaps *caps;
3012       GstBuffer *buffer;
3013       GstSegment *segment;
3014       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3015
3016       caps = gst_sample_get_caps (last_sample);
3017       buffer = gst_sample_get_buffer (last_sample);
3018       segment = gst_sample_get_segment (last_sample);
3019
3020       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3021         if (seq) {
3022           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3023         }
3024
3025         if (rtptime) {
3026           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3027         }
3028
3029         gst_rtp_buffer_unmap (&rtp_buffer);
3030
3031         if (running_time) {
3032           *running_time =
3033               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3034               GST_BUFFER_TIMESTAMP (buffer));
3035         }
3036
3037         if (clock_rate) {
3038           GstStructure *s = gst_caps_get_structure (caps, 0);
3039
3040           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3041
3042           if (*clock_rate == 0 && running_time)
3043             *running_time = GST_CLOCK_TIME_NONE;
3044         }
3045         gst_sample_unref (last_sample);
3046
3047         goto done;
3048       } else {
3049         gst_sample_unref (last_sample);
3050       }
3051     }
3052   }
3053
3054   if (g_object_class_find_property (payobjclass, "stats")) {
3055     g_object_get (priv->payloader, "stats", &stats, NULL);
3056     if (stats == NULL)
3057       goto no_stats;
3058
3059     if (seq)
3060       gst_structure_get_uint (stats, "seqnum", seq);
3061
3062     if (rtptime)
3063       gst_structure_get_uint (stats, "timestamp", rtptime);
3064
3065     if (running_time)
3066       gst_structure_get_clock_time (stats, "running-time", running_time);
3067
3068     if (clock_rate) {
3069       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3070       if (*clock_rate == 0 && running_time)
3071         *running_time = GST_CLOCK_TIME_NONE;
3072     }
3073     gst_structure_free (stats);
3074   } else {
3075     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3076         !g_object_class_find_property (payobjclass, "timestamp"))
3077       goto no_stats;
3078
3079     if (seq)
3080       g_object_get (priv->payloader, "seqnum", seq, NULL);
3081
3082     if (rtptime)
3083       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3084
3085     if (running_time)
3086       *running_time = GST_CLOCK_TIME_NONE;
3087   }
3088
3089 done:
3090   g_mutex_unlock (&priv->lock);
3091
3092   return TRUE;
3093
3094   /* ERRORS */
3095 no_stats:
3096   {
3097     GST_WARNING ("Could not get payloader stats");
3098     g_mutex_unlock (&priv->lock);
3099     return FALSE;
3100   }
3101 }
3102
3103 /**
3104  * gst_rtsp_stream_get_caps:
3105  * @stream: a #GstRTSPStream
3106  *
3107  * Retrieve the current caps of @stream.
3108  *
3109  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3110  * after usage.
3111  */
3112 GstCaps *
3113 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3114 {
3115   GstRTSPStreamPrivate *priv;
3116   GstCaps *result;
3117
3118   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3119
3120   priv = stream->priv;
3121
3122   g_mutex_lock (&priv->lock);
3123   if ((result = priv->caps))
3124     gst_caps_ref (result);
3125   g_mutex_unlock (&priv->lock);
3126
3127   return result;
3128 }
3129
3130 /**
3131  * gst_rtsp_stream_recv_rtp:
3132  * @stream: a #GstRTSPStream
3133  * @buffer: (transfer full): a #GstBuffer
3134  *
3135  * Handle an RTP buffer for the stream. This method is usually called when a
3136  * message has been received from a client using the TCP transport.
3137  *
3138  * This function takes ownership of @buffer.
3139  *
3140  * Returns: a GstFlowReturn.
3141  */
3142 GstFlowReturn
3143 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3144 {
3145   GstRTSPStreamPrivate *priv;
3146   GstFlowReturn ret;
3147   GstElement *element;
3148
3149   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3150   priv = stream->priv;
3151   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3152   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3153
3154   g_mutex_lock (&priv->lock);
3155   if (priv->appsrc[0])
3156     element = gst_object_ref (priv->appsrc[0]);
3157   else
3158     element = NULL;
3159   g_mutex_unlock (&priv->lock);
3160
3161   if (element) {
3162     if (priv->appsrc_base_time[0] == -1) {
3163       /* Take current running_time. This timestamp will be put on
3164        * the first buffer of each stream because we are a live source and so we
3165        * timestamp with the running_time. When we are dealing with TCP, we also
3166        * only timestamp the first buffer (using the DISCONT flag) because a server
3167        * typically bursts data, for which we don't want to compensate by speeding
3168        * up the media. The other timestamps will be interpollated from this one
3169        * using the RTP timestamps. */
3170       GST_OBJECT_LOCK (element);
3171       if (GST_ELEMENT_CLOCK (element)) {
3172         GstClockTime now;
3173         GstClockTime base_time;
3174
3175         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3176         base_time = GST_ELEMENT_CAST (element)->base_time;
3177
3178         priv->appsrc_base_time[0] = now - base_time;
3179         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3180         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3181             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3182             GST_TIME_ARGS (base_time));
3183       }
3184       GST_OBJECT_UNLOCK (element);
3185     }
3186
3187     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3188     gst_object_unref (element);
3189   } else {
3190     ret = GST_FLOW_OK;
3191   }
3192   return ret;
3193 }
3194
3195 /**
3196  * gst_rtsp_stream_recv_rtcp:
3197  * @stream: a #GstRTSPStream
3198  * @buffer: (transfer full): a #GstBuffer
3199  *
3200  * Handle an RTCP buffer for the stream. This method is usually called when a
3201  * message has been received from a client using the TCP transport.
3202  *
3203  * This function takes ownership of @buffer.
3204  *
3205  * Returns: a GstFlowReturn.
3206  */
3207 GstFlowReturn
3208 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3209 {
3210   GstRTSPStreamPrivate *priv;
3211   GstFlowReturn ret;
3212   GstElement *element;
3213
3214   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3215   priv = stream->priv;
3216   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3217
3218   if (priv->joined_bin == NULL) {
3219     gst_buffer_unref (buffer);
3220     return GST_FLOW_NOT_LINKED;
3221   }
3222   g_mutex_lock (&priv->lock);
3223   if (priv->appsrc[1])
3224     element = gst_object_ref (priv->appsrc[1]);
3225   else
3226     element = NULL;
3227   g_mutex_unlock (&priv->lock);
3228
3229   if (element) {
3230     if (priv->appsrc_base_time[1] == -1) {
3231       /* Take current running_time. This timestamp will be put on
3232        * the first buffer of each stream because we are a live source and so we
3233        * timestamp with the running_time. When we are dealing with TCP, we also
3234        * only timestamp the first buffer (using the DISCONT flag) because a server
3235        * typically bursts data, for which we don't want to compensate by speeding
3236        * up the media. The other timestamps will be interpollated from this one
3237        * using the RTP timestamps. */
3238       GST_OBJECT_LOCK (element);
3239       if (GST_ELEMENT_CLOCK (element)) {
3240         GstClockTime now;
3241         GstClockTime base_time;
3242
3243         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3244         base_time = GST_ELEMENT_CAST (element)->base_time;
3245
3246         priv->appsrc_base_time[1] = now - base_time;
3247         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3248         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3249             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3250             GST_TIME_ARGS (base_time));
3251       }
3252       GST_OBJECT_UNLOCK (element);
3253     }
3254
3255     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3256     gst_object_unref (element);
3257   } else {
3258     ret = GST_FLOW_OK;
3259     gst_buffer_unref (buffer);
3260   }
3261   return ret;
3262 }
3263
3264 /* must be called with lock */
3265 static gboolean
3266 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3267     gboolean add)
3268 {
3269   GstRTSPStreamPrivate *priv = stream->priv;
3270   const GstRTSPTransport *tr;
3271
3272   tr = gst_rtsp_stream_transport_get_transport (trans);
3273
3274   switch (tr->lower_transport) {
3275     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3276     {
3277       if (add) {
3278         if (!check_mcast_part_for_transport (stream, tr))
3279           goto mcast_error;
3280         priv->transports = g_list_prepend (priv->transports, trans);
3281       } else {
3282         priv->transports = g_list_remove (priv->transports, trans);
3283       }
3284       break;
3285     }
3286     case GST_RTSP_LOWER_TRANS_UDP:
3287     {
3288       gchar *dest;
3289       gint min, max;
3290       guint ttl = 0;
3291
3292       dest = tr->destination;
3293       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3294         min = tr->port.min;
3295         max = tr->port.max;
3296         ttl = tr->ttl;
3297       } else if (priv->client_side) {
3298         /* In client side mode the 'destination' is the RTSP server, so send
3299          * to those ports */
3300         min = tr->server_port.min;
3301         max = tr->server_port.max;
3302       } else {
3303         min = tr->client_port.min;
3304         max = tr->client_port.max;
3305       }
3306
3307       if (add) {
3308         if (ttl > 0) {
3309           GST_INFO ("setting ttl-mc %d", ttl);
3310           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3311           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3312         }
3313         GST_INFO ("adding %s:%d-%d", dest, min, max);
3314         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3315         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3316         priv->transports = g_list_prepend (priv->transports, trans);
3317       } else {
3318         GST_INFO ("removing %s:%d-%d", dest, min, max);
3319         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3320         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3321         priv->transports = g_list_remove (priv->transports, trans);
3322       }
3323       priv->transports_cookie++;
3324       break;
3325     }
3326     case GST_RTSP_LOWER_TRANS_TCP:
3327       if (add) {
3328         GST_INFO ("adding TCP %s", tr->destination);
3329         priv->transports = g_list_prepend (priv->transports, trans);
3330       } else {
3331         GST_INFO ("removing TCP %s", tr->destination);
3332         priv->transports = g_list_remove (priv->transports, trans);
3333       }
3334       priv->transports_cookie++;
3335       break;
3336     default:
3337       goto unknown_transport;
3338   }
3339   return TRUE;
3340
3341   /* ERRORS */
3342 unknown_transport:
3343   {
3344     GST_INFO ("Unknown transport %d", tr->lower_transport);
3345     return FALSE;
3346   }
3347 mcast_error:
3348   {
3349     return FALSE;
3350   }
3351 }
3352
3353
3354 /**
3355  * gst_rtsp_stream_add_transport:
3356  * @stream: a #GstRTSPStream
3357  * @trans: (transfer none): a #GstRTSPStreamTransport
3358  *
3359  * Add the transport in @trans to @stream. The media of @stream will
3360  * then also be send to the values configured in @trans.
3361  *
3362  * @stream must be joined to a bin.
3363  *
3364  * @trans must contain a valid #GstRTSPTransport.
3365  *
3366  * Returns: %TRUE if @trans was added
3367  */
3368 gboolean
3369 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3370     GstRTSPStreamTransport * trans)
3371 {
3372   GstRTSPStreamPrivate *priv;
3373   gboolean res;
3374
3375   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3376   priv = stream->priv;
3377   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3378   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3379
3380   g_mutex_lock (&priv->lock);
3381   res = update_transport (stream, trans, TRUE);
3382   g_mutex_unlock (&priv->lock);
3383
3384   return res;
3385 }
3386
3387 /**
3388  * gst_rtsp_stream_remove_transport:
3389  * @stream: a #GstRTSPStream
3390  * @trans: (transfer none): a #GstRTSPStreamTransport
3391  *
3392  * Remove the transport in @trans from @stream. The media of @stream will
3393  * not be sent to the values configured in @trans.
3394  *
3395  * @stream must be joined to a bin.
3396  *
3397  * @trans must contain a valid #GstRTSPTransport.
3398  *
3399  * Returns: %TRUE if @trans was removed
3400  */
3401 gboolean
3402 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3403     GstRTSPStreamTransport * trans)
3404 {
3405   GstRTSPStreamPrivate *priv;
3406   gboolean res;
3407
3408   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3409   priv = stream->priv;
3410   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3411   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3412
3413   g_mutex_lock (&priv->lock);
3414   res = update_transport (stream, trans, FALSE);
3415   g_mutex_unlock (&priv->lock);
3416
3417   return res;
3418 }
3419
3420 /**
3421  * gst_rtsp_stream_update_crypto:
3422  * @stream: a #GstRTSPStream
3423  * @ssrc: the SSRC
3424  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3425  *
3426  * Update the new crypto information for @ssrc in @stream. If information
3427  * for @ssrc did not exist, it will be added. If information
3428  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3429  * be removed from @stream.
3430  *
3431  * Returns: %TRUE if @crypto could be updated
3432  */
3433 gboolean
3434 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3435     guint ssrc, GstCaps * crypto)
3436 {
3437   GstRTSPStreamPrivate *priv;
3438
3439   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3440   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3441
3442   priv = stream->priv;
3443
3444   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3445
3446   g_mutex_lock (&priv->lock);
3447   if (crypto)
3448     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3449         gst_caps_ref (crypto));
3450   else
3451     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3452   g_mutex_unlock (&priv->lock);
3453
3454   return TRUE;
3455 }
3456
3457 /**
3458  * gst_rtsp_stream_get_rtp_socket:
3459  * @stream: a #GstRTSPStream
3460  * @family: the socket family
3461  *
3462  * Get the RTP socket from @stream for a @family.
3463  *
3464  * @stream must be joined to a bin.
3465  *
3466  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3467  * socket could be allocated for @family. Unref after usage
3468  */
3469 GSocket *
3470 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3471 {
3472   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3473   GSocket *socket;
3474   const gchar *name;
3475
3476   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3477   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3478       family == G_SOCKET_FAMILY_IPV6, NULL);
3479   g_return_val_if_fail (priv->udpsink[0], NULL);
3480
3481   if (family == G_SOCKET_FAMILY_IPV6)
3482     name = "socket-v6";
3483   else
3484     name = "socket";
3485
3486   g_object_get (priv->udpsink[0], name, &socket, NULL);
3487
3488   return socket;
3489 }
3490
3491 /**
3492  * gst_rtsp_stream_get_rtcp_socket:
3493  * @stream: a #GstRTSPStream
3494  * @family: the socket family
3495  *
3496  * Get the RTCP socket from @stream for a @family.
3497  *
3498  * @stream must be joined to a bin.
3499  *
3500  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3501  * socket could be allocated for @family. Unref after usage
3502  */
3503 GSocket *
3504 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3505 {
3506   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3507   GSocket *socket;
3508   const gchar *name;
3509
3510   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3511   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3512       family == G_SOCKET_FAMILY_IPV6, NULL);
3513   g_return_val_if_fail (priv->udpsink[1], NULL);
3514
3515   if (family == G_SOCKET_FAMILY_IPV6)
3516     name = "socket-v6";
3517   else
3518     name = "socket";
3519
3520   g_object_get (priv->udpsink[1], name, &socket, NULL);
3521
3522   return socket;
3523 }
3524
3525 /**
3526  * gst_rtsp_stream_set_seqnum:
3527  * @stream: a #GstRTSPStream
3528  * @seqnum: a new sequence number
3529  *
3530  * Configure the sequence number in the payloader of @stream to @seqnum.
3531  */
3532 void
3533 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3534 {
3535   GstRTSPStreamPrivate *priv;
3536
3537   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3538
3539   priv = stream->priv;
3540
3541   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3542 }
3543
3544 /**
3545  * gst_rtsp_stream_get_seqnum:
3546  * @stream: a #GstRTSPStream
3547  *
3548  * Get the configured sequence number in the payloader of @stream.
3549  *
3550  * Returns: the sequence number of the payloader.
3551  */
3552 guint16
3553 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3554 {
3555   GstRTSPStreamPrivate *priv;
3556   guint seqnum;
3557
3558   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3559
3560   priv = stream->priv;
3561
3562   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3563
3564   return seqnum;
3565 }
3566
3567 /**
3568  * gst_rtsp_stream_transport_filter:
3569  * @stream: a #GstRTSPStream
3570  * @func: (scope call) (allow-none): a callback
3571  * @user_data: (closure): user data passed to @func
3572  *
3573  * Call @func for each transport managed by @stream. The result value of @func
3574  * determines what happens to the transport. @func will be called with @stream
3575  * locked so no further actions on @stream can be performed from @func.
3576  *
3577  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3578  * @stream.
3579  *
3580  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3581  *
3582  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3583  * will also be added with an additional ref to the result #GList of this
3584  * function..
3585  *
3586  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3587  *
3588  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3589  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3590  * element in the #GList should be unreffed before the list is freed.
3591  */
3592 GList *
3593 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3594     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3595 {
3596   GstRTSPStreamPrivate *priv;
3597   GList *result, *walk, *next;
3598   GHashTable *visited = NULL;
3599   guint cookie;
3600
3601   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3602
3603   priv = stream->priv;
3604
3605   result = NULL;
3606   if (func)
3607     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3608
3609   g_mutex_lock (&priv->lock);
3610 restart:
3611   cookie = priv->transports_cookie;
3612   for (walk = priv->transports; walk; walk = next) {
3613     GstRTSPStreamTransport *trans = walk->data;
3614     GstRTSPFilterResult res;
3615     gboolean changed;
3616
3617     next = g_list_next (walk);
3618
3619     if (func) {
3620       /* only visit each transport once */
3621       if (g_hash_table_contains (visited, trans))
3622         continue;
3623
3624       g_hash_table_add (visited, g_object_ref (trans));
3625       g_mutex_unlock (&priv->lock);
3626
3627       res = func (stream, trans, user_data);
3628
3629       g_mutex_lock (&priv->lock);
3630     } else
3631       res = GST_RTSP_FILTER_REF;
3632
3633     changed = (cookie != priv->transports_cookie);
3634
3635     switch (res) {
3636       case GST_RTSP_FILTER_REMOVE:
3637         update_transport (stream, trans, FALSE);
3638         break;
3639       case GST_RTSP_FILTER_REF:
3640         result = g_list_prepend (result, g_object_ref (trans));
3641         break;
3642       case GST_RTSP_FILTER_KEEP:
3643       default:
3644         break;
3645     }
3646     if (changed)
3647       goto restart;
3648   }
3649   g_mutex_unlock (&priv->lock);
3650
3651   if (func)
3652     g_hash_table_unref (visited);
3653
3654   return result;
3655 }
3656
3657 static GstPadProbeReturn
3658 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3659 {
3660   GstRTSPStreamPrivate *priv;
3661   GstRTSPStream *stream;
3662
3663   stream = user_data;
3664   priv = stream->priv;
3665
3666   GST_DEBUG_OBJECT (pad, "now blocking");
3667
3668   g_mutex_lock (&priv->lock);
3669   priv->blocking = TRUE;
3670   g_mutex_unlock (&priv->lock);
3671
3672   gst_element_post_message (priv->payloader,
3673       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3674           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3675
3676   return GST_PAD_PROBE_OK;
3677 }
3678
3679 /**
3680  * gst_rtsp_stream_set_blocked:
3681  * @stream: a #GstRTSPStream
3682  * @blocked: boolean indicating we should block or unblock
3683  *
3684  * Blocks or unblocks the dataflow on @stream.
3685  *
3686  * Returns: %TRUE on success
3687  */
3688 gboolean
3689 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3690 {
3691   GstRTSPStreamPrivate *priv;
3692
3693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3694
3695   priv = stream->priv;
3696
3697   g_mutex_lock (&priv->lock);
3698   if (blocked) {
3699     priv->blocking = FALSE;
3700     if (priv->blocked_id == 0) {
3701       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3702           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3703           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3704           g_object_ref (stream), g_object_unref);
3705     }
3706   } else {
3707     if (priv->blocked_id != 0) {
3708       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3709       priv->blocked_id = 0;
3710       priv->blocking = FALSE;
3711     }
3712   }
3713   g_mutex_unlock (&priv->lock);
3714
3715   return TRUE;
3716 }
3717
3718 /**
3719  * gst_rtsp_stream_is_blocking:
3720  * @stream: a #GstRTSPStream
3721  *
3722  * Check if @stream is blocking on a #GstBuffer.
3723  *
3724  * Returns: %TRUE if @stream is blocking
3725  */
3726 gboolean
3727 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3728 {
3729   GstRTSPStreamPrivate *priv;
3730   gboolean result;
3731
3732   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3733
3734   priv = stream->priv;
3735
3736   g_mutex_lock (&priv->lock);
3737   result = priv->blocking;
3738   g_mutex_unlock (&priv->lock);
3739
3740   return result;
3741 }
3742
3743 /**
3744  * gst_rtsp_stream_query_position:
3745  * @stream: a #GstRTSPStream
3746  *
3747  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3748  * the RTP parts of the pipeline and not the RTCP parts.
3749  *
3750  * Returns: %TRUE if the position could be queried
3751  */
3752 gboolean
3753 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3754 {
3755   GstRTSPStreamPrivate *priv;
3756   GstElement *sink;
3757   gboolean ret;
3758
3759   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3760
3761   priv = stream->priv;
3762
3763   g_mutex_lock (&priv->lock);
3764   /* depending on the transport type, it should query corresponding sink */
3765   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3766       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3767     sink = priv->udpsink[0];
3768   else
3769     sink = priv->appsink[0];
3770
3771   if (sink)
3772     gst_object_ref (sink);
3773   g_mutex_unlock (&priv->lock);
3774
3775   if (!sink)
3776     return FALSE;
3777
3778   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3779   gst_object_unref (sink);
3780
3781   return ret;
3782 }
3783
3784 /**
3785  * gst_rtsp_stream_query_stop:
3786  * @stream: a #GstRTSPStream
3787  *
3788  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3789  * the RTP parts of the pipeline and not the RTCP parts.
3790  *
3791  * Returns: %TRUE if the stop could be queried
3792  */
3793 gboolean
3794 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3795 {
3796   GstRTSPStreamPrivate *priv;
3797   GstElement *sink;
3798   GstQuery *query;
3799   gboolean ret;
3800
3801   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3802
3803   priv = stream->priv;
3804
3805   g_mutex_lock (&priv->lock);
3806   /* depending on the transport type, it should query corresponding sink */
3807   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3808       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3809     sink = priv->udpsink[0];
3810   else
3811     sink = priv->appsink[0];
3812
3813   if (sink)
3814     gst_object_ref (sink);
3815   g_mutex_unlock (&priv->lock);
3816
3817   if (!sink)
3818     return FALSE;
3819
3820   query = gst_query_new_segment (GST_FORMAT_TIME);
3821   if ((ret = gst_element_query (sink, query))) {
3822     GstFormat format;
3823
3824     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3825     if (format != GST_FORMAT_TIME)
3826       *stop = -1;
3827   }
3828   gst_query_unref (query);
3829   gst_object_unref (sink);
3830
3831   return ret;
3832
3833 }