9a8f1c21c4fac84e37a0b3649e4f8a3283b54d7a
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102   GSocket *socket_v4[2];
103   GSocket *socket_v6[2];
104
105   /* for UDP multicast */
106   GstElement *mcast_udpsrc_v4[2];
107   GstElement *mcast_udpsrc_v6[2];
108   GstElement *mcast_udpqueue[2];
109   GstElement *mcast_udpsink[2];
110   GSocket *mcast_socket_v4[2];
111   GSocket *mcast_socket_v6[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   guint rtx_pt;
125   GstClockTime rtx_time;
126
127   /* pool used to manage unicast and multicast addresses */
128   GstRTSPAddressPool *pool;
129
130   /* unicast server addr/port */
131   GstRTSPAddress *server_addr_v4;
132   GstRTSPAddress *server_addr_v6;
133
134   /* multicast addresses */
135   GstRTSPAddress *mcast_addr_v4;
136   GstRTSPAddress *mcast_addr_v6;
137
138   gchar *multicast_iface;
139
140   /* the caps of the stream */
141   gulong caps_sig;
142   GstCaps *caps;
143
144   /* transports we stream to */
145   guint n_active;
146   GList *transports;
147   guint transports_cookie;
148   GList *tr_cache_rtp;
149   GList *tr_cache_rtcp;
150   guint tr_cache_cookie_rtp;
151   guint tr_cache_cookie_rtcp;
152
153   gint dscp_qos;
154
155   /* stream blocking */
156   gulong blocked_id[2];
157   gboolean blocking;
158
159   /* current stream postion */
160   GstClockTime position;
161
162   /* pt->caps map for RECORD streams */
163   GHashTable *ptmap;
164
165   GstRTSPPublishClockMode publish_clock_mode;
166 };
167
168 #define DEFAULT_CONTROL         NULL
169 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
170 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
171                                         GST_RTSP_LOWER_TRANS_TCP
172
173 enum
174 {
175   PROP_0,
176   PROP_CONTROL,
177   PROP_PROFILES,
178   PROP_PROTOCOLS,
179   PROP_LAST
180 };
181
182 enum
183 {
184   SIGNAL_NEW_RTP_ENCODER,
185   SIGNAL_NEW_RTCP_ENCODER,
186   SIGNAL_LAST
187 };
188
189 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
190 #define GST_CAT_DEFAULT rtsp_stream_debug
191
192 static GQuark ssrc_stream_map_key;
193
194 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
195     GValue * value, GParamSpec * pspec);
196 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
197     const GValue * value, GParamSpec * pspec);
198
199 static void gst_rtsp_stream_finalize (GObject * obj);
200
201 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
202
203 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
204
205 static void
206 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
207 {
208   GObjectClass *gobject_class;
209
210   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
211
212   gobject_class = G_OBJECT_CLASS (klass);
213
214   gobject_class->get_property = gst_rtsp_stream_get_property;
215   gobject_class->set_property = gst_rtsp_stream_set_property;
216   gobject_class->finalize = gst_rtsp_stream_finalize;
217
218   g_object_class_install_property (gobject_class, PROP_CONTROL,
219       g_param_spec_string ("control", "Control",
220           "The control string for this stream", DEFAULT_CONTROL,
221           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222
223   g_object_class_install_property (gobject_class, PROP_PROFILES,
224       g_param_spec_flags ("profiles", "Profiles",
225           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
226           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
229       g_param_spec_flags ("protocols", "Protocols",
230           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
231           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232
233   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
234       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
235       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
236       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
237
238   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
239       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
241       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
242
243   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
244
245   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
246 }
247
248 static void
249 gst_rtsp_stream_init (GstRTSPStream * stream)
250 {
251   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
252
253   GST_DEBUG ("new stream %p", stream);
254
255   stream->priv = priv;
256
257   priv->dscp_qos = -1;
258   priv->control = g_strdup (DEFAULT_CONTROL);
259   priv->profiles = DEFAULT_PROFILES;
260   priv->protocols = DEFAULT_PROTOCOLS;
261   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
262
263   g_mutex_init (&priv->lock);
264
265   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
266       NULL, (GDestroyNotify) gst_caps_unref);
267   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
268       (GDestroyNotify) gst_caps_unref);
269 }
270
271 static void
272 gst_rtsp_stream_finalize (GObject * obj)
273 {
274   GstRTSPStream *stream;
275   GstRTSPStreamPrivate *priv;
276
277   stream = GST_RTSP_STREAM (obj);
278   priv = stream->priv;
279
280   GST_DEBUG ("finalize stream %p", stream);
281
282   /* we really need to be unjoined now */
283   g_return_if_fail (priv->joined_bin == NULL);
284
285   if (priv->mcast_addr_v4)
286     gst_rtsp_address_free (priv->mcast_addr_v4);
287   if (priv->mcast_addr_v6)
288     gst_rtsp_address_free (priv->mcast_addr_v6);
289   if (priv->server_addr_v4)
290     gst_rtsp_address_free (priv->server_addr_v4);
291   if (priv->server_addr_v6)
292     gst_rtsp_address_free (priv->server_addr_v6);
293   if (priv->pool)
294     g_object_unref (priv->pool);
295   if (priv->rtxsend)
296     g_object_unref (priv->rtxsend);
297
298   if (priv->socket_v4[0])
299     g_object_unref (priv->socket_v4[0]);
300   if (priv->socket_v4[1])
301     g_object_unref (priv->socket_v4[1]);
302   if (priv->socket_v6[0])
303     g_object_unref (priv->socket_v6[0]);
304   if (priv->socket_v6[1])
305     g_object_unref (priv->socket_v6[1]);
306   if (priv->mcast_socket_v4[0])
307     g_object_unref (priv->mcast_socket_v4[0]);
308   if (priv->mcast_socket_v4[1])
309     g_object_unref (priv->mcast_socket_v4[1]);
310   if (priv->mcast_socket_v6[0])
311     g_object_unref (priv->mcast_socket_v6[0]);
312   if (priv->mcast_socket_v6[1])
313     g_object_unref (priv->mcast_socket_v6[1]);
314
315   g_free (priv->multicast_iface);
316
317   gst_object_unref (priv->payloader);
318   if (priv->srcpad)
319     gst_object_unref (priv->srcpad);
320   if (priv->sinkpad)
321     gst_object_unref (priv->sinkpad);
322   g_free (priv->control);
323   g_mutex_clear (&priv->lock);
324
325   g_hash_table_unref (priv->keys);
326   g_hash_table_destroy (priv->ptmap);
327
328   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
329 }
330
331 static void
332 gst_rtsp_stream_get_property (GObject * object, guint propid,
333     GValue * value, GParamSpec * pspec)
334 {
335   GstRTSPStream *stream = GST_RTSP_STREAM (object);
336
337   switch (propid) {
338     case PROP_CONTROL:
339       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
340       break;
341     case PROP_PROFILES:
342       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
343       break;
344     case PROP_PROTOCOLS:
345       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
349   }
350 }
351
352 static void
353 gst_rtsp_stream_set_property (GObject * object, guint propid,
354     const GValue * value, GParamSpec * pspec)
355 {
356   GstRTSPStream *stream = GST_RTSP_STREAM (object);
357
358   switch (propid) {
359     case PROP_CONTROL:
360       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
361       break;
362     case PROP_PROFILES:
363       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
364       break;
365     case PROP_PROTOCOLS:
366       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
367       break;
368     default:
369       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
370   }
371 }
372
373 /**
374  * gst_rtsp_stream_new:
375  * @idx: an index
376  * @pad: a #GstPad
377  * @payloader: a #GstElement
378  *
379  * Create a new media stream with index @idx that handles RTP data on
380  * @pad and has a payloader element @payloader if @pad is a source pad
381  * or a depayloader element @payloader if @pad is a sink pad.
382  *
383  * Returns: (transfer full): a new #GstRTSPStream
384  */
385 GstRTSPStream *
386 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
387 {
388   GstRTSPStreamPrivate *priv;
389   GstRTSPStream *stream;
390
391   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
392   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
393
394   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
395   priv = stream->priv;
396   priv->idx = idx;
397   priv->payloader = gst_object_ref (payloader);
398   if (GST_PAD_IS_SRC (pad))
399     priv->srcpad = gst_object_ref (pad);
400   else
401     priv->sinkpad = gst_object_ref (pad);
402
403   return stream;
404 }
405
406 /**
407  * gst_rtsp_stream_get_index:
408  * @stream: a #GstRTSPStream
409  *
410  * Get the stream index.
411  *
412  * Return: the stream index.
413  */
414 guint
415 gst_rtsp_stream_get_index (GstRTSPStream * stream)
416 {
417   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
418
419   return stream->priv->idx;
420 }
421
422 /**
423  * gst_rtsp_stream_get_pt:
424  * @stream: a #GstRTSPStream
425  *
426  * Get the stream payload type.
427  *
428  * Return: the stream payload type.
429  */
430 guint
431 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
432 {
433   GstRTSPStreamPrivate *priv;
434   guint pt;
435
436   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
437
438   priv = stream->priv;
439
440   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
441
442   return pt;
443 }
444
445 /**
446  * gst_rtsp_stream_get_srcpad:
447  * @stream: a #GstRTSPStream
448  *
449  * Get the srcpad associated with @stream.
450  *
451  * Returns: (transfer full): the srcpad. Unref after usage.
452  */
453 GstPad *
454 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
455 {
456   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
457
458   if (!stream->priv->srcpad)
459     return NULL;
460
461   return gst_object_ref (stream->priv->srcpad);
462 }
463
464 /**
465  * gst_rtsp_stream_get_sinkpad:
466  * @stream: a #GstRTSPStream
467  *
468  * Get the sinkpad associated with @stream.
469  *
470  * Returns: (transfer full): the sinkpad. Unref after usage.
471  */
472 GstPad *
473 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
474 {
475   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
476
477   if (!stream->priv->sinkpad)
478     return NULL;
479
480   return gst_object_ref (stream->priv->sinkpad);
481 }
482
483 /**
484  * gst_rtsp_stream_get_control:
485  * @stream: a #GstRTSPStream
486  *
487  * Get the control string to identify this stream.
488  *
489  * Returns: (transfer full): the control string. g_free() after usage.
490  */
491 gchar *
492 gst_rtsp_stream_get_control (GstRTSPStream * stream)
493 {
494   GstRTSPStreamPrivate *priv;
495   gchar *result;
496
497   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
498
499   priv = stream->priv;
500
501   g_mutex_lock (&priv->lock);
502   if ((result = g_strdup (priv->control)) == NULL)
503     result = g_strdup_printf ("stream=%u", priv->idx);
504   g_mutex_unlock (&priv->lock);
505
506   return result;
507 }
508
509 /**
510  * gst_rtsp_stream_set_control:
511  * @stream: a #GstRTSPStream
512  * @control: a control string
513  *
514  * Set the control string in @stream.
515  */
516 void
517 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
518 {
519   GstRTSPStreamPrivate *priv;
520
521   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
522
523   priv = stream->priv;
524
525   g_mutex_lock (&priv->lock);
526   g_free (priv->control);
527   priv->control = g_strdup (control);
528   g_mutex_unlock (&priv->lock);
529 }
530
531 /**
532  * gst_rtsp_stream_has_control:
533  * @stream: a #GstRTSPStream
534  * @control: a control string
535  *
536  * Check if @stream has the control string @control.
537  *
538  * Returns: %TRUE is @stream has @control as the control string
539  */
540 gboolean
541 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
542 {
543   GstRTSPStreamPrivate *priv;
544   gboolean res;
545
546   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
547
548   priv = stream->priv;
549
550   g_mutex_lock (&priv->lock);
551   if (priv->control)
552     res = (g_strcmp0 (priv->control, control) == 0);
553   else {
554     guint streamid;
555
556     if (sscanf (control, "stream=%u", &streamid) > 0)
557       res = (streamid == priv->idx);
558     else
559       res = FALSE;
560   }
561   g_mutex_unlock (&priv->lock);
562
563   return res;
564 }
565
566 /**
567  * gst_rtsp_stream_set_mtu:
568  * @stream: a #GstRTSPStream
569  * @mtu: a new MTU
570  *
571  * Configure the mtu in the payloader of @stream to @mtu.
572  */
573 void
574 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
575 {
576   GstRTSPStreamPrivate *priv;
577
578   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
579
580   priv = stream->priv;
581
582   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
583
584   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
585 }
586
587 /**
588  * gst_rtsp_stream_get_mtu:
589  * @stream: a #GstRTSPStream
590  *
591  * Get the configured MTU in the payloader of @stream.
592  *
593  * Returns: the MTU of the payloader.
594  */
595 guint
596 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
597 {
598   GstRTSPStreamPrivate *priv;
599   guint mtu;
600
601   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
602
603   priv = stream->priv;
604
605   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
606
607   return mtu;
608 }
609
610 /* Update the dscp qos property on the udp sinks */
611 static void
612 update_dscp_qos (GstRTSPStream * stream, GstElement ** udpsink)
613 {
614   GstRTSPStreamPrivate *priv;
615
616   priv = stream->priv;
617
618   if (*udpsink) {
619     g_object_set (G_OBJECT (*udpsink), "qos-dscp", priv->dscp_qos, NULL);
620   }
621 }
622
623 /**
624  * gst_rtsp_stream_set_dscp_qos:
625  * @stream: a #GstRTSPStream
626  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
627  *
628  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
629  */
630 void
631 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
632 {
633   GstRTSPStreamPrivate *priv;
634
635   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
636
637   priv = stream->priv;
638
639   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
640
641   if (dscp_qos < -1 || dscp_qos > 63) {
642     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
643     return;
644   }
645
646   priv->dscp_qos = dscp_qos;
647
648   update_dscp_qos (stream, priv->udpsink);
649 }
650
651 /**
652  * gst_rtsp_stream_get_dscp_qos:
653  * @stream: a #GstRTSPStream
654  *
655  * Get the configured DSCP QoS in of the outgoing sockets.
656  *
657  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
658  */
659 gint
660 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
661 {
662   GstRTSPStreamPrivate *priv;
663
664   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
665
666   priv = stream->priv;
667
668   return priv->dscp_qos;
669 }
670
671 /**
672  * gst_rtsp_stream_is_transport_supported:
673  * @stream: a #GstRTSPStream
674  * @transport: (transfer none): a #GstRTSPTransport
675  *
676  * Check if @transport can be handled by stream
677  *
678  * Returns: %TRUE if @transport can be handled by @stream.
679  */
680 gboolean
681 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
682     GstRTSPTransport * transport)
683 {
684   GstRTSPStreamPrivate *priv;
685
686   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
687
688   priv = stream->priv;
689
690   g_mutex_lock (&priv->lock);
691   if (transport->trans != GST_RTSP_TRANS_RTP)
692     goto unsupported_transmode;
693
694   if (!(transport->profile & priv->profiles))
695     goto unsupported_profile;
696
697   if (!(transport->lower_transport & priv->protocols))
698     goto unsupported_ltrans;
699
700   g_mutex_unlock (&priv->lock);
701
702   return TRUE;
703
704   /* ERRORS */
705 unsupported_transmode:
706   {
707     GST_DEBUG ("unsupported transport mode %d", transport->trans);
708     g_mutex_unlock (&priv->lock);
709     return FALSE;
710   }
711 unsupported_profile:
712   {
713     GST_DEBUG ("unsupported profile %d", transport->profile);
714     g_mutex_unlock (&priv->lock);
715     return FALSE;
716   }
717 unsupported_ltrans:
718   {
719     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
720     g_mutex_unlock (&priv->lock);
721     return FALSE;
722   }
723 }
724
725 /**
726  * gst_rtsp_stream_set_profiles:
727  * @stream: a #GstRTSPStream
728  * @profiles: the new profiles
729  *
730  * Configure the allowed profiles for @stream.
731  */
732 void
733 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
734 {
735   GstRTSPStreamPrivate *priv;
736
737   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
738
739   priv = stream->priv;
740
741   g_mutex_lock (&priv->lock);
742   priv->profiles = profiles;
743   g_mutex_unlock (&priv->lock);
744 }
745
746 /**
747  * gst_rtsp_stream_get_profiles:
748  * @stream: a #GstRTSPStream
749  *
750  * Get the allowed profiles of @stream.
751  *
752  * Returns: a #GstRTSPProfile
753  */
754 GstRTSPProfile
755 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
756 {
757   GstRTSPStreamPrivate *priv;
758   GstRTSPProfile res;
759
760   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
761
762   priv = stream->priv;
763
764   g_mutex_lock (&priv->lock);
765   res = priv->profiles;
766   g_mutex_unlock (&priv->lock);
767
768   return res;
769 }
770
771 /**
772  * gst_rtsp_stream_set_protocols:
773  * @stream: a #GstRTSPStream
774  * @protocols: the new flags
775  *
776  * Configure the allowed lower transport for @stream.
777  */
778 void
779 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
780     GstRTSPLowerTrans protocols)
781 {
782   GstRTSPStreamPrivate *priv;
783
784   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
785
786   priv = stream->priv;
787
788   g_mutex_lock (&priv->lock);
789   priv->protocols = protocols;
790   g_mutex_unlock (&priv->lock);
791 }
792
793 /**
794  * gst_rtsp_stream_get_protocols:
795  * @stream: a #GstRTSPStream
796  *
797  * Get the allowed protocols of @stream.
798  *
799  * Returns: a #GstRTSPLowerTrans
800  */
801 GstRTSPLowerTrans
802 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
803 {
804   GstRTSPStreamPrivate *priv;
805   GstRTSPLowerTrans res;
806
807   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
808       GST_RTSP_LOWER_TRANS_UNKNOWN);
809
810   priv = stream->priv;
811
812   g_mutex_lock (&priv->lock);
813   res = priv->protocols;
814   g_mutex_unlock (&priv->lock);
815
816   return res;
817 }
818
819 /**
820  * gst_rtsp_stream_set_address_pool:
821  * @stream: a #GstRTSPStream
822  * @pool: (transfer none): a #GstRTSPAddressPool
823  *
824  * configure @pool to be used as the address pool of @stream.
825  */
826 void
827 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
828     GstRTSPAddressPool * pool)
829 {
830   GstRTSPStreamPrivate *priv;
831   GstRTSPAddressPool *old;
832
833   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
834
835   priv = stream->priv;
836
837   GST_LOG_OBJECT (stream, "set address pool %p", pool);
838
839   g_mutex_lock (&priv->lock);
840   if ((old = priv->pool) != pool)
841     priv->pool = pool ? g_object_ref (pool) : NULL;
842   else
843     old = NULL;
844   g_mutex_unlock (&priv->lock);
845
846   if (old)
847     g_object_unref (old);
848 }
849
850 /**
851  * gst_rtsp_stream_get_address_pool:
852  * @stream: a #GstRTSPStream
853  *
854  * Get the #GstRTSPAddressPool used as the address pool of @stream.
855  *
856  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
857  * usage.
858  */
859 GstRTSPAddressPool *
860 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
861 {
862   GstRTSPStreamPrivate *priv;
863   GstRTSPAddressPool *result;
864
865   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
866
867   priv = stream->priv;
868
869   g_mutex_lock (&priv->lock);
870   if ((result = priv->pool))
871     g_object_ref (result);
872   g_mutex_unlock (&priv->lock);
873
874   return result;
875 }
876
877 /**
878  * gst_rtsp_stream_set_multicast_iface:
879  * @stream: a #GstRTSPStream
880  * @multicast_iface: (transfer none): a multicast interface name
881  *
882  * configure @multicast_iface to be used for @stream.
883  */
884 void
885 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
886     const gchar * multicast_iface)
887 {
888   GstRTSPStreamPrivate *priv;
889   gchar *old;
890
891   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
892
893   priv = stream->priv;
894
895   GST_LOG_OBJECT (stream, "set multicast iface %s",
896       GST_STR_NULL (multicast_iface));
897
898   g_mutex_lock (&priv->lock);
899   if ((old = priv->multicast_iface) != multicast_iface)
900     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
901   else
902     old = NULL;
903   g_mutex_unlock (&priv->lock);
904
905   if (old)
906     g_free (old);
907 }
908
909 /**
910  * gst_rtsp_stream_get_multicast_iface:
911  * @stream: a #GstRTSPStream
912  *
913  * Get the multicast interface used for @stream.
914  *
915  * Returns: (transfer full): the multicast interface for @stream. g_free() after
916  * usage.
917  */
918 gchar *
919 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
920 {
921   GstRTSPStreamPrivate *priv;
922   gchar *result;
923
924   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
925
926   priv = stream->priv;
927
928   g_mutex_lock (&priv->lock);
929   if ((result = priv->multicast_iface))
930     result = g_strdup (result);
931   g_mutex_unlock (&priv->lock);
932
933   return result;
934 }
935
936 /**
937  * gst_rtsp_stream_get_multicast_address:
938  * @stream: a #GstRTSPStream
939  * @family: the #GSocketFamily
940  *
941  * Get the multicast address of @stream for @family. The original
942  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
943  * won't release the address from the pool.
944  *
945  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
946  * or %NULL when no address could be allocated. gst_rtsp_address_free()
947  * after usage.
948  */
949 GstRTSPAddress *
950 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
951     GSocketFamily family)
952 {
953   GstRTSPStreamPrivate *priv;
954   GstRTSPAddress *result;
955   GstRTSPAddress **addrp;
956   GstRTSPAddressFlags flags;
957
958   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
959
960   priv = stream->priv;
961
962   g_mutex_lock (&stream->priv->lock);
963
964   if (family == G_SOCKET_FAMILY_IPV6) {
965     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
966     addrp = &priv->mcast_addr_v6;
967   } else {
968     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
969     addrp = &priv->mcast_addr_v4;
970   }
971
972   if (*addrp == NULL) {
973     if (priv->pool == NULL)
974       goto no_pool;
975
976     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
977
978     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
979     if (*addrp == NULL)
980       goto no_address;
981
982     /* FIXME: Also reserve the same port with unicast ANY address, since that's
983      * where we are going to bind our socket. Probably loop until we find a port
984      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
985      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
986      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
987   }
988   result = gst_rtsp_address_copy (*addrp);
989
990   g_mutex_unlock (&stream->priv->lock);
991
992   return result;
993
994   /* ERRORS */
995 no_pool:
996   {
997     GST_ERROR_OBJECT (stream, "no address pool specified");
998     g_mutex_unlock (&stream->priv->lock);
999     return NULL;
1000   }
1001 no_address:
1002   {
1003     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1004     g_mutex_unlock (&stream->priv->lock);
1005     return NULL;
1006   }
1007 }
1008
1009 /**
1010  * gst_rtsp_stream_reserve_address:
1011  * @stream: a #GstRTSPStream
1012  * @address: an address
1013  * @port: a port
1014  * @n_ports: n_ports
1015  * @ttl: a TTL
1016  *
1017  * Reserve @address and @port as the address and port of @stream. The original
1018  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1019  * won't release the address from the pool.
1020  *
1021  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1022  * the address could be reserved. gst_rtsp_address_free() after usage.
1023  */
1024 GstRTSPAddress *
1025 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1026     const gchar * address, guint port, guint n_ports, guint ttl)
1027 {
1028   GstRTSPStreamPrivate *priv;
1029   GstRTSPAddress *result;
1030   GInetAddress *addr;
1031   GSocketFamily family;
1032   GstRTSPAddress **addrp;
1033
1034   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1035   g_return_val_if_fail (address != NULL, NULL);
1036   g_return_val_if_fail (port > 0, NULL);
1037   g_return_val_if_fail (n_ports > 0, NULL);
1038   g_return_val_if_fail (ttl > 0, NULL);
1039
1040   priv = stream->priv;
1041
1042   addr = g_inet_address_new_from_string (address);
1043   if (!addr) {
1044     GST_ERROR ("failed to get inet addr from %s", address);
1045     family = G_SOCKET_FAMILY_IPV4;
1046   } else {
1047     family = g_inet_address_get_family (addr);
1048     g_object_unref (addr);
1049   }
1050
1051   if (family == G_SOCKET_FAMILY_IPV6)
1052     addrp = &priv->mcast_addr_v6;
1053   else
1054     addrp = &priv->mcast_addr_v4;
1055
1056   g_mutex_lock (&priv->lock);
1057   if (*addrp == NULL) {
1058     GstRTSPAddressPoolResult res;
1059
1060     if (priv->pool == NULL)
1061       goto no_pool;
1062
1063     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1064         port, n_ports, ttl, addrp);
1065     if (res != GST_RTSP_ADDRESS_POOL_OK)
1066       goto no_address;
1067
1068     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1069      * where we are going to bind our socket. */
1070   } else {
1071     if (g_ascii_strcasecmp ((*addrp)->address, address) ||
1072         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1073         (*addrp)->ttl != ttl)
1074       goto different_address;
1075   }
1076   result = gst_rtsp_address_copy (*addrp);
1077   g_mutex_unlock (&priv->lock);
1078
1079   return result;
1080
1081   /* ERRORS */
1082 no_pool:
1083   {
1084     GST_ERROR_OBJECT (stream, "no address pool specified");
1085     g_mutex_unlock (&priv->lock);
1086     return NULL;
1087   }
1088 no_address:
1089   {
1090     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1091         address);
1092     g_mutex_unlock (&priv->lock);
1093     return NULL;
1094   }
1095 different_address:
1096   {
1097     GST_ERROR_OBJECT (stream,
1098         "address %s is not the same as %s that was already reserved",
1099         address, (*addrp)->address);
1100     g_mutex_unlock (&priv->lock);
1101     return NULL;
1102   }
1103 }
1104
1105 /* must be called with lock */
1106 static void
1107 set_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1108     GSocketFamily family)
1109 {
1110   const gchar *multisink_socket;
1111
1112   if (family == G_SOCKET_FAMILY_IPV6)
1113     multisink_socket = "socket-v6";
1114   else
1115     multisink_socket = "socket";
1116
1117   g_object_set (G_OBJECT (udpsink), multisink_socket, socket, NULL);
1118 }
1119
1120 /* must be called with lock */
1121 static void
1122 set_multicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1123     GSocketFamily family, const gchar * multicast_iface,
1124     const gchar * addr_str, gint port)
1125 {
1126   set_socket_for_udpsink (udpsink, socket, family);
1127
1128   if (multicast_iface) {
1129     g_object_set (G_OBJECT (udpsink), "multicast-iface",
1130         multicast_iface, NULL);
1131   }
1132
1133   g_signal_emit_by_name (udpsink, "add", addr_str, port, NULL);
1134 }
1135
1136
1137 /* must be called with lock */
1138 static void
1139 set_unicast_socket_for_udpsink (GstElement * udpsink, GSocket * socket,
1140     GSocketFamily family)
1141 {
1142   set_socket_for_udpsink (udpsink, socket, family);
1143 }
1144
1145 static guint16
1146 get_port_from_socket (GSocket * socket)
1147 {
1148   guint16 port;
1149   GSocketAddress *sockaddr;
1150   GError *err;
1151
1152   GST_DEBUG ("socket: %p", socket);
1153   sockaddr = g_socket_get_local_address (socket, &err);
1154   if (sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (sockaddr)) {
1155     g_clear_object (&sockaddr);
1156     GST_ERROR ("failed to get sockaddr: %s", err->message);
1157     g_error_free (err);
1158     return 0;
1159   }
1160
1161   port = g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (sockaddr));
1162   g_object_unref (sockaddr);
1163
1164   return port;
1165 }
1166
1167
1168 static gboolean
1169 create_and_configure_udpsink (GstRTSPStream * stream, GstElement ** udpsink,
1170     GSocket *socket_v4, GSocket *socket_v6, gboolean multicast, gboolean is_rtp)
1171 {
1172   GstRTSPStreamPrivate *priv = stream->priv;
1173
1174   *udpsink = gst_element_factory_make ("multiudpsink", NULL);
1175
1176   if (!*udpsink)
1177     goto no_udp_protocol;
1178
1179   /* configure sinks */
1180
1181   g_object_set (G_OBJECT (*udpsink), "close-socket", FALSE, NULL);
1182
1183   g_object_set (G_OBJECT (*udpsink), "send-duplicates", FALSE, NULL);
1184
1185   if (is_rtp)
1186     g_object_set (G_OBJECT (*udpsink), "buffer-size", priv->buffer_size, NULL);
1187   else
1188     g_object_set (G_OBJECT (*udpsink), "sync", FALSE, NULL);
1189
1190   /* Needs to be async for RECORD streams, otherwise we will never go to
1191    * PLAYING because the sinks will wait for data while the udpsrc can't
1192    * provide data with timestamps in PAUSED. */
1193   if (!is_rtp || priv->sinkpad)
1194     g_object_set (G_OBJECT (*udpsink), "async", FALSE, NULL);
1195
1196   if (multicast) {
1197     /* join multicast group when adding clients, so we'll start receiving from it.
1198      * We cannot rely on the udpsrc to join the group since its socket is always a
1199      * local unicast one. */
1200     g_object_set (G_OBJECT (*udpsink), "auto-multicast", TRUE, NULL);
1201
1202     g_object_set (G_OBJECT (*udpsink), "loop", FALSE, NULL);
1203   }
1204
1205   /* update the dscp qos field in the sinks */
1206   update_dscp_qos (stream, udpsink);
1207
1208   if (priv->server_addr_v4) {
1209     GST_DEBUG_OBJECT (stream,
1210         "udp IPv4, configure udpsinks");
1211     set_unicast_socket_for_udpsink (*udpsink, socket_v4,
1212         G_SOCKET_FAMILY_IPV4);
1213   }
1214
1215   if (priv->server_addr_v6) {
1216     GST_DEBUG_OBJECT (stream,
1217         "udp IPv6, configure udpsinks");
1218     set_unicast_socket_for_udpsink (*udpsink, socket_v6,
1219         G_SOCKET_FAMILY_IPV6);
1220   }
1221
1222   if (multicast) {
1223     gint port;
1224     if (priv->mcast_addr_v4) {
1225       GST_DEBUG_OBJECT (stream, "mcast IPv4, configure udpsinks");
1226       port = get_port_from_socket (socket_v4);
1227       if (!port)
1228         goto get_port_failed;
1229       set_multicast_socket_for_udpsink (*udpsink, socket_v4,
1230           G_SOCKET_FAMILY_IPV4, priv->multicast_iface, priv->mcast_addr_v4->address, port);
1231     }
1232
1233     if (priv->mcast_addr_v6) {
1234       GST_DEBUG_OBJECT (stream, "mcast IPv6, configure udpsinks");
1235       port = get_port_from_socket (socket_v6);
1236       if (!port)
1237         goto get_port_failed;
1238       set_multicast_socket_for_udpsink (*udpsink, socket_v6,
1239           G_SOCKET_FAMILY_IPV6, priv->multicast_iface, priv->mcast_addr_v6->address, port);
1240     }
1241
1242   }
1243
1244   return TRUE;
1245
1246   /* ERRORS */
1247 no_udp_protocol:
1248   {
1249     GST_ERROR_OBJECT (stream, "failed to create udpsink element");
1250     return FALSE;
1251   }
1252 get_port_failed:
1253   {
1254     GST_ERROR_OBJECT (stream, "failed to get udp port");
1255     return FALSE;
1256   }
1257 }
1258
1259 /* must be called with lock */
1260 static gboolean
1261 create_and_configure_udpsource (GstElement ** udpsrc,
1262     GSocket * socket)
1263 {
1264   GstStateChangeReturn ret;
1265
1266   g_assert (socket != NULL);
1267
1268   *udpsrc = gst_element_factory_make ("udpsrc", NULL);
1269   if (*udpsrc == NULL)
1270     goto error;
1271
1272   g_object_set (G_OBJECT (*udpsrc), "socket", socket, NULL);
1273
1274   /* The udpsrc cannot do the join because its socket is always a local unicast
1275    * one. The udpsink sharing the same socket will do it for us. */
1276   g_object_set (G_OBJECT (*udpsrc), "auto-multicast", FALSE, NULL);
1277
1278   g_object_set (G_OBJECT (*udpsrc), "loop", FALSE, NULL);
1279
1280   g_object_set (G_OBJECT (*udpsrc), "close-socket", FALSE, NULL);
1281
1282   ret = gst_element_set_state (*udpsrc, GST_STATE_READY);
1283   if (ret == GST_STATE_CHANGE_FAILURE)
1284     goto error;
1285
1286   return TRUE;
1287
1288   /* ERRORS */
1289 error:
1290   {
1291     if (*udpsrc) {
1292       gst_element_set_state (*udpsrc, GST_STATE_NULL);
1293       g_clear_object (udpsrc);
1294     }
1295     return FALSE;
1296   }
1297 }
1298
1299 static gboolean
1300 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1301     GSocket *socket_out[2], GstRTSPAddress ** server_addr_out,
1302     gboolean multicast, GstRTSPTransport * ct)
1303 {
1304   GstRTSPStreamPrivate *priv = stream->priv;
1305   GSocket *rtp_socket = NULL;
1306   GSocket *rtcp_socket;
1307   gint tmp_rtp, tmp_rtcp;
1308   guint count;
1309   GList *rejected_addresses = NULL;
1310   GstRTSPAddress *addr = NULL;
1311   GInetAddress *inetaddr = NULL;
1312   GSocketAddress *rtp_sockaddr = NULL;
1313   GSocketAddress *rtcp_sockaddr = NULL;
1314   GstRTSPAddressPool *pool;
1315
1316   pool = priv->pool;
1317   count = 0;
1318
1319   /* Start with random port */
1320   tmp_rtp = 0;
1321
1322   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1323       G_SOCKET_PROTOCOL_UDP, NULL);
1324   if (!rtcp_socket)
1325     goto no_udp_protocol;
1326   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1327
1328   /* try to allocate 2 UDP ports, the RTP port should be an even
1329    * number and the RTCP port should be the next (uneven) port */
1330 again:
1331
1332   if (rtp_socket == NULL) {
1333     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1334         G_SOCKET_PROTOCOL_UDP, NULL);
1335     if (!rtp_socket)
1336       goto no_udp_protocol;
1337     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1338   }
1339
1340   if ((pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) || multicast) {
1341     GstRTSPAddressFlags flags;
1342
1343     if (addr)
1344       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1345
1346     if (!pool)
1347       goto no_pool;
1348
1349     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1350     if (multicast)
1351       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1352     else
1353       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1354
1355     if (family == G_SOCKET_FAMILY_IPV6)
1356       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1357     else
1358       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1359
1360     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1361
1362     if (addr == NULL)
1363       goto no_address;
1364
1365     tmp_rtp = addr->port;
1366
1367     g_clear_object (&inetaddr);
1368     /* FIXME: Does it really work with the IP_MULTICAST_ALL socket option and
1369      * socket control message set in udpsrc? */
1370     if (multicast)
1371       inetaddr = g_inet_address_new_any (family);
1372     else
1373       inetaddr = g_inet_address_new_from_string (addr->address);
1374   } else {
1375     if (tmp_rtp != 0) {
1376       tmp_rtp += 2;
1377       if (++count > 20)
1378         goto no_ports;
1379     }
1380
1381     if (inetaddr == NULL)
1382       inetaddr = g_inet_address_new_any (family);
1383   }
1384
1385   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1386   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1387     GST_DEBUG_OBJECT (stream, "rtp bind() failed, will try again");
1388     g_object_unref (rtp_sockaddr);
1389     goto again;
1390   }
1391   g_object_unref (rtp_sockaddr);
1392
1393   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1394   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1395     g_clear_object (&rtp_sockaddr);
1396     goto socket_error;
1397   }
1398
1399   tmp_rtp =
1400       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1401   g_object_unref (rtp_sockaddr);
1402
1403   /* check if port is even */
1404   if ((tmp_rtp & 1) != 0) {
1405     /* port not even, close and allocate another */
1406     tmp_rtp++;
1407     g_clear_object (&rtp_socket);
1408     goto again;
1409   }
1410
1411   /* set port */
1412   tmp_rtcp = tmp_rtp + 1;
1413
1414   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1415   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1416     GST_DEBUG_OBJECT (stream, "rctp bind() failed, will try again");
1417     g_object_unref (rtcp_sockaddr);
1418     g_clear_object (&rtp_socket);
1419     goto again;
1420   }
1421   g_object_unref (rtcp_sockaddr);
1422
1423   if (!addr) {
1424     addr = g_slice_new0 (GstRTSPAddress);
1425     addr->address = g_inet_address_to_string (inetaddr);
1426     addr->port = tmp_rtp;
1427     addr->n_ports = 2;
1428   }
1429
1430   g_clear_object (&inetaddr);
1431
1432   socket_out[0] = rtp_socket;
1433   socket_out[1] = rtcp_socket;
1434   *server_addr_out = addr;
1435
1436   GST_DEBUG_OBJECT (stream, "allocated address: %s and ports: %d, %d", addr->address, tmp_rtp, tmp_rtcp);
1437
1438   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1439
1440   return TRUE;
1441
1442   /* ERRORS */
1443 no_udp_protocol:
1444   {
1445     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: protocol error");
1446     goto cleanup;
1447   }
1448 no_pool:
1449   {
1450     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no address pool specified");
1451     goto cleanup;
1452   }
1453 no_address:
1454   {
1455     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
1456     goto cleanup;
1457   }
1458 no_ports:
1459   {
1460     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: no ports");
1461     goto cleanup;
1462   }
1463 socket_error:
1464   {
1465     GST_ERROR_OBJECT (stream, "failed to allocate UDP ports: socket error");
1466     goto cleanup;
1467   }
1468 cleanup:
1469   {
1470     if (inetaddr)
1471       g_object_unref (inetaddr);
1472     g_list_free_full (rejected_addresses,
1473         (GDestroyNotify) gst_rtsp_address_free);
1474     if (addr)
1475       gst_rtsp_address_free (addr);
1476     if (rtp_socket)
1477       g_object_unref (rtp_socket);
1478     if (rtcp_socket)
1479       g_object_unref (rtcp_socket);
1480     return FALSE;
1481   }
1482 }
1483
1484 /**
1485  * gst_rtsp_stream_allocate_udp_sockets:
1486  * @stream: a #GstRTSPStream
1487  * @family: protocol family
1488  * @transport: transport method
1489  * @use_client_setttings: Whether to use client settings or not
1490  *
1491  * Allocates RTP and RTCP ports.
1492  *
1493  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1494  */
1495 gboolean
1496 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1497     GSocketFamily family, GstRTSPTransport * ct,
1498     gboolean use_transport_settings)
1499 {
1500   GstRTSPStreamPrivate *priv;
1501   gboolean ret = FALSE;
1502   GstRTSPLowerTrans transport;
1503   gboolean allocated = FALSE;
1504
1505   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1506   g_return_val_if_fail (ct != NULL, FALSE);
1507   priv = stream->priv;
1508
1509   transport = ct->lower_transport;
1510
1511   g_mutex_lock (&priv->lock);
1512
1513   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1514     if (family == G_SOCKET_FAMILY_IPV4 && priv->mcast_addr_v4)
1515       allocated = TRUE;
1516     else if (family == G_SOCKET_FAMILY_IPV6 && priv->mcast_addr_v6)
1517       allocated = TRUE;
1518   } else if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1519     if (family == G_SOCKET_FAMILY_IPV4 && priv->server_addr_v4)
1520       allocated = TRUE;
1521     else if (family == G_SOCKET_FAMILY_IPV6 && priv->server_addr_v6)
1522       allocated = TRUE;
1523   }
1524
1525   if (allocated) {
1526     g_mutex_unlock (&priv->lock);
1527     return TRUE;
1528   }
1529
1530   if (family == G_SOCKET_FAMILY_IPV4) {
1531     /* IPv4 */
1532     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1533       /* UDP unicast */
1534       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv4");
1535       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1536           priv->socket_v4, &priv->server_addr_v4, FALSE, ct);
1537     } else {
1538       /* multicast */
1539       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv4");
1540       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1541           priv->mcast_socket_v4, &priv->mcast_addr_v4, TRUE, ct);
1542     }
1543   } else {
1544     /* IPv6 */
1545     if (transport == GST_RTSP_LOWER_TRANS_UDP) {
1546       /* unicast */
1547       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_UDP, ipv6");
1548       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1549           priv->socket_v6, &priv->server_addr_v6, FALSE, ct);
1550
1551     } else {
1552       /* multicast */
1553       GST_DEBUG_OBJECT (stream, "GST_RTSP_LOWER_TRANS_MCAST_UDP, ipv6");
1554       ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1555           priv->mcast_socket_v6, &priv->mcast_addr_v6, TRUE, ct);
1556     }
1557   }
1558   g_mutex_unlock (&priv->lock);
1559
1560   return ret;
1561 }
1562
1563 /**
1564  * gst_rtsp_stream_set_client_side:
1565  * @stream: a #GstRTSPStream
1566  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1567  * an RTSP connection.
1568  *
1569  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1570  * streams to an RTSP server via RECORD. This has the practical effect
1571  * of changing which UDP port numbers are used when setting up the local
1572  * side of the stream sending to be either the 'server' or 'client' pair
1573  * of a configured UDP transport.
1574  */
1575 void
1576 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1577 {
1578   GstRTSPStreamPrivate *priv;
1579
1580   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1581   priv = stream->priv;
1582   g_mutex_lock (&priv->lock);
1583   priv->client_side = client_side;
1584   g_mutex_unlock (&priv->lock);
1585 }
1586
1587 /**
1588  * gst_rtsp_stream_is_client_side:
1589  * @stream: a #GstRTSPStream
1590  *
1591  * See gst_rtsp_stream_set_client_side()
1592  *
1593  * Returns: TRUE if this #GstRTSPStream is client-side.
1594  */
1595 gboolean
1596 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1597 {
1598   GstRTSPStreamPrivate *priv;
1599   gboolean ret;
1600
1601   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1602
1603   priv = stream->priv;
1604   g_mutex_lock (&priv->lock);
1605   ret = priv->client_side;
1606   g_mutex_unlock (&priv->lock);
1607
1608   return ret;
1609 }
1610
1611 /**
1612  * gst_rtsp_stream_get_server_port:
1613  * @stream: a #GstRTSPStream
1614  * @server_port: (out): result server port
1615  * @family: the port family to get
1616  *
1617  * Fill @server_port with the port pair used by the server. This function can
1618  * only be called when @stream has been joined.
1619  */
1620 void
1621 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1622     GstRTSPRange * server_port, GSocketFamily family)
1623 {
1624   GstRTSPStreamPrivate *priv;
1625
1626   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1627   priv = stream->priv;
1628   g_return_if_fail (priv->joined_bin != NULL);
1629
1630   if (server_port) {
1631     server_port->min = 0;
1632     server_port->max = 0;
1633   }
1634
1635   g_mutex_lock (&priv->lock);
1636   if (family == G_SOCKET_FAMILY_IPV4) {
1637     if (server_port && priv->server_addr_v4) {
1638       server_port->min = priv->server_addr_v4->port;
1639       server_port->max =
1640           priv->server_addr_v4->port + priv->server_addr_v4->n_ports - 1;
1641     }
1642   } else {
1643     if (server_port && priv->server_addr_v6) {
1644       server_port->min = priv->server_addr_v6->port;
1645       server_port->max =
1646           priv->server_addr_v6->port + priv->server_addr_v6->n_ports - 1;
1647     }
1648   }
1649   g_mutex_unlock (&priv->lock);
1650 }
1651
1652 /**
1653  * gst_rtsp_stream_get_rtpsession:
1654  * @stream: a #GstRTSPStream
1655  *
1656  * Get the RTP session of this stream.
1657  *
1658  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1659  */
1660 GObject *
1661 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1662 {
1663   GstRTSPStreamPrivate *priv;
1664   GObject *session;
1665
1666   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1667
1668   priv = stream->priv;
1669
1670   g_mutex_lock (&priv->lock);
1671   if ((session = priv->session))
1672     g_object_ref (session);
1673   g_mutex_unlock (&priv->lock);
1674
1675   return session;
1676 }
1677
1678 /**
1679  * gst_rtsp_stream_get_srtp_encoder:
1680  * @stream: a #GstRTSPStream
1681  *
1682  * Get the SRTP encoder for this stream.
1683  *
1684  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1685  */
1686 GstElement *
1687 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1688 {
1689   GstRTSPStreamPrivate *priv;
1690   GstElement *encoder;
1691
1692   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1693
1694   priv = stream->priv;
1695
1696   g_mutex_lock (&priv->lock);
1697   if ((encoder = priv->srtpenc))
1698     g_object_ref (encoder);
1699   g_mutex_unlock (&priv->lock);
1700
1701   return encoder;
1702 }
1703
1704 /**
1705  * gst_rtsp_stream_get_ssrc:
1706  * @stream: a #GstRTSPStream
1707  * @ssrc: (out): result ssrc
1708  *
1709  * Get the SSRC used by the RTP session of this stream. This function can only
1710  * be called when @stream has been joined.
1711  */
1712 void
1713 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1714 {
1715   GstRTSPStreamPrivate *priv;
1716
1717   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1718   priv = stream->priv;
1719   g_return_if_fail (priv->joined_bin != NULL);
1720
1721   g_mutex_lock (&priv->lock);
1722   if (ssrc && priv->session)
1723     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1724   g_mutex_unlock (&priv->lock);
1725 }
1726
1727 /**
1728  * gst_rtsp_stream_set_retransmission_time:
1729  * @stream: a #GstRTSPStream
1730  * @time: a #GstClockTime
1731  *
1732  * Set the amount of time to store retransmission packets.
1733  */
1734 void
1735 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1736     GstClockTime time)
1737 {
1738   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1739
1740   g_mutex_lock (&stream->priv->lock);
1741   stream->priv->rtx_time = time;
1742   if (stream->priv->rtxsend)
1743     g_object_set (stream->priv->rtxsend, "max-size-time",
1744         GST_TIME_AS_MSECONDS (time), NULL);
1745   g_mutex_unlock (&stream->priv->lock);
1746 }
1747
1748 /**
1749  * gst_rtsp_stream_get_retransmission_time:
1750  * @stream: a #GstRTSPStream
1751  *
1752  * Get the amount of time to store retransmission data.
1753  *
1754  * Returns: the amount of time to store retransmission data.
1755  */
1756 GstClockTime
1757 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1758 {
1759   GstClockTime ret;
1760
1761   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1762
1763   g_mutex_lock (&stream->priv->lock);
1764   ret = stream->priv->rtx_time;
1765   g_mutex_unlock (&stream->priv->lock);
1766
1767   return ret;
1768 }
1769
1770 /**
1771  * gst_rtsp_stream_set_retransmission_pt:
1772  * @stream: a #GstRTSPStream
1773  * @rtx_pt: a #guint
1774  *
1775  * Set the payload type (pt) for retransmission of this stream.
1776  */
1777 void
1778 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1779 {
1780   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1781
1782   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1783
1784   g_mutex_lock (&stream->priv->lock);
1785   stream->priv->rtx_pt = rtx_pt;
1786   if (stream->priv->rtxsend) {
1787     guint pt = gst_rtsp_stream_get_pt (stream);
1788     gchar *pt_s = g_strdup_printf ("%d", pt);
1789     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1790         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1791     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1792     g_free (pt_s);
1793     gst_structure_free (rtx_pt_map);
1794   }
1795   g_mutex_unlock (&stream->priv->lock);
1796 }
1797
1798 /**
1799  * gst_rtsp_stream_get_retransmission_pt:
1800  * @stream: a #GstRTSPStream
1801  *
1802  * Get the payload-type used for retransmission of this stream
1803  *
1804  * Returns: The retransmission PT.
1805  */
1806 guint
1807 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1808 {
1809   guint rtx_pt;
1810
1811   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1812
1813   g_mutex_lock (&stream->priv->lock);
1814   rtx_pt = stream->priv->rtx_pt;
1815   g_mutex_unlock (&stream->priv->lock);
1816
1817   return rtx_pt;
1818 }
1819
1820 /**
1821  * gst_rtsp_stream_set_buffer_size:
1822  * @stream: a #GstRTSPStream
1823  * @size: the buffer size
1824  *
1825  * Set the size of the UDP transmission buffer (in bytes)
1826  * Needs to be set before the stream is joined to a bin.
1827  *
1828  * Since: 1.6
1829  */
1830 void
1831 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1832 {
1833   g_mutex_lock (&stream->priv->lock);
1834   stream->priv->buffer_size = size;
1835   g_mutex_unlock (&stream->priv->lock);
1836 }
1837
1838 /**
1839  * gst_rtsp_stream_get_buffer_size:
1840  * @stream: a #GstRTSPStream
1841  *
1842  * Get the size of the UDP transmission buffer (in bytes)
1843  *
1844  * Returns: the size of the UDP TX buffer
1845  *
1846  * Since: 1.6
1847  */
1848 guint
1849 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1850 {
1851   guint buffer_size;
1852
1853   g_mutex_lock (&stream->priv->lock);
1854   buffer_size = stream->priv->buffer_size;
1855   g_mutex_unlock (&stream->priv->lock);
1856
1857   return buffer_size;
1858 }
1859
1860 /* executed from streaming thread */
1861 static void
1862 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1863 {
1864   GstRTSPStreamPrivate *priv = stream->priv;
1865   GstCaps *newcaps, *oldcaps;
1866
1867   newcaps = gst_pad_get_current_caps (pad);
1868
1869   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1870       newcaps);
1871
1872   g_mutex_lock (&priv->lock);
1873   oldcaps = priv->caps;
1874   priv->caps = newcaps;
1875   g_mutex_unlock (&priv->lock);
1876
1877   if (oldcaps)
1878     gst_caps_unref (oldcaps);
1879 }
1880
1881 static void
1882 dump_structure (const GstStructure * s)
1883 {
1884   gchar *sstr;
1885
1886   sstr = gst_structure_to_string (s);
1887   GST_INFO ("structure: %s", sstr);
1888   g_free (sstr);
1889 }
1890
1891 static GstRTSPStreamTransport *
1892 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1893 {
1894   GstRTSPStreamPrivate *priv = stream->priv;
1895   GList *walk;
1896   GstRTSPStreamTransport *result = NULL;
1897   const gchar *tmp;
1898   gchar *dest;
1899   guint port;
1900
1901   if (rtcp_from == NULL)
1902     return NULL;
1903
1904   tmp = g_strrstr (rtcp_from, ":");
1905   if (tmp == NULL)
1906     return NULL;
1907
1908   port = atoi (tmp + 1);
1909   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1910
1911   g_mutex_lock (&priv->lock);
1912   GST_INFO ("finding %s:%d in %d transports", dest, port,
1913       g_list_length (priv->transports));
1914
1915   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1916     GstRTSPStreamTransport *trans = walk->data;
1917     const GstRTSPTransport *tr;
1918     gint min, max;
1919
1920     tr = gst_rtsp_stream_transport_get_transport (trans);
1921
1922     if (priv->client_side) {
1923       /* In client side mode the 'destination' is the RTSP server, so send
1924        * to those ports */
1925       min = tr->server_port.min;
1926       max = tr->server_port.max;
1927     } else {
1928       min = tr->client_port.min;
1929       max = tr->client_port.max;
1930     }
1931
1932     if ((g_ascii_strcasecmp (tr->destination, dest) == 0) &&
1933         (min == port || max == port)) {
1934       result = trans;
1935       break;
1936     }
1937   }
1938   if (result)
1939     g_object_ref (result);
1940   g_mutex_unlock (&priv->lock);
1941
1942   g_free (dest);
1943
1944   return result;
1945 }
1946
1947 static GstRTSPStreamTransport *
1948 check_transport (GObject * source, GstRTSPStream * stream)
1949 {
1950   GstStructure *stats;
1951   GstRTSPStreamTransport *trans;
1952
1953   /* see if we have a stream to match with the origin of the RTCP packet */
1954   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1955   if (trans == NULL) {
1956     g_object_get (source, "stats", &stats, NULL);
1957     if (stats) {
1958       const gchar *rtcp_from;
1959
1960       dump_structure (stats);
1961
1962       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1963       if ((trans = find_transport (stream, rtcp_from))) {
1964         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1965             source);
1966         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1967             g_object_unref);
1968       }
1969       gst_structure_free (stats);
1970     }
1971   }
1972   return trans;
1973 }
1974
1975
1976 static void
1977 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1978 {
1979   GstRTSPStreamTransport *trans;
1980
1981   GST_INFO ("%p: new source %p", stream, source);
1982
1983   trans = check_transport (source, stream);
1984
1985   if (trans)
1986     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1987 }
1988
1989 static void
1990 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1991 {
1992   GST_INFO ("%p: new SDES %p", stream, source);
1993 }
1994
1995 static void
1996 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1997 {
1998   GstRTSPStreamTransport *trans;
1999
2000   trans = check_transport (source, stream);
2001
2002   if (trans) {
2003     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
2004     gst_rtsp_stream_transport_keep_alive (trans);
2005   }
2006 #ifdef DUMP_STATS
2007   {
2008     GstStructure *stats;
2009     g_object_get (source, "stats", &stats, NULL);
2010     if (stats) {
2011       dump_structure (stats);
2012       gst_structure_free (stats);
2013     }
2014   }
2015 #endif
2016 }
2017
2018 static void
2019 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2020 {
2021   GST_INFO ("%p: source %p bye", stream, source);
2022 }
2023
2024 static void
2025 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2026 {
2027   GstRTSPStreamTransport *trans;
2028
2029   GST_INFO ("%p: source %p bye timeout", stream, source);
2030
2031   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2032     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2033     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2034   }
2035 }
2036
2037 static void
2038 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2039 {
2040   GstRTSPStreamTransport *trans;
2041
2042   GST_INFO ("%p: source %p timeout", stream, source);
2043
2044   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2045     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2046     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2047   }
2048 }
2049
2050 static void
2051 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2052 {
2053   GST_INFO ("%p: new sender source %p", stream, source);
2054 #ifndef DUMP_STATS
2055   {
2056     GstStructure *stats;
2057     g_object_get (source, "stats", &stats, NULL);
2058     if (stats) {
2059       dump_structure (stats);
2060       gst_structure_free (stats);
2061     }
2062   }
2063 #endif
2064 }
2065
2066 static void
2067 on_sender_ssrc_active (GObject * session, GObject * source,
2068     GstRTSPStream * stream)
2069 {
2070 #ifndef DUMP_STATS
2071   {
2072     GstStructure *stats;
2073     g_object_get (source, "stats", &stats, NULL);
2074     if (stats) {
2075       dump_structure (stats);
2076       gst_structure_free (stats);
2077     }
2078   }
2079 #endif
2080 }
2081
2082 static void
2083 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2084 {
2085   if (is_rtp) {
2086     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2087     g_list_free (priv->tr_cache_rtp);
2088     priv->tr_cache_rtp = NULL;
2089   } else {
2090     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2091     g_list_free (priv->tr_cache_rtcp);
2092     priv->tr_cache_rtcp = NULL;
2093   }
2094 }
2095
2096 static GstFlowReturn
2097 handle_new_sample (GstAppSink * sink, gpointer user_data)
2098 {
2099   GstRTSPStreamPrivate *priv;
2100   GList *walk;
2101   GstSample *sample;
2102   GstBuffer *buffer;
2103   GstRTSPStream *stream;
2104   gboolean is_rtp;
2105
2106   sample = gst_app_sink_pull_sample (sink);
2107   if (!sample)
2108     return GST_FLOW_OK;
2109
2110   stream = (GstRTSPStream *) user_data;
2111   priv = stream->priv;
2112   buffer = gst_sample_get_buffer (sample);
2113
2114   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2115
2116   g_mutex_lock (&priv->lock);
2117   if (is_rtp) {
2118     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2119       clear_tr_cache (priv, is_rtp);
2120       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2121         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2122         priv->tr_cache_rtp =
2123             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2124       }
2125       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2126     }
2127   } else {
2128     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2129       clear_tr_cache (priv, is_rtp);
2130       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2131         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2132         priv->tr_cache_rtcp =
2133             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2134       }
2135       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2136     }
2137   }
2138   g_mutex_unlock (&priv->lock);
2139
2140   if (is_rtp) {
2141     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2142       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2143       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2144     }
2145   } else {
2146     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2147       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2148       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2149     }
2150   }
2151   gst_sample_unref (sample);
2152
2153   return GST_FLOW_OK;
2154 }
2155
2156 static GstAppSinkCallbacks sink_cb = {
2157   NULL,                         /* not interested in EOS */
2158   NULL,                         /* not interested in preroll samples */
2159   handle_new_sample,
2160 };
2161
2162 static GstElement *
2163 get_rtp_encoder (GstRTSPStream * stream, guint session)
2164 {
2165   GstRTSPStreamPrivate *priv = stream->priv;
2166
2167   if (priv->srtpenc == NULL) {
2168     gchar *name;
2169
2170     name = g_strdup_printf ("srtpenc_%u", session);
2171     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2172     g_free (name);
2173
2174     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2175   }
2176   return gst_object_ref (priv->srtpenc);
2177 }
2178
2179 static GstElement *
2180 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2181 {
2182   GstRTSPStreamPrivate *priv = stream->priv;
2183   GstElement *oldenc, *enc;
2184   GstPad *pad;
2185   gchar *name;
2186
2187   if (priv->idx != session)
2188     return NULL;
2189
2190   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2191
2192   oldenc = priv->srtpenc;
2193   enc = get_rtp_encoder (stream, session);
2194   name = g_strdup_printf ("rtp_sink_%d", session);
2195   pad = gst_element_get_request_pad (enc, name);
2196   g_free (name);
2197   gst_object_unref (pad);
2198
2199   if (oldenc == NULL)
2200     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2201         enc);
2202
2203   return enc;
2204 }
2205
2206 static GstElement *
2207 request_rtcp_encoder (GstElement * rtpbin, guint session,
2208     GstRTSPStream * stream)
2209 {
2210   GstRTSPStreamPrivate *priv = stream->priv;
2211   GstElement *oldenc, *enc;
2212   GstPad *pad;
2213   gchar *name;
2214
2215   if (priv->idx != session)
2216     return NULL;
2217
2218   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2219
2220   oldenc = priv->srtpenc;
2221   enc = get_rtp_encoder (stream, session);
2222   name = g_strdup_printf ("rtcp_sink_%d", session);
2223   pad = gst_element_get_request_pad (enc, name);
2224   g_free (name);
2225   gst_object_unref (pad);
2226
2227   if (oldenc == NULL)
2228     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2229         enc);
2230
2231   return enc;
2232 }
2233
2234 static GstCaps *
2235 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2236 {
2237   GstRTSPStreamPrivate *priv = stream->priv;
2238   GstCaps *caps;
2239
2240   GST_DEBUG ("request key %08x", ssrc);
2241
2242   g_mutex_lock (&priv->lock);
2243   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2244     gst_caps_ref (caps);
2245   g_mutex_unlock (&priv->lock);
2246
2247   return caps;
2248 }
2249
2250 static GstElement *
2251 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2252     GstRTSPStream * stream)
2253 {
2254   GstRTSPStreamPrivate *priv = stream->priv;
2255
2256   if (priv->idx != session)
2257     return NULL;
2258
2259   if (priv->srtpdec == NULL) {
2260     gchar *name;
2261
2262     name = g_strdup_printf ("srtpdec_%u", session);
2263     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2264     g_free (name);
2265
2266     g_signal_connect (priv->srtpdec, "request-key",
2267         (GCallback) request_key, stream);
2268   }
2269   return gst_object_ref (priv->srtpdec);
2270 }
2271
2272 /**
2273  * gst_rtsp_stream_request_aux_sender:
2274  * @stream: a #GstRTSPStream
2275  * @sessid: the session id
2276  *
2277  * Creating a rtxsend bin
2278  *
2279  * Returns: (transfer full): a #GstElement.
2280  *
2281  * Since: 1.6
2282  */
2283 GstElement *
2284 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2285 {
2286   GstElement *bin;
2287   GstPad *pad;
2288   GstStructure *pt_map;
2289   gchar *name;
2290   guint pt, rtx_pt;
2291   gchar *pt_s;
2292
2293   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2294
2295   pt = gst_rtsp_stream_get_pt (stream);
2296   pt_s = g_strdup_printf ("%u", pt);
2297   rtx_pt = stream->priv->rtx_pt;
2298
2299   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2300
2301   bin = gst_bin_new (NULL);
2302   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2303   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2304       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2305   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2306       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2307   g_free (pt_s);
2308   gst_structure_free (pt_map);
2309   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2310
2311   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2312   name = g_strdup_printf ("src_%u", sessid);
2313   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2314   g_free (name);
2315   gst_object_unref (pad);
2316
2317   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2318   name = g_strdup_printf ("sink_%u", sessid);
2319   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2320   g_free (name);
2321   gst_object_unref (pad);
2322
2323   return bin;
2324 }
2325
2326 /**
2327  * gst_rtsp_stream_set_pt_map:
2328  * @stream: a #GstRTSPStream
2329  * @pt: the pt
2330  * @caps: a #GstCaps
2331  *
2332  * Configure a pt map between @pt and @caps.
2333  */
2334 void
2335 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2336 {
2337   GstRTSPStreamPrivate *priv = stream->priv;
2338
2339   g_mutex_lock (&priv->lock);
2340   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2341   g_mutex_unlock (&priv->lock);
2342 }
2343
2344 /**
2345  * gst_rtsp_stream_set_publish_clock_mode:
2346  * @stream: a #GstRTSPStream
2347  * @mode: the clock publish mode
2348  *
2349  * Sets if and how the stream clock should be published according to RFC7273.
2350  *
2351  * Since: 1.8
2352  */
2353 void
2354 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2355     GstRTSPPublishClockMode mode)
2356 {
2357   GstRTSPStreamPrivate *priv;
2358
2359   priv = stream->priv;
2360   g_mutex_lock (&priv->lock);
2361   priv->publish_clock_mode = mode;
2362   g_mutex_unlock (&priv->lock);
2363 }
2364
2365 /**
2366  * gst_rtsp_stream_get_publish_clock_mode:
2367  * @stream: a #GstRTSPStream
2368  *
2369  * Gets if and how the stream clock should be published according to RFC7273.
2370  *
2371  * Returns: The GstRTSPPublishClockMode
2372  *
2373  * Since: 1.8
2374  */
2375 GstRTSPPublishClockMode
2376 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2377 {
2378   GstRTSPStreamPrivate *priv;
2379   GstRTSPPublishClockMode ret;
2380
2381   priv = stream->priv;
2382   g_mutex_lock (&priv->lock);
2383   ret = priv->publish_clock_mode;
2384   g_mutex_unlock (&priv->lock);
2385
2386   return ret;
2387 }
2388
2389 static GstCaps *
2390 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2391     GstRTSPStream * stream)
2392 {
2393   GstRTSPStreamPrivate *priv = stream->priv;
2394   GstCaps *caps = NULL;
2395
2396   g_mutex_lock (&priv->lock);
2397
2398   if (priv->idx == session) {
2399     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2400     if (caps) {
2401       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2402       gst_caps_ref (caps);
2403     } else {
2404       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2405     }
2406   }
2407
2408   g_mutex_unlock (&priv->lock);
2409
2410   return caps;
2411 }
2412
2413 static void
2414 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2415 {
2416   GstRTSPStreamPrivate *priv = stream->priv;
2417   gchar *name;
2418   GstPadLinkReturn ret;
2419   guint sessid;
2420
2421   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2422       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2423
2424   name = gst_pad_get_name (pad);
2425   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2426     g_free (name);
2427     return;
2428   }
2429   g_free (name);
2430
2431   if (priv->idx != sessid)
2432     return;
2433
2434   if (gst_pad_is_linked (priv->sinkpad)) {
2435     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2436         GST_DEBUG_PAD_NAME (priv->sinkpad));
2437     return;
2438   }
2439
2440   /* link the RTP pad to the session manager, it should not really fail unless
2441    * this is not really an RTP pad */
2442   ret = gst_pad_link (pad, priv->sinkpad);
2443   if (ret != GST_PAD_LINK_OK)
2444     goto link_failed;
2445   priv->recv_rtp_src = gst_object_ref (pad);
2446
2447   return;
2448
2449 /* ERRORS */
2450 link_failed:
2451   {
2452     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2453         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2454   }
2455 }
2456
2457 static void
2458 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2459     GstRTSPStream * stream)
2460 {
2461   /* TODO: What to do here other than this? */
2462   GST_DEBUG ("Stream %p: Got EOS", stream);
2463   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2464 }
2465
2466 typedef struct _ProbeData ProbeData;
2467
2468 struct _ProbeData
2469 {
2470   GstRTSPStream *stream;
2471   /* existing sink, already linked to tee */
2472   GstElement *sink1;
2473   /* new sink, about to be linked */
2474   GstElement *sink2;
2475   /* new queue element, that will be linked to tee and sink1 */
2476   GstElement **queue1;
2477   /* new queue element, that will be linked to tee and sink2 */
2478   GstElement **queue2;
2479   GstPad *sink_pad;
2480   GstPad *tee_pad;
2481   guint index;
2482 };
2483
2484 static void
2485 free_cb_data (gpointer user_data)
2486 {
2487   ProbeData *data = user_data;
2488
2489   gst_object_unref (data->stream);
2490   gst_object_unref (data->sink1);
2491   gst_object_unref (data->sink2);
2492   gst_object_unref (data->sink_pad);
2493   gst_object_unref (data->tee_pad);
2494   g_free (data);
2495 }
2496
2497
2498 static void
2499 create_and_plug_queue_to_unlinked_stream (GstRTSPStream * stream, GstElement *tee,
2500     GstElement *sink, GstElement ** queue)
2501 {
2502   GstRTSPStreamPrivate *priv = stream->priv;
2503   GstPad *tee_pad;
2504   GstPad *queue_pad;
2505   GstPad *sink_pad;
2506
2507   /* create queue for the new stream */
2508   *queue = gst_element_factory_make ("queue", NULL);
2509   g_object_set (*queue, "max-size-buffers", 1, "max-size-bytes", 0,
2510       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2511   gst_bin_add (priv->joined_bin, *queue);
2512
2513   /* link tee to queue */
2514   tee_pad = gst_element_get_request_pad (tee, "src_%u");
2515   queue_pad = gst_element_get_static_pad (*queue, "sink");
2516   gst_pad_link (tee_pad, queue_pad);
2517   gst_object_unref (queue_pad);
2518   gst_object_unref (tee_pad);
2519
2520   /* link queue to sink */
2521   queue_pad = gst_element_get_static_pad (*queue, "src");
2522   sink_pad = gst_element_get_static_pad (sink, "sink");
2523   gst_pad_link (queue_pad, sink_pad);
2524   gst_object_unref (queue_pad);
2525   gst_object_unref (sink_pad);
2526
2527   gst_element_sync_state_with_parent (sink);
2528   gst_element_sync_state_with_parent (*queue);
2529 }
2530
2531 static GstPadProbeReturn
2532 create_and_plug_queue_to_linked_stream_probe_cb (GstPad * inpad,
2533     GstPadProbeInfo * info, gpointer user_data)
2534 {
2535   GstRTSPStreamPrivate *priv;
2536   ProbeData *data = user_data;
2537   GstRTSPStream *stream;
2538   GstElement **queue1;
2539   GstElement **queue2;
2540   GstPad *sink_pad;
2541   GstPad *tee_pad;
2542   GstPad *queue_pad;
2543   guint index;
2544
2545   stream = data->stream;
2546   priv = stream->priv;
2547   queue1 = data->queue1;
2548   queue2 = data->queue2;
2549   sink_pad = data->sink_pad;
2550   tee_pad = data->tee_pad;
2551   index = data->index;
2552
2553   /* unlink tee and the existing sink:
2554    *   .-----.    .---------.
2555    *   | tee |    |  sink1  |
2556    * sink   src->sink       |
2557    *   '-----'    '---------'
2558    */
2559   g_assert (gst_pad_unlink (tee_pad, sink_pad));
2560
2561   /* add queue to the already existing stream */
2562   *queue1 = gst_element_factory_make ("queue", NULL);
2563   g_object_set (*queue1, "max-size-buffers", 1, "max-size-bytes", 0,
2564       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2565   gst_bin_add (priv->joined_bin, *queue1);
2566
2567   /* link tee, queue and sink:
2568    *   .-----.    .---------.    .---------.
2569    *   | tee |    |  queue1 |    | sink1   |
2570    * sink   src->sink      src->sink       |
2571    *   '-----'    '---------'    '---------'
2572    */
2573   queue_pad = gst_element_get_static_pad (*queue1, "sink");
2574   gst_pad_link (tee_pad, queue_pad);
2575   gst_object_unref (queue_pad);
2576   queue_pad = gst_element_get_static_pad (*queue1, "src");
2577   gst_pad_link (queue_pad, sink_pad);
2578   gst_object_unref (queue_pad);
2579
2580   gst_element_sync_state_with_parent (*queue1);
2581
2582   /* create queue and link it to tee and the new sink */
2583   create_and_plug_queue_to_unlinked_stream (stream,
2584       priv->tee[index], data->sink2, queue2);
2585
2586   /* the final stream:
2587    *
2588    *    .-----.    .---------.    .---------.
2589    *    | tee |    |  queue1 |    | sink1   |
2590    *  sink   src->sink      src->sink       |
2591    *    |     |    '---------'    '---------'
2592    *    |     |    .---------.    .---------.
2593    *    |     |    |  queue2 |    | sink2   |
2594    *    |    src->sink      src->sink       |
2595    *    '-----'    '---------'    '---------'
2596    */
2597
2598   return GST_PAD_PROBE_REMOVE;
2599 }
2600
2601 static void
2602 create_and_plug_queue_to_linked_stream (GstRTSPStream * stream, GstElement * sink1,
2603     GstElement * sink2, guint index, GstElement ** queue1,
2604     GstElement ** queue2)
2605 {
2606   ProbeData *data;
2607
2608   data = g_new0 (ProbeData, 1);
2609   data->stream = gst_object_ref (stream);
2610   data->sink1 = gst_object_ref (sink1);
2611   data->sink2 = gst_object_ref (sink2);
2612   data->queue1 = queue1;
2613   data->queue2 = queue2;
2614   data->index = index;
2615
2616   data->sink_pad =  gst_element_get_static_pad (sink1, "sink");
2617   g_assert (data->sink_pad);
2618   data->tee_pad = gst_pad_get_peer (data->sink_pad);
2619   g_assert (data->tee_pad);
2620
2621   gst_pad_add_probe (data->tee_pad, GST_PAD_PROBE_TYPE_IDLE,
2622       create_and_plug_queue_to_linked_stream_probe_cb, data, free_cb_data);
2623 }
2624
2625 static void
2626 plug_udp_sink (GstRTSPStream * stream, GstElement * sink_to_plug,
2627     GstElement ** queue_to_plug, guint index, gboolean is_mcast)
2628 {
2629   GstRTSPStreamPrivate *priv = stream->priv;
2630   GstElement *existing_sink;
2631
2632   if (is_mcast)
2633     existing_sink = priv->udpsink[index];
2634   else
2635     existing_sink = priv->mcast_udpsink[index];
2636
2637   GST_DEBUG_OBJECT (stream, "plug %s sink", is_mcast ? "mcast" : "udp");
2638
2639   /* add sink to the bin */
2640   gst_bin_add (priv->joined_bin, sink_to_plug);
2641
2642   if (priv->appsink[index] && existing_sink) {
2643
2644     /* queues are already added for the existing stream, add one for
2645        the newly added udp stream */
2646     create_and_plug_queue_to_unlinked_stream (stream, priv->tee[index],
2647         sink_to_plug, queue_to_plug);
2648
2649   } else if (priv->appsink[index] || existing_sink) {
2650     GstElement **queue;
2651     GstElement *element;
2652
2653     /* add queue to the already existing stream plus the newly created udp
2654        stream */
2655     if (priv->appsink[index]) {
2656       element = priv->appsink[index];
2657       queue = &priv->appqueue[index];
2658     } else {
2659       element = existing_sink;
2660       if (is_mcast)
2661         queue = &priv->udpqueue[index];
2662       else
2663         queue = &priv->mcast_udpqueue[index];
2664     }
2665
2666     create_and_plug_queue_to_linked_stream (stream, element, sink_to_plug, index,
2667         queue, queue_to_plug);
2668
2669   } else {
2670     GstPad *tee_pad;
2671     GstPad *sink_pad;
2672
2673     GST_DEBUG_OBJECT (stream, "creating first stream");
2674
2675     /* no need to add queues */
2676     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2677     sink_pad = gst_element_get_static_pad (sink_to_plug, "sink");
2678     gst_pad_link (tee_pad, sink_pad);
2679     gst_object_unref (tee_pad);
2680     gst_object_unref (sink_pad);
2681   }
2682
2683   gst_element_sync_state_with_parent (sink_to_plug);
2684 }
2685
2686 static void
2687 plug_tcp_sink (GstRTSPStream * stream, guint index)
2688 {
2689   GstRTSPStreamPrivate *priv = stream->priv;
2690
2691   GST_DEBUG_OBJECT (stream, "plug tcp sink");
2692
2693   /* add sink to the bin */
2694   gst_bin_add (priv->joined_bin, priv->appsink[index]);
2695
2696   if (priv->mcast_udpsink[index] && priv->udpsink[index]) {
2697
2698     /* queues are already added for the existing stream, add one for
2699        the newly added tcp stream */
2700     create_and_plug_queue_to_unlinked_stream (stream,
2701       priv->tee[index], priv->appsink[index], &priv->appqueue[index]);
2702
2703   } else if (priv->mcast_udpsink[index] || priv->udpsink[index]) {
2704     GstElement **queue;
2705     GstElement *element;
2706
2707     /* add queue to the already existing stream plus the newly created tcp
2708        stream */
2709     if (priv->mcast_udpsink[index]) {
2710       element = priv->mcast_udpsink[index];
2711       queue = &priv->mcast_udpqueue[index];
2712     } else {
2713       element = priv->udpsink[index];
2714       queue = &priv->udpqueue[index];
2715     }
2716
2717     create_and_plug_queue_to_linked_stream (stream, element, priv->appsink[index], index,
2718         queue, &priv->appqueue[index]);
2719
2720   } else {
2721     GstPad *tee_pad;
2722     GstPad *sink_pad;
2723
2724     /* no need to add queues */
2725     tee_pad = gst_element_get_request_pad (priv->tee[index], "src_%u");
2726     sink_pad = gst_element_get_static_pad (priv->appsink[index], "sink");
2727     gst_pad_link (tee_pad, sink_pad);
2728     gst_object_unref (tee_pad);
2729     gst_object_unref (sink_pad);
2730   }
2731
2732   gst_element_sync_state_with_parent (priv->appsink[index]);
2733 }
2734
2735 static void
2736 plug_sink (GstRTSPStream * stream, const GstRTSPTransport * transport,
2737     guint index)
2738 {
2739   GstRTSPStreamPrivate *priv;
2740   gboolean is_tcp, is_udp, is_mcast;
2741   priv = stream->priv;
2742
2743   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2744   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2745   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2746
2747   if (is_udp)
2748     plug_udp_sink (stream, priv->udpsink[index],
2749         &priv->udpqueue[index], index, FALSE);
2750
2751   else if (is_mcast)
2752     plug_udp_sink (stream, priv->mcast_udpsink[index],
2753         &priv->mcast_udpqueue[index], index, TRUE);
2754
2755   else if (is_tcp)
2756     plug_tcp_sink (stream, index);
2757 }
2758
2759 /* must be called with lock */
2760 static gboolean
2761 create_sender_part (GstRTSPStream * stream, const GstRTSPTransport * transport)
2762 {
2763   GstRTSPStreamPrivate *priv;
2764   GstPad *pad;
2765   GstBin *bin;
2766   gboolean is_tcp, is_udp, is_mcast;
2767   gint i;
2768
2769   GST_DEBUG_OBJECT (stream, "create sender part");
2770   priv = stream->priv;
2771   bin = priv->joined_bin;
2772
2773   is_tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2774   is_udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2775   is_mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2776
2777   GST_DEBUG_OBJECT (stream, "tcp: %d, udp: %d, mcast: %d", is_tcp, is_udp,
2778       is_mcast);
2779
2780   if (is_udp && !priv->server_addr_v4 && !priv->server_addr_v6) {
2781     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP");
2782     return FALSE;
2783   }
2784
2785   if (is_mcast && !priv->mcast_addr_v4 && !priv->mcast_addr_v6) {
2786     GST_WARNING_OBJECT (stream, "no sockets assigned for UDP multicast");
2787     return FALSE;
2788   }
2789
2790   for (i = 0; i < 2; i++) {
2791     gboolean link_tee = FALSE;
2792     /* For the sender we create this bit of pipeline for both
2793      * RTP and RTCP.
2794      * Initially there will be only one active transport for
2795      * the stream, so the pipeline will look like this:
2796      *
2797      * .--------.      .-----.    .---------.
2798      * | rtpbin |      | tee |    |  sink   |
2799      * |       send->sink   src->sink       |
2800      * '--------'      '-----'    '---------'
2801      *
2802      * For each new transport, the already existing branch will
2803      * be reconfigured by adding a queue element:
2804      *
2805      * .--------.      .-----.    .---------.    .---------.
2806      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2807      * |       send->sink   src->sink      src->sink       |
2808      * '--------'      |     |    '---------'    '---------'
2809      *                 |     |    .---------.    .---------.
2810      *                 |     |    |  queue  |    | udpsink |
2811      *                 |    src->sink      src->sink       |
2812      *                 |     |    '---------'    '---------'
2813      *                 |     |    .---------.    .---------.
2814      *                 |     |    |  queue  |    | appsink |
2815      *                 |    src->sink      src->sink       |
2816      *                 '-----'    '---------'    '---------'
2817      */
2818
2819     /* Only link the RTP send src if we're going to send RTP, link
2820      * the RTCP send src always */
2821     if (!priv->srcpad && i == 0)
2822       continue;
2823
2824     if (!priv->tee[i]) {
2825       /* make tee for RTP/RTCP */
2826       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2827       gst_bin_add (bin, priv->tee[i]);
2828       link_tee = TRUE;
2829     }
2830
2831     if (is_udp && !priv->udpsink[i]) {
2832       /* we create only one pair of udpsinks for IPv4 and IPv6 */
2833       create_and_configure_udpsink (stream, &priv->udpsink[i], priv->socket_v4[i],
2834           priv->socket_v6[i], FALSE, (i == 0));
2835       plug_sink (stream, transport, i);
2836     } else if (is_mcast && !priv->mcast_udpsink[i]) {
2837       /* we create only one pair of mcast-udpsinks for IPv4 and IPv6 */
2838       create_and_configure_udpsink (stream, &priv->mcast_udpsink[i],
2839           priv->mcast_socket_v4[i], priv->mcast_socket_v6[i], TRUE, (i == 0));
2840       plug_sink (stream, transport, i);
2841     } else if (is_tcp && !priv->appsink[i]) {
2842       /* make appsink */
2843       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2844       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2845
2846       /* we need to set sync and preroll to FALSE for the sink to avoid
2847        * deadlock. This is only needed for sink sending RTCP data. */
2848       if (i == 1)
2849         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE,
2850          NULL);
2851
2852       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2853           &sink_cb, stream, NULL);
2854       plug_sink (stream, transport, i);
2855     }
2856
2857     if (link_tee) {
2858       /* and link to rtpbin send pad */
2859       gst_element_sync_state_with_parent (priv->tee[i]);
2860       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2861       gst_pad_link (priv->send_src[i], pad);
2862       gst_object_unref (pad);
2863     }
2864   }
2865
2866   return TRUE;
2867 }
2868
2869 /* must be called with lock */
2870 static void
2871 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2872     GstElement * funnel)
2873 {
2874   GstRTSPStreamPrivate *priv;
2875   GstPad *pad, *selpad;
2876
2877   priv = stream->priv;
2878
2879   if (priv->srcpad) {
2880     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2881      * values. This is only relevant for PLAY pipelines */
2882     gst_element_set_state (src, GST_STATE_PLAYING);
2883     gst_element_set_locked_state (src, TRUE);
2884   }
2885
2886   /* add src */
2887   gst_bin_add (bin, src);
2888
2889   /* and link to the funnel */
2890   selpad = gst_element_get_request_pad (funnel, "sink_%u");
2891   pad = gst_element_get_static_pad (src, "src");
2892   gst_pad_link (pad, selpad);
2893   gst_object_unref (pad);
2894   gst_object_unref (selpad);
2895 }
2896
2897 /* must be called with lock */
2898 static gboolean
2899 create_receiver_part (GstRTSPStream * stream, const GstRTSPTransport *
2900     transport)
2901 {
2902   GstRTSPStreamPrivate *priv;
2903   GstPad *pad;
2904   GstBin *bin;
2905   gboolean tcp;
2906   gboolean udp;
2907   gboolean mcast;
2908   gint i;
2909
2910   GST_DEBUG_OBJECT (stream, "create receiver part");
2911   priv = stream->priv;
2912   bin = priv->joined_bin;
2913
2914   tcp = transport->lower_transport == GST_RTSP_LOWER_TRANS_TCP;
2915   udp = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP;
2916   mcast = transport->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST;
2917
2918   for (i = 0; i < 2; i++) {
2919     /* For the receiver we create this bit of pipeline for both
2920      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2921      * and it is all funneled into the rtpbin receive pad.
2922      *
2923      *
2924      * .--------.     .--------.    .--------.
2925      * | udpsrc |     | funnel |    | rtpbin |
2926      * | RTP    src->sink      src->sink     |
2927      * '--------'     |        |    |        |
2928      * .--------.     |        |    |        |
2929      * | appsrc |     |        |    |        |
2930      * | RTP    src->sink      |    |        |
2931      * '--------'     '--------'    |        |
2932      *                              |        |
2933      * .--------.     .--------.    |        |
2934      * | udpsrc |     | funnel |    |        |
2935      * | RTCP   src->sink      src->sink     |
2936      * '--------'     |        |    '--------'
2937      * .--------.     |        |
2938      * | appsrc |     |        |
2939      * | RTCP   src->sink      |
2940      * '--------'     '--------'
2941      */
2942
2943     if (!priv->sinkpad && i == 0) {
2944       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2945        * RTCP sink always */
2946       continue;
2947     }
2948
2949     /* make funnel for the RTP/RTCP receivers */
2950     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2951     gst_bin_add (bin, priv->funnel[i]);
2952
2953     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2954     gst_pad_link (pad, priv->recv_sink[i]);
2955     gst_object_unref (pad);
2956
2957     if (udp && !priv->udpsrc_v4[i] && priv->server_addr_v4) {
2958       GST_DEBUG_OBJECT (stream, "udp IPv4, create and configure udpsources");
2959       if (!create_and_configure_udpsource (&priv->udpsrc_v4[i],
2960             priv->socket_v4[i]))
2961         goto udpsrc_error;
2962
2963       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
2964     }
2965
2966     if (udp && !priv->udpsrc_v6[i] && priv->server_addr_v6) {
2967       GST_DEBUG_OBJECT (stream, "udp IPv6, create and configure udpsources");
2968       if (!create_and_configure_udpsource (&priv->udpsrc_v6[i],
2969             priv->socket_v6[i]))
2970         goto udpsrc_error;
2971
2972       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
2973     }
2974
2975     if (mcast && !priv->mcast_udpsrc_v4[i] && priv->mcast_addr_v4) {
2976       GST_DEBUG_OBJECT (stream, "mcast IPv4, create and configure udpsources");
2977       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v4[i],
2978             priv->mcast_socket_v4[i]))
2979         goto mcast_udpsrc_error;
2980       plug_src (stream, bin, priv->mcast_udpsrc_v4[i], priv->funnel[i]);
2981     }
2982
2983     if (mcast && !priv->mcast_udpsrc_v6[i] && priv->mcast_addr_v6) {
2984       GST_DEBUG_OBJECT (stream, "mcast IPv6, create and configure udpsources");
2985       if (!create_and_configure_udpsource (&priv->mcast_udpsrc_v6[i],
2986             priv->mcast_socket_v6[i]))
2987         goto mcast_udpsrc_error;
2988       plug_src (stream, bin, priv->mcast_udpsrc_v6[i], priv->funnel[i]);
2989     }
2990
2991     if (tcp && !priv->appsrc[i]) {
2992       /* make and add appsrc */
2993       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2994       priv->appsrc_base_time[i] = -1;
2995       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2996           TRUE, NULL);
2997       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
2998     }
2999
3000     gst_element_sync_state_with_parent (priv->funnel[i]);
3001   }
3002
3003   return TRUE;
3004
3005 mcast_udpsrc_error:
3006 udpsrc_error:
3007   return FALSE;
3008 }
3009
3010 static gboolean
3011 check_mcast_part_for_transport (GstRTSPStream * stream,
3012     const GstRTSPTransport * tr)
3013 {
3014   GstRTSPStreamPrivate *priv = stream->priv;
3015   GInetAddress *inetaddr;
3016   GSocketFamily family;
3017   GstRTSPAddress *mcast_addr;
3018
3019   /* Check if it's a ipv4 or ipv6 transport */
3020   inetaddr = g_inet_address_new_from_string (tr->destination);
3021   family = g_inet_address_get_family (inetaddr);
3022   g_object_unref (inetaddr);
3023
3024   /* Select fields corresponding to the family */
3025   if (family == G_SOCKET_FAMILY_IPV4) {
3026     mcast_addr = priv->mcast_addr_v4;
3027   } else {
3028     mcast_addr = priv->mcast_addr_v6;
3029   }
3030
3031   /* We support only one mcast group per family, make sure this transport
3032    * matches it. */
3033   if (!mcast_addr)
3034     goto no_addr;
3035
3036   if (g_ascii_strcasecmp (tr->destination, mcast_addr->address) != 0 ||
3037       tr->port.min != mcast_addr->port ||
3038       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
3039       tr->ttl != mcast_addr->ttl)
3040     goto wrong_addr;
3041
3042   return TRUE;
3043
3044 no_addr:
3045   {
3046     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
3047         "has been reserved");
3048     return FALSE;
3049   }
3050 wrong_addr:
3051   {
3052     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
3053         "the reserved address");
3054     return FALSE;
3055   }
3056 }
3057
3058 /**
3059  * gst_rtsp_stream_join_bin:
3060  * @stream: a #GstRTSPStream
3061  * @bin: (transfer none): a #GstBin to join
3062  * @rtpbin: (transfer none): a rtpbin element in @bin
3063  * @state: the target state of the new elements
3064  *
3065  * Join the #GstBin @bin that contains the element @rtpbin.
3066  *
3067  * @stream will link to @rtpbin, which must be inside @bin. The elements
3068  * added to @bin will be set to the state given in @state.
3069  *
3070  * Returns: %TRUE on success.
3071  */
3072 gboolean
3073 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
3074     GstElement * rtpbin, GstState state)
3075 {
3076   GstRTSPStreamPrivate *priv;
3077   guint idx;
3078   gchar *name;
3079   GstPadLinkReturn ret;
3080
3081   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3082   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3083   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3084
3085   priv = stream->priv;
3086
3087   g_mutex_lock (&priv->lock);
3088   if (priv->joined_bin != NULL)
3089     goto was_joined;
3090
3091   /* create a session with the same index as the stream */
3092   idx = priv->idx;
3093
3094   GST_INFO ("stream %p joining bin as session %u", stream, idx);
3095
3096   if (priv->profiles & GST_RTSP_PROFILE_SAVP
3097       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
3098     /* For SRTP */
3099     g_signal_connect (rtpbin, "request-rtp-encoder",
3100         (GCallback) request_rtp_encoder, stream);
3101     g_signal_connect (rtpbin, "request-rtcp-encoder",
3102         (GCallback) request_rtcp_encoder, stream);
3103     g_signal_connect (rtpbin, "request-rtp-decoder",
3104         (GCallback) request_rtp_rtcp_decoder, stream);
3105     g_signal_connect (rtpbin, "request-rtcp-decoder",
3106         (GCallback) request_rtp_rtcp_decoder, stream);
3107   }
3108
3109   if (priv->sinkpad) {
3110     g_signal_connect (rtpbin, "request-pt-map",
3111         (GCallback) request_pt_map, stream);
3112   }
3113
3114   /* get pads from the RTP session element for sending and receiving
3115    * RTP/RTCP*/
3116   if (priv->srcpad) {
3117     /* get a pad for sending RTP */
3118     name = g_strdup_printf ("send_rtp_sink_%u", idx);
3119     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
3120     g_free (name);
3121
3122     /* link the RTP pad to the session manager, it should not really fail unless
3123      * this is not really an RTP pad */
3124     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
3125     if (ret != GST_PAD_LINK_OK)
3126       goto link_failed;
3127
3128     name = g_strdup_printf ("send_rtp_src_%u", idx);
3129     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
3130     g_free (name);
3131   } else {
3132     /* RECORD case: need to connect our sinkpad from here */
3133     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
3134     /* EOS */
3135     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
3136
3137     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
3138     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
3139     g_free (name);
3140   }
3141
3142   name = g_strdup_printf ("send_rtcp_src_%u", idx);
3143   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
3144   g_free (name);
3145   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
3146   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
3147   g_free (name);
3148
3149   /* get the session */
3150   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
3151
3152   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
3153       stream);
3154   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
3155       stream);
3156   g_signal_connect (priv->session, "on-ssrc-active",
3157       (GCallback) on_ssrc_active, stream);
3158   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
3159       stream);
3160   g_signal_connect (priv->session, "on-bye-timeout",
3161       (GCallback) on_bye_timeout, stream);
3162   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
3163       stream);
3164
3165   /* signal for sender ssrc */
3166   g_signal_connect (priv->session, "on-new-sender-ssrc",
3167       (GCallback) on_new_sender_ssrc, stream);
3168   g_signal_connect (priv->session, "on-sender-ssrc-active",
3169       (GCallback) on_sender_ssrc_active, stream);
3170
3171   if (priv->srcpad) {
3172     /* be notified of caps changes */
3173     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
3174         (GCallback) caps_notify, stream);
3175     priv->caps = gst_pad_get_current_caps (priv->send_src[0]);
3176   }
3177
3178   priv->joined_bin = bin;
3179   GST_DEBUG_OBJECT (stream, "successfully joined bin");
3180   g_mutex_unlock (&priv->lock);
3181
3182   return TRUE;
3183
3184   /* ERRORS */
3185 was_joined:
3186   {
3187     g_mutex_unlock (&priv->lock);
3188     return TRUE;
3189   }
3190 link_failed:
3191   {
3192     GST_WARNING ("failed to link stream %u", idx);
3193     gst_object_unref (priv->send_rtp_sink);
3194     priv->send_rtp_sink = NULL;
3195     g_mutex_unlock (&priv->lock);
3196     return FALSE;
3197   }
3198 }
3199
3200 static void
3201 clear_element (GstBin * bin, GstElement ** elementptr)
3202 {
3203   if (*elementptr) {
3204     gst_element_set_locked_state (*elementptr, FALSE);
3205     gst_element_set_state (*elementptr, GST_STATE_NULL);
3206     if (GST_ELEMENT_PARENT (*elementptr))
3207       gst_bin_remove (bin, *elementptr);
3208     else
3209       gst_object_unref (*elementptr);
3210     *elementptr = NULL;
3211   }
3212 }
3213
3214 /**
3215  * gst_rtsp_stream_leave_bin:
3216  * @stream: a #GstRTSPStream
3217  * @bin: (transfer none): a #GstBin
3218  * @rtpbin: (transfer none): a rtpbin #GstElement
3219  *
3220  * Remove the elements of @stream from @bin.
3221  *
3222  * Return: %TRUE on success.
3223  */
3224 gboolean
3225 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
3226     GstElement * rtpbin)
3227 {
3228   GstRTSPStreamPrivate *priv;
3229   gint i;
3230
3231   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3232   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
3233   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
3234
3235   priv = stream->priv;
3236
3237   g_mutex_lock (&priv->lock);
3238   if (priv->joined_bin == NULL)
3239     goto was_not_joined;
3240   if (priv->joined_bin != bin)
3241     goto wrong_bin;
3242
3243   priv->joined_bin = NULL;
3244
3245   /* all transports must be removed by now */
3246   if (priv->transports != NULL)
3247     goto transports_not_removed;
3248
3249   clear_tr_cache (priv, TRUE);
3250   clear_tr_cache (priv, FALSE);
3251
3252   GST_INFO ("stream %p leaving bin", stream);
3253
3254   if (priv->srcpad) {
3255     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
3256
3257     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
3258     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
3259     gst_object_unref (priv->send_rtp_sink);
3260     priv->send_rtp_sink = NULL;
3261   } else if (priv->recv_rtp_src) {
3262     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
3263     gst_object_unref (priv->recv_rtp_src);
3264     priv->recv_rtp_src = NULL;
3265   }
3266
3267   for (i = 0; i < 2; i++) {
3268     clear_element (bin, &priv->udpsrc_v4[i]);
3269     clear_element (bin, &priv->udpsrc_v6[i]);
3270     clear_element (bin, &priv->udpqueue[i]);
3271     clear_element (bin, &priv->udpsink[i]);
3272
3273     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
3274     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
3275     clear_element (bin, &priv->mcast_udpqueue[i]);
3276     clear_element (bin, &priv->mcast_udpsink[i]);
3277
3278     clear_element (bin, &priv->appsrc[i]);
3279     clear_element (bin, &priv->appqueue[i]);
3280     clear_element (bin, &priv->appsink[i]);
3281
3282     clear_element (bin, &priv->tee[i]);
3283     clear_element (bin, &priv->funnel[i]);
3284
3285     if (priv->sinkpad || i == 1) {
3286       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3287       gst_object_unref (priv->recv_sink[i]);
3288       priv->recv_sink[i] = NULL;
3289     }
3290   }
3291
3292   if (priv->srcpad) {
3293     gst_object_unref (priv->send_src[0]);
3294     priv->send_src[0] = NULL;
3295   }
3296
3297   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3298   gst_object_unref (priv->send_src[1]);
3299   priv->send_src[1] = NULL;
3300
3301   g_object_unref (priv->session);
3302   priv->session = NULL;
3303   if (priv->caps)
3304     gst_caps_unref (priv->caps);
3305   priv->caps = NULL;
3306
3307   if (priv->srtpenc)
3308     gst_object_unref (priv->srtpenc);
3309   if (priv->srtpdec)
3310     gst_object_unref (priv->srtpdec);
3311
3312   if (priv->mcast_addr_v4)
3313     gst_rtsp_address_free (priv->mcast_addr_v4);
3314   priv->mcast_addr_v4 = NULL;
3315   if (priv->mcast_addr_v6)
3316     gst_rtsp_address_free (priv->mcast_addr_v6);
3317   priv->mcast_addr_v6 = NULL;
3318   if (priv->server_addr_v4)
3319     gst_rtsp_address_free (priv->server_addr_v4);
3320   priv->server_addr_v4 = NULL;
3321   if (priv->server_addr_v6)
3322     gst_rtsp_address_free (priv->server_addr_v6);
3323   priv->server_addr_v6 = NULL;
3324
3325   g_mutex_unlock (&priv->lock);
3326
3327   return TRUE;
3328
3329 was_not_joined:
3330   {
3331     g_mutex_unlock (&priv->lock);
3332     return TRUE;
3333   }
3334 transports_not_removed:
3335   {
3336     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3337     g_mutex_unlock (&priv->lock);
3338     return FALSE;
3339   }
3340 wrong_bin:
3341   {
3342     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3343     g_mutex_unlock (&priv->lock);
3344     return FALSE;
3345   }
3346 }
3347
3348 /**
3349  * gst_rtsp_stream_get_joined_bin:
3350  * @stream: a #GstRTSPStream
3351  *
3352  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3353  *
3354  * Return: (transfer full): the joined bin or NULL.
3355  */
3356 GstBin *
3357 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3358 {
3359   GstRTSPStreamPrivate *priv;
3360   GstBin *bin = NULL;
3361
3362   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3363
3364   priv = stream->priv;
3365
3366   g_mutex_lock (&priv->lock);
3367   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3368   g_mutex_unlock (&priv->lock);
3369
3370   return bin;
3371 }
3372
3373 /**
3374  * gst_rtsp_stream_get_rtpinfo:
3375  * @stream: a #GstRTSPStream
3376  * @rtptime: (allow-none): result RTP timestamp
3377  * @seq: (allow-none): result RTP seqnum
3378  * @clock_rate: (allow-none): the clock rate
3379  * @running_time: result running-time
3380  *
3381  * Retrieve the current rtptime, seq and running-time. This is used to
3382  * construct a RTPInfo reply header.
3383  *
3384  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3385  */
3386 gboolean
3387 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3388     guint * rtptime, guint * seq, guint * clock_rate,
3389     GstClockTime * running_time)
3390 {
3391   GstRTSPStreamPrivate *priv;
3392   GstStructure *stats;
3393   GObjectClass *payobjclass;
3394
3395   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3396
3397   priv = stream->priv;
3398
3399   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3400
3401   g_mutex_lock (&priv->lock);
3402
3403   /* First try to extract the information from the last buffer on the sinks.
3404    * This will have a more accurate sequence number and timestamp, as between
3405    * the payloader and the sink there can be some queues
3406    */
3407   if (priv->udpsink[0] || priv->appsink[0]) {
3408     GstSample *last_sample;
3409
3410     if (priv->udpsink[0])
3411       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3412     else
3413       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3414
3415     if (last_sample) {
3416       GstCaps *caps;
3417       GstBuffer *buffer;
3418       GstSegment *segment;
3419       GstStructure *s;
3420       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3421
3422       caps = gst_sample_get_caps (last_sample);
3423       buffer = gst_sample_get_buffer (last_sample);
3424       segment = gst_sample_get_segment (last_sample);
3425       s = gst_caps_get_structure (caps, 0);
3426
3427       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3428         guint ssrc_buf = gst_rtp_buffer_get_ssrc (&rtp_buffer);
3429         guint ssrc_stream = 0;
3430         if (gst_structure_has_field_typed (s, "ssrc", G_TYPE_UINT) &&
3431             gst_structure_get_uint (s, "ssrc", &ssrc_stream) &&
3432             ssrc_buf != ssrc_stream) {
3433           /* Skip buffers from auxiliary streams. */
3434           GST_DEBUG_OBJECT (stream,
3435               "not a buffer from the payloader, SSRC: %08x", ssrc_buf);
3436
3437           gst_rtp_buffer_unmap (&rtp_buffer);
3438           gst_sample_unref (last_sample);
3439           goto stats;
3440         }
3441
3442         if (seq) {
3443           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3444         }
3445
3446         if (rtptime) {
3447           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3448         }
3449
3450         gst_rtp_buffer_unmap (&rtp_buffer);
3451
3452         if (running_time) {
3453           *running_time =
3454               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3455               GST_BUFFER_TIMESTAMP (buffer));
3456         }
3457
3458         if (clock_rate) {
3459           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3460
3461           if (*clock_rate == 0 && running_time)
3462             *running_time = GST_CLOCK_TIME_NONE;
3463         }
3464         gst_sample_unref (last_sample);
3465
3466         goto done;
3467       } else {
3468         gst_sample_unref (last_sample);
3469       }
3470     }
3471   }
3472
3473 stats:
3474   if (g_object_class_find_property (payobjclass, "stats")) {
3475     g_object_get (priv->payloader, "stats", &stats, NULL);
3476     if (stats == NULL)
3477       goto no_stats;
3478
3479     if (seq)
3480       gst_structure_get_uint (stats, "seqnum", seq);
3481
3482     if (rtptime)
3483       gst_structure_get_uint (stats, "timestamp", rtptime);
3484
3485     if (running_time)
3486       gst_structure_get_clock_time (stats, "running-time", running_time);
3487
3488     if (clock_rate) {
3489       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3490       if (*clock_rate == 0 && running_time)
3491         *running_time = GST_CLOCK_TIME_NONE;
3492     }
3493     gst_structure_free (stats);
3494   } else {
3495     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3496         !g_object_class_find_property (payobjclass, "timestamp"))
3497       goto no_stats;
3498
3499     if (seq)
3500       g_object_get (priv->payloader, "seqnum", seq, NULL);
3501
3502     if (rtptime)
3503       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3504
3505     if (running_time)
3506       *running_time = GST_CLOCK_TIME_NONE;
3507   }
3508
3509 done:
3510   g_mutex_unlock (&priv->lock);
3511
3512   return TRUE;
3513
3514   /* ERRORS */
3515 no_stats:
3516   {
3517     GST_WARNING ("Could not get payloader stats");
3518     g_mutex_unlock (&priv->lock);
3519     return FALSE;
3520   }
3521 }
3522
3523 /**
3524  * gst_rtsp_stream_get_caps:
3525  * @stream: a #GstRTSPStream
3526  *
3527  * Retrieve the current caps of @stream.
3528  *
3529  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3530  * after usage.
3531  */
3532 GstCaps *
3533 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3534 {
3535   GstRTSPStreamPrivate *priv;
3536   GstCaps *result;
3537
3538   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3539
3540   priv = stream->priv;
3541
3542   g_mutex_lock (&priv->lock);
3543   if ((result = priv->caps))
3544     gst_caps_ref (result);
3545   g_mutex_unlock (&priv->lock);
3546
3547   return result;
3548 }
3549
3550 /**
3551  * gst_rtsp_stream_recv_rtp:
3552  * @stream: a #GstRTSPStream
3553  * @buffer: (transfer full): a #GstBuffer
3554  *
3555  * Handle an RTP buffer for the stream. This method is usually called when a
3556  * message has been received from a client using the TCP transport.
3557  *
3558  * This function takes ownership of @buffer.
3559  *
3560  * Returns: a GstFlowReturn.
3561  */
3562 GstFlowReturn
3563 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3564 {
3565   GstRTSPStreamPrivate *priv;
3566   GstFlowReturn ret;
3567   GstElement *element;
3568
3569   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3570   priv = stream->priv;
3571   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3572   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3573
3574   g_mutex_lock (&priv->lock);
3575   if (priv->appsrc[0])
3576     element = gst_object_ref (priv->appsrc[0]);
3577   else
3578     element = NULL;
3579   g_mutex_unlock (&priv->lock);
3580
3581   if (element) {
3582     if (priv->appsrc_base_time[0] == -1) {
3583       /* Take current running_time. This timestamp will be put on
3584        * the first buffer of each stream because we are a live source and so we
3585        * timestamp with the running_time. When we are dealing with TCP, we also
3586        * only timestamp the first buffer (using the DISCONT flag) because a server
3587        * typically bursts data, for which we don't want to compensate by speeding
3588        * up the media. The other timestamps will be interpollated from this one
3589        * using the RTP timestamps. */
3590       GST_OBJECT_LOCK (element);
3591       if (GST_ELEMENT_CLOCK (element)) {
3592         GstClockTime now;
3593         GstClockTime base_time;
3594
3595         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3596         base_time = GST_ELEMENT_CAST (element)->base_time;
3597
3598         priv->appsrc_base_time[0] = now - base_time;
3599         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3600         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3601             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3602             GST_TIME_ARGS (base_time));
3603       }
3604       GST_OBJECT_UNLOCK (element);
3605     }
3606
3607     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3608     gst_object_unref (element);
3609   } else {
3610     ret = GST_FLOW_OK;
3611   }
3612   return ret;
3613 }
3614
3615 /**
3616  * gst_rtsp_stream_recv_rtcp:
3617  * @stream: a #GstRTSPStream
3618  * @buffer: (transfer full): a #GstBuffer
3619  *
3620  * Handle an RTCP buffer for the stream. This method is usually called when a
3621  * message has been received from a client using the TCP transport.
3622  *
3623  * This function takes ownership of @buffer.
3624  *
3625  * Returns: a GstFlowReturn.
3626  */
3627 GstFlowReturn
3628 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3629 {
3630   GstRTSPStreamPrivate *priv;
3631   GstFlowReturn ret;
3632   GstElement *element;
3633
3634   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3635   priv = stream->priv;
3636   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3637
3638   if (priv->joined_bin == NULL) {
3639     gst_buffer_unref (buffer);
3640     return GST_FLOW_NOT_LINKED;
3641   }
3642   g_mutex_lock (&priv->lock);
3643   if (priv->appsrc[1])
3644     element = gst_object_ref (priv->appsrc[1]);
3645   else
3646     element = NULL;
3647   g_mutex_unlock (&priv->lock);
3648
3649   if (element) {
3650     if (priv->appsrc_base_time[1] == -1) {
3651       /* Take current running_time. This timestamp will be put on
3652        * the first buffer of each stream because we are a live source and so we
3653        * timestamp with the running_time. When we are dealing with TCP, we also
3654        * only timestamp the first buffer (using the DISCONT flag) because a server
3655        * typically bursts data, for which we don't want to compensate by speeding
3656        * up the media. The other timestamps will be interpollated from this one
3657        * using the RTP timestamps. */
3658       GST_OBJECT_LOCK (element);
3659       if (GST_ELEMENT_CLOCK (element)) {
3660         GstClockTime now;
3661         GstClockTime base_time;
3662
3663         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3664         base_time = GST_ELEMENT_CAST (element)->base_time;
3665
3666         priv->appsrc_base_time[1] = now - base_time;
3667         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3668         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3669             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3670             GST_TIME_ARGS (base_time));
3671       }
3672       GST_OBJECT_UNLOCK (element);
3673     }
3674
3675     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3676     gst_object_unref (element);
3677   } else {
3678     ret = GST_FLOW_OK;
3679     gst_buffer_unref (buffer);
3680   }
3681   return ret;
3682 }
3683
3684 /* must be called with lock */
3685 static gboolean
3686 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3687     gboolean add)
3688 {
3689   GstRTSPStreamPrivate *priv = stream->priv;
3690   const GstRTSPTransport *tr;
3691
3692   tr = gst_rtsp_stream_transport_get_transport (trans);
3693
3694   switch (tr->lower_transport) {
3695     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3696     {
3697       if (add) {
3698         if (!check_mcast_part_for_transport (stream, tr))
3699           goto mcast_error;
3700         priv->transports = g_list_prepend (priv->transports, trans);
3701       } else {
3702         priv->transports = g_list_remove (priv->transports, trans);
3703       }
3704       break;
3705     }
3706     case GST_RTSP_LOWER_TRANS_UDP:
3707     {
3708       gchar *dest;
3709       gint min, max;
3710       guint ttl = 0;
3711
3712       dest = tr->destination;
3713       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3714         min = tr->port.min;
3715         max = tr->port.max;
3716         ttl = tr->ttl;
3717       } else if (priv->client_side) {
3718         /* In client side mode the 'destination' is the RTSP server, so send
3719          * to those ports */
3720         min = tr->server_port.min;
3721         max = tr->server_port.max;
3722       } else {
3723         min = tr->client_port.min;
3724         max = tr->client_port.max;
3725       }
3726
3727       if (add) {
3728         if (ttl > 0) {
3729           GST_INFO ("setting ttl-mc %d", ttl);
3730           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3731           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3732         }
3733         GST_INFO ("adding %s:%d-%d", dest, min, max);
3734         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3735         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3736         priv->transports = g_list_prepend (priv->transports, trans);
3737       } else {
3738         GST_INFO ("removing %s:%d-%d", dest, min, max);
3739         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3740         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3741         priv->transports = g_list_remove (priv->transports, trans);
3742       }
3743       priv->transports_cookie++;
3744       break;
3745     }
3746     case GST_RTSP_LOWER_TRANS_TCP:
3747       if (add) {
3748         GST_INFO ("adding TCP %s", tr->destination);
3749         priv->transports = g_list_prepend (priv->transports, trans);
3750       } else {
3751         GST_INFO ("removing TCP %s", tr->destination);
3752         priv->transports = g_list_remove (priv->transports, trans);
3753       }
3754       priv->transports_cookie++;
3755       break;
3756     default:
3757       goto unknown_transport;
3758   }
3759   return TRUE;
3760
3761   /* ERRORS */
3762 unknown_transport:
3763   {
3764     GST_INFO ("Unknown transport %d", tr->lower_transport);
3765     return FALSE;
3766   }
3767 mcast_error:
3768   {
3769     return FALSE;
3770   }
3771 }
3772
3773
3774 /**
3775  * gst_rtsp_stream_add_transport:
3776  * @stream: a #GstRTSPStream
3777  * @trans: (transfer none): a #GstRTSPStreamTransport
3778  *
3779  * Add the transport in @trans to @stream. The media of @stream will
3780  * then also be send to the values configured in @trans.
3781  *
3782  * @stream must be joined to a bin.
3783  *
3784  * @trans must contain a valid #GstRTSPTransport.
3785  *
3786  * Returns: %TRUE if @trans was added
3787  */
3788 gboolean
3789 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3790     GstRTSPStreamTransport * trans)
3791 {
3792   GstRTSPStreamPrivate *priv;
3793   gboolean res;
3794
3795   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3796   priv = stream->priv;
3797   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3798   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3799
3800   g_mutex_lock (&priv->lock);
3801   res = update_transport (stream, trans, TRUE);
3802   g_mutex_unlock (&priv->lock);
3803
3804   return res;
3805 }
3806
3807 /**
3808  * gst_rtsp_stream_remove_transport:
3809  * @stream: a #GstRTSPStream
3810  * @trans: (transfer none): a #GstRTSPStreamTransport
3811  *
3812  * Remove the transport in @trans from @stream. The media of @stream will
3813  * not be sent to the values configured in @trans.
3814  *
3815  * @stream must be joined to a bin.
3816  *
3817  * @trans must contain a valid #GstRTSPTransport.
3818  *
3819  * Returns: %TRUE if @trans was removed
3820  */
3821 gboolean
3822 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3823     GstRTSPStreamTransport * trans)
3824 {
3825   GstRTSPStreamPrivate *priv;
3826   gboolean res;
3827
3828   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3829   priv = stream->priv;
3830   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3831   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3832
3833   g_mutex_lock (&priv->lock);
3834   res = update_transport (stream, trans, FALSE);
3835   g_mutex_unlock (&priv->lock);
3836
3837   return res;
3838 }
3839
3840 /**
3841  * gst_rtsp_stream_update_crypto:
3842  * @stream: a #GstRTSPStream
3843  * @ssrc: the SSRC
3844  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3845  *
3846  * Update the new crypto information for @ssrc in @stream. If information
3847  * for @ssrc did not exist, it will be added. If information
3848  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3849  * be removed from @stream.
3850  *
3851  * Returns: %TRUE if @crypto could be updated
3852  */
3853 gboolean
3854 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3855     guint ssrc, GstCaps * crypto)
3856 {
3857   GstRTSPStreamPrivate *priv;
3858
3859   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3860   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3861
3862   priv = stream->priv;
3863
3864   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3865
3866   g_mutex_lock (&priv->lock);
3867   if (crypto)
3868     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3869         gst_caps_ref (crypto));
3870   else
3871     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3872   g_mutex_unlock (&priv->lock);
3873
3874   return TRUE;
3875 }
3876
3877 /**
3878  * gst_rtsp_stream_get_rtp_socket:
3879  * @stream: a #GstRTSPStream
3880  * @family: the socket family
3881  *
3882  * Get the RTP socket from @stream for a @family.
3883  *
3884  * @stream must be joined to a bin.
3885  *
3886  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3887  * socket could be allocated for @family. Unref after usage
3888  */
3889 GSocket *
3890 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3891 {
3892   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3893   GSocket *socket;
3894   const gchar *name;
3895
3896   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3897   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3898       family == G_SOCKET_FAMILY_IPV6, NULL);
3899   g_return_val_if_fail (priv->udpsink[0], NULL);
3900
3901   if (family == G_SOCKET_FAMILY_IPV6)
3902     name = "socket-v6";
3903   else
3904     name = "socket";
3905
3906   g_object_get (priv->udpsink[0], name, &socket, NULL);
3907
3908   return socket;
3909 }
3910
3911 /**
3912  * gst_rtsp_stream_get_rtcp_socket:
3913  * @stream: a #GstRTSPStream
3914  * @family: the socket family
3915  *
3916  * Get the RTCP socket from @stream for a @family.
3917  *
3918  * @stream must be joined to a bin.
3919  *
3920  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3921  * socket could be allocated for @family. Unref after usage
3922  */
3923 GSocket *
3924 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3925 {
3926   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3927   GSocket *socket;
3928   const gchar *name;
3929
3930   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3931   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3932       family == G_SOCKET_FAMILY_IPV6, NULL);
3933   g_return_val_if_fail (priv->udpsink[1], NULL);
3934
3935   if (family == G_SOCKET_FAMILY_IPV6)
3936     name = "socket-v6";
3937   else
3938     name = "socket";
3939
3940   g_object_get (priv->udpsink[1], name, &socket, NULL);
3941
3942   return socket;
3943 }
3944
3945 /**
3946  * gst_rtsp_stream_get_rtp_multicast_socket:
3947  * @stream: a #GstRTSPStream
3948  * @family: the socket family
3949  *
3950  * Get the multicast RTP socket from @stream for a @family.
3951  *
3952  * Returns: (transfer full) (nullable): the multicast RTP socket or %NULL if no
3953  * socket could be allocated for @family. Unref after usage
3954  */
3955 GSocket *
3956 gst_rtsp_stream_get_rtp_multicast_socket (GstRTSPStream * stream, GSocketFamily family)
3957 {
3958   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3959   GSocket *socket;
3960   const gchar *name;
3961
3962   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3963   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3964       family == G_SOCKET_FAMILY_IPV6, NULL);
3965   g_return_val_if_fail (priv->mcast_udpsink[0], NULL);
3966
3967   if (family == G_SOCKET_FAMILY_IPV6)
3968     name = "socket-v6";
3969   else
3970     name = "socket";
3971
3972   g_object_get (priv->mcast_udpsink[0], name, &socket, NULL);
3973
3974   return socket;
3975 }
3976
3977 /**
3978  * gst_rtsp_stream_get_rtcp_multicast_socket:
3979  * @stream: a #GstRTSPStream
3980  * @family: the socket family
3981  *
3982  * Get the multicast RTCP socket from @stream for a @family.
3983  *
3984  * Returns: (transfer full) (nullable): the multicast RTCP socket or %NULL if no
3985  * socket could be allocated for @family. Unref after usage
3986  */
3987 GSocket *
3988 gst_rtsp_stream_get_rtcp_multicast_socket (GstRTSPStream * stream, GSocketFamily family)
3989 {
3990   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3991   GSocket *socket;
3992   const gchar *name;
3993
3994   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3995   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3996       family == G_SOCKET_FAMILY_IPV6, NULL);
3997   g_return_val_if_fail (priv->mcast_udpsink[1], NULL);
3998
3999   if (family == G_SOCKET_FAMILY_IPV6)
4000     name = "socket-v6";
4001   else
4002     name = "socket";
4003
4004   g_object_get (priv->mcast_udpsink[1], name, &socket, NULL);
4005
4006   return socket;
4007 }
4008
4009 /**
4010  * gst_rtsp_stream_set_seqnum:
4011  * @stream: a #GstRTSPStream
4012  * @seqnum: a new sequence number
4013  *
4014  * Configure the sequence number in the payloader of @stream to @seqnum.
4015  */
4016 void
4017 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
4018 {
4019   GstRTSPStreamPrivate *priv;
4020
4021   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
4022
4023   priv = stream->priv;
4024
4025   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
4026 }
4027
4028 /**
4029  * gst_rtsp_stream_get_seqnum:
4030  * @stream: a #GstRTSPStream
4031  *
4032  * Get the configured sequence number in the payloader of @stream.
4033  *
4034  * Returns: the sequence number of the payloader.
4035  */
4036 guint16
4037 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
4038 {
4039   GstRTSPStreamPrivate *priv;
4040   guint seqnum;
4041
4042   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
4043
4044   priv = stream->priv;
4045
4046   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
4047
4048   return seqnum;
4049 }
4050
4051 /**
4052  * gst_rtsp_stream_transport_filter:
4053  * @stream: a #GstRTSPStream
4054  * @func: (scope call) (allow-none): a callback
4055  * @user_data: (closure): user data passed to @func
4056  *
4057  * Call @func for each transport managed by @stream. The result value of @func
4058  * determines what happens to the transport. @func will be called with @stream
4059  * locked so no further actions on @stream can be performed from @func.
4060  *
4061  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
4062  * @stream.
4063  *
4064  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
4065  *
4066  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
4067  * will also be added with an additional ref to the result #GList of this
4068  * function..
4069  *
4070  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
4071  *
4072  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
4073  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
4074  * element in the #GList should be unreffed before the list is freed.
4075  */
4076 GList *
4077 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
4078     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
4079 {
4080   GstRTSPStreamPrivate *priv;
4081   GList *result, *walk, *next;
4082   GHashTable *visited = NULL;
4083   guint cookie;
4084
4085   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
4086
4087   priv = stream->priv;
4088
4089   result = NULL;
4090   if (func)
4091     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
4092
4093   g_mutex_lock (&priv->lock);
4094 restart:
4095   cookie = priv->transports_cookie;
4096   for (walk = priv->transports; walk; walk = next) {
4097     GstRTSPStreamTransport *trans = walk->data;
4098     GstRTSPFilterResult res;
4099     gboolean changed;
4100
4101     next = g_list_next (walk);
4102
4103     if (func) {
4104       /* only visit each transport once */
4105       if (g_hash_table_contains (visited, trans))
4106         continue;
4107
4108       g_hash_table_add (visited, g_object_ref (trans));
4109       g_mutex_unlock (&priv->lock);
4110
4111       res = func (stream, trans, user_data);
4112
4113       g_mutex_lock (&priv->lock);
4114     } else
4115       res = GST_RTSP_FILTER_REF;
4116
4117     changed = (cookie != priv->transports_cookie);
4118
4119     switch (res) {
4120       case GST_RTSP_FILTER_REMOVE:
4121         update_transport (stream, trans, FALSE);
4122         break;
4123       case GST_RTSP_FILTER_REF:
4124         result = g_list_prepend (result, g_object_ref (trans));
4125         break;
4126       case GST_RTSP_FILTER_KEEP:
4127       default:
4128         break;
4129     }
4130     if (changed)
4131       goto restart;
4132   }
4133   g_mutex_unlock (&priv->lock);
4134
4135   if (func)
4136     g_hash_table_unref (visited);
4137
4138   return result;
4139 }
4140
4141 static GstPadProbeReturn
4142 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4143 {
4144   GstRTSPStreamPrivate *priv;
4145   GstRTSPStream *stream;
4146   GstBuffer *buffer = NULL;
4147
4148   stream = user_data;
4149   priv = stream->priv;
4150
4151   GST_DEBUG_OBJECT (pad, "now blocking");
4152
4153   g_mutex_lock (&priv->lock);
4154   priv->blocking = TRUE;
4155
4156   if ((info->type & GST_PAD_PROBE_TYPE_BUFFER)) {
4157     buffer = gst_pad_probe_info_get_buffer (info);
4158   } else if ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)) {
4159     GstBufferList *list = gst_pad_probe_info_get_buffer_list (info);
4160     buffer = gst_buffer_list_get (list, 0);
4161   } else {
4162     g_assert_not_reached ();
4163   }
4164
4165   g_assert (buffer);
4166   priv->position = GST_BUFFER_TIMESTAMP (buffer);
4167   GST_DEBUG_OBJECT (stream, "buffer position: %" GST_TIME_FORMAT,
4168       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
4169   g_mutex_unlock (&priv->lock);
4170
4171   gst_element_post_message (priv->payloader,
4172       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
4173           gst_structure_new_empty ("GstRTSPStreamBlocking")));
4174
4175   return GST_PAD_PROBE_OK;
4176 }
4177
4178 static void
4179 set_blocked (GstRTSPStream * stream, gboolean blocked)
4180 {
4181   GstRTSPStreamPrivate *priv;
4182   int i;
4183
4184   GST_DEBUG_OBJECT (stream, "blocked: %d", blocked);
4185
4186   priv = stream->priv;
4187
4188   if (blocked) {
4189     priv->blocking = FALSE;
4190     for (i = 0; i < 2; i++) {
4191       if (priv->blocked_id[i] != 0)
4192         continue;
4193       if (priv->send_src[i]) {
4194         priv->blocked_id[i] = gst_pad_add_probe (priv->send_src[i],
4195             GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
4196             GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
4197             g_object_ref (stream), g_object_unref);
4198       }
4199     }
4200   } else {
4201     for (i = 0; i < 2; i++) {
4202       if (priv->blocked_id[i] != 0) {
4203         gst_pad_remove_probe (priv->send_src[i], priv->blocked_id[i]);
4204         priv->blocked_id[i] = 0;
4205       }
4206     }
4207     priv->blocking = FALSE;
4208   }
4209 }
4210
4211 /**
4212  * gst_rtsp_stream_set_blocked:
4213  * @stream: a #GstRTSPStream
4214  * @blocked: boolean indicating we should block or unblock
4215  *
4216  * Blocks or unblocks the dataflow on @stream.
4217  *
4218  * Returns: %TRUE on success
4219  */
4220 gboolean
4221 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
4222 {
4223   GstRTSPStreamPrivate *priv;
4224
4225   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4226
4227   priv = stream->priv;
4228   g_mutex_lock (&priv->lock);
4229   set_blocked (stream, blocked);
4230   g_mutex_unlock (&priv->lock);
4231
4232   return TRUE;
4233 }
4234
4235 /**
4236  * gst_rtsp_stream_ublock_linked:
4237  * @stream: a #GstRTSPStream
4238  *
4239  * Unblocks the dataflow on @stream if it is linked.
4240  *
4241  * Returns: %TRUE on success
4242  */
4243 gboolean
4244 gst_rtsp_stream_unblock_linked (GstRTSPStream * stream)
4245 {
4246   GstRTSPStreamPrivate *priv;
4247
4248   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4249
4250   priv = stream->priv;
4251   g_mutex_lock (&priv->lock);
4252   if (priv->send_src[0] && gst_pad_is_linked (priv->send_src[0]))
4253     set_blocked (stream, FALSE);
4254   g_mutex_unlock (&priv->lock);
4255
4256   return TRUE;
4257 }
4258
4259 /**
4260  * gst_rtsp_stream_is_blocking:
4261  * @stream: a #GstRTSPStream
4262  *
4263  * Check if @stream is blocking on a #GstBuffer.
4264  *
4265  * Returns: %TRUE if @stream is blocking
4266  */
4267 gboolean
4268 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
4269 {
4270   GstRTSPStreamPrivate *priv;
4271   gboolean result;
4272
4273   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4274
4275   priv = stream->priv;
4276
4277   g_mutex_lock (&priv->lock);
4278   result = priv->blocking;
4279   g_mutex_unlock (&priv->lock);
4280
4281   return result;
4282 }
4283
4284 /**
4285  * gst_rtsp_stream_query_position:
4286  * @stream: a #GstRTSPStream
4287  * @position: current position of a #GstRTSPStream
4288  *
4289  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
4290  * the RTP parts of the pipeline and not the RTCP parts.
4291  *
4292  * Returns: %TRUE if the position could be queried
4293  */
4294 gboolean
4295 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
4296 {
4297   GstRTSPStreamPrivate *priv;
4298   GstElement *sink;
4299   GstPad *pad = NULL;
4300
4301   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4302
4303   /* query position: if no sinks have been added yet,
4304    * we obtain the position from the pad otherwise we query the sinks */
4305
4306   priv = stream->priv;
4307
4308   g_mutex_lock (&priv->lock);
4309   /* depending on the transport type, it should query corresponding sink */
4310   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP)
4311     sink = priv->udpsink[0];
4312   else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4313     sink = priv->mcast_udpsink[0];
4314   else
4315     sink = priv->appsink[0];
4316
4317   if (sink) {
4318     gst_object_ref (sink);
4319   } else if (priv->send_src[0]) {
4320     pad = gst_object_ref (priv->send_src[0]);
4321   } else {
4322     g_mutex_unlock (&priv->lock);
4323     GST_WARNING_OBJECT (stream, "Couldn't obtain postion: erroneous pipeline");
4324     return FALSE;
4325   }
4326   g_mutex_unlock (&priv->lock);
4327
4328   if (sink) {
4329     if (!gst_element_query_position (sink , GST_FORMAT_TIME, position)) {
4330       GST_WARNING_OBJECT (stream, "Couldn't obtain postion: position query failed");
4331       gst_object_unref (sink);
4332       return FALSE;
4333     }
4334     gst_object_unref (sink);
4335   } else if (pad) {
4336     GstEvent *event;
4337     const GstSegment *segment;
4338
4339     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4340     if (!event) {
4341       GST_WARNING_OBJECT (stream, "Couldn't obtain postion: no segment event");
4342       gst_object_unref (pad);
4343       return FALSE;
4344     }
4345
4346     gst_event_parse_segment (event, &segment);
4347     if (segment->format != GST_FORMAT_TIME) {
4348       *position = -1;
4349     } else {
4350       g_mutex_lock (&priv->lock);
4351       *position = priv->position;
4352       g_mutex_unlock (&priv->lock);
4353       *position =
4354         gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *position);
4355     }
4356     gst_event_unref (event);
4357     gst_object_unref (pad);
4358   }
4359
4360   return TRUE;
4361 }
4362
4363 /**
4364  * gst_rtsp_stream_query_stop:
4365  * @stream: a #GstRTSPStream
4366  * @stop: current stop of a #GstRTSPStream
4367  *
4368  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
4369  * the RTP parts of the pipeline and not the RTCP parts.
4370  *
4371  * Returns: %TRUE if the stop could be queried
4372  */
4373 gboolean
4374 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
4375 {
4376   GstRTSPStreamPrivate *priv;
4377   GstElement *sink;
4378   GstPad *pad = NULL;
4379
4380   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4381
4382   /* query stop position: if no sinks have been added yet,
4383    * we obtain the stop position from the pad otherwise we query the sinks */
4384
4385   priv = stream->priv;
4386
4387   g_mutex_lock (&priv->lock);
4388   /* depending on the transport type, it should query corresponding sink */
4389   if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP)
4390     sink = priv->udpsink[0];
4391   else if (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)
4392     sink = priv->mcast_udpsink[0];
4393   else
4394     sink = priv->appsink[0];
4395
4396   if (sink) {
4397     gst_object_ref (sink);
4398   } else if (priv->send_src[0]) {
4399     pad = gst_object_ref (priv->send_src[0]);
4400   } else {
4401     g_mutex_unlock (&priv->lock);
4402     GST_WARNING_OBJECT (stream, "Couldn't obtain stop: erroneous pipeline");
4403     return FALSE;
4404   }
4405   g_mutex_unlock (&priv->lock);
4406
4407   if (sink) {
4408     GstQuery *query;
4409     GstFormat format;
4410
4411     query = gst_query_new_segment (GST_FORMAT_TIME);
4412     if (!gst_element_query (sink, query)) {
4413       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: element query failed");
4414       gst_query_unref (query);
4415       gst_object_unref (sink);
4416       return FALSE;
4417     }
4418     gst_query_parse_segment (query, NULL, &format, NULL, stop);
4419     if (format != GST_FORMAT_TIME)
4420       *stop = -1;
4421     gst_query_unref (query);
4422     gst_object_unref (sink);
4423   } else if (pad) {
4424     GstEvent *event;
4425     const GstSegment *segment;
4426
4427     event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
4428     if (!event) {
4429       GST_WARNING_OBJECT (stream, "Couldn't obtain stop: no segment event");
4430       gst_object_unref (pad);
4431       return FALSE;
4432     }
4433     gst_event_parse_segment (event, &segment);
4434     if (segment->format != GST_FORMAT_TIME) {
4435       *stop = -1;
4436     } else {
4437       *stop = segment->stop;
4438       if (*stop == -1)
4439         *stop = segment->duration;
4440       else
4441         *stop = gst_segment_to_stream_time (segment, GST_FORMAT_TIME, *stop);
4442     }
4443     gst_event_unref (event);
4444     gst_object_unref (pad);
4445   }
4446
4447   return TRUE;
4448 }
4449
4450 /**
4451  * gst_rtsp_stream_complete_stream:
4452  * @stream: a #GstRTSPStream
4453  * @transport: a #GstRTSPTransport
4454  *
4455  * Add a receiver and sender part to the pipeline based on the transport from
4456  * SETUP.
4457  *
4458  * Returns: %TRUE if the pipeline has been sucessfully updated.
4459  */
4460 gboolean
4461 gst_rtsp_stream_complete_stream (GstRTSPStream * stream,
4462     const GstRTSPTransport * transport)
4463 {
4464   GstRTSPStreamPrivate *priv;
4465
4466   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
4467
4468   priv = stream->priv;
4469   GST_DEBUG_OBJECT (stream, "complete stream");
4470
4471   g_mutex_lock (&priv->lock);
4472
4473   if (!(priv->protocols & transport->lower_transport))
4474     goto unallowed_transport;
4475
4476   if (!create_receiver_part (stream, transport))
4477     goto create_receiver_error;
4478
4479   /* in the RECORD case, we only add RTCP sender part */
4480   if (!create_sender_part (stream, transport))
4481     goto create_sender_error;
4482
4483   g_mutex_unlock (&priv->lock);
4484
4485   GST_DEBUG_OBJECT (stream, "pipeline sucsessfully updated");
4486   return TRUE;
4487
4488 create_receiver_error:
4489 create_sender_error:
4490 unallowed_transport:
4491   {
4492     g_mutex_unlock (&priv->lock);
4493     return FALSE;
4494   }
4495 }