7f12e138cb4e68a4acb19121fd3c21c630d4a69c
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* SRTP encoder/decoder */
83   GstElement *srtpenc;
84   GstElement *srtpdec;
85   GHashTable *keys;
86
87   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
88    * sockets */
89   GstElement *udpsrc_v4[2];
90
91   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
92    * sockets */
93   GstElement *udpsrc_v6[2];
94
95   GstElement *udpsink[2];
96
97   /* for TCP transport */
98   GstElement *appsrc[2];
99   GstElement *appqueue[2];
100   GstElement *appsink[2];
101
102   GstElement *tee[2];
103   GstElement *funnel[2];
104
105   /* server ports for sending/receiving over ipv4 */
106   GstRTSPRange server_port_v4;
107   GstRTSPAddress *server_addr_v4;
108   gboolean have_ipv4;
109
110   /* server ports for sending/receiving over ipv6 */
111   GstRTSPRange server_port_v6;
112   GstRTSPAddress *server_addr_v6;
113   gboolean have_ipv6;
114
115   /* multicast addresses */
116   GstRTSPAddressPool *pool;
117   GstRTSPAddress *addr_v4;
118   GstRTSPAddress *addr_v6;
119
120   /* the caps of the stream */
121   gulong caps_sig;
122   GstCaps *caps;
123
124   /* transports we stream to */
125   guint n_active;
126   GList *transports;
127   gboolean tr_changed;
128   GList *tr_cache;
129
130   gint dscp_qos;
131
132   /* stream blocking */
133   gulong blocked_id;
134   gboolean blocking;
135 };
136
137 #define DEFAULT_CONTROL         NULL
138 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
139 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
140                                         GST_RTSP_LOWER_TRANS_TCP
141
142 enum
143 {
144   PROP_0,
145   PROP_CONTROL,
146   PROP_PROFILES,
147   PROP_PROTOCOLS,
148   PROP_LAST
149 };
150
151 enum
152 {
153   SIGNAL_NEW_RTP_ENCODER,
154   SIGNAL_NEW_RTCP_ENCODER,
155   SIGNAL_LAST
156 };
157
158 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
159 #define GST_CAT_DEFAULT rtsp_stream_debug
160
161 static GQuark ssrc_stream_map_key;
162
163 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
164     GValue * value, GParamSpec * pspec);
165 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
166     const GValue * value, GParamSpec * pspec);
167
168 static void gst_rtsp_stream_finalize (GObject * obj);
169
170 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
171
172 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
173
174 static void
175 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
176 {
177   GObjectClass *gobject_class;
178
179   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
180
181   gobject_class = G_OBJECT_CLASS (klass);
182
183   gobject_class->get_property = gst_rtsp_stream_get_property;
184   gobject_class->set_property = gst_rtsp_stream_set_property;
185   gobject_class->finalize = gst_rtsp_stream_finalize;
186
187   g_object_class_install_property (gobject_class, PROP_CONTROL,
188       g_param_spec_string ("control", "Control",
189           "The control string for this stream", DEFAULT_CONTROL,
190           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191
192   g_object_class_install_property (gobject_class, PROP_PROFILES,
193       g_param_spec_flags ("profiles", "Profiles",
194           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
195           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
197   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
198       g_param_spec_flags ("protocols", "Protocols",
199           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
200           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201
202   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
203       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
204       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
205       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
206
207   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
208       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
209       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
210       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
211
212   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
213
214   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
215 }
216
217 static void
218 gst_rtsp_stream_init (GstRTSPStream * stream)
219 {
220   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
221
222   GST_DEBUG ("new stream %p", stream);
223
224   stream->priv = priv;
225
226   priv->dscp_qos = -1;
227   priv->control = g_strdup (DEFAULT_CONTROL);
228   priv->profiles = DEFAULT_PROFILES;
229   priv->protocols = DEFAULT_PROTOCOLS;
230
231   g_mutex_init (&priv->lock);
232
233   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
234       NULL, (GDestroyNotify) gst_caps_unref);
235 }
236
237 static void
238 gst_rtsp_stream_finalize (GObject * obj)
239 {
240   GstRTSPStream *stream;
241   GstRTSPStreamPrivate *priv;
242
243   stream = GST_RTSP_STREAM (obj);
244   priv = stream->priv;
245
246   GST_DEBUG ("finalize stream %p", stream);
247
248   /* we really need to be unjoined now */
249   g_return_if_fail (!priv->is_joined);
250
251   if (priv->addr_v4)
252     gst_rtsp_address_free (priv->addr_v4);
253   if (priv->addr_v6)
254     gst_rtsp_address_free (priv->addr_v6);
255   if (priv->server_addr_v4)
256     gst_rtsp_address_free (priv->server_addr_v4);
257   if (priv->server_addr_v6)
258     gst_rtsp_address_free (priv->server_addr_v6);
259   if (priv->pool)
260     g_object_unref (priv->pool);
261   gst_object_unref (priv->payloader);
262   gst_object_unref (priv->srcpad);
263   g_free (priv->control);
264   g_mutex_clear (&priv->lock);
265
266   g_hash_table_unref (priv->keys);
267
268   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
269 }
270
271 static void
272 gst_rtsp_stream_get_property (GObject * object, guint propid,
273     GValue * value, GParamSpec * pspec)
274 {
275   GstRTSPStream *stream = GST_RTSP_STREAM (object);
276
277   switch (propid) {
278     case PROP_CONTROL:
279       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
280       break;
281     case PROP_PROFILES:
282       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
283       break;
284     case PROP_PROTOCOLS:
285       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
286       break;
287     default:
288       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
289   }
290 }
291
292 static void
293 gst_rtsp_stream_set_property (GObject * object, guint propid,
294     const GValue * value, GParamSpec * pspec)
295 {
296   GstRTSPStream *stream = GST_RTSP_STREAM (object);
297
298   switch (propid) {
299     case PROP_CONTROL:
300       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
301       break;
302     case PROP_PROFILES:
303       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
304       break;
305     case PROP_PROTOCOLS:
306       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
307       break;
308     default:
309       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
310   }
311 }
312
313 /**
314  * gst_rtsp_stream_new:
315  * @idx: an index
316  * @srcpad: a #GstPad
317  * @payloader: a #GstElement
318  *
319  * Create a new media stream with index @idx that handles RTP data on
320  * @srcpad and has a payloader element @payloader.
321  *
322  * Returns: (transfer full): a new #GstRTSPStream
323  */
324 GstRTSPStream *
325 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
326 {
327   GstRTSPStreamPrivate *priv;
328   GstRTSPStream *stream;
329
330   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
331   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
332   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
333
334   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
335   priv = stream->priv;
336   priv->idx = idx;
337   priv->payloader = gst_object_ref (payloader);
338   priv->srcpad = gst_object_ref (srcpad);
339
340   return stream;
341 }
342
343 /**
344  * gst_rtsp_stream_get_index:
345  * @stream: a #GstRTSPStream
346  *
347  * Get the stream index.
348  *
349  * Return: the stream index.
350  */
351 guint
352 gst_rtsp_stream_get_index (GstRTSPStream * stream)
353 {
354   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
355
356   return stream->priv->idx;
357 }
358
359 /**
360  * gst_rtsp_stream_get_pt:
361  * @stream: a #GstRTSPStream
362  *
363  * Get the stream payload type.
364  *
365  * Return: the stream payload type.
366  */
367 guint
368 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
369 {
370   GstRTSPStreamPrivate *priv;
371   guint pt;
372
373   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
374
375   priv = stream->priv;
376
377   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
378
379   return pt;
380 }
381
382 /**
383  * gst_rtsp_stream_get_srcpad:
384  * @stream: a #GstRTSPStream
385  *
386  * Get the srcpad associated with @stream.
387  *
388  * Returns: (transfer full): the srcpad. Unref after usage.
389  */
390 GstPad *
391 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
392 {
393   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
394
395   return gst_object_ref (stream->priv->srcpad);
396 }
397
398 /**
399  * gst_rtsp_stream_get_control:
400  * @stream: a #GstRTSPStream
401  *
402  * Get the control string to identify this stream.
403  *
404  * Returns: (transfer full): the control string. g_free() after usage.
405  */
406 gchar *
407 gst_rtsp_stream_get_control (GstRTSPStream * stream)
408 {
409   GstRTSPStreamPrivate *priv;
410   gchar *result;
411
412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
413
414   priv = stream->priv;
415
416   g_mutex_lock (&priv->lock);
417   if ((result = g_strdup (priv->control)) == NULL)
418     result = g_strdup_printf ("stream=%u", priv->idx);
419   g_mutex_unlock (&priv->lock);
420
421   return result;
422 }
423
424 /**
425  * gst_rtsp_stream_set_control:
426  * @stream: a #GstRTSPStream
427  * @control: a control string
428  *
429  * Set the control string in @stream.
430  */
431 void
432 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
433 {
434   GstRTSPStreamPrivate *priv;
435
436   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
437
438   priv = stream->priv;
439
440   g_mutex_lock (&priv->lock);
441   g_free (priv->control);
442   priv->control = g_strdup (control);
443   g_mutex_unlock (&priv->lock);
444 }
445
446 /**
447  * gst_rtsp_stream_has_control:
448  * @stream: a #GstRTSPStream
449  * @control: a control string
450  *
451  * Check if @stream has the control string @control.
452  *
453  * Returns: %TRUE is @stream has @control as the control string
454  */
455 gboolean
456 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
457 {
458   GstRTSPStreamPrivate *priv;
459   gboolean res;
460
461   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
462
463   priv = stream->priv;
464
465   g_mutex_lock (&priv->lock);
466   if (priv->control)
467     res = (g_strcmp0 (priv->control, control) == 0);
468   else {
469     guint streamid;
470
471     if (sscanf (control, "stream=%u", &streamid) > 0)
472       res = (streamid == priv->idx);
473     else
474       res = FALSE;
475   }
476   g_mutex_unlock (&priv->lock);
477
478   return res;
479 }
480
481 /**
482  * gst_rtsp_stream_set_mtu:
483  * @stream: a #GstRTSPStream
484  * @mtu: a new MTU
485  *
486  * Configure the mtu in the payloader of @stream to @mtu.
487  */
488 void
489 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
490 {
491   GstRTSPStreamPrivate *priv;
492
493   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
494
495   priv = stream->priv;
496
497   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
498
499   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
500 }
501
502 /**
503  * gst_rtsp_stream_get_mtu:
504  * @stream: a #GstRTSPStream
505  *
506  * Get the configured MTU in the payloader of @stream.
507  *
508  * Returns: the MTU of the payloader.
509  */
510 guint
511 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
512 {
513   GstRTSPStreamPrivate *priv;
514   guint mtu;
515
516   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
517
518   priv = stream->priv;
519
520   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
521
522   return mtu;
523 }
524
525 /* Update the dscp qos property on the udp sinks */
526 static void
527 update_dscp_qos (GstRTSPStream * stream)
528 {
529   GstRTSPStreamPrivate *priv;
530
531   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
532
533   priv = stream->priv;
534
535   if (priv->udpsink[0]) {
536     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
537         NULL);
538   }
539
540   if (priv->udpsink[1]) {
541     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
542         NULL);
543   }
544 }
545
546 /**
547  * gst_rtsp_stream_set_dscp_qos:
548  * @stream: a #GstRTSPStream
549  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
550  *
551  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
552  */
553 void
554 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
555 {
556   GstRTSPStreamPrivate *priv;
557
558   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
559
560   priv = stream->priv;
561
562   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
563
564   if (dscp_qos < -1 || dscp_qos > 63) {
565     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
566     return;
567   }
568
569   priv->dscp_qos = dscp_qos;
570
571   update_dscp_qos (stream);
572 }
573
574 /**
575  * gst_rtsp_stream_get_dscp_qos:
576  * @stream: a #GstRTSPStream
577  *
578  * Get the configured DSCP QoS in of the outgoing sockets.
579  *
580  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
581  */
582 gint
583 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
584 {
585   GstRTSPStreamPrivate *priv;
586
587   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
588
589   priv = stream->priv;
590
591   return priv->dscp_qos;
592 }
593
594 /**
595  * gst_rtsp_stream_is_transport_supported:
596  * @stream: a #GstRTSPStream
597  * @transport: (transfer none): a #GstRTSPTransport
598  *
599  * Check if @transport can be handled by stream
600  *
601  * Returns: %TRUE if @transport can be handled by @stream.
602  */
603 gboolean
604 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
605     GstRTSPTransport * transport)
606 {
607   GstRTSPStreamPrivate *priv;
608
609   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
610
611   priv = stream->priv;
612
613   g_mutex_lock (&priv->lock);
614   if (transport->trans != GST_RTSP_TRANS_RTP)
615     goto unsupported_transmode;
616
617   if (!(transport->profile & priv->profiles))
618     goto unsupported_profile;
619
620   if (!(transport->lower_transport & priv->protocols))
621     goto unsupported_ltrans;
622
623   g_mutex_unlock (&priv->lock);
624
625   return TRUE;
626
627   /* ERRORS */
628 unsupported_transmode:
629   {
630     GST_DEBUG ("unsupported transport mode %d", transport->trans);
631     g_mutex_unlock (&priv->lock);
632     return FALSE;
633   }
634 unsupported_profile:
635   {
636     GST_DEBUG ("unsupported profile %d", transport->profile);
637     g_mutex_unlock (&priv->lock);
638     return FALSE;
639   }
640 unsupported_ltrans:
641   {
642     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
643     g_mutex_unlock (&priv->lock);
644     return FALSE;
645   }
646 }
647
648 /**
649  * gst_rtsp_stream_set_profiles:
650  * @stream: a #GstRTSPStream
651  * @profiles: the new profiles
652  *
653  * Configure the allowed profiles for @stream.
654  */
655 void
656 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
657 {
658   GstRTSPStreamPrivate *priv;
659
660   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
661
662   priv = stream->priv;
663
664   g_mutex_lock (&priv->lock);
665   priv->profiles = profiles;
666   g_mutex_unlock (&priv->lock);
667 }
668
669 /**
670  * gst_rtsp_stream_get_profiles:
671  * @stream: a #GstRTSPStream
672  *
673  * Get the allowed profiles of @stream.
674  *
675  * Returns: a #GstRTSPProfile
676  */
677 GstRTSPProfile
678 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
679 {
680   GstRTSPStreamPrivate *priv;
681   GstRTSPProfile res;
682
683   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
684
685   priv = stream->priv;
686
687   g_mutex_lock (&priv->lock);
688   res = priv->profiles;
689   g_mutex_unlock (&priv->lock);
690
691   return res;
692 }
693
694 /**
695  * gst_rtsp_stream_set_protocols:
696  * @stream: a #GstRTSPStream
697  * @protocols: the new flags
698  *
699  * Configure the allowed lower transport for @stream.
700  */
701 void
702 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
703     GstRTSPLowerTrans protocols)
704 {
705   GstRTSPStreamPrivate *priv;
706
707   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
708
709   priv = stream->priv;
710
711   g_mutex_lock (&priv->lock);
712   priv->protocols = protocols;
713   g_mutex_unlock (&priv->lock);
714 }
715
716 /**
717  * gst_rtsp_stream_get_protocols:
718  * @stream: a #GstRTSPStream
719  *
720  * Get the allowed protocols of @stream.
721  *
722  * Returns: a #GstRTSPLowerTrans
723  */
724 GstRTSPLowerTrans
725 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
726 {
727   GstRTSPStreamPrivate *priv;
728   GstRTSPLowerTrans res;
729
730   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
731       GST_RTSP_LOWER_TRANS_UNKNOWN);
732
733   priv = stream->priv;
734
735   g_mutex_lock (&priv->lock);
736   res = priv->protocols;
737   g_mutex_unlock (&priv->lock);
738
739   return res;
740 }
741
742 /**
743  * gst_rtsp_stream_set_address_pool:
744  * @stream: a #GstRTSPStream
745  * @pool: (transfer none): a #GstRTSPAddressPool
746  *
747  * configure @pool to be used as the address pool of @stream.
748  */
749 void
750 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
751     GstRTSPAddressPool * pool)
752 {
753   GstRTSPStreamPrivate *priv;
754   GstRTSPAddressPool *old;
755
756   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
757
758   priv = stream->priv;
759
760   GST_LOG_OBJECT (stream, "set address pool %p", pool);
761
762   g_mutex_lock (&priv->lock);
763   if ((old = priv->pool) != pool)
764     priv->pool = pool ? g_object_ref (pool) : NULL;
765   else
766     old = NULL;
767   g_mutex_unlock (&priv->lock);
768
769   if (old)
770     g_object_unref (old);
771 }
772
773 /**
774  * gst_rtsp_stream_get_address_pool:
775  * @stream: a #GstRTSPStream
776  *
777  * Get the #GstRTSPAddressPool used as the address pool of @stream.
778  *
779  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
780  * usage.
781  */
782 GstRTSPAddressPool *
783 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
784 {
785   GstRTSPStreamPrivate *priv;
786   GstRTSPAddressPool *result;
787
788   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
789
790   priv = stream->priv;
791
792   g_mutex_lock (&priv->lock);
793   if ((result = priv->pool))
794     g_object_ref (result);
795   g_mutex_unlock (&priv->lock);
796
797   return result;
798 }
799
800 /**
801  * gst_rtsp_stream_get_multicast_address:
802  * @stream: a #GstRTSPStream
803  * @family: the #GSocketFamily
804  *
805  * Get the multicast address of @stream for @family.
806  *
807  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
808  * or %NULL when no address could be allocated. gst_rtsp_address_free()
809  * after usage.
810  */
811 GstRTSPAddress *
812 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
813     GSocketFamily family)
814 {
815   GstRTSPStreamPrivate *priv;
816   GstRTSPAddress *result;
817   GstRTSPAddress **addrp;
818   GstRTSPAddressFlags flags;
819
820   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
821
822   priv = stream->priv;
823
824   if (family == G_SOCKET_FAMILY_IPV6) {
825     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
826     addrp = &priv->addr_v6;
827   } else {
828     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
829     addrp = &priv->addr_v4;
830   }
831
832   g_mutex_lock (&priv->lock);
833   if (*addrp == NULL) {
834     if (priv->pool == NULL)
835       goto no_pool;
836
837     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
838
839     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
840     if (*addrp == NULL)
841       goto no_address;
842   }
843   result = gst_rtsp_address_copy (*addrp);
844   g_mutex_unlock (&priv->lock);
845
846   return result;
847
848   /* ERRORS */
849 no_pool:
850   {
851     GST_ERROR_OBJECT (stream, "no address pool specified");
852     g_mutex_unlock (&priv->lock);
853     return NULL;
854   }
855 no_address:
856   {
857     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
858     g_mutex_unlock (&priv->lock);
859     return NULL;
860   }
861 }
862
863 /**
864  * gst_rtsp_stream_reserve_address:
865  * @stream: a #GstRTSPStream
866  * @address: an address
867  * @port: a port
868  * @n_ports: n_ports
869  * @ttl: a TTL
870  *
871  * Reserve @address and @port as the address and port of @stream.
872  *
873  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
874  * the address could be reserved. gst_rtsp_address_free() after usage.
875  */
876 GstRTSPAddress *
877 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
878     const gchar * address, guint port, guint n_ports, guint ttl)
879 {
880   GstRTSPStreamPrivate *priv;
881   GstRTSPAddress *result;
882   GInetAddress *addr;
883   GSocketFamily family;
884   GstRTSPAddress **addrp;
885
886   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
887   g_return_val_if_fail (address != NULL, NULL);
888   g_return_val_if_fail (port > 0, NULL);
889   g_return_val_if_fail (n_ports > 0, NULL);
890   g_return_val_if_fail (ttl > 0, NULL);
891
892   priv = stream->priv;
893
894   addr = g_inet_address_new_from_string (address);
895   if (!addr) {
896     GST_ERROR ("failed to get inet addr from %s", address);
897     family = G_SOCKET_FAMILY_IPV4;
898   } else {
899     family = g_inet_address_get_family (addr);
900     g_object_unref (addr);
901   }
902
903   if (family == G_SOCKET_FAMILY_IPV6)
904     addrp = &priv->addr_v6;
905   else
906     addrp = &priv->addr_v4;
907
908   g_mutex_lock (&priv->lock);
909   if (*addrp == NULL) {
910     GstRTSPAddressPoolResult res;
911
912     if (priv->pool == NULL)
913       goto no_pool;
914
915     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
916         port, n_ports, ttl, addrp);
917     if (res != GST_RTSP_ADDRESS_POOL_OK)
918       goto no_address;
919   } else {
920     if (strcmp ((*addrp)->address, address) ||
921         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
922         (*addrp)->ttl != ttl)
923       goto different_address;
924   }
925   result = gst_rtsp_address_copy (*addrp);
926   g_mutex_unlock (&priv->lock);
927
928   return result;
929
930   /* ERRORS */
931 no_pool:
932   {
933     GST_ERROR_OBJECT (stream, "no address pool specified");
934     g_mutex_unlock (&priv->lock);
935     return NULL;
936   }
937 no_address:
938   {
939     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
940         address);
941     g_mutex_unlock (&priv->lock);
942     return NULL;
943   }
944 different_address:
945   {
946     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
947         " reserved", address);
948     g_mutex_unlock (&priv->lock);
949     return NULL;
950   }
951 }
952
953 static gboolean
954 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
955     GSocketFamily family, GstElement * udpsrc_out[2],
956     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
957     GstRTSPAddress ** server_addr_out)
958 {
959   GstStateChangeReturn ret;
960   GstElement *udpsrc0, *udpsrc1;
961   GstElement *udpsink0, *udpsink1;
962   GSocket *rtp_socket = NULL;
963   GSocket *rtcp_socket;
964   gint tmp_rtp, tmp_rtcp;
965   guint count;
966   gint rtpport, rtcpport;
967   GList *rejected_addresses = NULL;
968   GstRTSPAddress *addr = NULL;
969   GInetAddress *inetaddr = NULL;
970   GSocketAddress *rtp_sockaddr = NULL;
971   GSocketAddress *rtcp_sockaddr = NULL;
972   const gchar *multisink_socket;
973
974   if (family == G_SOCKET_FAMILY_IPV6)
975     multisink_socket = "socket-v6";
976   else
977     multisink_socket = "socket";
978
979   udpsrc0 = NULL;
980   udpsrc1 = NULL;
981   udpsink0 = NULL;
982   udpsink1 = NULL;
983   count = 0;
984
985   /* Start with random port */
986   tmp_rtp = 0;
987
988   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
989       G_SOCKET_PROTOCOL_UDP, NULL);
990   if (!rtcp_socket)
991     goto no_udp_protocol;
992
993   if (*server_addr_out)
994     gst_rtsp_address_free (*server_addr_out);
995
996   /* try to allocate 2 UDP ports, the RTP port should be an even
997    * number and the RTCP port should be the next (uneven) port */
998 again:
999
1000   if (rtp_socket == NULL) {
1001     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1002         G_SOCKET_PROTOCOL_UDP, NULL);
1003     if (!rtp_socket)
1004       goto no_udp_protocol;
1005   }
1006
1007   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1008     GstRTSPAddressFlags flags;
1009
1010     if (addr)
1011       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1012
1013     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1014     if (family == G_SOCKET_FAMILY_IPV6)
1015       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1016     else
1017       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1018
1019     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1020
1021     if (addr == NULL)
1022       goto no_ports;
1023
1024     tmp_rtp = addr->port;
1025
1026     g_clear_object (&inetaddr);
1027     inetaddr = g_inet_address_new_from_string (addr->address);
1028   } else {
1029     if (tmp_rtp != 0) {
1030       tmp_rtp += 2;
1031       if (++count > 20)
1032         goto no_ports;
1033     }
1034
1035     if (inetaddr == NULL)
1036       inetaddr = g_inet_address_new_any (family);
1037   }
1038
1039   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1040   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1041     g_object_unref (rtp_sockaddr);
1042     goto again;
1043   }
1044   g_object_unref (rtp_sockaddr);
1045
1046   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1047   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1048     g_clear_object (&rtp_sockaddr);
1049     goto socket_error;
1050   }
1051
1052   tmp_rtp =
1053       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1054   g_object_unref (rtp_sockaddr);
1055
1056   /* check if port is even */
1057   if ((tmp_rtp & 1) != 0) {
1058     /* port not even, close and allocate another */
1059     tmp_rtp++;
1060     g_clear_object (&rtp_socket);
1061     goto again;
1062   }
1063
1064   /* set port */
1065   tmp_rtcp = tmp_rtp + 1;
1066
1067   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1068   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1069     g_object_unref (rtcp_sockaddr);
1070     g_clear_object (&rtp_socket);
1071     goto again;
1072   }
1073   g_object_unref (rtcp_sockaddr);
1074
1075   g_clear_object (&inetaddr);
1076
1077   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1078   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1079
1080   if (udpsrc0 == NULL || udpsrc1 == NULL)
1081     goto no_udp_protocol;
1082
1083   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1084   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1085
1086   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1087   if (ret == GST_STATE_CHANGE_FAILURE)
1088     goto element_error;
1089   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1090   if (ret == GST_STATE_CHANGE_FAILURE)
1091     goto element_error;
1092
1093   /* all fine, do port check */
1094   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1095   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1096
1097   /* this should not happen... */
1098   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1099     goto port_error;
1100
1101   if (udpsink_out[0])
1102     udpsink0 = udpsink_out[0];
1103   else
1104     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1105
1106   if (!udpsink0)
1107     goto no_udp_protocol;
1108
1109   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1110   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1111
1112   if (udpsink_out[1])
1113     udpsink1 = udpsink_out[1];
1114   else
1115     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1116
1117   if (!udpsink1)
1118     goto no_udp_protocol;
1119
1120   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1121   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1122   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1123
1124   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1125   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1126   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1127   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1128   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1129   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1130   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1131   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1132
1133   /* we keep these elements, we will further configure them when the
1134    * client told us to really use the UDP ports. */
1135   udpsrc_out[0] = udpsrc0;
1136   udpsrc_out[1] = udpsrc1;
1137   udpsink_out[0] = udpsink0;
1138   udpsink_out[1] = udpsink1;
1139   server_port_out->min = rtpport;
1140   server_port_out->max = rtcpport;
1141
1142   *server_addr_out = addr;
1143   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1144
1145   g_object_unref (rtp_socket);
1146   g_object_unref (rtcp_socket);
1147
1148   return TRUE;
1149
1150   /* ERRORS */
1151 no_udp_protocol:
1152   {
1153     goto cleanup;
1154   }
1155 no_ports:
1156   {
1157     goto cleanup;
1158   }
1159 port_error:
1160   {
1161     goto cleanup;
1162   }
1163 socket_error:
1164   {
1165     goto cleanup;
1166   }
1167 element_error:
1168   {
1169     goto cleanup;
1170   }
1171 cleanup:
1172   {
1173     if (udpsrc0) {
1174       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1175       gst_object_unref (udpsrc0);
1176     }
1177     if (udpsrc1) {
1178       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1179       gst_object_unref (udpsrc1);
1180     }
1181     if (udpsink0) {
1182       gst_element_set_state (udpsink0, GST_STATE_NULL);
1183       gst_object_unref (udpsink0);
1184     }
1185     if (inetaddr)
1186       g_object_unref (inetaddr);
1187     g_list_free_full (rejected_addresses,
1188         (GDestroyNotify) gst_rtsp_address_free);
1189     if (addr)
1190       gst_rtsp_address_free (addr);
1191     if (rtp_socket)
1192       g_object_unref (rtp_socket);
1193     if (rtcp_socket)
1194       g_object_unref (rtcp_socket);
1195     return FALSE;
1196   }
1197 }
1198
1199 /* must be called with lock */
1200 static gboolean
1201 alloc_ports (GstRTSPStream * stream)
1202 {
1203   GstRTSPStreamPrivate *priv = stream->priv;
1204
1205   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1206       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1207       &priv->server_port_v4, &priv->server_addr_v4);
1208
1209   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1210       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1211       &priv->server_port_v6, &priv->server_addr_v6);
1212
1213   return priv->have_ipv4 || priv->have_ipv6;
1214 }
1215
1216 /**
1217  * gst_rtsp_stream_get_server_port:
1218  * @stream: a #GstRTSPStream
1219  * @server_port: (out): result server port
1220  * @family: the port family to get
1221  *
1222  * Fill @server_port with the port pair used by the server. This function can
1223  * only be called when @stream has been joined.
1224  */
1225 void
1226 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1227     GstRTSPRange * server_port, GSocketFamily family)
1228 {
1229   GstRTSPStreamPrivate *priv;
1230
1231   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1232   priv = stream->priv;
1233   g_return_if_fail (priv->is_joined);
1234
1235   g_mutex_lock (&priv->lock);
1236   if (family == G_SOCKET_FAMILY_IPV4) {
1237     if (server_port)
1238       *server_port = priv->server_port_v4;
1239   } else {
1240     if (server_port)
1241       *server_port = priv->server_port_v6;
1242   }
1243   g_mutex_unlock (&priv->lock);
1244 }
1245
1246 /**
1247  * gst_rtsp_stream_get_rtpsession:
1248  * @stream: a #GstRTSPStream
1249  *
1250  * Get the RTP session of this stream.
1251  *
1252  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1253  */
1254 GObject *
1255 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1256 {
1257   GstRTSPStreamPrivate *priv;
1258   GObject *session;
1259
1260   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1261
1262   priv = stream->priv;
1263
1264   g_mutex_lock (&priv->lock);
1265   if ((session = priv->session))
1266     g_object_ref (session);
1267   g_mutex_unlock (&priv->lock);
1268
1269   return session;
1270 }
1271
1272 /**
1273  * gst_rtsp_stream_get_ssrc:
1274  * @stream: a #GstRTSPStream
1275  * @ssrc: (out): result ssrc
1276  *
1277  * Get the SSRC used by the RTP session of this stream. This function can only
1278  * be called when @stream has been joined.
1279  */
1280 void
1281 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1282 {
1283   GstRTSPStreamPrivate *priv;
1284
1285   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1286   priv = stream->priv;
1287   g_return_if_fail (priv->is_joined);
1288
1289   g_mutex_lock (&priv->lock);
1290   if (ssrc && priv->session)
1291     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1292   g_mutex_unlock (&priv->lock);
1293 }
1294
1295 /* executed from streaming thread */
1296 static void
1297 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1298 {
1299   GstRTSPStreamPrivate *priv = stream->priv;
1300   GstCaps *newcaps, *oldcaps;
1301
1302   newcaps = gst_pad_get_current_caps (pad);
1303
1304   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1305       newcaps);
1306
1307   g_mutex_lock (&priv->lock);
1308   oldcaps = priv->caps;
1309   priv->caps = newcaps;
1310   g_mutex_unlock (&priv->lock);
1311
1312   if (oldcaps)
1313     gst_caps_unref (oldcaps);
1314 }
1315
1316 static void
1317 dump_structure (const GstStructure * s)
1318 {
1319   gchar *sstr;
1320
1321   sstr = gst_structure_to_string (s);
1322   GST_INFO ("structure: %s", sstr);
1323   g_free (sstr);
1324 }
1325
1326 static GstRTSPStreamTransport *
1327 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1328 {
1329   GstRTSPStreamPrivate *priv = stream->priv;
1330   GList *walk;
1331   GstRTSPStreamTransport *result = NULL;
1332   const gchar *tmp;
1333   gchar *dest;
1334   guint port;
1335
1336   if (rtcp_from == NULL)
1337     return NULL;
1338
1339   tmp = g_strrstr (rtcp_from, ":");
1340   if (tmp == NULL)
1341     return NULL;
1342
1343   port = atoi (tmp + 1);
1344   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1345
1346   g_mutex_lock (&priv->lock);
1347   GST_INFO ("finding %s:%d in %d transports", dest, port,
1348       g_list_length (priv->transports));
1349
1350   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1351     GstRTSPStreamTransport *trans = walk->data;
1352     const GstRTSPTransport *tr;
1353     gint min, max;
1354
1355     tr = gst_rtsp_stream_transport_get_transport (trans);
1356
1357     min = tr->client_port.min;
1358     max = tr->client_port.max;
1359
1360     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1361       result = trans;
1362       break;
1363     }
1364   }
1365   if (result)
1366     g_object_ref (result);
1367   g_mutex_unlock (&priv->lock);
1368
1369   g_free (dest);
1370
1371   return result;
1372 }
1373
1374 static GstRTSPStreamTransport *
1375 check_transport (GObject * source, GstRTSPStream * stream)
1376 {
1377   GstStructure *stats;
1378   GstRTSPStreamTransport *trans;
1379
1380   /* see if we have a stream to match with the origin of the RTCP packet */
1381   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1382   if (trans == NULL) {
1383     g_object_get (source, "stats", &stats, NULL);
1384     if (stats) {
1385       const gchar *rtcp_from;
1386
1387       dump_structure (stats);
1388
1389       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1390       if ((trans = find_transport (stream, rtcp_from))) {
1391         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1392             source);
1393         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1394             g_object_unref);
1395       }
1396       gst_structure_free (stats);
1397     }
1398   }
1399   return trans;
1400 }
1401
1402
1403 static void
1404 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1405 {
1406   GstRTSPStreamTransport *trans;
1407
1408   GST_INFO ("%p: new source %p", stream, source);
1409
1410   trans = check_transport (source, stream);
1411
1412   if (trans)
1413     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1414 }
1415
1416 static void
1417 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1418 {
1419   GST_INFO ("%p: new SDES %p", stream, source);
1420 }
1421
1422 static void
1423 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1424 {
1425   GstRTSPStreamTransport *trans;
1426
1427   trans = check_transport (source, stream);
1428
1429   if (trans) {
1430     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1431     gst_rtsp_stream_transport_keep_alive (trans);
1432   }
1433 #ifdef DUMP_STATS
1434   {
1435     GstStructure *stats;
1436     g_object_get (source, "stats", &stats, NULL);
1437     if (stats) {
1438       dump_structure (stats);
1439       gst_structure_free (stats);
1440     }
1441   }
1442 #endif
1443 }
1444
1445 static void
1446 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1447 {
1448   GST_INFO ("%p: source %p bye", stream, source);
1449 }
1450
1451 static void
1452 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1453 {
1454   GstRTSPStreamTransport *trans;
1455
1456   GST_INFO ("%p: source %p bye timeout", stream, source);
1457
1458   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1459     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1460     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1461   }
1462 }
1463
1464 static void
1465 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1466 {
1467   GstRTSPStreamTransport *trans;
1468
1469   GST_INFO ("%p: source %p timeout", stream, source);
1470
1471   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1472     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1473     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1474   }
1475 }
1476
1477 static void
1478 clear_tr_cache (GstRTSPStreamPrivate * priv)
1479 {
1480   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1481   g_list_free (priv->tr_cache);
1482   priv->tr_cache = NULL;
1483 }
1484
1485 static GstFlowReturn
1486 handle_new_sample (GstAppSink * sink, gpointer user_data)
1487 {
1488   GstRTSPStreamPrivate *priv;
1489   GList *walk;
1490   GstSample *sample;
1491   GstBuffer *buffer;
1492   GstRTSPStream *stream;
1493   gboolean is_rtp;
1494
1495   sample = gst_app_sink_pull_sample (sink);
1496   if (!sample)
1497     return GST_FLOW_OK;
1498
1499   stream = (GstRTSPStream *) user_data;
1500   priv = stream->priv;
1501   buffer = gst_sample_get_buffer (sample);
1502
1503   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1504
1505   g_mutex_lock (&priv->lock);
1506   if (priv->tr_changed) {
1507     clear_tr_cache (priv);
1508     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1509       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1510       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1511     }
1512     priv->tr_changed = FALSE;
1513   }
1514   g_mutex_unlock (&priv->lock);
1515
1516   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1517     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1518
1519     if (is_rtp) {
1520       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1521     } else {
1522       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1523     }
1524   }
1525   gst_sample_unref (sample);
1526
1527   return GST_FLOW_OK;
1528 }
1529
1530 static GstAppSinkCallbacks sink_cb = {
1531   NULL,                         /* not interested in EOS */
1532   NULL,                         /* not interested in preroll samples */
1533   handle_new_sample,
1534 };
1535
1536 static GstElement *
1537 get_rtp_encoder (GstRTSPStream * stream, guint session)
1538 {
1539   GstRTSPStreamPrivate *priv = stream->priv;
1540
1541   if (priv->srtpenc == NULL) {
1542     gchar *name;
1543
1544     name = g_strdup_printf ("srtpenc_%u", session);
1545     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1546     g_free (name);
1547
1548     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1549   }
1550   return gst_object_ref (priv->srtpenc);
1551 }
1552
1553 static GstElement *
1554 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1555 {
1556   GstRTSPStreamPrivate *priv = stream->priv;
1557   GstElement *oldenc, *enc;
1558   GstPad *pad;
1559   gchar *name;
1560
1561   if (priv->idx != session)
1562     return NULL;
1563
1564   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1565
1566   oldenc = priv->srtpenc;
1567   enc = get_rtp_encoder (stream, session);
1568   name = g_strdup_printf ("rtp_sink_%d", session);
1569   pad = gst_element_get_request_pad (enc, name);
1570   g_free (name);
1571   gst_object_unref (pad);
1572
1573   if (oldenc == NULL)
1574     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1575         enc);
1576
1577   return enc;
1578 }
1579
1580 static GstElement *
1581 request_rtcp_encoder (GstElement * rtpbin, guint session,
1582     GstRTSPStream * stream)
1583 {
1584   GstRTSPStreamPrivate *priv = stream->priv;
1585   GstElement *oldenc, *enc;
1586   GstPad *pad;
1587   gchar *name;
1588
1589   if (priv->idx != session)
1590     return NULL;
1591
1592   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1593
1594   oldenc = priv->srtpenc;
1595   enc = get_rtp_encoder (stream, session);
1596   name = g_strdup_printf ("rtcp_sink_%d", session);
1597   pad = gst_element_get_request_pad (enc, name);
1598   g_free (name);
1599   gst_object_unref (pad);
1600
1601   if (oldenc == NULL)
1602     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1603         enc);
1604
1605   return enc;
1606 }
1607
1608 static GstCaps *
1609 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1610 {
1611   GstRTSPStreamPrivate *priv = stream->priv;
1612   GstCaps *caps;
1613
1614   GST_DEBUG ("request key %08x", ssrc);
1615
1616   g_mutex_lock (&priv->lock);
1617   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1618     gst_caps_ref (caps);
1619   g_mutex_unlock (&priv->lock);
1620
1621   return caps;
1622 }
1623
1624 static GstElement *
1625 request_rtcp_decoder (GstElement * rtpbin, guint session,
1626     GstRTSPStream * stream)
1627 {
1628   GstRTSPStreamPrivate *priv = stream->priv;
1629
1630   if (priv->idx != session)
1631     return NULL;
1632
1633   if (priv->srtpdec == NULL) {
1634     gchar *name;
1635
1636     name = g_strdup_printf ("srtpdec_%u", session);
1637     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1638     g_free (name);
1639
1640     g_signal_connect (priv->srtpdec, "request-key",
1641         (GCallback) request_key, stream);
1642   }
1643   return gst_object_ref (priv->srtpdec);
1644 }
1645
1646 /**
1647  * gst_rtsp_stream_join_bin:
1648  * @stream: a #GstRTSPStream
1649  * @bin: (transfer none): a #GstBin to join
1650  * @rtpbin: (transfer none): a rtpbin element in @bin
1651  * @state: the target state of the new elements
1652  *
1653  * Join the #GstBin @bin that contains the element @rtpbin.
1654  *
1655  * @stream will link to @rtpbin, which must be inside @bin. The elements
1656  * added to @bin will be set to the state given in @state.
1657  *
1658  * Returns: %TRUE on success.
1659  */
1660 gboolean
1661 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1662     GstElement * rtpbin, GstState state)
1663 {
1664   GstRTSPStreamPrivate *priv;
1665   gint i;
1666   guint idx;
1667   gchar *name;
1668   GstPad *pad, *sinkpad, *selpad;
1669   GstPadLinkReturn ret;
1670
1671   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1672   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1673   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1674
1675   priv = stream->priv;
1676
1677   g_mutex_lock (&priv->lock);
1678   if (priv->is_joined)
1679     goto was_joined;
1680
1681   /* create a session with the same index as the stream */
1682   idx = priv->idx;
1683
1684   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1685
1686   if (!alloc_ports (stream))
1687     goto no_ports;
1688
1689   /* update the dscp qos field in the sinks */
1690   update_dscp_qos (stream);
1691
1692   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1693       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1694     /* For SRTP */
1695     g_signal_connect (rtpbin, "request-rtp-encoder",
1696         (GCallback) request_rtp_encoder, stream);
1697     g_signal_connect (rtpbin, "request-rtcp-encoder",
1698         (GCallback) request_rtcp_encoder, stream);
1699     g_signal_connect (rtpbin, "request-rtcp-decoder",
1700         (GCallback) request_rtcp_decoder, stream);
1701   }
1702
1703   /* get a pad for sending RTP */
1704   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1705   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1706   g_free (name);
1707   /* link the RTP pad to the session manager, it should not really fail unless
1708    * this is not really an RTP pad */
1709   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1710   if (ret != GST_PAD_LINK_OK)
1711     goto link_failed;
1712
1713   /* get pads from the RTP session element for sending and receiving
1714    * RTP/RTCP*/
1715   name = g_strdup_printf ("send_rtp_src_%u", idx);
1716   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1717   g_free (name);
1718   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1719   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1720   g_free (name);
1721   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1722   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1723   g_free (name);
1724   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1725   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1726   g_free (name);
1727
1728   /* get the session */
1729   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1730
1731   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1732       stream);
1733   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1734       stream);
1735   g_signal_connect (priv->session, "on-ssrc-active",
1736       (GCallback) on_ssrc_active, stream);
1737   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1738       stream);
1739   g_signal_connect (priv->session, "on-bye-timeout",
1740       (GCallback) on_bye_timeout, stream);
1741   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1742       stream);
1743
1744   for (i = 0; i < 2; i++) {
1745     GstPad *teepad, *queuepad;
1746     /* For the sender we create this bit of pipeline for both
1747      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1748      * we need to add a queue before appsink to make the pipeline
1749      * not block. For the TCP case, we want to pump data to the
1750      * client as fast as possible anyway.
1751      *
1752      * .--------.      .-----.    .---------.
1753      * | rtpbin |      | tee |    | udpsink |
1754      * |       send->sink   src->sink       |
1755      * '--------'      |     |    '---------'
1756      *                 |     |    .---------.    .---------.
1757      *                 |     |    |  queue  |    | appsink |
1758      *                 |    src->sink      src->sink       |
1759      *                 '-----'    '---------'    '---------'
1760      *
1761      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1762      * udpsink directly to the session.
1763      */
1764     /* add udpsink */
1765     gst_bin_add (bin, priv->udpsink[i]);
1766     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1767
1768     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1769       /* make tee for RTP/RTCP */
1770       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1771       gst_bin_add (bin, priv->tee[i]);
1772
1773       /* and link to rtpbin send pad */
1774       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1775       gst_pad_link (priv->send_src[i], pad);
1776       gst_object_unref (pad);
1777
1778       /* link tee to udpsink */
1779       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1780       gst_pad_link (teepad, sinkpad);
1781       gst_object_unref (teepad);
1782
1783       /* make queue */
1784       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1785       gst_bin_add (bin, priv->appqueue[i]);
1786       /* and link to tee */
1787       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1788       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1789       gst_pad_link (teepad, pad);
1790       gst_object_unref (pad);
1791       gst_object_unref (teepad);
1792
1793       /* make appsink */
1794       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1795       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1796       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1797       gst_bin_add (bin, priv->appsink[i]);
1798       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1799           &sink_cb, stream, NULL);
1800       /* and link to queue */
1801       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1802       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1803       gst_pad_link (queuepad, pad);
1804       gst_object_unref (pad);
1805       gst_object_unref (queuepad);
1806     } else {
1807       /* else only udpsink needed, link it to the session */
1808       gst_pad_link (priv->send_src[i], sinkpad);
1809     }
1810     gst_object_unref (sinkpad);
1811
1812     /* For the receiver we create this bit of pipeline for both
1813      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1814      * and it is all funneled into the rtpbin receive pad.
1815      *
1816      * .--------.     .--------.    .--------.
1817      * | udpsrc |     | funnel |    | rtpbin |
1818      * |       src->sink      src->sink      |
1819      * '--------'     |        |    '--------'
1820      * .--------.     |        |
1821      * | appsrc |     |        |
1822      * |       src->sink       |
1823      * '--------'     '--------'
1824      */
1825     /* make funnel for the RTP/RTCP receivers */
1826     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1827     gst_bin_add (bin, priv->funnel[i]);
1828
1829     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1830     gst_pad_link (pad, priv->recv_sink[i]);
1831     gst_object_unref (pad);
1832
1833     if (priv->udpsrc_v4[i]) {
1834       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1835        * values */
1836       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1837       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1838       /* add udpsrc */
1839       gst_bin_add (bin, priv->udpsrc_v4[i]);
1840
1841       /* and link to the funnel v4 */
1842       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1843       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1844       gst_pad_link (pad, selpad);
1845       gst_object_unref (pad);
1846       gst_object_unref (selpad);
1847     }
1848
1849     if (priv->udpsrc_v6[i]) {
1850       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1851       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1852       gst_bin_add (bin, priv->udpsrc_v6[i]);
1853
1854       /* and link to the funnel v6 */
1855       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1856       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1857       gst_pad_link (pad, selpad);
1858       gst_object_unref (pad);
1859       gst_object_unref (selpad);
1860     }
1861
1862     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1863       /* make and add appsrc */
1864       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1865       gst_bin_add (bin, priv->appsrc[i]);
1866       /* and link to the funnel */
1867       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1868       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1869       gst_pad_link (pad, selpad);
1870       gst_object_unref (pad);
1871       gst_object_unref (selpad);
1872     }
1873
1874     /* check if we need to set to a special state */
1875     if (state != GST_STATE_NULL) {
1876       if (priv->udpsink[i])
1877         gst_element_set_state (priv->udpsink[i], state);
1878       if (priv->appsink[i])
1879         gst_element_set_state (priv->appsink[i], state);
1880       if (priv->appqueue[i])
1881         gst_element_set_state (priv->appqueue[i], state);
1882       if (priv->tee[i])
1883         gst_element_set_state (priv->tee[i], state);
1884       if (priv->funnel[i])
1885         gst_element_set_state (priv->funnel[i], state);
1886       if (priv->appsrc[i])
1887         gst_element_set_state (priv->appsrc[i], state);
1888     }
1889   }
1890
1891   /* be notified of caps changes */
1892   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
1893       (GCallback) caps_notify, stream);
1894
1895   priv->is_joined = TRUE;
1896   g_mutex_unlock (&priv->lock);
1897
1898   return TRUE;
1899
1900   /* ERRORS */
1901 was_joined:
1902   {
1903     g_mutex_unlock (&priv->lock);
1904     return TRUE;
1905   }
1906 no_ports:
1907   {
1908     g_mutex_unlock (&priv->lock);
1909     GST_WARNING ("failed to allocate ports %u", idx);
1910     return FALSE;
1911   }
1912 link_failed:
1913   {
1914     GST_WARNING ("failed to link stream %u", idx);
1915     gst_object_unref (priv->send_rtp_sink);
1916     priv->send_rtp_sink = NULL;
1917     g_mutex_unlock (&priv->lock);
1918     return FALSE;
1919   }
1920 }
1921
1922 /**
1923  * gst_rtsp_stream_leave_bin:
1924  * @stream: a #GstRTSPStream
1925  * @bin: (transfer none): a #GstBin
1926  * @rtpbin: (transfer none): a rtpbin #GstElement
1927  *
1928  * Remove the elements of @stream from @bin.
1929  *
1930  * Return: %TRUE on success.
1931  */
1932 gboolean
1933 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1934     GstElement * rtpbin)
1935 {
1936   GstRTSPStreamPrivate *priv;
1937   gint i;
1938
1939   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1940   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1941   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1942
1943   priv = stream->priv;
1944
1945   g_mutex_lock (&priv->lock);
1946   if (!priv->is_joined)
1947     goto was_not_joined;
1948
1949   /* all transports must be removed by now */
1950   g_return_val_if_fail (priv->transports == NULL, FALSE);
1951
1952   clear_tr_cache (priv);
1953
1954   GST_INFO ("stream %p leaving bin", stream);
1955
1956   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1957   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
1958   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1959   gst_object_unref (priv->send_rtp_sink);
1960   priv->send_rtp_sink = NULL;
1961
1962   for (i = 0; i < 2; i++) {
1963     if (priv->udpsink[i])
1964       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1965     if (priv->appsink[i])
1966       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1967     if (priv->appqueue[i])
1968       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1969     if (priv->tee[i])
1970       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1971     if (priv->funnel[i])
1972       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1973     if (priv->appsrc[i])
1974       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1975     if (priv->udpsrc_v4[i]) {
1976       /* and set udpsrc to NULL now before removing */
1977       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1978       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1979       /* removing them should also nicely release the request
1980        * pads when they finalize */
1981       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1982     }
1983     if (priv->udpsrc_v6[i]) {
1984       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1985       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1986       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1987     }
1988     if (priv->udpsink[i])
1989       gst_bin_remove (bin, priv->udpsink[i]);
1990     if (priv->appsrc[i])
1991       gst_bin_remove (bin, priv->appsrc[i]);
1992     if (priv->appsink[i])
1993       gst_bin_remove (bin, priv->appsink[i]);
1994     if (priv->appqueue[i])
1995       gst_bin_remove (bin, priv->appqueue[i]);
1996     if (priv->tee[i])
1997       gst_bin_remove (bin, priv->tee[i]);
1998     if (priv->funnel[i])
1999       gst_bin_remove (bin, priv->funnel[i]);
2000
2001     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2002     gst_object_unref (priv->recv_sink[i]);
2003     priv->recv_sink[i] = NULL;
2004
2005     priv->udpsrc_v4[i] = NULL;
2006     priv->udpsrc_v6[i] = NULL;
2007     priv->udpsink[i] = NULL;
2008     priv->appsrc[i] = NULL;
2009     priv->appsink[i] = NULL;
2010     priv->appqueue[i] = NULL;
2011     priv->tee[i] = NULL;
2012     priv->funnel[i] = NULL;
2013   }
2014   gst_object_unref (priv->send_src[0]);
2015   priv->send_src[0] = NULL;
2016
2017   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2018   gst_object_unref (priv->send_src[1]);
2019   priv->send_src[1] = NULL;
2020
2021   g_object_unref (priv->session);
2022   priv->session = NULL;
2023   if (priv->caps)
2024     gst_caps_unref (priv->caps);
2025   priv->caps = NULL;
2026
2027   if (priv->srtpenc)
2028     gst_object_unref (priv->srtpenc);
2029
2030   priv->is_joined = FALSE;
2031   g_mutex_unlock (&priv->lock);
2032
2033   return TRUE;
2034
2035 was_not_joined:
2036   {
2037     g_mutex_unlock (&priv->lock);
2038     return TRUE;
2039   }
2040 }
2041
2042 /**
2043  * gst_rtsp_stream_get_rtpinfo:
2044  * @stream: a #GstRTSPStream
2045  * @rtptime: (allow-none): result RTP timestamp
2046  * @seq: (allow-none): result RTP seqnum
2047  * @clock_rate: (allow-none): the clock rate
2048  * @running_time: (allow-none): result running-time
2049  *
2050  * Retrieve the current rtptime, seq and running-time. This is used to
2051  * construct a RTPInfo reply header.
2052  *
2053  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2054  */
2055 gboolean
2056 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2057     guint * rtptime, guint * seq, guint * clock_rate,
2058     GstClockTime * running_time)
2059 {
2060   GstRTSPStreamPrivate *priv;
2061   GstStructure *stats;
2062   GObjectClass *payobjclass;
2063
2064   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2065
2066   priv = stream->priv;
2067
2068   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2069
2070   g_mutex_lock (&priv->lock);
2071
2072   if (g_object_class_find_property (payobjclass, "stats")) {
2073     g_object_get (priv->payloader, "stats", &stats, NULL);
2074     if (stats == NULL)
2075       goto no_stats;
2076
2077     if (seq)
2078       gst_structure_get_uint (stats, "seqnum", seq);
2079
2080     if (rtptime)
2081       gst_structure_get_uint (stats, "timestamp", rtptime);
2082
2083     if (running_time)
2084       gst_structure_get_clock_time (stats, "running-time", running_time);
2085
2086     if (clock_rate) {
2087       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2088       if (*clock_rate == 0 && running_time)
2089         *running_time = GST_CLOCK_TIME_NONE;
2090     }
2091     gst_structure_free (stats);
2092   } else {
2093     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2094         !g_object_class_find_property (payobjclass, "timestamp"))
2095       goto no_stats;
2096
2097     if (seq)
2098       g_object_get (priv->payloader, "seqnum", seq, NULL);
2099
2100     if (rtptime)
2101       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2102
2103     if (running_time)
2104       *running_time = GST_CLOCK_TIME_NONE;
2105   }
2106   g_mutex_unlock (&priv->lock);
2107
2108   return TRUE;
2109
2110   /* ERRORS */
2111 no_stats:
2112   {
2113     GST_WARNING ("Could not get payloader stats");
2114     g_mutex_unlock (&priv->lock);
2115     return FALSE;
2116   }
2117 }
2118
2119 /**
2120  * gst_rtsp_stream_get_caps:
2121  * @stream: a #GstRTSPStream
2122  *
2123  * Retrieve the current caps of @stream.
2124  *
2125  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2126  * after usage.
2127  */
2128 GstCaps *
2129 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2130 {
2131   GstRTSPStreamPrivate *priv;
2132   GstCaps *result;
2133
2134   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2135
2136   priv = stream->priv;
2137
2138   g_mutex_lock (&priv->lock);
2139   if ((result = priv->caps))
2140     gst_caps_ref (result);
2141   g_mutex_unlock (&priv->lock);
2142
2143   return result;
2144 }
2145
2146 /**
2147  * gst_rtsp_stream_recv_rtp:
2148  * @stream: a #GstRTSPStream
2149  * @buffer: (transfer full): a #GstBuffer
2150  *
2151  * Handle an RTP buffer for the stream. This method is usually called when a
2152  * message has been received from a client using the TCP transport.
2153  *
2154  * This function takes ownership of @buffer.
2155  *
2156  * Returns: a GstFlowReturn.
2157  */
2158 GstFlowReturn
2159 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2160 {
2161   GstRTSPStreamPrivate *priv;
2162   GstFlowReturn ret;
2163   GstElement *element;
2164
2165   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2166   priv = stream->priv;
2167   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2168   g_return_val_if_fail (priv->is_joined, FALSE);
2169
2170   g_mutex_lock (&priv->lock);
2171   if (priv->appsrc[0])
2172     element = gst_object_ref (priv->appsrc[0]);
2173   else
2174     element = NULL;
2175   g_mutex_unlock (&priv->lock);
2176
2177   if (element) {
2178     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2179     gst_object_unref (element);
2180   } else {
2181     ret = GST_FLOW_OK;
2182   }
2183   return ret;
2184 }
2185
2186 /**
2187  * gst_rtsp_stream_recv_rtcp:
2188  * @stream: a #GstRTSPStream
2189  * @buffer: (transfer full): a #GstBuffer
2190  *
2191  * Handle an RTCP buffer for the stream. This method is usually called when a
2192  * message has been received from a client using the TCP transport.
2193  *
2194  * This function takes ownership of @buffer.
2195  *
2196  * Returns: a GstFlowReturn.
2197  */
2198 GstFlowReturn
2199 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2200 {
2201   GstRTSPStreamPrivate *priv;
2202   GstFlowReturn ret;
2203   GstElement *element;
2204
2205   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2206   priv = stream->priv;
2207   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2208   g_return_val_if_fail (priv->is_joined, FALSE);
2209
2210   g_mutex_lock (&priv->lock);
2211   if (priv->appsrc[1])
2212     element = gst_object_ref (priv->appsrc[1]);
2213   else
2214     element = NULL;
2215   g_mutex_unlock (&priv->lock);
2216
2217   if (element) {
2218     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2219     gst_object_unref (element);
2220   } else {
2221     ret = GST_FLOW_OK;
2222     gst_buffer_unref (buffer);
2223   }
2224   return ret;
2225 }
2226
2227 /* must be called with lock */
2228 static gboolean
2229 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2230     gboolean add)
2231 {
2232   GstRTSPStreamPrivate *priv = stream->priv;
2233   const GstRTSPTransport *tr;
2234
2235   tr = gst_rtsp_stream_transport_get_transport (trans);
2236
2237   switch (tr->lower_transport) {
2238     case GST_RTSP_LOWER_TRANS_UDP:
2239     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2240     {
2241       gchar *dest;
2242       gint min, max;
2243       guint ttl = 0;
2244
2245       dest = tr->destination;
2246       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2247         min = tr->port.min;
2248         max = tr->port.max;
2249         ttl = tr->ttl;
2250       } else {
2251         min = tr->client_port.min;
2252         max = tr->client_port.max;
2253       }
2254
2255       if (add) {
2256         if (ttl > 0) {
2257           GST_INFO ("setting ttl-mc %d", ttl);
2258           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2259           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2260         }
2261         GST_INFO ("adding %s:%d-%d", dest, min, max);
2262         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2263         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2264         priv->transports = g_list_prepend (priv->transports, trans);
2265       } else {
2266         GST_INFO ("removing %s:%d-%d", dest, min, max);
2267         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2268         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2269         priv->transports = g_list_remove (priv->transports, trans);
2270       }
2271       priv->tr_changed = TRUE;
2272       break;
2273     }
2274     case GST_RTSP_LOWER_TRANS_TCP:
2275       if (add) {
2276         GST_INFO ("adding TCP %s", tr->destination);
2277         priv->transports = g_list_prepend (priv->transports, trans);
2278       } else {
2279         GST_INFO ("removing TCP %s", tr->destination);
2280         priv->transports = g_list_remove (priv->transports, trans);
2281       }
2282       priv->tr_changed = TRUE;
2283       break;
2284     default:
2285       goto unknown_transport;
2286   }
2287   return TRUE;
2288
2289   /* ERRORS */
2290 unknown_transport:
2291   {
2292     GST_INFO ("Unknown transport %d", tr->lower_transport);
2293     return FALSE;
2294   }
2295 }
2296
2297
2298 /**
2299  * gst_rtsp_stream_add_transport:
2300  * @stream: a #GstRTSPStream
2301  * @trans: (transfer none): a #GstRTSPStreamTransport
2302  *
2303  * Add the transport in @trans to @stream. The media of @stream will
2304  * then also be send to the values configured in @trans.
2305  *
2306  * @stream must be joined to a bin.
2307  *
2308  * @trans must contain a valid #GstRTSPTransport.
2309  *
2310  * Returns: %TRUE if @trans was added
2311  */
2312 gboolean
2313 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2314     GstRTSPStreamTransport * trans)
2315 {
2316   GstRTSPStreamPrivate *priv;
2317   gboolean res;
2318
2319   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2320   priv = stream->priv;
2321   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2322   g_return_val_if_fail (priv->is_joined, FALSE);
2323
2324   g_mutex_lock (&priv->lock);
2325   res = update_transport (stream, trans, TRUE);
2326   g_mutex_unlock (&priv->lock);
2327
2328   return res;
2329 }
2330
2331 /**
2332  * gst_rtsp_stream_remove_transport:
2333  * @stream: a #GstRTSPStream
2334  * @trans: (transfer none): a #GstRTSPStreamTransport
2335  *
2336  * Remove the transport in @trans from @stream. The media of @stream will
2337  * not be sent to the values configured in @trans.
2338  *
2339  * @stream must be joined to a bin.
2340  *
2341  * @trans must contain a valid #GstRTSPTransport.
2342  *
2343  * Returns: %TRUE if @trans was removed
2344  */
2345 gboolean
2346 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2347     GstRTSPStreamTransport * trans)
2348 {
2349   GstRTSPStreamPrivate *priv;
2350   gboolean res;
2351
2352   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2353   priv = stream->priv;
2354   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2355   g_return_val_if_fail (priv->is_joined, FALSE);
2356
2357   g_mutex_lock (&priv->lock);
2358   res = update_transport (stream, trans, FALSE);
2359   g_mutex_unlock (&priv->lock);
2360
2361   return res;
2362 }
2363
2364 /**
2365  * gst_rtsp_stream_update_crypto:
2366  * @stream: a #GstRTSPStream
2367  * @ssrc: the SSRC
2368  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2369  *
2370  * Update the new crypto information for @ssrc in @stream. If information
2371  * for @ssrc did not exist, it will be added. If information
2372  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2373  * be removed from @stream.
2374  *
2375  * Returns: %TRUE if @crypto could be updated
2376  */
2377 gboolean
2378 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2379     guint ssrc, GstCaps * crypto)
2380 {
2381   GstRTSPStreamPrivate *priv;
2382
2383   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2384   g_return_val_if_fail (GST_IS_CAPS (crypto), FALSE);
2385
2386   priv = stream->priv;
2387
2388   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2389
2390   g_mutex_lock (&priv->lock);
2391   if (crypto)
2392     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2393         gst_caps_ref (crypto));
2394   else
2395     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2396   g_mutex_unlock (&priv->lock);
2397
2398   return TRUE;
2399 }
2400
2401 /**
2402  * gst_rtsp_stream_get_rtp_socket:
2403  * @stream: a #GstRTSPStream
2404  * @family: the socket family
2405  *
2406  * Get the RTP socket from @stream for a @family.
2407  *
2408  * @stream must be joined to a bin.
2409  *
2410  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2411  * socket could be allocated for @family. Unref after usage
2412  */
2413 GSocket *
2414 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2415 {
2416   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2417   GSocket *socket;
2418   const gchar *name;
2419
2420   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2421   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2422       family == G_SOCKET_FAMILY_IPV6, NULL);
2423   g_return_val_if_fail (priv->udpsink[0], NULL);
2424
2425   if (family == G_SOCKET_FAMILY_IPV6)
2426     name = "socket-v6";
2427   else
2428     name = "socket";
2429
2430   g_object_get (priv->udpsink[0], name, &socket, NULL);
2431
2432   return socket;
2433 }
2434
2435 /**
2436  * gst_rtsp_stream_get_rtcp_socket:
2437  * @stream: a #GstRTSPStream
2438  * @family: the socket family
2439  *
2440  * Get the RTCP socket from @stream for a @family.
2441  *
2442  * @stream must be joined to a bin.
2443  *
2444  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2445  * socket could be allocated for @family. Unref after usage
2446  */
2447 GSocket *
2448 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2449 {
2450   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2451   GSocket *socket;
2452   const gchar *name;
2453
2454   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2455   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2456       family == G_SOCKET_FAMILY_IPV6, NULL);
2457   g_return_val_if_fail (priv->udpsink[1], NULL);
2458
2459   if (family == G_SOCKET_FAMILY_IPV6)
2460     name = "socket-v6";
2461   else
2462     name = "socket";
2463
2464   g_object_get (priv->udpsink[1], name, &socket, NULL);
2465
2466   return socket;
2467 }
2468
2469 /**
2470  * gst_rtsp_stream_transport_filter:
2471  * @stream: a #GstRTSPStream
2472  * @func: (scope call) (allow-none): a callback
2473  * @user_data: (closure): user data passed to @func
2474  *
2475  * Call @func for each transport managed by @stream. The result value of @func
2476  * determines what happens to the transport. @func will be called with @stream
2477  * locked so no further actions on @stream can be performed from @func.
2478  *
2479  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2480  * @stream.
2481  *
2482  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2483  *
2484  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2485  * will also be added with an additional ref to the result #GList of this
2486  * function..
2487  *
2488  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2489  *
2490  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2491  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2492  * element in the #GList should be unreffed before the list is freed.
2493  */
2494 GList *
2495 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2496     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2497 {
2498   GstRTSPStreamPrivate *priv;
2499   GList *result, *walk, *next;
2500
2501   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2502
2503   priv = stream->priv;
2504
2505   result = NULL;
2506
2507   g_mutex_lock (&priv->lock);
2508   for (walk = priv->transports; walk; walk = next) {
2509     GstRTSPStreamTransport *trans = walk->data;
2510     GstRTSPFilterResult res;
2511
2512     next = g_list_next (walk);
2513
2514     if (func)
2515       res = func (stream, trans, user_data);
2516     else
2517       res = GST_RTSP_FILTER_REF;
2518
2519     switch (res) {
2520       case GST_RTSP_FILTER_REMOVE:
2521         update_transport (stream, trans, FALSE);
2522         break;
2523       case GST_RTSP_FILTER_REF:
2524         result = g_list_prepend (result, g_object_ref (trans));
2525         break;
2526       case GST_RTSP_FILTER_KEEP:
2527       default:
2528         break;
2529     }
2530   }
2531   g_mutex_unlock (&priv->lock);
2532
2533   return result;
2534 }
2535
2536 static GstPadProbeReturn
2537 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2538 {
2539   GstRTSPStreamPrivate *priv;
2540   GstRTSPStream *stream;
2541
2542   stream = user_data;
2543   priv = stream->priv;
2544
2545   GST_DEBUG_OBJECT (pad, "now blocking");
2546
2547   g_mutex_lock (&priv->lock);
2548   priv->blocking = TRUE;
2549   g_mutex_unlock (&priv->lock);
2550
2551   gst_element_post_message (priv->payloader,
2552       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2553           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2554
2555   return GST_PAD_PROBE_OK;
2556 }
2557
2558 /**
2559  * gst_rtsp_stream_set_blocked:
2560  * @stream: a #GstRTSPStream
2561  * @blocked: boolean indicating we should block or unblock
2562  *
2563  * Blocks or unblocks the dataflow on @stream.
2564  *
2565  * Returns: %TRUE on success
2566  */
2567 gboolean
2568 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2569 {
2570   GstRTSPStreamPrivate *priv;
2571
2572   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2573
2574   priv = stream->priv;
2575
2576   g_mutex_lock (&priv->lock);
2577   if (blocked) {
2578     priv->blocking = FALSE;
2579     if (priv->blocked_id == 0) {
2580       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2581           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2582           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2583           g_object_ref (stream), g_object_unref);
2584     }
2585   } else {
2586     if (priv->blocked_id != 0) {
2587       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2588       priv->blocked_id = 0;
2589       priv->blocking = FALSE;
2590     }
2591   }
2592   g_mutex_unlock (&priv->lock);
2593
2594   return TRUE;
2595 }
2596
2597 /**
2598  * gst_rtsp_stream_is_blocking:
2599  * @stream: a #GstRTSPStream
2600  *
2601  * Check if @stream is blocking on a #GstBuffer.
2602  *
2603  * Returns: %TRUE if @stream is blocking
2604  */
2605 gboolean
2606 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2607 {
2608   GstRTSPStreamPrivate *priv;
2609   gboolean result;
2610
2611   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2612
2613   priv = stream->priv;
2614
2615   g_mutex_lock (&priv->lock);
2616   result = priv->blocking;
2617   g_mutex_unlock (&priv->lock);
2618
2619   return result;
2620 }