stream: add method to set crypto info
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* SRTP encoder/decoder */
83   GstElement *srtpenc;
84   GstElement *srtpdec;
85   GHashTable *keys;
86
87   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
88    * sockets */
89   GstElement *udpsrc_v4[2];
90
91   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
92    * sockets */
93   GstElement *udpsrc_v6[2];
94
95   GstElement *udpsink[2];
96
97   /* for TCP transport */
98   GstElement *appsrc[2];
99   GstElement *appqueue[2];
100   GstElement *appsink[2];
101
102   GstElement *tee[2];
103   GstElement *funnel[2];
104
105   /* server ports for sending/receiving over ipv4 */
106   GstRTSPRange server_port_v4;
107   GstRTSPAddress *server_addr_v4;
108   gboolean have_ipv4;
109
110   /* server ports for sending/receiving over ipv6 */
111   GstRTSPRange server_port_v6;
112   GstRTSPAddress *server_addr_v6;
113   gboolean have_ipv6;
114
115   /* multicast addresses */
116   GstRTSPAddressPool *pool;
117   GstRTSPAddress *addr_v4;
118   GstRTSPAddress *addr_v6;
119
120   /* the caps of the stream */
121   gulong caps_sig;
122   GstCaps *caps;
123
124   /* transports we stream to */
125   guint n_active;
126   GList *transports;
127   gboolean tr_changed;
128   GList *tr_cache;
129
130   gint dscp_qos;
131
132   /* stream blocking */
133   gulong blocked_id;
134   gboolean blocking;
135 };
136
137 #define DEFAULT_CONTROL         NULL
138 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
139 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
140                                         GST_RTSP_LOWER_TRANS_TCP
141
142 enum
143 {
144   PROP_0,
145   PROP_CONTROL,
146   PROP_PROFILES,
147   PROP_PROTOCOLS,
148   PROP_LAST
149 };
150
151 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
152 #define GST_CAT_DEFAULT rtsp_stream_debug
153
154 static GQuark ssrc_stream_map_key;
155
156 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
157     GValue * value, GParamSpec * pspec);
158 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
159     const GValue * value, GParamSpec * pspec);
160
161 static void gst_rtsp_stream_finalize (GObject * obj);
162
163 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
164
165 static void
166 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
167 {
168   GObjectClass *gobject_class;
169
170   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
171
172   gobject_class = G_OBJECT_CLASS (klass);
173
174   gobject_class->get_property = gst_rtsp_stream_get_property;
175   gobject_class->set_property = gst_rtsp_stream_set_property;
176   gobject_class->finalize = gst_rtsp_stream_finalize;
177
178   g_object_class_install_property (gobject_class, PROP_CONTROL,
179       g_param_spec_string ("control", "Control",
180           "The control string for this stream", DEFAULT_CONTROL,
181           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
182
183   g_object_class_install_property (gobject_class, PROP_PROFILES,
184       g_param_spec_flags ("profiles", "Profiles",
185           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
186           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
187
188   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
189       g_param_spec_flags ("protocols", "Protocols",
190           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
191           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
192
193   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
194
195   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
196 }
197
198 static void
199 gst_rtsp_stream_init (GstRTSPStream * stream)
200 {
201   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
202
203   GST_DEBUG ("new stream %p", stream);
204
205   stream->priv = priv;
206
207   priv->dscp_qos = -1;
208   priv->control = g_strdup (DEFAULT_CONTROL);
209   priv->profiles = DEFAULT_PROFILES;
210   priv->protocols = DEFAULT_PROTOCOLS;
211
212   g_mutex_init (&priv->lock);
213
214   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
215       NULL, (GDestroyNotify) gst_caps_unref);
216 }
217
218 static void
219 gst_rtsp_stream_finalize (GObject * obj)
220 {
221   GstRTSPStream *stream;
222   GstRTSPStreamPrivate *priv;
223
224   stream = GST_RTSP_STREAM (obj);
225   priv = stream->priv;
226
227   GST_DEBUG ("finalize stream %p", stream);
228
229   /* we really need to be unjoined now */
230   g_return_if_fail (!priv->is_joined);
231
232   if (priv->addr_v4)
233     gst_rtsp_address_free (priv->addr_v4);
234   if (priv->addr_v6)
235     gst_rtsp_address_free (priv->addr_v6);
236   if (priv->server_addr_v4)
237     gst_rtsp_address_free (priv->server_addr_v4);
238   if (priv->server_addr_v6)
239     gst_rtsp_address_free (priv->server_addr_v6);
240   if (priv->pool)
241     g_object_unref (priv->pool);
242   gst_object_unref (priv->payloader);
243   gst_object_unref (priv->srcpad);
244   g_free (priv->control);
245   g_mutex_clear (&priv->lock);
246
247   g_hash_table_unref (priv->keys);
248
249   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
250 }
251
252 static void
253 gst_rtsp_stream_get_property (GObject * object, guint propid,
254     GValue * value, GParamSpec * pspec)
255 {
256   GstRTSPStream *stream = GST_RTSP_STREAM (object);
257
258   switch (propid) {
259     case PROP_CONTROL:
260       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
261       break;
262     case PROP_PROFILES:
263       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
264       break;
265     case PROP_PROTOCOLS:
266       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
267       break;
268     default:
269       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
270   }
271 }
272
273 static void
274 gst_rtsp_stream_set_property (GObject * object, guint propid,
275     const GValue * value, GParamSpec * pspec)
276 {
277   GstRTSPStream *stream = GST_RTSP_STREAM (object);
278
279   switch (propid) {
280     case PROP_CONTROL:
281       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
282       break;
283     case PROP_PROFILES:
284       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
285       break;
286     case PROP_PROTOCOLS:
287       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
288       break;
289     default:
290       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
291   }
292 }
293
294 /**
295  * gst_rtsp_stream_new:
296  * @idx: an index
297  * @srcpad: a #GstPad
298  * @payloader: a #GstElement
299  *
300  * Create a new media stream with index @idx that handles RTP data on
301  * @srcpad and has a payloader element @payloader.
302  *
303  * Returns: (transfer full): a new #GstRTSPStream
304  */
305 GstRTSPStream *
306 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
307 {
308   GstRTSPStreamPrivate *priv;
309   GstRTSPStream *stream;
310
311   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
312   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
313   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
314
315   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
316   priv = stream->priv;
317   priv->idx = idx;
318   priv->payloader = gst_object_ref (payloader);
319   priv->srcpad = gst_object_ref (srcpad);
320
321   return stream;
322 }
323
324 /**
325  * gst_rtsp_stream_get_index:
326  * @stream: a #GstRTSPStream
327  *
328  * Get the stream index.
329  *
330  * Return: the stream index.
331  */
332 guint
333 gst_rtsp_stream_get_index (GstRTSPStream * stream)
334 {
335   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
336
337   return stream->priv->idx;
338 }
339
340 /**
341  * gst_rtsp_stream_get_pt:
342  * @stream: a #GstRTSPStream
343  *
344  * Get the stream payload type.
345  *
346  * Return: the stream payload type.
347  */
348 guint
349 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
350 {
351   GstRTSPStreamPrivate *priv;
352   guint pt;
353
354   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
355
356   priv = stream->priv;
357
358   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
359
360   return pt;
361 }
362
363 /**
364  * gst_rtsp_stream_get_srcpad:
365  * @stream: a #GstRTSPStream
366  *
367  * Get the srcpad associated with @stream.
368  *
369  * Returns: (transfer full): the srcpad. Unref after usage.
370  */
371 GstPad *
372 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
373 {
374   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
375
376   return gst_object_ref (stream->priv->srcpad);
377 }
378
379 /**
380  * gst_rtsp_stream_get_control:
381  * @stream: a #GstRTSPStream
382  *
383  * Get the control string to identify this stream.
384  *
385  * Returns: (transfer full): the control string. g_free() after usage.
386  */
387 gchar *
388 gst_rtsp_stream_get_control (GstRTSPStream * stream)
389 {
390   GstRTSPStreamPrivate *priv;
391   gchar *result;
392
393   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
394
395   priv = stream->priv;
396
397   g_mutex_lock (&priv->lock);
398   if ((result = g_strdup (priv->control)) == NULL)
399     result = g_strdup_printf ("stream=%u", priv->idx);
400   g_mutex_unlock (&priv->lock);
401
402   return result;
403 }
404
405 /**
406  * gst_rtsp_stream_set_control:
407  * @stream: a #GstRTSPStream
408  * @control: a control string
409  *
410  * Set the control string in @stream.
411  */
412 void
413 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
414 {
415   GstRTSPStreamPrivate *priv;
416
417   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
418
419   priv = stream->priv;
420
421   g_mutex_lock (&priv->lock);
422   g_free (priv->control);
423   priv->control = g_strdup (control);
424   g_mutex_unlock (&priv->lock);
425 }
426
427 /**
428  * gst_rtsp_stream_has_control:
429  * @stream: a #GstRTSPStream
430  * @control: a control string
431  *
432  * Check if @stream has the control string @control.
433  *
434  * Returns: %TRUE is @stream has @control as the control string
435  */
436 gboolean
437 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
438 {
439   GstRTSPStreamPrivate *priv;
440   gboolean res;
441
442   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
443
444   priv = stream->priv;
445
446   g_mutex_lock (&priv->lock);
447   if (priv->control)
448     res = (g_strcmp0 (priv->control, control) == 0);
449   else {
450     guint streamid;
451
452     if (sscanf (control, "stream=%u", &streamid) > 0)
453       res = (streamid == priv->idx);
454     else
455       res = FALSE;
456   }
457   g_mutex_unlock (&priv->lock);
458
459   return res;
460 }
461
462 /**
463  * gst_rtsp_stream_set_mtu:
464  * @stream: a #GstRTSPStream
465  * @mtu: a new MTU
466  *
467  * Configure the mtu in the payloader of @stream to @mtu.
468  */
469 void
470 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
471 {
472   GstRTSPStreamPrivate *priv;
473
474   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
475
476   priv = stream->priv;
477
478   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
479
480   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
481 }
482
483 /**
484  * gst_rtsp_stream_get_mtu:
485  * @stream: a #GstRTSPStream
486  *
487  * Get the configured MTU in the payloader of @stream.
488  *
489  * Returns: the MTU of the payloader.
490  */
491 guint
492 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
493 {
494   GstRTSPStreamPrivate *priv;
495   guint mtu;
496
497   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
498
499   priv = stream->priv;
500
501   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
502
503   return mtu;
504 }
505
506 /* Update the dscp qos property on the udp sinks */
507 static void
508 update_dscp_qos (GstRTSPStream * stream)
509 {
510   GstRTSPStreamPrivate *priv;
511
512   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
513
514   priv = stream->priv;
515
516   if (priv->udpsink[0]) {
517     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
518         NULL);
519   }
520
521   if (priv->udpsink[1]) {
522     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
523         NULL);
524   }
525 }
526
527 /**
528  * gst_rtsp_stream_set_dscp_qos:
529  * @stream: a #GstRTSPStream
530  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
531  *
532  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
533  */
534 void
535 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
536 {
537   GstRTSPStreamPrivate *priv;
538
539   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
540
541   priv = stream->priv;
542
543   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
544
545   if (dscp_qos < -1 || dscp_qos > 63) {
546     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
547     return;
548   }
549
550   priv->dscp_qos = dscp_qos;
551
552   update_dscp_qos (stream);
553 }
554
555 /**
556  * gst_rtsp_stream_get_dscp_qos:
557  * @stream: a #GstRTSPStream
558  *
559  * Get the configured DSCP QoS in of the outgoing sockets.
560  *
561  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
562  */
563 gint
564 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
565 {
566   GstRTSPStreamPrivate *priv;
567
568   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
569
570   priv = stream->priv;
571
572   return priv->dscp_qos;
573 }
574
575 /**
576  * gst_rtsp_stream_is_transport_supported:
577  * @stream: a #GstRTSPStream
578  * @transport: (transfer none): a #GstRTSPTransport
579  *
580  * Check if @transport can be handled by stream
581  *
582  * Returns: %TRUE if @transport can be handled by @stream.
583  */
584 gboolean
585 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
586     GstRTSPTransport * transport)
587 {
588   GstRTSPStreamPrivate *priv;
589
590   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
591
592   priv = stream->priv;
593
594   g_mutex_lock (&priv->lock);
595   if (transport->trans != GST_RTSP_TRANS_RTP)
596     goto unsupported_transmode;
597
598   if (!(transport->profile & priv->profiles))
599     goto unsupported_profile;
600
601   if (!(transport->lower_transport & priv->protocols))
602     goto unsupported_ltrans;
603
604   g_mutex_unlock (&priv->lock);
605
606   return TRUE;
607
608   /* ERRORS */
609 unsupported_transmode:
610   {
611     GST_DEBUG ("unsupported transport mode %d", transport->trans);
612     g_mutex_unlock (&priv->lock);
613     return FALSE;
614   }
615 unsupported_profile:
616   {
617     GST_DEBUG ("unsupported profile %d", transport->profile);
618     g_mutex_unlock (&priv->lock);
619     return FALSE;
620   }
621 unsupported_ltrans:
622   {
623     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
624     g_mutex_unlock (&priv->lock);
625     return FALSE;
626   }
627 }
628
629 /**
630  * gst_rtsp_stream_set_profiles:
631  * @stream: a #GstRTSPStream
632  * @profiles: the new profiles
633  *
634  * Configure the allowed profiles for @stream.
635  */
636 void
637 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
638 {
639   GstRTSPStreamPrivate *priv;
640
641   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
642
643   priv = stream->priv;
644
645   g_mutex_lock (&priv->lock);
646   priv->profiles = profiles;
647   g_mutex_unlock (&priv->lock);
648 }
649
650 /**
651  * gst_rtsp_stream_get_profiles:
652  * @stream: a #GstRTSPStream
653  *
654  * Get the allowed profiles of @stream.
655  *
656  * Returns: a #GstRTSPProfile
657  */
658 GstRTSPProfile
659 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
660 {
661   GstRTSPStreamPrivate *priv;
662   GstRTSPProfile res;
663
664   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
665
666   priv = stream->priv;
667
668   g_mutex_lock (&priv->lock);
669   res = priv->profiles;
670   g_mutex_unlock (&priv->lock);
671
672   return res;
673 }
674
675 /**
676  * gst_rtsp_stream_set_protocols:
677  * @stream: a #GstRTSPStream
678  * @protocols: the new flags
679  *
680  * Configure the allowed lower transport for @stream.
681  */
682 void
683 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
684     GstRTSPLowerTrans protocols)
685 {
686   GstRTSPStreamPrivate *priv;
687
688   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
689
690   priv = stream->priv;
691
692   g_mutex_lock (&priv->lock);
693   priv->protocols = protocols;
694   g_mutex_unlock (&priv->lock);
695 }
696
697 /**
698  * gst_rtsp_stream_get_protocols:
699  * @stream: a #GstRTSPStream
700  *
701  * Get the allowed protocols of @stream.
702  *
703  * Returns: a #GstRTSPLowerTrans
704  */
705 GstRTSPLowerTrans
706 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
707 {
708   GstRTSPStreamPrivate *priv;
709   GstRTSPLowerTrans res;
710
711   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
712       GST_RTSP_LOWER_TRANS_UNKNOWN);
713
714   priv = stream->priv;
715
716   g_mutex_lock (&priv->lock);
717   res = priv->protocols;
718   g_mutex_unlock (&priv->lock);
719
720   return res;
721 }
722
723 /**
724  * gst_rtsp_stream_set_address_pool:
725  * @stream: a #GstRTSPStream
726  * @pool: (transfer none): a #GstRTSPAddressPool
727  *
728  * configure @pool to be used as the address pool of @stream.
729  */
730 void
731 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
732     GstRTSPAddressPool * pool)
733 {
734   GstRTSPStreamPrivate *priv;
735   GstRTSPAddressPool *old;
736
737   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
738
739   priv = stream->priv;
740
741   GST_LOG_OBJECT (stream, "set address pool %p", pool);
742
743   g_mutex_lock (&priv->lock);
744   if ((old = priv->pool) != pool)
745     priv->pool = pool ? g_object_ref (pool) : NULL;
746   else
747     old = NULL;
748   g_mutex_unlock (&priv->lock);
749
750   if (old)
751     g_object_unref (old);
752 }
753
754 /**
755  * gst_rtsp_stream_get_address_pool:
756  * @stream: a #GstRTSPStream
757  *
758  * Get the #GstRTSPAddressPool used as the address pool of @stream.
759  *
760  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
761  * usage.
762  */
763 GstRTSPAddressPool *
764 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
765 {
766   GstRTSPStreamPrivate *priv;
767   GstRTSPAddressPool *result;
768
769   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
770
771   priv = stream->priv;
772
773   g_mutex_lock (&priv->lock);
774   if ((result = priv->pool))
775     g_object_ref (result);
776   g_mutex_unlock (&priv->lock);
777
778   return result;
779 }
780
781 /**
782  * gst_rtsp_stream_get_multicast_address:
783  * @stream: a #GstRTSPStream
784  * @family: the #GSocketFamily
785  *
786  * Get the multicast address of @stream for @family.
787  *
788  * Returns: (transfer full): the #GstRTSPAddress of @stream or %NULL when no
789  * address could be allocated. gst_rtsp_address_free() after usage.
790  */
791 GstRTSPAddress *
792 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
793     GSocketFamily family)
794 {
795   GstRTSPStreamPrivate *priv;
796   GstRTSPAddress *result;
797   GstRTSPAddress **addrp;
798   GstRTSPAddressFlags flags;
799
800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
801
802   priv = stream->priv;
803
804   if (family == G_SOCKET_FAMILY_IPV6) {
805     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
806     addrp = &priv->addr_v6;
807   } else {
808     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
809     addrp = &priv->addr_v4;
810   }
811
812   g_mutex_lock (&priv->lock);
813   if (*addrp == NULL) {
814     if (priv->pool == NULL)
815       goto no_pool;
816
817     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
818
819     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
820     if (*addrp == NULL)
821       goto no_address;
822   }
823   result = gst_rtsp_address_copy (*addrp);
824   g_mutex_unlock (&priv->lock);
825
826   return result;
827
828   /* ERRORS */
829 no_pool:
830   {
831     GST_ERROR_OBJECT (stream, "no address pool specified");
832     g_mutex_unlock (&priv->lock);
833     return NULL;
834   }
835 no_address:
836   {
837     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
838     g_mutex_unlock (&priv->lock);
839     return NULL;
840   }
841 }
842
843 /**
844  * gst_rtsp_stream_reserve_address:
845  * @stream: a #GstRTSPStream
846  * @address: an address
847  * @port: a port
848  * @n_ports: n_ports
849  * @ttl: a TTL
850  *
851  * Reserve @address and @port as the address and port of @stream.
852  *
853  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
854  * reserved. gst_rtsp_address_free() after usage.
855  */
856 GstRTSPAddress *
857 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
858     const gchar * address, guint port, guint n_ports, guint ttl)
859 {
860   GstRTSPStreamPrivate *priv;
861   GstRTSPAddress *result;
862   GInetAddress *addr;
863   GSocketFamily family;
864   GstRTSPAddress **addrp;
865
866   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
867   g_return_val_if_fail (address != NULL, NULL);
868   g_return_val_if_fail (port > 0, NULL);
869   g_return_val_if_fail (n_ports > 0, NULL);
870   g_return_val_if_fail (ttl > 0, NULL);
871
872   priv = stream->priv;
873
874   addr = g_inet_address_new_from_string (address);
875   if (!addr) {
876     GST_ERROR ("failed to get inet addr from %s", address);
877     family = G_SOCKET_FAMILY_IPV4;
878   } else {
879     family = g_inet_address_get_family (addr);
880     g_object_unref (addr);
881   }
882
883   if (family == G_SOCKET_FAMILY_IPV6)
884     addrp = &priv->addr_v6;
885   else
886     addrp = &priv->addr_v4;
887
888   g_mutex_lock (&priv->lock);
889   if (*addrp == NULL) {
890     GstRTSPAddressPoolResult res;
891
892     if (priv->pool == NULL)
893       goto no_pool;
894
895     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
896         port, n_ports, ttl, addrp);
897     if (res != GST_RTSP_ADDRESS_POOL_OK)
898       goto no_address;
899   } else {
900     if (strcmp ((*addrp)->address, address) ||
901         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
902         (*addrp)->ttl != ttl)
903       goto different_address;
904   }
905   result = gst_rtsp_address_copy (*addrp);
906   g_mutex_unlock (&priv->lock);
907
908   return result;
909
910   /* ERRORS */
911 no_pool:
912   {
913     GST_ERROR_OBJECT (stream, "no address pool specified");
914     g_mutex_unlock (&priv->lock);
915     return NULL;
916   }
917 no_address:
918   {
919     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
920         address);
921     g_mutex_unlock (&priv->lock);
922     return NULL;
923   }
924 different_address:
925   {
926     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
927         " reserved", address);
928     g_mutex_unlock (&priv->lock);
929     return NULL;
930   }
931 }
932
933 static gboolean
934 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
935     GSocketFamily family, GstElement * udpsrc_out[2],
936     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
937     GstRTSPAddress ** server_addr_out)
938 {
939   GstStateChangeReturn ret;
940   GstElement *udpsrc0, *udpsrc1;
941   GstElement *udpsink0, *udpsink1;
942   GSocket *rtp_socket = NULL;
943   GSocket *rtcp_socket;
944   gint tmp_rtp, tmp_rtcp;
945   guint count;
946   gint rtpport, rtcpport;
947   GList *rejected_addresses = NULL;
948   GstRTSPAddress *addr = NULL;
949   GInetAddress *inetaddr = NULL;
950   GSocketAddress *rtp_sockaddr = NULL;
951   GSocketAddress *rtcp_sockaddr = NULL;
952   const gchar *multisink_socket;
953
954   if (family == G_SOCKET_FAMILY_IPV6)
955     multisink_socket = "socket-v6";
956   else
957     multisink_socket = "socket";
958
959   udpsrc0 = NULL;
960   udpsrc1 = NULL;
961   udpsink0 = NULL;
962   udpsink1 = NULL;
963   count = 0;
964
965   /* Start with random port */
966   tmp_rtp = 0;
967
968   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
969       G_SOCKET_PROTOCOL_UDP, NULL);
970   if (!rtcp_socket)
971     goto no_udp_protocol;
972
973   if (*server_addr_out)
974     gst_rtsp_address_free (*server_addr_out);
975
976   /* try to allocate 2 UDP ports, the RTP port should be an even
977    * number and the RTCP port should be the next (uneven) port */
978 again:
979
980   if (rtp_socket == NULL) {
981     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
982         G_SOCKET_PROTOCOL_UDP, NULL);
983     if (!rtp_socket)
984       goto no_udp_protocol;
985   }
986
987   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
988     GstRTSPAddressFlags flags;
989
990     if (addr)
991       rejected_addresses = g_list_prepend (rejected_addresses, addr);
992
993     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
994     if (family == G_SOCKET_FAMILY_IPV6)
995       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
996     else
997       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
998
999     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1000
1001     if (addr == NULL)
1002       goto no_ports;
1003
1004     tmp_rtp = addr->port;
1005
1006     g_clear_object (&inetaddr);
1007     inetaddr = g_inet_address_new_from_string (addr->address);
1008   } else {
1009     if (tmp_rtp != 0) {
1010       tmp_rtp += 2;
1011       if (++count > 20)
1012         goto no_ports;
1013     }
1014
1015     if (inetaddr == NULL)
1016       inetaddr = g_inet_address_new_any (family);
1017   }
1018
1019   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1020   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1021     g_object_unref (rtp_sockaddr);
1022     goto again;
1023   }
1024   g_object_unref (rtp_sockaddr);
1025
1026   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1027   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1028     g_clear_object (&rtp_sockaddr);
1029     goto socket_error;
1030   }
1031
1032   tmp_rtp =
1033       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1034   g_object_unref (rtp_sockaddr);
1035
1036   /* check if port is even */
1037   if ((tmp_rtp & 1) != 0) {
1038     /* port not even, close and allocate another */
1039     tmp_rtp++;
1040     g_clear_object (&rtp_socket);
1041     goto again;
1042   }
1043
1044   /* set port */
1045   tmp_rtcp = tmp_rtp + 1;
1046
1047   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1048   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1049     g_object_unref (rtcp_sockaddr);
1050     g_clear_object (&rtp_socket);
1051     goto again;
1052   }
1053   g_object_unref (rtcp_sockaddr);
1054
1055   g_clear_object (&inetaddr);
1056
1057   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1058   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1059
1060   if (udpsrc0 == NULL || udpsrc1 == NULL)
1061     goto no_udp_protocol;
1062
1063   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1064   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1065
1066   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1067   if (ret == GST_STATE_CHANGE_FAILURE)
1068     goto element_error;
1069   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1070   if (ret == GST_STATE_CHANGE_FAILURE)
1071     goto element_error;
1072
1073   /* all fine, do port check */
1074   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1075   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1076
1077   /* this should not happen... */
1078   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1079     goto port_error;
1080
1081   if (udpsink_out[0])
1082     udpsink0 = udpsink_out[0];
1083   else
1084     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1085
1086   if (!udpsink0)
1087     goto no_udp_protocol;
1088
1089   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1090   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1091
1092   if (udpsink_out[1])
1093     udpsink1 = udpsink_out[1];
1094   else
1095     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1096
1097   if (!udpsink1)
1098     goto no_udp_protocol;
1099
1100   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1101   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1102   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1103
1104   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1105   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1106   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1107   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1108   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1109   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1110   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1111   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1112
1113   /* we keep these elements, we will further configure them when the
1114    * client told us to really use the UDP ports. */
1115   udpsrc_out[0] = udpsrc0;
1116   udpsrc_out[1] = udpsrc1;
1117   udpsink_out[0] = udpsink0;
1118   udpsink_out[1] = udpsink1;
1119   server_port_out->min = rtpport;
1120   server_port_out->max = rtcpport;
1121
1122   *server_addr_out = addr;
1123   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1124
1125   g_object_unref (rtp_socket);
1126   g_object_unref (rtcp_socket);
1127
1128   return TRUE;
1129
1130   /* ERRORS */
1131 no_udp_protocol:
1132   {
1133     goto cleanup;
1134   }
1135 no_ports:
1136   {
1137     goto cleanup;
1138   }
1139 port_error:
1140   {
1141     goto cleanup;
1142   }
1143 socket_error:
1144   {
1145     goto cleanup;
1146   }
1147 element_error:
1148   {
1149     goto cleanup;
1150   }
1151 cleanup:
1152   {
1153     if (udpsrc0) {
1154       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1155       gst_object_unref (udpsrc0);
1156     }
1157     if (udpsrc1) {
1158       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1159       gst_object_unref (udpsrc1);
1160     }
1161     if (udpsink0) {
1162       gst_element_set_state (udpsink0, GST_STATE_NULL);
1163       gst_object_unref (udpsink0);
1164     }
1165     if (inetaddr)
1166       g_object_unref (inetaddr);
1167     g_list_free_full (rejected_addresses,
1168         (GDestroyNotify) gst_rtsp_address_free);
1169     if (addr)
1170       gst_rtsp_address_free (addr);
1171     if (rtp_socket)
1172       g_object_unref (rtp_socket);
1173     if (rtcp_socket)
1174       g_object_unref (rtcp_socket);
1175     return FALSE;
1176   }
1177 }
1178
1179 /* must be called with lock */
1180 static gboolean
1181 alloc_ports (GstRTSPStream * stream)
1182 {
1183   GstRTSPStreamPrivate *priv = stream->priv;
1184
1185   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1186       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1187       &priv->server_port_v4, &priv->server_addr_v4);
1188
1189   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1190       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1191       &priv->server_port_v6, &priv->server_addr_v6);
1192
1193   return priv->have_ipv4 || priv->have_ipv6;
1194 }
1195
1196 /**
1197  * gst_rtsp_stream_get_server_port:
1198  * @stream: a #GstRTSPStream
1199  * @server_port: (out): result server port
1200  * @family: the port family to get
1201  *
1202  * Fill @server_port with the port pair used by the server. This function can
1203  * only be called when @stream has been joined.
1204  */
1205 void
1206 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1207     GstRTSPRange * server_port, GSocketFamily family)
1208 {
1209   GstRTSPStreamPrivate *priv;
1210
1211   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1212   priv = stream->priv;
1213   g_return_if_fail (priv->is_joined);
1214
1215   g_mutex_lock (&priv->lock);
1216   if (family == G_SOCKET_FAMILY_IPV4) {
1217     if (server_port)
1218       *server_port = priv->server_port_v4;
1219   } else {
1220     if (server_port)
1221       *server_port = priv->server_port_v6;
1222   }
1223   g_mutex_unlock (&priv->lock);
1224 }
1225
1226 /**
1227  * gst_rtsp_stream_get_rtpsession:
1228  * @stream: a #GstRTSPStream
1229  *
1230  * Get the RTP session of this stream.
1231  *
1232  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1233  */
1234 GObject *
1235 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1236 {
1237   GstRTSPStreamPrivate *priv;
1238   GObject *session;
1239
1240   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1241
1242   priv = stream->priv;
1243
1244   g_mutex_lock (&priv->lock);
1245   if ((session = priv->session))
1246     g_object_ref (session);
1247   g_mutex_unlock (&priv->lock);
1248
1249   return session;
1250 }
1251
1252 /**
1253  * gst_rtsp_stream_get_ssrc:
1254  * @stream: a #GstRTSPStream
1255  * @ssrc: (out): result ssrc
1256  *
1257  * Get the SSRC used by the RTP session of this stream. This function can only
1258  * be called when @stream has been joined.
1259  */
1260 void
1261 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1262 {
1263   GstRTSPStreamPrivate *priv;
1264
1265   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1266   priv = stream->priv;
1267   g_return_if_fail (priv->is_joined);
1268
1269   g_mutex_lock (&priv->lock);
1270   if (ssrc && priv->session)
1271     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1272   g_mutex_unlock (&priv->lock);
1273 }
1274
1275 /* executed from streaming thread */
1276 static void
1277 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1278 {
1279   GstRTSPStreamPrivate *priv = stream->priv;
1280   GstCaps *newcaps, *oldcaps;
1281
1282   newcaps = gst_pad_get_current_caps (pad);
1283
1284   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1285       newcaps);
1286
1287   g_mutex_lock (&priv->lock);
1288   oldcaps = priv->caps;
1289   priv->caps = newcaps;
1290   g_mutex_unlock (&priv->lock);
1291
1292   if (oldcaps)
1293     gst_caps_unref (oldcaps);
1294 }
1295
1296 static void
1297 dump_structure (const GstStructure * s)
1298 {
1299   gchar *sstr;
1300
1301   sstr = gst_structure_to_string (s);
1302   GST_INFO ("structure: %s", sstr);
1303   g_free (sstr);
1304 }
1305
1306 static GstRTSPStreamTransport *
1307 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1308 {
1309   GstRTSPStreamPrivate *priv = stream->priv;
1310   GList *walk;
1311   GstRTSPStreamTransport *result = NULL;
1312   const gchar *tmp;
1313   gchar *dest;
1314   guint port;
1315
1316   if (rtcp_from == NULL)
1317     return NULL;
1318
1319   tmp = g_strrstr (rtcp_from, ":");
1320   if (tmp == NULL)
1321     return NULL;
1322
1323   port = atoi (tmp + 1);
1324   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1325
1326   g_mutex_lock (&priv->lock);
1327   GST_INFO ("finding %s:%d in %d transports", dest, port,
1328       g_list_length (priv->transports));
1329
1330   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1331     GstRTSPStreamTransport *trans = walk->data;
1332     const GstRTSPTransport *tr;
1333     gint min, max;
1334
1335     tr = gst_rtsp_stream_transport_get_transport (trans);
1336
1337     min = tr->client_port.min;
1338     max = tr->client_port.max;
1339
1340     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1341       result = trans;
1342       break;
1343     }
1344   }
1345   if (result)
1346     g_object_ref (result);
1347   g_mutex_unlock (&priv->lock);
1348
1349   g_free (dest);
1350
1351   return result;
1352 }
1353
1354 static GstRTSPStreamTransport *
1355 check_transport (GObject * source, GstRTSPStream * stream)
1356 {
1357   GstStructure *stats;
1358   GstRTSPStreamTransport *trans;
1359
1360   /* see if we have a stream to match with the origin of the RTCP packet */
1361   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1362   if (trans == NULL) {
1363     g_object_get (source, "stats", &stats, NULL);
1364     if (stats) {
1365       const gchar *rtcp_from;
1366
1367       dump_structure (stats);
1368
1369       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1370       if ((trans = find_transport (stream, rtcp_from))) {
1371         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1372             source);
1373         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1374             g_object_unref);
1375       }
1376       gst_structure_free (stats);
1377     }
1378   }
1379   return trans;
1380 }
1381
1382
1383 static void
1384 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1385 {
1386   GstRTSPStreamTransport *trans;
1387
1388   GST_INFO ("%p: new source %p", stream, source);
1389
1390   trans = check_transport (source, stream);
1391
1392   if (trans)
1393     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1394 }
1395
1396 static void
1397 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1398 {
1399   GST_INFO ("%p: new SDES %p", stream, source);
1400 }
1401
1402 static void
1403 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1404 {
1405   GstRTSPStreamTransport *trans;
1406
1407   trans = check_transport (source, stream);
1408
1409   if (trans) {
1410     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1411     gst_rtsp_stream_transport_keep_alive (trans);
1412   }
1413 #ifdef DUMP_STATS
1414   {
1415     GstStructure *stats;
1416     g_object_get (source, "stats", &stats, NULL);
1417     if (stats) {
1418       dump_structure (stats);
1419       gst_structure_free (stats);
1420     }
1421   }
1422 #endif
1423 }
1424
1425 static void
1426 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1427 {
1428   GST_INFO ("%p: source %p bye", stream, source);
1429 }
1430
1431 static void
1432 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1433 {
1434   GstRTSPStreamTransport *trans;
1435
1436   GST_INFO ("%p: source %p bye timeout", stream, source);
1437
1438   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1439     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1440     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1441   }
1442 }
1443
1444 static void
1445 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1446 {
1447   GstRTSPStreamTransport *trans;
1448
1449   GST_INFO ("%p: source %p timeout", stream, source);
1450
1451   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1452     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1453     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1454   }
1455 }
1456
1457 static void
1458 clear_tr_cache (GstRTSPStreamPrivate * priv)
1459 {
1460   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1461   g_list_free (priv->tr_cache);
1462   priv->tr_cache = NULL;
1463 }
1464
1465 static GstFlowReturn
1466 handle_new_sample (GstAppSink * sink, gpointer user_data)
1467 {
1468   GstRTSPStreamPrivate *priv;
1469   GList *walk;
1470   GstSample *sample;
1471   GstBuffer *buffer;
1472   GstRTSPStream *stream;
1473   gboolean is_rtp;
1474
1475   sample = gst_app_sink_pull_sample (sink);
1476   if (!sample)
1477     return GST_FLOW_OK;
1478
1479   stream = (GstRTSPStream *) user_data;
1480   priv = stream->priv;
1481   buffer = gst_sample_get_buffer (sample);
1482
1483   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1484
1485   g_mutex_lock (&priv->lock);
1486   if (priv->tr_changed) {
1487     clear_tr_cache (priv);
1488     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1489       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1490       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1491     }
1492     priv->tr_changed = FALSE;
1493   }
1494   g_mutex_unlock (&priv->lock);
1495
1496   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1497     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1498
1499     if (is_rtp) {
1500       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1501     } else {
1502       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1503     }
1504   }
1505   gst_sample_unref (sample);
1506
1507   return GST_FLOW_OK;
1508 }
1509
1510 static GstAppSinkCallbacks sink_cb = {
1511   NULL,                         /* not interested in EOS */
1512   NULL,                         /* not interested in preroll samples */
1513   handle_new_sample,
1514 };
1515
1516 static GstElement *
1517 get_rtp_encoder (GstRTSPStream * stream, guint session)
1518 {
1519   GstRTSPStreamPrivate *priv = stream->priv;
1520
1521   if (priv->srtpenc == NULL) {
1522     gchar *name;
1523
1524     name = g_strdup_printf ("srtpenc_%u", session);
1525     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1526     g_free (name);
1527
1528     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1529   }
1530   return gst_object_ref (priv->srtpenc);
1531 }
1532
1533 static GstElement *
1534 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1535 {
1536   GstRTSPStreamPrivate *priv = stream->priv;
1537   GstElement *enc;
1538   GstPad *pad;
1539   gchar *name;
1540
1541   if (priv->idx != session)
1542     return NULL;
1543
1544   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1545
1546   enc = get_rtp_encoder (stream, session);
1547   name = g_strdup_printf ("rtp_sink_%d", session);
1548   pad = gst_element_get_request_pad (enc, name);
1549   g_free (name);
1550   gst_object_unref (pad);
1551
1552   return enc;
1553 }
1554
1555 static GstElement *
1556 request_rtcp_encoder (GstElement * rtpbin, guint session,
1557     GstRTSPStream * stream)
1558 {
1559   GstRTSPStreamPrivate *priv = stream->priv;
1560   GstElement *enc;
1561   GstPad *pad;
1562   gchar *name;
1563
1564   if (priv->idx != session)
1565     return NULL;
1566
1567   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1568
1569   enc = get_rtp_encoder (stream, session);
1570   name = g_strdup_printf ("rtcp_sink_%d", session);
1571   pad = gst_element_get_request_pad (enc, name);
1572   g_free (name);
1573   gst_object_unref (pad);
1574
1575   return enc;
1576 }
1577
1578 static GstCaps *
1579 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1580 {
1581   GstRTSPStreamPrivate *priv = stream->priv;
1582   GstCaps *caps;
1583
1584   GST_DEBUG ("request key %08x", ssrc);
1585
1586   g_mutex_lock (&priv->lock);
1587   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1588     gst_caps_ref (caps);
1589   g_mutex_unlock (&priv->lock);
1590
1591   return caps;
1592 }
1593
1594 static GstElement *
1595 request_rtcp_decoder (GstElement * rtpbin, guint session,
1596     GstRTSPStream * stream)
1597 {
1598   GstRTSPStreamPrivate *priv = stream->priv;
1599
1600   if (priv->idx != session)
1601     return NULL;
1602
1603   if (priv->srtpdec == NULL) {
1604     gchar *name;
1605
1606     name = g_strdup_printf ("srtpdec_%u", session);
1607     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1608     g_free (name);
1609
1610     g_signal_connect (priv->srtpdec, "request-key",
1611         (GCallback) request_key, stream);
1612   }
1613   return gst_object_ref (priv->srtpdec);
1614 }
1615
1616 /**
1617  * gst_rtsp_stream_join_bin:
1618  * @stream: a #GstRTSPStream
1619  * @bin: (transfer none): a #GstBin to join
1620  * @rtpbin: (transfer none): a rtpbin element in @bin
1621  * @state: the target state of the new elements
1622  *
1623  * Join the #GstBin @bin that contains the element @rtpbin.
1624  *
1625  * @stream will link to @rtpbin, which must be inside @bin. The elements
1626  * added to @bin will be set to the state given in @state.
1627  *
1628  * Returns: %TRUE on success.
1629  */
1630 gboolean
1631 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1632     GstElement * rtpbin, GstState state)
1633 {
1634   GstRTSPStreamPrivate *priv;
1635   gint i;
1636   guint idx;
1637   gchar *name;
1638   GstPad *pad, *sinkpad, *selpad;
1639   GstPadLinkReturn ret;
1640
1641   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1642   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1643   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1644
1645   priv = stream->priv;
1646
1647   g_mutex_lock (&priv->lock);
1648   if (priv->is_joined)
1649     goto was_joined;
1650
1651   /* create a session with the same index as the stream */
1652   idx = priv->idx;
1653
1654   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1655
1656   if (!alloc_ports (stream))
1657     goto no_ports;
1658
1659   /* update the dscp qos field in the sinks */
1660   update_dscp_qos (stream);
1661
1662   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1663       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1664     /* For SRTP */
1665     g_signal_connect (rtpbin, "request-rtp-encoder",
1666         (GCallback) request_rtp_encoder, stream);
1667     g_signal_connect (rtpbin, "request-rtcp-encoder",
1668         (GCallback) request_rtcp_encoder, stream);
1669     g_signal_connect (rtpbin, "request-rtcp-decoder",
1670         (GCallback) request_rtcp_decoder, stream);
1671   }
1672
1673   /* get a pad for sending RTP */
1674   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1675   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1676   g_free (name);
1677   /* link the RTP pad to the session manager, it should not really fail unless
1678    * this is not really an RTP pad */
1679   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1680   if (ret != GST_PAD_LINK_OK)
1681     goto link_failed;
1682
1683   /* get pads from the RTP session element for sending and receiving
1684    * RTP/RTCP*/
1685   name = g_strdup_printf ("send_rtp_src_%u", idx);
1686   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1687   g_free (name);
1688   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1689   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1690   g_free (name);
1691   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1692   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1693   g_free (name);
1694   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1695   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1696   g_free (name);
1697
1698   /* get the session */
1699   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1700
1701   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1702       stream);
1703   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1704       stream);
1705   g_signal_connect (priv->session, "on-ssrc-active",
1706       (GCallback) on_ssrc_active, stream);
1707   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1708       stream);
1709   g_signal_connect (priv->session, "on-bye-timeout",
1710       (GCallback) on_bye_timeout, stream);
1711   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1712       stream);
1713
1714   for (i = 0; i < 2; i++) {
1715     GstPad *teepad, *queuepad;
1716     /* For the sender we create this bit of pipeline for both
1717      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1718      * we need to add a queue before appsink to make the pipeline
1719      * not block. For the TCP case, we want to pump data to the
1720      * client as fast as possible anyway.
1721      *
1722      * .--------.      .-----.    .---------.
1723      * | rtpbin |      | tee |    | udpsink |
1724      * |       send->sink   src->sink       |
1725      * '--------'      |     |    '---------'
1726      *                 |     |    .---------.    .---------.
1727      *                 |     |    |  queue  |    | appsink |
1728      *                 |    src->sink      src->sink       |
1729      *                 '-----'    '---------'    '---------'
1730      *
1731      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1732      * udpsink directly to the session.
1733      */
1734     /* add udpsink */
1735     gst_bin_add (bin, priv->udpsink[i]);
1736     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1737
1738     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1739       /* make tee for RTP/RTCP */
1740       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1741       gst_bin_add (bin, priv->tee[i]);
1742
1743       /* and link to rtpbin send pad */
1744       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1745       gst_pad_link (priv->send_src[i], pad);
1746       gst_object_unref (pad);
1747
1748       /* link tee to udpsink */
1749       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1750       gst_pad_link (teepad, sinkpad);
1751       gst_object_unref (teepad);
1752
1753       /* make queue */
1754       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1755       gst_bin_add (bin, priv->appqueue[i]);
1756       /* and link to tee */
1757       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1758       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1759       gst_pad_link (teepad, pad);
1760       gst_object_unref (pad);
1761       gst_object_unref (teepad);
1762
1763       /* make appsink */
1764       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1765       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1766       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1767       gst_bin_add (bin, priv->appsink[i]);
1768       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1769           &sink_cb, stream, NULL);
1770       /* and link to queue */
1771       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1772       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1773       gst_pad_link (queuepad, pad);
1774       gst_object_unref (pad);
1775       gst_object_unref (queuepad);
1776     } else {
1777       /* else only udpsink needed, link it to the session */
1778       gst_pad_link (priv->send_src[i], sinkpad);
1779     }
1780     gst_object_unref (sinkpad);
1781
1782     /* For the receiver we create this bit of pipeline for both
1783      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1784      * and it is all funneled into the rtpbin receive pad.
1785      *
1786      * .--------.     .--------.    .--------.
1787      * | udpsrc |     | funnel |    | rtpbin |
1788      * |       src->sink      src->sink      |
1789      * '--------'     |        |    '--------'
1790      * .--------.     |        |
1791      * | appsrc |     |        |
1792      * |       src->sink       |
1793      * '--------'     '--------'
1794      */
1795     /* make funnel for the RTP/RTCP receivers */
1796     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1797     gst_bin_add (bin, priv->funnel[i]);
1798
1799     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1800     gst_pad_link (pad, priv->recv_sink[i]);
1801     gst_object_unref (pad);
1802
1803     if (priv->udpsrc_v4[i]) {
1804       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1805        * values */
1806       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1807       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1808       /* add udpsrc */
1809       gst_bin_add (bin, priv->udpsrc_v4[i]);
1810
1811       /* and link to the funnel v4 */
1812       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1813       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1814       gst_pad_link (pad, selpad);
1815       gst_object_unref (pad);
1816       gst_object_unref (selpad);
1817     }
1818
1819     if (priv->udpsrc_v6[i]) {
1820       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1821       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1822       gst_bin_add (bin, priv->udpsrc_v6[i]);
1823
1824       /* and link to the funnel v6 */
1825       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1826       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1827       gst_pad_link (pad, selpad);
1828       gst_object_unref (pad);
1829       gst_object_unref (selpad);
1830     }
1831
1832     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1833       /* make and add appsrc */
1834       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1835       gst_bin_add (bin, priv->appsrc[i]);
1836       /* and link to the funnel */
1837       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1838       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1839       gst_pad_link (pad, selpad);
1840       gst_object_unref (pad);
1841       gst_object_unref (selpad);
1842     }
1843
1844     /* check if we need to set to a special state */
1845     if (state != GST_STATE_NULL) {
1846       if (priv->udpsink[i])
1847         gst_element_set_state (priv->udpsink[i], state);
1848       if (priv->appsink[i])
1849         gst_element_set_state (priv->appsink[i], state);
1850       if (priv->appqueue[i])
1851         gst_element_set_state (priv->appqueue[i], state);
1852       if (priv->tee[i])
1853         gst_element_set_state (priv->tee[i], state);
1854       if (priv->funnel[i])
1855         gst_element_set_state (priv->funnel[i], state);
1856       if (priv->appsrc[i])
1857         gst_element_set_state (priv->appsrc[i], state);
1858     }
1859   }
1860
1861   /* be notified of caps changes */
1862   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
1863       (GCallback) caps_notify, stream);
1864
1865   priv->is_joined = TRUE;
1866   g_mutex_unlock (&priv->lock);
1867
1868   return TRUE;
1869
1870   /* ERRORS */
1871 was_joined:
1872   {
1873     g_mutex_unlock (&priv->lock);
1874     return TRUE;
1875   }
1876 no_ports:
1877   {
1878     g_mutex_unlock (&priv->lock);
1879     GST_WARNING ("failed to allocate ports %u", idx);
1880     return FALSE;
1881   }
1882 link_failed:
1883   {
1884     GST_WARNING ("failed to link stream %u", idx);
1885     gst_object_unref (priv->send_rtp_sink);
1886     priv->send_rtp_sink = NULL;
1887     g_mutex_unlock (&priv->lock);
1888     return FALSE;
1889   }
1890 }
1891
1892 /**
1893  * gst_rtsp_stream_leave_bin:
1894  * @stream: a #GstRTSPStream
1895  * @bin: (transfer none): a #GstBin
1896  * @rtpbin: (transfer none): a rtpbin #GstElement
1897  *
1898  * Remove the elements of @stream from @bin.
1899  *
1900  * Return: %TRUE on success.
1901  */
1902 gboolean
1903 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1904     GstElement * rtpbin)
1905 {
1906   GstRTSPStreamPrivate *priv;
1907   gint i;
1908
1909   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1910   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1911   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1912
1913   priv = stream->priv;
1914
1915   g_mutex_lock (&priv->lock);
1916   if (!priv->is_joined)
1917     goto was_not_joined;
1918
1919   /* all transports must be removed by now */
1920   g_return_val_if_fail (priv->transports == NULL, FALSE);
1921
1922   clear_tr_cache (priv);
1923
1924   GST_INFO ("stream %p leaving bin", stream);
1925
1926   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1927   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
1928   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1929   gst_object_unref (priv->send_rtp_sink);
1930   priv->send_rtp_sink = NULL;
1931
1932   for (i = 0; i < 2; i++) {
1933     if (priv->udpsink[i])
1934       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1935     if (priv->appsink[i])
1936       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1937     if (priv->appqueue[i])
1938       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1939     if (priv->tee[i])
1940       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1941     if (priv->funnel[i])
1942       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1943     if (priv->appsrc[i])
1944       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1945     if (priv->udpsrc_v4[i]) {
1946       /* and set udpsrc to NULL now before removing */
1947       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1948       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1949       /* removing them should also nicely release the request
1950        * pads when they finalize */
1951       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1952     }
1953     if (priv->udpsrc_v6[i]) {
1954       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1955       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1956       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1957     }
1958     if (priv->udpsink[i])
1959       gst_bin_remove (bin, priv->udpsink[i]);
1960     if (priv->appsrc[i])
1961       gst_bin_remove (bin, priv->appsrc[i]);
1962     if (priv->appsink[i])
1963       gst_bin_remove (bin, priv->appsink[i]);
1964     if (priv->appqueue[i])
1965       gst_bin_remove (bin, priv->appqueue[i]);
1966     if (priv->tee[i])
1967       gst_bin_remove (bin, priv->tee[i]);
1968     if (priv->funnel[i])
1969       gst_bin_remove (bin, priv->funnel[i]);
1970
1971     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1972     gst_object_unref (priv->recv_sink[i]);
1973     priv->recv_sink[i] = NULL;
1974
1975     priv->udpsrc_v4[i] = NULL;
1976     priv->udpsrc_v6[i] = NULL;
1977     priv->udpsink[i] = NULL;
1978     priv->appsrc[i] = NULL;
1979     priv->appsink[i] = NULL;
1980     priv->appqueue[i] = NULL;
1981     priv->tee[i] = NULL;
1982     priv->funnel[i] = NULL;
1983   }
1984   gst_object_unref (priv->send_src[0]);
1985   priv->send_src[0] = NULL;
1986
1987   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1988   gst_object_unref (priv->send_src[1]);
1989   priv->send_src[1] = NULL;
1990
1991   g_object_unref (priv->session);
1992   priv->session = NULL;
1993   if (priv->caps)
1994     gst_caps_unref (priv->caps);
1995   priv->caps = NULL;
1996
1997   if (priv->srtpenc)
1998     gst_object_unref (priv->srtpenc);
1999
2000   priv->is_joined = FALSE;
2001   g_mutex_unlock (&priv->lock);
2002
2003   return TRUE;
2004
2005 was_not_joined:
2006   {
2007     g_mutex_unlock (&priv->lock);
2008     return TRUE;
2009   }
2010 }
2011
2012 /**
2013  * gst_rtsp_stream_get_rtpinfo:
2014  * @stream: a #GstRTSPStream
2015  * @rtptime: (allow-none): result RTP timestamp
2016  * @seq: (allow-none): result RTP seqnum
2017  * @clock_rate: (allow-none): the clock rate
2018  * @running_time: (allow-none): result running-time
2019  *
2020  * Retrieve the current rtptime, seq and running-time. This is used to
2021  * construct a RTPInfo reply header.
2022  *
2023  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2024  */
2025 gboolean
2026 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2027     guint * rtptime, guint * seq, guint * clock_rate,
2028     GstClockTime * running_time)
2029 {
2030   GstRTSPStreamPrivate *priv;
2031   GstStructure *stats;
2032   GObjectClass *payobjclass;
2033
2034   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2035
2036   priv = stream->priv;
2037
2038   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2039
2040   g_mutex_lock (&priv->lock);
2041
2042   if (g_object_class_find_property (payobjclass, "stats")) {
2043     g_object_get (priv->payloader, "stats", &stats, NULL);
2044     if (stats == NULL)
2045       goto no_stats;
2046
2047     if (seq)
2048       gst_structure_get_uint (stats, "seqnum", seq);
2049
2050     if (rtptime)
2051       gst_structure_get_uint (stats, "timestamp", rtptime);
2052
2053     if (running_time)
2054       gst_structure_get_clock_time (stats, "running-time", running_time);
2055
2056     if (clock_rate) {
2057       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2058       if (*clock_rate == 0 && running_time)
2059         *running_time = GST_CLOCK_TIME_NONE;
2060     }
2061     gst_structure_free (stats);
2062   } else {
2063     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2064         !g_object_class_find_property (payobjclass, "timestamp"))
2065       goto no_stats;
2066
2067     if (seq)
2068       g_object_get (priv->payloader, "seqnum", seq, NULL);
2069
2070     if (rtptime)
2071       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2072
2073     if (running_time)
2074       *running_time = GST_CLOCK_TIME_NONE;
2075   }
2076   g_mutex_unlock (&priv->lock);
2077
2078   return TRUE;
2079
2080   /* ERRORS */
2081 no_stats:
2082   {
2083     GST_WARNING ("Could not get payloader stats");
2084     g_mutex_unlock (&priv->lock);
2085     return FALSE;
2086   }
2087 }
2088
2089 /**
2090  * gst_rtsp_stream_get_caps:
2091  * @stream: a #GstRTSPStream
2092  *
2093  * Retrieve the current caps of @stream.
2094  *
2095  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2096  * after usage.
2097  */
2098 GstCaps *
2099 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2100 {
2101   GstRTSPStreamPrivate *priv;
2102   GstCaps *result;
2103
2104   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2105
2106   priv = stream->priv;
2107
2108   g_mutex_lock (&priv->lock);
2109   if ((result = priv->caps))
2110     gst_caps_ref (result);
2111   g_mutex_unlock (&priv->lock);
2112
2113   return result;
2114 }
2115
2116 /**
2117  * gst_rtsp_stream_recv_rtp:
2118  * @stream: a #GstRTSPStream
2119  * @buffer: (transfer full): a #GstBuffer
2120  *
2121  * Handle an RTP buffer for the stream. This method is usually called when a
2122  * message has been received from a client using the TCP transport.
2123  *
2124  * This function takes ownership of @buffer.
2125  *
2126  * Returns: a GstFlowReturn.
2127  */
2128 GstFlowReturn
2129 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2130 {
2131   GstRTSPStreamPrivate *priv;
2132   GstFlowReturn ret;
2133   GstElement *element;
2134
2135   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2136   priv = stream->priv;
2137   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2138   g_return_val_if_fail (priv->is_joined, FALSE);
2139
2140   g_mutex_lock (&priv->lock);
2141   if (priv->appsrc[0])
2142     element = gst_object_ref (priv->appsrc[0]);
2143   else
2144     element = NULL;
2145   g_mutex_unlock (&priv->lock);
2146
2147   if (element) {
2148     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2149     gst_object_unref (element);
2150   } else {
2151     ret = GST_FLOW_OK;
2152   }
2153   return ret;
2154 }
2155
2156 /**
2157  * gst_rtsp_stream_recv_rtcp:
2158  * @stream: a #GstRTSPStream
2159  * @buffer: (transfer full): a #GstBuffer
2160  *
2161  * Handle an RTCP buffer for the stream. This method is usually called when a
2162  * message has been received from a client using the TCP transport.
2163  *
2164  * This function takes ownership of @buffer.
2165  *
2166  * Returns: a GstFlowReturn.
2167  */
2168 GstFlowReturn
2169 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2170 {
2171   GstRTSPStreamPrivate *priv;
2172   GstFlowReturn ret;
2173   GstElement *element;
2174
2175   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2176   priv = stream->priv;
2177   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2178   g_return_val_if_fail (priv->is_joined, FALSE);
2179
2180   g_mutex_lock (&priv->lock);
2181   if (priv->appsrc[1])
2182     element = gst_object_ref (priv->appsrc[1]);
2183   else
2184     element = NULL;
2185   g_mutex_unlock (&priv->lock);
2186
2187   if (element) {
2188     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2189     gst_object_unref (element);
2190   } else {
2191     ret = GST_FLOW_OK;
2192     gst_buffer_unref (buffer);
2193   }
2194   return ret;
2195 }
2196
2197 /* must be called with lock */
2198 static gboolean
2199 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2200     gboolean add)
2201 {
2202   GstRTSPStreamPrivate *priv = stream->priv;
2203   const GstRTSPTransport *tr;
2204
2205   tr = gst_rtsp_stream_transport_get_transport (trans);
2206
2207   switch (tr->lower_transport) {
2208     case GST_RTSP_LOWER_TRANS_UDP:
2209     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2210     {
2211       gchar *dest;
2212       gint min, max;
2213       guint ttl = 0;
2214
2215       dest = tr->destination;
2216       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2217         min = tr->port.min;
2218         max = tr->port.max;
2219         ttl = tr->ttl;
2220       } else {
2221         min = tr->client_port.min;
2222         max = tr->client_port.max;
2223       }
2224
2225       if (add) {
2226         if (ttl > 0) {
2227           GST_INFO ("setting ttl-mc %d", ttl);
2228           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2229           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2230         }
2231         GST_INFO ("adding %s:%d-%d", dest, min, max);
2232         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2233         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2234         priv->transports = g_list_prepend (priv->transports, trans);
2235       } else {
2236         GST_INFO ("removing %s:%d-%d", dest, min, max);
2237         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2238         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2239         priv->transports = g_list_remove (priv->transports, trans);
2240       }
2241       priv->tr_changed = TRUE;
2242       break;
2243     }
2244     case GST_RTSP_LOWER_TRANS_TCP:
2245       if (add) {
2246         GST_INFO ("adding TCP %s", tr->destination);
2247         priv->transports = g_list_prepend (priv->transports, trans);
2248       } else {
2249         GST_INFO ("removing TCP %s", tr->destination);
2250         priv->transports = g_list_remove (priv->transports, trans);
2251       }
2252       priv->tr_changed = TRUE;
2253       break;
2254     default:
2255       goto unknown_transport;
2256   }
2257   return TRUE;
2258
2259   /* ERRORS */
2260 unknown_transport:
2261   {
2262     GST_INFO ("Unknown transport %d", tr->lower_transport);
2263     return FALSE;
2264   }
2265 }
2266
2267
2268 /**
2269  * gst_rtsp_stream_add_transport:
2270  * @stream: a #GstRTSPStream
2271  * @trans: (transfer none): a #GstRTSPStreamTransport
2272  *
2273  * Add the transport in @trans to @stream. The media of @stream will
2274  * then also be send to the values configured in @trans.
2275  *
2276  * @stream must be joined to a bin.
2277  *
2278  * @trans must contain a valid #GstRTSPTransport.
2279  *
2280  * Returns: %TRUE if @trans was added
2281  */
2282 gboolean
2283 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2284     GstRTSPStreamTransport * trans)
2285 {
2286   GstRTSPStreamPrivate *priv;
2287   gboolean res;
2288
2289   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2290   priv = stream->priv;
2291   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2292   g_return_val_if_fail (priv->is_joined, FALSE);
2293
2294   g_mutex_lock (&priv->lock);
2295   res = update_transport (stream, trans, TRUE);
2296   g_mutex_unlock (&priv->lock);
2297
2298   return res;
2299 }
2300
2301 /**
2302  * gst_rtsp_stream_remove_transport:
2303  * @stream: a #GstRTSPStream
2304  * @trans: (transfer none): a #GstRTSPStreamTransport
2305  *
2306  * Remove the transport in @trans from @stream. The media of @stream will
2307  * not be sent to the values configured in @trans.
2308  *
2309  * @stream must be joined to a bin.
2310  *
2311  * @trans must contain a valid #GstRTSPTransport.
2312  *
2313  * Returns: %TRUE if @trans was removed
2314  */
2315 gboolean
2316 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2317     GstRTSPStreamTransport * trans)
2318 {
2319   GstRTSPStreamPrivate *priv;
2320   gboolean res;
2321
2322   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2323   priv = stream->priv;
2324   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2325   g_return_val_if_fail (priv->is_joined, FALSE);
2326
2327   g_mutex_lock (&priv->lock);
2328   res = update_transport (stream, trans, FALSE);
2329   g_mutex_unlock (&priv->lock);
2330
2331   return res;
2332 }
2333
2334 /**
2335  * gst_rtsp_stream_update_crypto:
2336  * @stream: a #GstRTSPStream
2337  * @ssrc: the SSRC
2338  * @crypto: (transfer none) (allow none): a #GstCaps with crypto info
2339  *
2340  * Update the new crypto information for @ssrc in @stream. If information
2341  * for @ssrc did not exist, it will be added. If information
2342  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2343  * be removed from @stream.
2344  *
2345  * Returns: %TRUE if @crypto could be updated
2346  */
2347 gboolean
2348 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2349     guint ssrc, GstCaps * crypto)
2350 {
2351   GstRTSPStreamPrivate *priv;
2352
2353   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2354   g_return_val_if_fail (GST_IS_CAPS (crypto), FALSE);
2355
2356   priv = stream->priv;
2357
2358   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2359
2360   g_mutex_lock (&priv->lock);
2361   if (crypto)
2362     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2363         gst_caps_ref (crypto));
2364   else
2365     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2366   g_mutex_unlock (&priv->lock);
2367
2368   return TRUE;
2369 }
2370
2371 /**
2372  * gst_rtsp_stream_get_rtp_socket:
2373  * @stream: a #GstRTSPStream
2374  * @family: the socket family
2375  *
2376  * Get the RTP socket from @stream for a @family.
2377  *
2378  * @stream must be joined to a bin.
2379  *
2380  * Returns: (transfer full): the RTP socket or %NULL if no socket could be
2381  * allocated for @family. Unref after usage
2382  */
2383 GSocket *
2384 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2385 {
2386   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2387   GSocket *socket;
2388   const gchar *name;
2389
2390   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2391   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2392       family == G_SOCKET_FAMILY_IPV6, NULL);
2393   g_return_val_if_fail (priv->udpsink[0], NULL);
2394
2395   if (family == G_SOCKET_FAMILY_IPV6)
2396     name = "socket-v6";
2397   else
2398     name = "socket";
2399
2400   g_object_get (priv->udpsink[0], name, &socket, NULL);
2401
2402   return socket;
2403 }
2404
2405 /**
2406  * gst_rtsp_stream_get_rtcp_socket:
2407  * @stream: a #GstRTSPStream
2408  * @family: the socket family
2409  *
2410  * Get the RTCP socket from @stream for a @family.
2411  *
2412  * @stream must be joined to a bin.
2413  *
2414  * Returns: (transfer full): the RTCP socket or %NULL if no socket could be
2415  * allocated for @family. Unref after usage
2416  */
2417 GSocket *
2418 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2419 {
2420   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2421   GSocket *socket;
2422   const gchar *name;
2423
2424   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2425   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2426       family == G_SOCKET_FAMILY_IPV6, NULL);
2427   g_return_val_if_fail (priv->udpsink[1], NULL);
2428
2429   if (family == G_SOCKET_FAMILY_IPV6)
2430     name = "socket-v6";
2431   else
2432     name = "socket";
2433
2434   g_object_get (priv->udpsink[1], name, &socket, NULL);
2435
2436   return socket;
2437 }
2438
2439 /**
2440  * gst_rtsp_stream_transport_filter:
2441  * @stream: a #GstRTSPStream
2442  * @func: (scope call) (allow-none): a callback
2443  * @user_data: (closure): user data passed to @func
2444  *
2445  * Call @func for each transport managed by @stream. The result value of @func
2446  * determines what happens to the transport. @func will be called with @stream
2447  * locked so no further actions on @stream can be performed from @func.
2448  *
2449  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2450  * @stream.
2451  *
2452  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2453  *
2454  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2455  * will also be added with an additional ref to the result #GList of this
2456  * function..
2457  *
2458  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2459  *
2460  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2461  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2462  * element in the #GList should be unreffed before the list is freed.
2463  */
2464 GList *
2465 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2466     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2467 {
2468   GstRTSPStreamPrivate *priv;
2469   GList *result, *walk, *next;
2470
2471   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2472
2473   priv = stream->priv;
2474
2475   result = NULL;
2476
2477   g_mutex_lock (&priv->lock);
2478   for (walk = priv->transports; walk; walk = next) {
2479     GstRTSPStreamTransport *trans = walk->data;
2480     GstRTSPFilterResult res;
2481
2482     next = g_list_next (walk);
2483
2484     if (func)
2485       res = func (stream, trans, user_data);
2486     else
2487       res = GST_RTSP_FILTER_REF;
2488
2489     switch (res) {
2490       case GST_RTSP_FILTER_REMOVE:
2491         update_transport (stream, trans, FALSE);
2492         break;
2493       case GST_RTSP_FILTER_REF:
2494         result = g_list_prepend (result, g_object_ref (trans));
2495         break;
2496       case GST_RTSP_FILTER_KEEP:
2497       default:
2498         break;
2499     }
2500   }
2501   g_mutex_unlock (&priv->lock);
2502
2503   return result;
2504 }
2505
2506 static GstPadProbeReturn
2507 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2508 {
2509   GstRTSPStreamPrivate *priv;
2510   GstRTSPStream *stream;
2511
2512   stream = user_data;
2513   priv = stream->priv;
2514
2515   GST_DEBUG_OBJECT (pad, "now blocking");
2516
2517   g_mutex_lock (&priv->lock);
2518   priv->blocking = TRUE;
2519   g_mutex_unlock (&priv->lock);
2520
2521   gst_element_post_message (priv->payloader,
2522       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2523           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2524
2525   return GST_PAD_PROBE_OK;
2526 }
2527
2528 /**
2529  * gst_rtsp_stream_set_blocked:
2530  * @stream: a #GstRTSPStream
2531  * @blocked: boolean indicating we should block or unblock
2532  *
2533  * Blocks or unblocks the dataflow on @stream.
2534  *
2535  * Returns: %TRUE on success
2536  */
2537 gboolean
2538 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2539 {
2540   GstRTSPStreamPrivate *priv;
2541
2542   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2543
2544   priv = stream->priv;
2545
2546   g_mutex_lock (&priv->lock);
2547   if (blocked) {
2548     priv->blocking = FALSE;
2549     if (priv->blocked_id == 0) {
2550       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2551           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2552           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2553           g_object_ref (stream), g_object_unref);
2554     }
2555   } else {
2556     if (priv->blocked_id != 0) {
2557       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2558       priv->blocked_id = 0;
2559       priv->blocking = FALSE;
2560     }
2561   }
2562   g_mutex_unlock (&priv->lock);
2563
2564   return TRUE;
2565 }
2566
2567 /**
2568  * gst_rtsp_stream_is_blocking:
2569  * @stream: a #GstRTSPStream
2570  *
2571  * Check if @stream is blocking on a #GstBuffer.
2572  *
2573  * Returns: %TRUE if @stream is blocking
2574  */
2575 gboolean
2576 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2577 {
2578   GstRTSPStreamPrivate *priv;
2579   gboolean result;
2580
2581   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2582
2583   priv = stream->priv;
2584
2585   g_mutex_lock (&priv->lock);
2586   result = priv->blocking;
2587   g_mutex_unlock (&priv->lock);
2588
2589   return result;
2590 }