filter: Release lock in filter functions
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* SRTP encoder/decoder */
83   GstElement *srtpenc;
84   GstElement *srtpdec;
85   GHashTable *keys;
86
87   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
88    * sockets */
89   GstElement *udpsrc_v4[2];
90
91   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
92    * sockets */
93   GstElement *udpsrc_v6[2];
94
95   GstElement *udpsink[2];
96
97   /* for TCP transport */
98   GstElement *appsrc[2];
99   GstElement *appqueue[2];
100   GstElement *appsink[2];
101
102   GstElement *tee[2];
103   GstElement *funnel[2];
104
105   /* server ports for sending/receiving over ipv4 */
106   GstRTSPRange server_port_v4;
107   GstRTSPAddress *server_addr_v4;
108   gboolean have_ipv4;
109
110   /* server ports for sending/receiving over ipv6 */
111   GstRTSPRange server_port_v6;
112   GstRTSPAddress *server_addr_v6;
113   gboolean have_ipv6;
114
115   /* multicast addresses */
116   GstRTSPAddressPool *pool;
117   GstRTSPAddress *addr_v4;
118   GstRTSPAddress *addr_v6;
119
120   /* the caps of the stream */
121   gulong caps_sig;
122   GstCaps *caps;
123
124   /* transports we stream to */
125   guint n_active;
126   GList *transports;
127   guint transports_cookie;
128   GList *tr_cache;
129   guint tr_cache_cookie;
130
131   gint dscp_qos;
132
133   /* stream blocking */
134   gulong blocked_id;
135   gboolean blocking;
136 };
137
138 #define DEFAULT_CONTROL         NULL
139 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
140 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
141                                         GST_RTSP_LOWER_TRANS_TCP
142
143 enum
144 {
145   PROP_0,
146   PROP_CONTROL,
147   PROP_PROFILES,
148   PROP_PROTOCOLS,
149   PROP_LAST
150 };
151
152 enum
153 {
154   SIGNAL_NEW_RTP_ENCODER,
155   SIGNAL_NEW_RTCP_ENCODER,
156   SIGNAL_LAST
157 };
158
159 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
160 #define GST_CAT_DEFAULT rtsp_stream_debug
161
162 static GQuark ssrc_stream_map_key;
163
164 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
165     GValue * value, GParamSpec * pspec);
166 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
167     const GValue * value, GParamSpec * pspec);
168
169 static void gst_rtsp_stream_finalize (GObject * obj);
170
171 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
172
173 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
174
175 static void
176 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
177 {
178   GObjectClass *gobject_class;
179
180   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
181
182   gobject_class = G_OBJECT_CLASS (klass);
183
184   gobject_class->get_property = gst_rtsp_stream_get_property;
185   gobject_class->set_property = gst_rtsp_stream_set_property;
186   gobject_class->finalize = gst_rtsp_stream_finalize;
187
188   g_object_class_install_property (gobject_class, PROP_CONTROL,
189       g_param_spec_string ("control", "Control",
190           "The control string for this stream", DEFAULT_CONTROL,
191           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192
193   g_object_class_install_property (gobject_class, PROP_PROFILES,
194       g_param_spec_flags ("profiles", "Profiles",
195           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
196           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
197
198   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
199       g_param_spec_flags ("protocols", "Protocols",
200           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
201           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
202
203   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
204       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
205       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
206       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
207
208   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
209       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
210       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
211       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
212
213   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
214
215   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
216 }
217
218 static void
219 gst_rtsp_stream_init (GstRTSPStream * stream)
220 {
221   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
222
223   GST_DEBUG ("new stream %p", stream);
224
225   stream->priv = priv;
226
227   priv->dscp_qos = -1;
228   priv->control = g_strdup (DEFAULT_CONTROL);
229   priv->profiles = DEFAULT_PROFILES;
230   priv->protocols = DEFAULT_PROTOCOLS;
231
232   g_mutex_init (&priv->lock);
233
234   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
235       NULL, (GDestroyNotify) gst_caps_unref);
236 }
237
238 static void
239 gst_rtsp_stream_finalize (GObject * obj)
240 {
241   GstRTSPStream *stream;
242   GstRTSPStreamPrivate *priv;
243
244   stream = GST_RTSP_STREAM (obj);
245   priv = stream->priv;
246
247   GST_DEBUG ("finalize stream %p", stream);
248
249   /* we really need to be unjoined now */
250   g_return_if_fail (!priv->is_joined);
251
252   if (priv->addr_v4)
253     gst_rtsp_address_free (priv->addr_v4);
254   if (priv->addr_v6)
255     gst_rtsp_address_free (priv->addr_v6);
256   if (priv->server_addr_v4)
257     gst_rtsp_address_free (priv->server_addr_v4);
258   if (priv->server_addr_v6)
259     gst_rtsp_address_free (priv->server_addr_v6);
260   if (priv->pool)
261     g_object_unref (priv->pool);
262   gst_object_unref (priv->payloader);
263   gst_object_unref (priv->srcpad);
264   g_free (priv->control);
265   g_mutex_clear (&priv->lock);
266
267   g_hash_table_unref (priv->keys);
268
269   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
270 }
271
272 static void
273 gst_rtsp_stream_get_property (GObject * object, guint propid,
274     GValue * value, GParamSpec * pspec)
275 {
276   GstRTSPStream *stream = GST_RTSP_STREAM (object);
277
278   switch (propid) {
279     case PROP_CONTROL:
280       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
281       break;
282     case PROP_PROFILES:
283       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
284       break;
285     case PROP_PROTOCOLS:
286       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
287       break;
288     default:
289       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
290   }
291 }
292
293 static void
294 gst_rtsp_stream_set_property (GObject * object, guint propid,
295     const GValue * value, GParamSpec * pspec)
296 {
297   GstRTSPStream *stream = GST_RTSP_STREAM (object);
298
299   switch (propid) {
300     case PROP_CONTROL:
301       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
302       break;
303     case PROP_PROFILES:
304       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
305       break;
306     case PROP_PROTOCOLS:
307       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
308       break;
309     default:
310       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
311   }
312 }
313
314 /**
315  * gst_rtsp_stream_new:
316  * @idx: an index
317  * @srcpad: a #GstPad
318  * @payloader: a #GstElement
319  *
320  * Create a new media stream with index @idx that handles RTP data on
321  * @srcpad and has a payloader element @payloader.
322  *
323  * Returns: (transfer full): a new #GstRTSPStream
324  */
325 GstRTSPStream *
326 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
327 {
328   GstRTSPStreamPrivate *priv;
329   GstRTSPStream *stream;
330
331   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
332   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
333   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
334
335   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
336   priv = stream->priv;
337   priv->idx = idx;
338   priv->payloader = gst_object_ref (payloader);
339   priv->srcpad = gst_object_ref (srcpad);
340
341   return stream;
342 }
343
344 /**
345  * gst_rtsp_stream_get_index:
346  * @stream: a #GstRTSPStream
347  *
348  * Get the stream index.
349  *
350  * Return: the stream index.
351  */
352 guint
353 gst_rtsp_stream_get_index (GstRTSPStream * stream)
354 {
355   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
356
357   return stream->priv->idx;
358 }
359
360 /**
361  * gst_rtsp_stream_get_pt:
362  * @stream: a #GstRTSPStream
363  *
364  * Get the stream payload type.
365  *
366  * Return: the stream payload type.
367  */
368 guint
369 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
370 {
371   GstRTSPStreamPrivate *priv;
372   guint pt;
373
374   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
375
376   priv = stream->priv;
377
378   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
379
380   return pt;
381 }
382
383 /**
384  * gst_rtsp_stream_get_srcpad:
385  * @stream: a #GstRTSPStream
386  *
387  * Get the srcpad associated with @stream.
388  *
389  * Returns: (transfer full): the srcpad. Unref after usage.
390  */
391 GstPad *
392 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
393 {
394   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
395
396   return gst_object_ref (stream->priv->srcpad);
397 }
398
399 /**
400  * gst_rtsp_stream_get_control:
401  * @stream: a #GstRTSPStream
402  *
403  * Get the control string to identify this stream.
404  *
405  * Returns: (transfer full): the control string. g_free() after usage.
406  */
407 gchar *
408 gst_rtsp_stream_get_control (GstRTSPStream * stream)
409 {
410   GstRTSPStreamPrivate *priv;
411   gchar *result;
412
413   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
414
415   priv = stream->priv;
416
417   g_mutex_lock (&priv->lock);
418   if ((result = g_strdup (priv->control)) == NULL)
419     result = g_strdup_printf ("stream=%u", priv->idx);
420   g_mutex_unlock (&priv->lock);
421
422   return result;
423 }
424
425 /**
426  * gst_rtsp_stream_set_control:
427  * @stream: a #GstRTSPStream
428  * @control: a control string
429  *
430  * Set the control string in @stream.
431  */
432 void
433 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
434 {
435   GstRTSPStreamPrivate *priv;
436
437   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
438
439   priv = stream->priv;
440
441   g_mutex_lock (&priv->lock);
442   g_free (priv->control);
443   priv->control = g_strdup (control);
444   g_mutex_unlock (&priv->lock);
445 }
446
447 /**
448  * gst_rtsp_stream_has_control:
449  * @stream: a #GstRTSPStream
450  * @control: a control string
451  *
452  * Check if @stream has the control string @control.
453  *
454  * Returns: %TRUE is @stream has @control as the control string
455  */
456 gboolean
457 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
458 {
459   GstRTSPStreamPrivate *priv;
460   gboolean res;
461
462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
463
464   priv = stream->priv;
465
466   g_mutex_lock (&priv->lock);
467   if (priv->control)
468     res = (g_strcmp0 (priv->control, control) == 0);
469   else {
470     guint streamid;
471
472     if (sscanf (control, "stream=%u", &streamid) > 0)
473       res = (streamid == priv->idx);
474     else
475       res = FALSE;
476   }
477   g_mutex_unlock (&priv->lock);
478
479   return res;
480 }
481
482 /**
483  * gst_rtsp_stream_set_mtu:
484  * @stream: a #GstRTSPStream
485  * @mtu: a new MTU
486  *
487  * Configure the mtu in the payloader of @stream to @mtu.
488  */
489 void
490 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
491 {
492   GstRTSPStreamPrivate *priv;
493
494   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
495
496   priv = stream->priv;
497
498   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
499
500   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
501 }
502
503 /**
504  * gst_rtsp_stream_get_mtu:
505  * @stream: a #GstRTSPStream
506  *
507  * Get the configured MTU in the payloader of @stream.
508  *
509  * Returns: the MTU of the payloader.
510  */
511 guint
512 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
513 {
514   GstRTSPStreamPrivate *priv;
515   guint mtu;
516
517   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
518
519   priv = stream->priv;
520
521   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
522
523   return mtu;
524 }
525
526 /* Update the dscp qos property on the udp sinks */
527 static void
528 update_dscp_qos (GstRTSPStream * stream)
529 {
530   GstRTSPStreamPrivate *priv;
531
532   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
533
534   priv = stream->priv;
535
536   if (priv->udpsink[0]) {
537     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
538         NULL);
539   }
540
541   if (priv->udpsink[1]) {
542     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
543         NULL);
544   }
545 }
546
547 /**
548  * gst_rtsp_stream_set_dscp_qos:
549  * @stream: a #GstRTSPStream
550  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
551  *
552  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
553  */
554 void
555 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
556 {
557   GstRTSPStreamPrivate *priv;
558
559   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
560
561   priv = stream->priv;
562
563   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
564
565   if (dscp_qos < -1 || dscp_qos > 63) {
566     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
567     return;
568   }
569
570   priv->dscp_qos = dscp_qos;
571
572   update_dscp_qos (stream);
573 }
574
575 /**
576  * gst_rtsp_stream_get_dscp_qos:
577  * @stream: a #GstRTSPStream
578  *
579  * Get the configured DSCP QoS in of the outgoing sockets.
580  *
581  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
582  */
583 gint
584 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
585 {
586   GstRTSPStreamPrivate *priv;
587
588   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
589
590   priv = stream->priv;
591
592   return priv->dscp_qos;
593 }
594
595 /**
596  * gst_rtsp_stream_is_transport_supported:
597  * @stream: a #GstRTSPStream
598  * @transport: (transfer none): a #GstRTSPTransport
599  *
600  * Check if @transport can be handled by stream
601  *
602  * Returns: %TRUE if @transport can be handled by @stream.
603  */
604 gboolean
605 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
606     GstRTSPTransport * transport)
607 {
608   GstRTSPStreamPrivate *priv;
609
610   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
611
612   priv = stream->priv;
613
614   g_mutex_lock (&priv->lock);
615   if (transport->trans != GST_RTSP_TRANS_RTP)
616     goto unsupported_transmode;
617
618   if (!(transport->profile & priv->profiles))
619     goto unsupported_profile;
620
621   if (!(transport->lower_transport & priv->protocols))
622     goto unsupported_ltrans;
623
624   g_mutex_unlock (&priv->lock);
625
626   return TRUE;
627
628   /* ERRORS */
629 unsupported_transmode:
630   {
631     GST_DEBUG ("unsupported transport mode %d", transport->trans);
632     g_mutex_unlock (&priv->lock);
633     return FALSE;
634   }
635 unsupported_profile:
636   {
637     GST_DEBUG ("unsupported profile %d", transport->profile);
638     g_mutex_unlock (&priv->lock);
639     return FALSE;
640   }
641 unsupported_ltrans:
642   {
643     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
644     g_mutex_unlock (&priv->lock);
645     return FALSE;
646   }
647 }
648
649 /**
650  * gst_rtsp_stream_set_profiles:
651  * @stream: a #GstRTSPStream
652  * @profiles: the new profiles
653  *
654  * Configure the allowed profiles for @stream.
655  */
656 void
657 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
658 {
659   GstRTSPStreamPrivate *priv;
660
661   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
662
663   priv = stream->priv;
664
665   g_mutex_lock (&priv->lock);
666   priv->profiles = profiles;
667   g_mutex_unlock (&priv->lock);
668 }
669
670 /**
671  * gst_rtsp_stream_get_profiles:
672  * @stream: a #GstRTSPStream
673  *
674  * Get the allowed profiles of @stream.
675  *
676  * Returns: a #GstRTSPProfile
677  */
678 GstRTSPProfile
679 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
680 {
681   GstRTSPStreamPrivate *priv;
682   GstRTSPProfile res;
683
684   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
685
686   priv = stream->priv;
687
688   g_mutex_lock (&priv->lock);
689   res = priv->profiles;
690   g_mutex_unlock (&priv->lock);
691
692   return res;
693 }
694
695 /**
696  * gst_rtsp_stream_set_protocols:
697  * @stream: a #GstRTSPStream
698  * @protocols: the new flags
699  *
700  * Configure the allowed lower transport for @stream.
701  */
702 void
703 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
704     GstRTSPLowerTrans protocols)
705 {
706   GstRTSPStreamPrivate *priv;
707
708   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
709
710   priv = stream->priv;
711
712   g_mutex_lock (&priv->lock);
713   priv->protocols = protocols;
714   g_mutex_unlock (&priv->lock);
715 }
716
717 /**
718  * gst_rtsp_stream_get_protocols:
719  * @stream: a #GstRTSPStream
720  *
721  * Get the allowed protocols of @stream.
722  *
723  * Returns: a #GstRTSPLowerTrans
724  */
725 GstRTSPLowerTrans
726 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
727 {
728   GstRTSPStreamPrivate *priv;
729   GstRTSPLowerTrans res;
730
731   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
732       GST_RTSP_LOWER_TRANS_UNKNOWN);
733
734   priv = stream->priv;
735
736   g_mutex_lock (&priv->lock);
737   res = priv->protocols;
738   g_mutex_unlock (&priv->lock);
739
740   return res;
741 }
742
743 /**
744  * gst_rtsp_stream_set_address_pool:
745  * @stream: a #GstRTSPStream
746  * @pool: (transfer none): a #GstRTSPAddressPool
747  *
748  * configure @pool to be used as the address pool of @stream.
749  */
750 void
751 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
752     GstRTSPAddressPool * pool)
753 {
754   GstRTSPStreamPrivate *priv;
755   GstRTSPAddressPool *old;
756
757   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
758
759   priv = stream->priv;
760
761   GST_LOG_OBJECT (stream, "set address pool %p", pool);
762
763   g_mutex_lock (&priv->lock);
764   if ((old = priv->pool) != pool)
765     priv->pool = pool ? g_object_ref (pool) : NULL;
766   else
767     old = NULL;
768   g_mutex_unlock (&priv->lock);
769
770   if (old)
771     g_object_unref (old);
772 }
773
774 /**
775  * gst_rtsp_stream_get_address_pool:
776  * @stream: a #GstRTSPStream
777  *
778  * Get the #GstRTSPAddressPool used as the address pool of @stream.
779  *
780  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
781  * usage.
782  */
783 GstRTSPAddressPool *
784 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
785 {
786   GstRTSPStreamPrivate *priv;
787   GstRTSPAddressPool *result;
788
789   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
790
791   priv = stream->priv;
792
793   g_mutex_lock (&priv->lock);
794   if ((result = priv->pool))
795     g_object_ref (result);
796   g_mutex_unlock (&priv->lock);
797
798   return result;
799 }
800
801 /**
802  * gst_rtsp_stream_get_multicast_address:
803  * @stream: a #GstRTSPStream
804  * @family: the #GSocketFamily
805  *
806  * Get the multicast address of @stream for @family.
807  *
808  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
809  * or %NULL when no address could be allocated. gst_rtsp_address_free()
810  * after usage.
811  */
812 GstRTSPAddress *
813 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
814     GSocketFamily family)
815 {
816   GstRTSPStreamPrivate *priv;
817   GstRTSPAddress *result;
818   GstRTSPAddress **addrp;
819   GstRTSPAddressFlags flags;
820
821   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
822
823   priv = stream->priv;
824
825   if (family == G_SOCKET_FAMILY_IPV6) {
826     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
827     addrp = &priv->addr_v6;
828   } else {
829     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
830     addrp = &priv->addr_v4;
831   }
832
833   g_mutex_lock (&priv->lock);
834   if (*addrp == NULL) {
835     if (priv->pool == NULL)
836       goto no_pool;
837
838     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
839
840     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
841     if (*addrp == NULL)
842       goto no_address;
843   }
844   result = gst_rtsp_address_copy (*addrp);
845   g_mutex_unlock (&priv->lock);
846
847   return result;
848
849   /* ERRORS */
850 no_pool:
851   {
852     GST_ERROR_OBJECT (stream, "no address pool specified");
853     g_mutex_unlock (&priv->lock);
854     return NULL;
855   }
856 no_address:
857   {
858     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
859     g_mutex_unlock (&priv->lock);
860     return NULL;
861   }
862 }
863
864 /**
865  * gst_rtsp_stream_reserve_address:
866  * @stream: a #GstRTSPStream
867  * @address: an address
868  * @port: a port
869  * @n_ports: n_ports
870  * @ttl: a TTL
871  *
872  * Reserve @address and @port as the address and port of @stream.
873  *
874  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
875  * the address could be reserved. gst_rtsp_address_free() after usage.
876  */
877 GstRTSPAddress *
878 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
879     const gchar * address, guint port, guint n_ports, guint ttl)
880 {
881   GstRTSPStreamPrivate *priv;
882   GstRTSPAddress *result;
883   GInetAddress *addr;
884   GSocketFamily family;
885   GstRTSPAddress **addrp;
886
887   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
888   g_return_val_if_fail (address != NULL, NULL);
889   g_return_val_if_fail (port > 0, NULL);
890   g_return_val_if_fail (n_ports > 0, NULL);
891   g_return_val_if_fail (ttl > 0, NULL);
892
893   priv = stream->priv;
894
895   addr = g_inet_address_new_from_string (address);
896   if (!addr) {
897     GST_ERROR ("failed to get inet addr from %s", address);
898     family = G_SOCKET_FAMILY_IPV4;
899   } else {
900     family = g_inet_address_get_family (addr);
901     g_object_unref (addr);
902   }
903
904   if (family == G_SOCKET_FAMILY_IPV6)
905     addrp = &priv->addr_v6;
906   else
907     addrp = &priv->addr_v4;
908
909   g_mutex_lock (&priv->lock);
910   if (*addrp == NULL) {
911     GstRTSPAddressPoolResult res;
912
913     if (priv->pool == NULL)
914       goto no_pool;
915
916     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
917         port, n_ports, ttl, addrp);
918     if (res != GST_RTSP_ADDRESS_POOL_OK)
919       goto no_address;
920   } else {
921     if (strcmp ((*addrp)->address, address) ||
922         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
923         (*addrp)->ttl != ttl)
924       goto different_address;
925   }
926   result = gst_rtsp_address_copy (*addrp);
927   g_mutex_unlock (&priv->lock);
928
929   return result;
930
931   /* ERRORS */
932 no_pool:
933   {
934     GST_ERROR_OBJECT (stream, "no address pool specified");
935     g_mutex_unlock (&priv->lock);
936     return NULL;
937   }
938 no_address:
939   {
940     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
941         address);
942     g_mutex_unlock (&priv->lock);
943     return NULL;
944   }
945 different_address:
946   {
947     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
948         " reserved", address);
949     g_mutex_unlock (&priv->lock);
950     return NULL;
951   }
952 }
953
954 static gboolean
955 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
956     GSocketFamily family, GstElement * udpsrc_out[2],
957     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
958     GstRTSPAddress ** server_addr_out)
959 {
960   GstStateChangeReturn ret;
961   GstElement *udpsrc0, *udpsrc1;
962   GstElement *udpsink0, *udpsink1;
963   GSocket *rtp_socket = NULL;
964   GSocket *rtcp_socket;
965   gint tmp_rtp, tmp_rtcp;
966   guint count;
967   gint rtpport, rtcpport;
968   GList *rejected_addresses = NULL;
969   GstRTSPAddress *addr = NULL;
970   GInetAddress *inetaddr = NULL;
971   GSocketAddress *rtp_sockaddr = NULL;
972   GSocketAddress *rtcp_sockaddr = NULL;
973   const gchar *multisink_socket;
974
975   if (family == G_SOCKET_FAMILY_IPV6)
976     multisink_socket = "socket-v6";
977   else
978     multisink_socket = "socket";
979
980   udpsrc0 = NULL;
981   udpsrc1 = NULL;
982   udpsink0 = NULL;
983   udpsink1 = NULL;
984   count = 0;
985
986   /* Start with random port */
987   tmp_rtp = 0;
988
989   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
990       G_SOCKET_PROTOCOL_UDP, NULL);
991   if (!rtcp_socket)
992     goto no_udp_protocol;
993
994   if (*server_addr_out)
995     gst_rtsp_address_free (*server_addr_out);
996
997   /* try to allocate 2 UDP ports, the RTP port should be an even
998    * number and the RTCP port should be the next (uneven) port */
999 again:
1000
1001   if (rtp_socket == NULL) {
1002     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1003         G_SOCKET_PROTOCOL_UDP, NULL);
1004     if (!rtp_socket)
1005       goto no_udp_protocol;
1006   }
1007
1008   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1009     GstRTSPAddressFlags flags;
1010
1011     if (addr)
1012       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1013
1014     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1015     if (family == G_SOCKET_FAMILY_IPV6)
1016       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1017     else
1018       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1019
1020     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1021
1022     if (addr == NULL)
1023       goto no_ports;
1024
1025     tmp_rtp = addr->port;
1026
1027     g_clear_object (&inetaddr);
1028     inetaddr = g_inet_address_new_from_string (addr->address);
1029   } else {
1030     if (tmp_rtp != 0) {
1031       tmp_rtp += 2;
1032       if (++count > 20)
1033         goto no_ports;
1034     }
1035
1036     if (inetaddr == NULL)
1037       inetaddr = g_inet_address_new_any (family);
1038   }
1039
1040   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1041   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1042     g_object_unref (rtp_sockaddr);
1043     goto again;
1044   }
1045   g_object_unref (rtp_sockaddr);
1046
1047   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1048   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1049     g_clear_object (&rtp_sockaddr);
1050     goto socket_error;
1051   }
1052
1053   tmp_rtp =
1054       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1055   g_object_unref (rtp_sockaddr);
1056
1057   /* check if port is even */
1058   if ((tmp_rtp & 1) != 0) {
1059     /* port not even, close and allocate another */
1060     tmp_rtp++;
1061     g_clear_object (&rtp_socket);
1062     goto again;
1063   }
1064
1065   /* set port */
1066   tmp_rtcp = tmp_rtp + 1;
1067
1068   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1069   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1070     g_object_unref (rtcp_sockaddr);
1071     g_clear_object (&rtp_socket);
1072     goto again;
1073   }
1074   g_object_unref (rtcp_sockaddr);
1075
1076   g_clear_object (&inetaddr);
1077
1078   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1079   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1080
1081   if (udpsrc0 == NULL || udpsrc1 == NULL)
1082     goto no_udp_protocol;
1083
1084   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1085   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1086
1087   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1088   if (ret == GST_STATE_CHANGE_FAILURE)
1089     goto element_error;
1090   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1091   if (ret == GST_STATE_CHANGE_FAILURE)
1092     goto element_error;
1093
1094   /* all fine, do port check */
1095   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1096   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1097
1098   /* this should not happen... */
1099   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1100     goto port_error;
1101
1102   if (udpsink_out[0])
1103     udpsink0 = udpsink_out[0];
1104   else
1105     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1106
1107   if (!udpsink0)
1108     goto no_udp_protocol;
1109
1110   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1111   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1112
1113   if (udpsink_out[1])
1114     udpsink1 = udpsink_out[1];
1115   else
1116     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1117
1118   if (!udpsink1)
1119     goto no_udp_protocol;
1120
1121   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1122   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1123   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1124
1125   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1126   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1127   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1128   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1129   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1130   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1131   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1132   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1133
1134   /* we keep these elements, we will further configure them when the
1135    * client told us to really use the UDP ports. */
1136   udpsrc_out[0] = udpsrc0;
1137   udpsrc_out[1] = udpsrc1;
1138   udpsink_out[0] = udpsink0;
1139   udpsink_out[1] = udpsink1;
1140   server_port_out->min = rtpport;
1141   server_port_out->max = rtcpport;
1142
1143   *server_addr_out = addr;
1144   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1145
1146   g_object_unref (rtp_socket);
1147   g_object_unref (rtcp_socket);
1148
1149   return TRUE;
1150
1151   /* ERRORS */
1152 no_udp_protocol:
1153   {
1154     goto cleanup;
1155   }
1156 no_ports:
1157   {
1158     goto cleanup;
1159   }
1160 port_error:
1161   {
1162     goto cleanup;
1163   }
1164 socket_error:
1165   {
1166     goto cleanup;
1167   }
1168 element_error:
1169   {
1170     goto cleanup;
1171   }
1172 cleanup:
1173   {
1174     if (udpsrc0) {
1175       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1176       gst_object_unref (udpsrc0);
1177     }
1178     if (udpsrc1) {
1179       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1180       gst_object_unref (udpsrc1);
1181     }
1182     if (udpsink0) {
1183       gst_element_set_state (udpsink0, GST_STATE_NULL);
1184       gst_object_unref (udpsink0);
1185     }
1186     if (inetaddr)
1187       g_object_unref (inetaddr);
1188     g_list_free_full (rejected_addresses,
1189         (GDestroyNotify) gst_rtsp_address_free);
1190     if (addr)
1191       gst_rtsp_address_free (addr);
1192     if (rtp_socket)
1193       g_object_unref (rtp_socket);
1194     if (rtcp_socket)
1195       g_object_unref (rtcp_socket);
1196     return FALSE;
1197   }
1198 }
1199
1200 /* must be called with lock */
1201 static gboolean
1202 alloc_ports (GstRTSPStream * stream)
1203 {
1204   GstRTSPStreamPrivate *priv = stream->priv;
1205
1206   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1207       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1208       &priv->server_port_v4, &priv->server_addr_v4);
1209
1210   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1211       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1212       &priv->server_port_v6, &priv->server_addr_v6);
1213
1214   return priv->have_ipv4 || priv->have_ipv6;
1215 }
1216
1217 /**
1218  * gst_rtsp_stream_get_server_port:
1219  * @stream: a #GstRTSPStream
1220  * @server_port: (out): result server port
1221  * @family: the port family to get
1222  *
1223  * Fill @server_port with the port pair used by the server. This function can
1224  * only be called when @stream has been joined.
1225  */
1226 void
1227 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1228     GstRTSPRange * server_port, GSocketFamily family)
1229 {
1230   GstRTSPStreamPrivate *priv;
1231
1232   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1233   priv = stream->priv;
1234   g_return_if_fail (priv->is_joined);
1235
1236   g_mutex_lock (&priv->lock);
1237   if (family == G_SOCKET_FAMILY_IPV4) {
1238     if (server_port)
1239       *server_port = priv->server_port_v4;
1240   } else {
1241     if (server_port)
1242       *server_port = priv->server_port_v6;
1243   }
1244   g_mutex_unlock (&priv->lock);
1245 }
1246
1247 /**
1248  * gst_rtsp_stream_get_rtpsession:
1249  * @stream: a #GstRTSPStream
1250  *
1251  * Get the RTP session of this stream.
1252  *
1253  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1254  */
1255 GObject *
1256 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1257 {
1258   GstRTSPStreamPrivate *priv;
1259   GObject *session;
1260
1261   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1262
1263   priv = stream->priv;
1264
1265   g_mutex_lock (&priv->lock);
1266   if ((session = priv->session))
1267     g_object_ref (session);
1268   g_mutex_unlock (&priv->lock);
1269
1270   return session;
1271 }
1272
1273 /**
1274  * gst_rtsp_stream_get_ssrc:
1275  * @stream: a #GstRTSPStream
1276  * @ssrc: (out): result ssrc
1277  *
1278  * Get the SSRC used by the RTP session of this stream. This function can only
1279  * be called when @stream has been joined.
1280  */
1281 void
1282 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1283 {
1284   GstRTSPStreamPrivate *priv;
1285
1286   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1287   priv = stream->priv;
1288   g_return_if_fail (priv->is_joined);
1289
1290   g_mutex_lock (&priv->lock);
1291   if (ssrc && priv->session)
1292     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1293   g_mutex_unlock (&priv->lock);
1294 }
1295
1296 /* executed from streaming thread */
1297 static void
1298 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1299 {
1300   GstRTSPStreamPrivate *priv = stream->priv;
1301   GstCaps *newcaps, *oldcaps;
1302
1303   newcaps = gst_pad_get_current_caps (pad);
1304
1305   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1306       newcaps);
1307
1308   g_mutex_lock (&priv->lock);
1309   oldcaps = priv->caps;
1310   priv->caps = newcaps;
1311   g_mutex_unlock (&priv->lock);
1312
1313   if (oldcaps)
1314     gst_caps_unref (oldcaps);
1315 }
1316
1317 static void
1318 dump_structure (const GstStructure * s)
1319 {
1320   gchar *sstr;
1321
1322   sstr = gst_structure_to_string (s);
1323   GST_INFO ("structure: %s", sstr);
1324   g_free (sstr);
1325 }
1326
1327 static GstRTSPStreamTransport *
1328 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1329 {
1330   GstRTSPStreamPrivate *priv = stream->priv;
1331   GList *walk;
1332   GstRTSPStreamTransport *result = NULL;
1333   const gchar *tmp;
1334   gchar *dest;
1335   guint port;
1336
1337   if (rtcp_from == NULL)
1338     return NULL;
1339
1340   tmp = g_strrstr (rtcp_from, ":");
1341   if (tmp == NULL)
1342     return NULL;
1343
1344   port = atoi (tmp + 1);
1345   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1346
1347   g_mutex_lock (&priv->lock);
1348   GST_INFO ("finding %s:%d in %d transports", dest, port,
1349       g_list_length (priv->transports));
1350
1351   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1352     GstRTSPStreamTransport *trans = walk->data;
1353     const GstRTSPTransport *tr;
1354     gint min, max;
1355
1356     tr = gst_rtsp_stream_transport_get_transport (trans);
1357
1358     min = tr->client_port.min;
1359     max = tr->client_port.max;
1360
1361     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1362       result = trans;
1363       break;
1364     }
1365   }
1366   if (result)
1367     g_object_ref (result);
1368   g_mutex_unlock (&priv->lock);
1369
1370   g_free (dest);
1371
1372   return result;
1373 }
1374
1375 static GstRTSPStreamTransport *
1376 check_transport (GObject * source, GstRTSPStream * stream)
1377 {
1378   GstStructure *stats;
1379   GstRTSPStreamTransport *trans;
1380
1381   /* see if we have a stream to match with the origin of the RTCP packet */
1382   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1383   if (trans == NULL) {
1384     g_object_get (source, "stats", &stats, NULL);
1385     if (stats) {
1386       const gchar *rtcp_from;
1387
1388       dump_structure (stats);
1389
1390       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1391       if ((trans = find_transport (stream, rtcp_from))) {
1392         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1393             source);
1394         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1395             g_object_unref);
1396       }
1397       gst_structure_free (stats);
1398     }
1399   }
1400   return trans;
1401 }
1402
1403
1404 static void
1405 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1406 {
1407   GstRTSPStreamTransport *trans;
1408
1409   GST_INFO ("%p: new source %p", stream, source);
1410
1411   trans = check_transport (source, stream);
1412
1413   if (trans)
1414     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1415 }
1416
1417 static void
1418 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1419 {
1420   GST_INFO ("%p: new SDES %p", stream, source);
1421 }
1422
1423 static void
1424 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1425 {
1426   GstRTSPStreamTransport *trans;
1427
1428   trans = check_transport (source, stream);
1429
1430   if (trans) {
1431     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1432     gst_rtsp_stream_transport_keep_alive (trans);
1433   }
1434 #ifdef DUMP_STATS
1435   {
1436     GstStructure *stats;
1437     g_object_get (source, "stats", &stats, NULL);
1438     if (stats) {
1439       dump_structure (stats);
1440       gst_structure_free (stats);
1441     }
1442   }
1443 #endif
1444 }
1445
1446 static void
1447 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1448 {
1449   GST_INFO ("%p: source %p bye", stream, source);
1450 }
1451
1452 static void
1453 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1454 {
1455   GstRTSPStreamTransport *trans;
1456
1457   GST_INFO ("%p: source %p bye timeout", stream, source);
1458
1459   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1460     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1461     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1462   }
1463 }
1464
1465 static void
1466 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1467 {
1468   GstRTSPStreamTransport *trans;
1469
1470   GST_INFO ("%p: source %p timeout", stream, source);
1471
1472   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1473     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1474     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1475   }
1476 }
1477
1478 static void
1479 clear_tr_cache (GstRTSPStreamPrivate * priv)
1480 {
1481   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1482   g_list_free (priv->tr_cache);
1483   priv->tr_cache = NULL;
1484 }
1485
1486 static GstFlowReturn
1487 handle_new_sample (GstAppSink * sink, gpointer user_data)
1488 {
1489   GstRTSPStreamPrivate *priv;
1490   GList *walk;
1491   GstSample *sample;
1492   GstBuffer *buffer;
1493   GstRTSPStream *stream;
1494   gboolean is_rtp;
1495
1496   sample = gst_app_sink_pull_sample (sink);
1497   if (!sample)
1498     return GST_FLOW_OK;
1499
1500   stream = (GstRTSPStream *) user_data;
1501   priv = stream->priv;
1502   buffer = gst_sample_get_buffer (sample);
1503
1504   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1505
1506   g_mutex_lock (&priv->lock);
1507   if (priv->tr_cache_cookie != priv->transports_cookie) {
1508     clear_tr_cache (priv);
1509     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1510       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1511       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1512     }
1513     priv->tr_cache_cookie = priv->transports_cookie;
1514   }
1515   g_mutex_unlock (&priv->lock);
1516
1517   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1518     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1519
1520     if (is_rtp) {
1521       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1522     } else {
1523       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1524     }
1525   }
1526   gst_sample_unref (sample);
1527
1528   return GST_FLOW_OK;
1529 }
1530
1531 static GstAppSinkCallbacks sink_cb = {
1532   NULL,                         /* not interested in EOS */
1533   NULL,                         /* not interested in preroll samples */
1534   handle_new_sample,
1535 };
1536
1537 static GstElement *
1538 get_rtp_encoder (GstRTSPStream * stream, guint session)
1539 {
1540   GstRTSPStreamPrivate *priv = stream->priv;
1541
1542   if (priv->srtpenc == NULL) {
1543     gchar *name;
1544
1545     name = g_strdup_printf ("srtpenc_%u", session);
1546     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1547     g_free (name);
1548
1549     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1550   }
1551   return gst_object_ref (priv->srtpenc);
1552 }
1553
1554 static GstElement *
1555 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1556 {
1557   GstRTSPStreamPrivate *priv = stream->priv;
1558   GstElement *oldenc, *enc;
1559   GstPad *pad;
1560   gchar *name;
1561
1562   if (priv->idx != session)
1563     return NULL;
1564
1565   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1566
1567   oldenc = priv->srtpenc;
1568   enc = get_rtp_encoder (stream, session);
1569   name = g_strdup_printf ("rtp_sink_%d", session);
1570   pad = gst_element_get_request_pad (enc, name);
1571   g_free (name);
1572   gst_object_unref (pad);
1573
1574   if (oldenc == NULL)
1575     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1576         enc);
1577
1578   return enc;
1579 }
1580
1581 static GstElement *
1582 request_rtcp_encoder (GstElement * rtpbin, guint session,
1583     GstRTSPStream * stream)
1584 {
1585   GstRTSPStreamPrivate *priv = stream->priv;
1586   GstElement *oldenc, *enc;
1587   GstPad *pad;
1588   gchar *name;
1589
1590   if (priv->idx != session)
1591     return NULL;
1592
1593   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1594
1595   oldenc = priv->srtpenc;
1596   enc = get_rtp_encoder (stream, session);
1597   name = g_strdup_printf ("rtcp_sink_%d", session);
1598   pad = gst_element_get_request_pad (enc, name);
1599   g_free (name);
1600   gst_object_unref (pad);
1601
1602   if (oldenc == NULL)
1603     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1604         enc);
1605
1606   return enc;
1607 }
1608
1609 static GstCaps *
1610 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1611 {
1612   GstRTSPStreamPrivate *priv = stream->priv;
1613   GstCaps *caps;
1614
1615   GST_DEBUG ("request key %08x", ssrc);
1616
1617   g_mutex_lock (&priv->lock);
1618   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1619     gst_caps_ref (caps);
1620   g_mutex_unlock (&priv->lock);
1621
1622   return caps;
1623 }
1624
1625 static GstElement *
1626 request_rtcp_decoder (GstElement * rtpbin, guint session,
1627     GstRTSPStream * stream)
1628 {
1629   GstRTSPStreamPrivate *priv = stream->priv;
1630
1631   if (priv->idx != session)
1632     return NULL;
1633
1634   if (priv->srtpdec == NULL) {
1635     gchar *name;
1636
1637     name = g_strdup_printf ("srtpdec_%u", session);
1638     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1639     g_free (name);
1640
1641     g_signal_connect (priv->srtpdec, "request-key",
1642         (GCallback) request_key, stream);
1643   }
1644   return gst_object_ref (priv->srtpdec);
1645 }
1646
1647 /**
1648  * gst_rtsp_stream_join_bin:
1649  * @stream: a #GstRTSPStream
1650  * @bin: (transfer none): a #GstBin to join
1651  * @rtpbin: (transfer none): a rtpbin element in @bin
1652  * @state: the target state of the new elements
1653  *
1654  * Join the #GstBin @bin that contains the element @rtpbin.
1655  *
1656  * @stream will link to @rtpbin, which must be inside @bin. The elements
1657  * added to @bin will be set to the state given in @state.
1658  *
1659  * Returns: %TRUE on success.
1660  */
1661 gboolean
1662 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1663     GstElement * rtpbin, GstState state)
1664 {
1665   GstRTSPStreamPrivate *priv;
1666   gint i;
1667   guint idx;
1668   gchar *name;
1669   GstPad *pad, *sinkpad, *selpad;
1670   GstPadLinkReturn ret;
1671
1672   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1673   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1674   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1675
1676   priv = stream->priv;
1677
1678   g_mutex_lock (&priv->lock);
1679   if (priv->is_joined)
1680     goto was_joined;
1681
1682   /* create a session with the same index as the stream */
1683   idx = priv->idx;
1684
1685   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1686
1687   if (!alloc_ports (stream))
1688     goto no_ports;
1689
1690   /* update the dscp qos field in the sinks */
1691   update_dscp_qos (stream);
1692
1693   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1694       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1695     /* For SRTP */
1696     g_signal_connect (rtpbin, "request-rtp-encoder",
1697         (GCallback) request_rtp_encoder, stream);
1698     g_signal_connect (rtpbin, "request-rtcp-encoder",
1699         (GCallback) request_rtcp_encoder, stream);
1700     g_signal_connect (rtpbin, "request-rtcp-decoder",
1701         (GCallback) request_rtcp_decoder, stream);
1702   }
1703
1704   /* get a pad for sending RTP */
1705   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1706   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1707   g_free (name);
1708   /* link the RTP pad to the session manager, it should not really fail unless
1709    * this is not really an RTP pad */
1710   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1711   if (ret != GST_PAD_LINK_OK)
1712     goto link_failed;
1713
1714   /* get pads from the RTP session element for sending and receiving
1715    * RTP/RTCP*/
1716   name = g_strdup_printf ("send_rtp_src_%u", idx);
1717   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1718   g_free (name);
1719   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1720   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1721   g_free (name);
1722   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1723   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1724   g_free (name);
1725   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1726   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1727   g_free (name);
1728
1729   /* get the session */
1730   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1731
1732   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1733       stream);
1734   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1735       stream);
1736   g_signal_connect (priv->session, "on-ssrc-active",
1737       (GCallback) on_ssrc_active, stream);
1738   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1739       stream);
1740   g_signal_connect (priv->session, "on-bye-timeout",
1741       (GCallback) on_bye_timeout, stream);
1742   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1743       stream);
1744
1745   for (i = 0; i < 2; i++) {
1746     GstPad *teepad, *queuepad;
1747     /* For the sender we create this bit of pipeline for both
1748      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1749      * we need to add a queue before appsink to make the pipeline
1750      * not block. For the TCP case, we want to pump data to the
1751      * client as fast as possible anyway.
1752      *
1753      * .--------.      .-----.    .---------.
1754      * | rtpbin |      | tee |    | udpsink |
1755      * |       send->sink   src->sink       |
1756      * '--------'      |     |    '---------'
1757      *                 |     |    .---------.    .---------.
1758      *                 |     |    |  queue  |    | appsink |
1759      *                 |    src->sink      src->sink       |
1760      *                 '-----'    '---------'    '---------'
1761      *
1762      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1763      * udpsink directly to the session.
1764      */
1765     /* add udpsink */
1766     gst_bin_add (bin, priv->udpsink[i]);
1767     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1768
1769     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1770       /* make tee for RTP/RTCP */
1771       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1772       gst_bin_add (bin, priv->tee[i]);
1773
1774       /* and link to rtpbin send pad */
1775       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1776       gst_pad_link (priv->send_src[i], pad);
1777       gst_object_unref (pad);
1778
1779       /* link tee to udpsink */
1780       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1781       gst_pad_link (teepad, sinkpad);
1782       gst_object_unref (teepad);
1783
1784       /* make queue */
1785       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1786       gst_bin_add (bin, priv->appqueue[i]);
1787       /* and link to tee */
1788       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1789       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1790       gst_pad_link (teepad, pad);
1791       gst_object_unref (pad);
1792       gst_object_unref (teepad);
1793
1794       /* make appsink */
1795       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1796       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1797       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1798       gst_bin_add (bin, priv->appsink[i]);
1799       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1800           &sink_cb, stream, NULL);
1801       /* and link to queue */
1802       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1803       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1804       gst_pad_link (queuepad, pad);
1805       gst_object_unref (pad);
1806       gst_object_unref (queuepad);
1807     } else {
1808       /* else only udpsink needed, link it to the session */
1809       gst_pad_link (priv->send_src[i], sinkpad);
1810     }
1811     gst_object_unref (sinkpad);
1812
1813     /* For the receiver we create this bit of pipeline for both
1814      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1815      * and it is all funneled into the rtpbin receive pad.
1816      *
1817      * .--------.     .--------.    .--------.
1818      * | udpsrc |     | funnel |    | rtpbin |
1819      * |       src->sink      src->sink      |
1820      * '--------'     |        |    '--------'
1821      * .--------.     |        |
1822      * | appsrc |     |        |
1823      * |       src->sink       |
1824      * '--------'     '--------'
1825      */
1826     /* make funnel for the RTP/RTCP receivers */
1827     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1828     gst_bin_add (bin, priv->funnel[i]);
1829
1830     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1831     gst_pad_link (pad, priv->recv_sink[i]);
1832     gst_object_unref (pad);
1833
1834     if (priv->udpsrc_v4[i]) {
1835       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1836        * values */
1837       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1838       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1839       /* add udpsrc */
1840       gst_bin_add (bin, priv->udpsrc_v4[i]);
1841
1842       /* and link to the funnel v4 */
1843       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1844       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1845       gst_pad_link (pad, selpad);
1846       gst_object_unref (pad);
1847       gst_object_unref (selpad);
1848     }
1849
1850     if (priv->udpsrc_v6[i]) {
1851       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1852       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1853       gst_bin_add (bin, priv->udpsrc_v6[i]);
1854
1855       /* and link to the funnel v6 */
1856       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1857       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1858       gst_pad_link (pad, selpad);
1859       gst_object_unref (pad);
1860       gst_object_unref (selpad);
1861     }
1862
1863     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1864       /* make and add appsrc */
1865       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1866       gst_bin_add (bin, priv->appsrc[i]);
1867       /* and link to the funnel */
1868       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1869       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1870       gst_pad_link (pad, selpad);
1871       gst_object_unref (pad);
1872       gst_object_unref (selpad);
1873     }
1874
1875     /* check if we need to set to a special state */
1876     if (state != GST_STATE_NULL) {
1877       if (priv->udpsink[i])
1878         gst_element_set_state (priv->udpsink[i], state);
1879       if (priv->appsink[i])
1880         gst_element_set_state (priv->appsink[i], state);
1881       if (priv->appqueue[i])
1882         gst_element_set_state (priv->appqueue[i], state);
1883       if (priv->tee[i])
1884         gst_element_set_state (priv->tee[i], state);
1885       if (priv->funnel[i])
1886         gst_element_set_state (priv->funnel[i], state);
1887       if (priv->appsrc[i])
1888         gst_element_set_state (priv->appsrc[i], state);
1889     }
1890   }
1891
1892   /* be notified of caps changes */
1893   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
1894       (GCallback) caps_notify, stream);
1895
1896   priv->is_joined = TRUE;
1897   g_mutex_unlock (&priv->lock);
1898
1899   return TRUE;
1900
1901   /* ERRORS */
1902 was_joined:
1903   {
1904     g_mutex_unlock (&priv->lock);
1905     return TRUE;
1906   }
1907 no_ports:
1908   {
1909     g_mutex_unlock (&priv->lock);
1910     GST_WARNING ("failed to allocate ports %u", idx);
1911     return FALSE;
1912   }
1913 link_failed:
1914   {
1915     GST_WARNING ("failed to link stream %u", idx);
1916     gst_object_unref (priv->send_rtp_sink);
1917     priv->send_rtp_sink = NULL;
1918     g_mutex_unlock (&priv->lock);
1919     return FALSE;
1920   }
1921 }
1922
1923 /**
1924  * gst_rtsp_stream_leave_bin:
1925  * @stream: a #GstRTSPStream
1926  * @bin: (transfer none): a #GstBin
1927  * @rtpbin: (transfer none): a rtpbin #GstElement
1928  *
1929  * Remove the elements of @stream from @bin.
1930  *
1931  * Return: %TRUE on success.
1932  */
1933 gboolean
1934 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1935     GstElement * rtpbin)
1936 {
1937   GstRTSPStreamPrivate *priv;
1938   gint i;
1939
1940   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1941   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1942   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1943
1944   priv = stream->priv;
1945
1946   g_mutex_lock (&priv->lock);
1947   if (!priv->is_joined)
1948     goto was_not_joined;
1949
1950   /* all transports must be removed by now */
1951   g_return_val_if_fail (priv->transports == NULL, FALSE);
1952
1953   clear_tr_cache (priv);
1954
1955   GST_INFO ("stream %p leaving bin", stream);
1956
1957   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1958   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
1959   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1960   gst_object_unref (priv->send_rtp_sink);
1961   priv->send_rtp_sink = NULL;
1962
1963   for (i = 0; i < 2; i++) {
1964     if (priv->udpsink[i])
1965       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1966     if (priv->appsink[i])
1967       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1968     if (priv->appqueue[i])
1969       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1970     if (priv->tee[i])
1971       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1972     if (priv->funnel[i])
1973       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1974     if (priv->appsrc[i])
1975       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1976     if (priv->udpsrc_v4[i]) {
1977       /* and set udpsrc to NULL now before removing */
1978       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1979       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1980       /* removing them should also nicely release the request
1981        * pads when they finalize */
1982       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1983     }
1984     if (priv->udpsrc_v6[i]) {
1985       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1986       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1987       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1988     }
1989     if (priv->udpsink[i])
1990       gst_bin_remove (bin, priv->udpsink[i]);
1991     if (priv->appsrc[i])
1992       gst_bin_remove (bin, priv->appsrc[i]);
1993     if (priv->appsink[i])
1994       gst_bin_remove (bin, priv->appsink[i]);
1995     if (priv->appqueue[i])
1996       gst_bin_remove (bin, priv->appqueue[i]);
1997     if (priv->tee[i])
1998       gst_bin_remove (bin, priv->tee[i]);
1999     if (priv->funnel[i])
2000       gst_bin_remove (bin, priv->funnel[i]);
2001
2002     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2003     gst_object_unref (priv->recv_sink[i]);
2004     priv->recv_sink[i] = NULL;
2005
2006     priv->udpsrc_v4[i] = NULL;
2007     priv->udpsrc_v6[i] = NULL;
2008     priv->udpsink[i] = NULL;
2009     priv->appsrc[i] = NULL;
2010     priv->appsink[i] = NULL;
2011     priv->appqueue[i] = NULL;
2012     priv->tee[i] = NULL;
2013     priv->funnel[i] = NULL;
2014   }
2015   gst_object_unref (priv->send_src[0]);
2016   priv->send_src[0] = NULL;
2017
2018   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2019   gst_object_unref (priv->send_src[1]);
2020   priv->send_src[1] = NULL;
2021
2022   g_object_unref (priv->session);
2023   priv->session = NULL;
2024   if (priv->caps)
2025     gst_caps_unref (priv->caps);
2026   priv->caps = NULL;
2027
2028   if (priv->srtpenc)
2029     gst_object_unref (priv->srtpenc);
2030
2031   priv->is_joined = FALSE;
2032   g_mutex_unlock (&priv->lock);
2033
2034   return TRUE;
2035
2036 was_not_joined:
2037   {
2038     g_mutex_unlock (&priv->lock);
2039     return TRUE;
2040   }
2041 }
2042
2043 /**
2044  * gst_rtsp_stream_get_rtpinfo:
2045  * @stream: a #GstRTSPStream
2046  * @rtptime: (allow-none): result RTP timestamp
2047  * @seq: (allow-none): result RTP seqnum
2048  * @clock_rate: (allow-none): the clock rate
2049  * @running_time: (allow-none): result running-time
2050  *
2051  * Retrieve the current rtptime, seq and running-time. This is used to
2052  * construct a RTPInfo reply header.
2053  *
2054  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2055  */
2056 gboolean
2057 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2058     guint * rtptime, guint * seq, guint * clock_rate,
2059     GstClockTime * running_time)
2060 {
2061   GstRTSPStreamPrivate *priv;
2062   GstStructure *stats;
2063   GObjectClass *payobjclass;
2064
2065   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2066
2067   priv = stream->priv;
2068
2069   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2070
2071   g_mutex_lock (&priv->lock);
2072
2073   if (g_object_class_find_property (payobjclass, "stats")) {
2074     g_object_get (priv->payloader, "stats", &stats, NULL);
2075     if (stats == NULL)
2076       goto no_stats;
2077
2078     if (seq)
2079       gst_structure_get_uint (stats, "seqnum", seq);
2080
2081     if (rtptime)
2082       gst_structure_get_uint (stats, "timestamp", rtptime);
2083
2084     if (running_time)
2085       gst_structure_get_clock_time (stats, "running-time", running_time);
2086
2087     if (clock_rate) {
2088       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2089       if (*clock_rate == 0 && running_time)
2090         *running_time = GST_CLOCK_TIME_NONE;
2091     }
2092     gst_structure_free (stats);
2093   } else {
2094     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2095         !g_object_class_find_property (payobjclass, "timestamp"))
2096       goto no_stats;
2097
2098     if (seq)
2099       g_object_get (priv->payloader, "seqnum", seq, NULL);
2100
2101     if (rtptime)
2102       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2103
2104     if (running_time)
2105       *running_time = GST_CLOCK_TIME_NONE;
2106   }
2107   g_mutex_unlock (&priv->lock);
2108
2109   return TRUE;
2110
2111   /* ERRORS */
2112 no_stats:
2113   {
2114     GST_WARNING ("Could not get payloader stats");
2115     g_mutex_unlock (&priv->lock);
2116     return FALSE;
2117   }
2118 }
2119
2120 /**
2121  * gst_rtsp_stream_get_caps:
2122  * @stream: a #GstRTSPStream
2123  *
2124  * Retrieve the current caps of @stream.
2125  *
2126  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2127  * after usage.
2128  */
2129 GstCaps *
2130 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2131 {
2132   GstRTSPStreamPrivate *priv;
2133   GstCaps *result;
2134
2135   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2136
2137   priv = stream->priv;
2138
2139   g_mutex_lock (&priv->lock);
2140   if ((result = priv->caps))
2141     gst_caps_ref (result);
2142   g_mutex_unlock (&priv->lock);
2143
2144   return result;
2145 }
2146
2147 /**
2148  * gst_rtsp_stream_recv_rtp:
2149  * @stream: a #GstRTSPStream
2150  * @buffer: (transfer full): a #GstBuffer
2151  *
2152  * Handle an RTP buffer for the stream. This method is usually called when a
2153  * message has been received from a client using the TCP transport.
2154  *
2155  * This function takes ownership of @buffer.
2156  *
2157  * Returns: a GstFlowReturn.
2158  */
2159 GstFlowReturn
2160 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2161 {
2162   GstRTSPStreamPrivate *priv;
2163   GstFlowReturn ret;
2164   GstElement *element;
2165
2166   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2167   priv = stream->priv;
2168   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2169   g_return_val_if_fail (priv->is_joined, FALSE);
2170
2171   g_mutex_lock (&priv->lock);
2172   if (priv->appsrc[0])
2173     element = gst_object_ref (priv->appsrc[0]);
2174   else
2175     element = NULL;
2176   g_mutex_unlock (&priv->lock);
2177
2178   if (element) {
2179     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2180     gst_object_unref (element);
2181   } else {
2182     ret = GST_FLOW_OK;
2183   }
2184   return ret;
2185 }
2186
2187 /**
2188  * gst_rtsp_stream_recv_rtcp:
2189  * @stream: a #GstRTSPStream
2190  * @buffer: (transfer full): a #GstBuffer
2191  *
2192  * Handle an RTCP buffer for the stream. This method is usually called when a
2193  * message has been received from a client using the TCP transport.
2194  *
2195  * This function takes ownership of @buffer.
2196  *
2197  * Returns: a GstFlowReturn.
2198  */
2199 GstFlowReturn
2200 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2201 {
2202   GstRTSPStreamPrivate *priv;
2203   GstFlowReturn ret;
2204   GstElement *element;
2205
2206   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2207   priv = stream->priv;
2208   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2209   g_return_val_if_fail (priv->is_joined, FALSE);
2210
2211   g_mutex_lock (&priv->lock);
2212   if (priv->appsrc[1])
2213     element = gst_object_ref (priv->appsrc[1]);
2214   else
2215     element = NULL;
2216   g_mutex_unlock (&priv->lock);
2217
2218   if (element) {
2219     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2220     gst_object_unref (element);
2221   } else {
2222     ret = GST_FLOW_OK;
2223     gst_buffer_unref (buffer);
2224   }
2225   return ret;
2226 }
2227
2228 /* must be called with lock */
2229 static gboolean
2230 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2231     gboolean add)
2232 {
2233   GstRTSPStreamPrivate *priv = stream->priv;
2234   const GstRTSPTransport *tr;
2235
2236   tr = gst_rtsp_stream_transport_get_transport (trans);
2237
2238   switch (tr->lower_transport) {
2239     case GST_RTSP_LOWER_TRANS_UDP:
2240     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2241     {
2242       gchar *dest;
2243       gint min, max;
2244       guint ttl = 0;
2245
2246       dest = tr->destination;
2247       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2248         min = tr->port.min;
2249         max = tr->port.max;
2250         ttl = tr->ttl;
2251       } else {
2252         min = tr->client_port.min;
2253         max = tr->client_port.max;
2254       }
2255
2256       if (add) {
2257         if (ttl > 0) {
2258           GST_INFO ("setting ttl-mc %d", ttl);
2259           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2260           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2261         }
2262         GST_INFO ("adding %s:%d-%d", dest, min, max);
2263         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2264         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2265         priv->transports = g_list_prepend (priv->transports, trans);
2266       } else {
2267         GST_INFO ("removing %s:%d-%d", dest, min, max);
2268         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2269         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2270         priv->transports = g_list_remove (priv->transports, trans);
2271       }
2272       priv->transports_cookie++;
2273       break;
2274     }
2275     case GST_RTSP_LOWER_TRANS_TCP:
2276       if (add) {
2277         GST_INFO ("adding TCP %s", tr->destination);
2278         priv->transports = g_list_prepend (priv->transports, trans);
2279       } else {
2280         GST_INFO ("removing TCP %s", tr->destination);
2281         priv->transports = g_list_remove (priv->transports, trans);
2282       }
2283       priv->transports_cookie++;
2284       break;
2285     default:
2286       goto unknown_transport;
2287   }
2288   return TRUE;
2289
2290   /* ERRORS */
2291 unknown_transport:
2292   {
2293     GST_INFO ("Unknown transport %d", tr->lower_transport);
2294     return FALSE;
2295   }
2296 }
2297
2298
2299 /**
2300  * gst_rtsp_stream_add_transport:
2301  * @stream: a #GstRTSPStream
2302  * @trans: (transfer none): a #GstRTSPStreamTransport
2303  *
2304  * Add the transport in @trans to @stream. The media of @stream will
2305  * then also be send to the values configured in @trans.
2306  *
2307  * @stream must be joined to a bin.
2308  *
2309  * @trans must contain a valid #GstRTSPTransport.
2310  *
2311  * Returns: %TRUE if @trans was added
2312  */
2313 gboolean
2314 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2315     GstRTSPStreamTransport * trans)
2316 {
2317   GstRTSPStreamPrivate *priv;
2318   gboolean res;
2319
2320   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2321   priv = stream->priv;
2322   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2323   g_return_val_if_fail (priv->is_joined, FALSE);
2324
2325   g_mutex_lock (&priv->lock);
2326   res = update_transport (stream, trans, TRUE);
2327   g_mutex_unlock (&priv->lock);
2328
2329   return res;
2330 }
2331
2332 /**
2333  * gst_rtsp_stream_remove_transport:
2334  * @stream: a #GstRTSPStream
2335  * @trans: (transfer none): a #GstRTSPStreamTransport
2336  *
2337  * Remove the transport in @trans from @stream. The media of @stream will
2338  * not be sent to the values configured in @trans.
2339  *
2340  * @stream must be joined to a bin.
2341  *
2342  * @trans must contain a valid #GstRTSPTransport.
2343  *
2344  * Returns: %TRUE if @trans was removed
2345  */
2346 gboolean
2347 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2348     GstRTSPStreamTransport * trans)
2349 {
2350   GstRTSPStreamPrivate *priv;
2351   gboolean res;
2352
2353   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2354   priv = stream->priv;
2355   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2356   g_return_val_if_fail (priv->is_joined, FALSE);
2357
2358   g_mutex_lock (&priv->lock);
2359   res = update_transport (stream, trans, FALSE);
2360   g_mutex_unlock (&priv->lock);
2361
2362   return res;
2363 }
2364
2365 /**
2366  * gst_rtsp_stream_update_crypto:
2367  * @stream: a #GstRTSPStream
2368  * @ssrc: the SSRC
2369  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2370  *
2371  * Update the new crypto information for @ssrc in @stream. If information
2372  * for @ssrc did not exist, it will be added. If information
2373  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2374  * be removed from @stream.
2375  *
2376  * Returns: %TRUE if @crypto could be updated
2377  */
2378 gboolean
2379 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2380     guint ssrc, GstCaps * crypto)
2381 {
2382   GstRTSPStreamPrivate *priv;
2383
2384   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2385   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2386
2387   priv = stream->priv;
2388
2389   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2390
2391   g_mutex_lock (&priv->lock);
2392   if (crypto)
2393     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2394         gst_caps_ref (crypto));
2395   else
2396     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2397   g_mutex_unlock (&priv->lock);
2398
2399   return TRUE;
2400 }
2401
2402 /**
2403  * gst_rtsp_stream_get_rtp_socket:
2404  * @stream: a #GstRTSPStream
2405  * @family: the socket family
2406  *
2407  * Get the RTP socket from @stream for a @family.
2408  *
2409  * @stream must be joined to a bin.
2410  *
2411  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2412  * socket could be allocated for @family. Unref after usage
2413  */
2414 GSocket *
2415 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2416 {
2417   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2418   GSocket *socket;
2419   const gchar *name;
2420
2421   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2422   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2423       family == G_SOCKET_FAMILY_IPV6, NULL);
2424   g_return_val_if_fail (priv->udpsink[0], NULL);
2425
2426   if (family == G_SOCKET_FAMILY_IPV6)
2427     name = "socket-v6";
2428   else
2429     name = "socket";
2430
2431   g_object_get (priv->udpsink[0], name, &socket, NULL);
2432
2433   return socket;
2434 }
2435
2436 /**
2437  * gst_rtsp_stream_get_rtcp_socket:
2438  * @stream: a #GstRTSPStream
2439  * @family: the socket family
2440  *
2441  * Get the RTCP socket from @stream for a @family.
2442  *
2443  * @stream must be joined to a bin.
2444  *
2445  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2446  * socket could be allocated for @family. Unref after usage
2447  */
2448 GSocket *
2449 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2450 {
2451   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2452   GSocket *socket;
2453   const gchar *name;
2454
2455   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2456   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2457       family == G_SOCKET_FAMILY_IPV6, NULL);
2458   g_return_val_if_fail (priv->udpsink[1], NULL);
2459
2460   if (family == G_SOCKET_FAMILY_IPV6)
2461     name = "socket-v6";
2462   else
2463     name = "socket";
2464
2465   g_object_get (priv->udpsink[1], name, &socket, NULL);
2466
2467   return socket;
2468 }
2469
2470 /**
2471  * gst_rtsp_stream_transport_filter:
2472  * @stream: a #GstRTSPStream
2473  * @func: (scope call) (allow-none): a callback
2474  * @user_data: (closure): user data passed to @func
2475  *
2476  * Call @func for each transport managed by @stream. The result value of @func
2477  * determines what happens to the transport. @func will be called with @stream
2478  * locked so no further actions on @stream can be performed from @func.
2479  *
2480  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2481  * @stream.
2482  *
2483  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2484  *
2485  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2486  * will also be added with an additional ref to the result #GList of this
2487  * function..
2488  *
2489  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2490  *
2491  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2492  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2493  * element in the #GList should be unreffed before the list is freed.
2494  */
2495 GList *
2496 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2497     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2498 {
2499   GstRTSPStreamPrivate *priv;
2500   GList *result, *walk, *next;
2501   GHashTable *visited;
2502   guint cookie;
2503
2504   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2505
2506   priv = stream->priv;
2507
2508   result = NULL;
2509   if (func)
2510     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
2511
2512   g_mutex_lock (&priv->lock);
2513 restart:
2514   cookie = priv->transports_cookie;
2515   for (walk = priv->transports; walk; walk = next) {
2516     GstRTSPStreamTransport *trans = walk->data;
2517     GstRTSPFilterResult res;
2518     gboolean changed;
2519
2520     next = g_list_next (walk);
2521
2522     if (func) {
2523       /* only visit each transport once */
2524       if (g_hash_table_contains (visited, trans))
2525         continue;
2526
2527       g_hash_table_add (visited, g_object_ref (trans));
2528       g_mutex_unlock (&priv->lock);
2529
2530       res = func (stream, trans, user_data);
2531
2532       g_mutex_lock (&priv->lock);
2533     } else
2534       res = GST_RTSP_FILTER_REF;
2535
2536     changed = (cookie != priv->transports_cookie);
2537
2538     switch (res) {
2539       case GST_RTSP_FILTER_REMOVE:
2540         update_transport (stream, trans, FALSE);
2541         break;
2542       case GST_RTSP_FILTER_REF:
2543         result = g_list_prepend (result, g_object_ref (trans));
2544         break;
2545       case GST_RTSP_FILTER_KEEP:
2546       default:
2547         break;
2548     }
2549     if (changed)
2550       goto restart;
2551   }
2552   g_mutex_unlock (&priv->lock);
2553
2554   if (func)
2555     g_hash_table_unref (visited);
2556
2557   return result;
2558 }
2559
2560 static GstPadProbeReturn
2561 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2562 {
2563   GstRTSPStreamPrivate *priv;
2564   GstRTSPStream *stream;
2565
2566   stream = user_data;
2567   priv = stream->priv;
2568
2569   GST_DEBUG_OBJECT (pad, "now blocking");
2570
2571   g_mutex_lock (&priv->lock);
2572   priv->blocking = TRUE;
2573   g_mutex_unlock (&priv->lock);
2574
2575   gst_element_post_message (priv->payloader,
2576       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2577           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2578
2579   return GST_PAD_PROBE_OK;
2580 }
2581
2582 /**
2583  * gst_rtsp_stream_set_blocked:
2584  * @stream: a #GstRTSPStream
2585  * @blocked: boolean indicating we should block or unblock
2586  *
2587  * Blocks or unblocks the dataflow on @stream.
2588  *
2589  * Returns: %TRUE on success
2590  */
2591 gboolean
2592 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2593 {
2594   GstRTSPStreamPrivate *priv;
2595
2596   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2597
2598   priv = stream->priv;
2599
2600   g_mutex_lock (&priv->lock);
2601   if (blocked) {
2602     priv->blocking = FALSE;
2603     if (priv->blocked_id == 0) {
2604       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2605           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2606           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2607           g_object_ref (stream), g_object_unref);
2608     }
2609   } else {
2610     if (priv->blocked_id != 0) {
2611       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2612       priv->blocked_id = 0;
2613       priv->blocking = FALSE;
2614     }
2615   }
2616   g_mutex_unlock (&priv->lock);
2617
2618   return TRUE;
2619 }
2620
2621 /**
2622  * gst_rtsp_stream_is_blocking:
2623  * @stream: a #GstRTSPStream
2624  *
2625  * Check if @stream is blocking on a #GstBuffer.
2626  *
2627  * Returns: %TRUE if @stream is blocking
2628  */
2629 gboolean
2630 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2631 {
2632   GstRTSPStreamPrivate *priv;
2633   gboolean result;
2634
2635   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2636
2637   priv = stream->priv;
2638
2639   g_mutex_lock (&priv->lock);
2640   result = priv->blocking;
2641   g_mutex_unlock (&priv->lock);
2642
2643   return result;
2644 }