gi: improve annotations
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* SRTP encoder/decoder */
83   GstElement *srtpenc;
84   GstElement *srtpdec;
85   GHashTable *keys;
86
87   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
88    * sockets */
89   GstElement *udpsrc_v4[2];
90
91   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
92    * sockets */
93   GstElement *udpsrc_v6[2];
94
95   GstElement *udpsink[2];
96
97   /* for TCP transport */
98   GstElement *appsrc[2];
99   GstElement *appqueue[2];
100   GstElement *appsink[2];
101
102   GstElement *tee[2];
103   GstElement *funnel[2];
104
105   /* server ports for sending/receiving over ipv4 */
106   GstRTSPRange server_port_v4;
107   GstRTSPAddress *server_addr_v4;
108   gboolean have_ipv4;
109
110   /* server ports for sending/receiving over ipv6 */
111   GstRTSPRange server_port_v6;
112   GstRTSPAddress *server_addr_v6;
113   gboolean have_ipv6;
114
115   /* multicast addresses */
116   GstRTSPAddressPool *pool;
117   GstRTSPAddress *addr_v4;
118   GstRTSPAddress *addr_v6;
119
120   /* the caps of the stream */
121   gulong caps_sig;
122   GstCaps *caps;
123
124   /* transports we stream to */
125   guint n_active;
126   GList *transports;
127   gboolean tr_changed;
128   GList *tr_cache;
129
130   gint dscp_qos;
131
132   /* stream blocking */
133   gulong blocked_id;
134   gboolean blocking;
135 };
136
137 #define DEFAULT_CONTROL         NULL
138 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
139 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
140                                         GST_RTSP_LOWER_TRANS_TCP
141
142 enum
143 {
144   PROP_0,
145   PROP_CONTROL,
146   PROP_PROFILES,
147   PROP_PROTOCOLS,
148   PROP_LAST
149 };
150
151 enum
152 {
153   SIGNAL_NEW_RTP_ENCODER,
154   SIGNAL_NEW_RTCP_ENCODER,
155   SIGNAL_LAST
156 };
157
158 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
159 #define GST_CAT_DEFAULT rtsp_stream_debug
160
161 static GQuark ssrc_stream_map_key;
162
163 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
164     GValue * value, GParamSpec * pspec);
165 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
166     const GValue * value, GParamSpec * pspec);
167
168 static void gst_rtsp_stream_finalize (GObject * obj);
169
170 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
171
172 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
173
174 static void
175 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
176 {
177   GObjectClass *gobject_class;
178
179   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
180
181   gobject_class = G_OBJECT_CLASS (klass);
182
183   gobject_class->get_property = gst_rtsp_stream_get_property;
184   gobject_class->set_property = gst_rtsp_stream_set_property;
185   gobject_class->finalize = gst_rtsp_stream_finalize;
186
187   g_object_class_install_property (gobject_class, PROP_CONTROL,
188       g_param_spec_string ("control", "Control",
189           "The control string for this stream", DEFAULT_CONTROL,
190           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191
192   g_object_class_install_property (gobject_class, PROP_PROFILES,
193       g_param_spec_flags ("profiles", "Profiles",
194           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
195           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
197   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
198       g_param_spec_flags ("protocols", "Protocols",
199           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
200           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201
202   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
203       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
204       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
205       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
206
207   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
208       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
209       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
210       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
211
212   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
213
214   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
215 }
216
217 static void
218 gst_rtsp_stream_init (GstRTSPStream * stream)
219 {
220   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
221
222   GST_DEBUG ("new stream %p", stream);
223
224   stream->priv = priv;
225
226   priv->dscp_qos = -1;
227   priv->control = g_strdup (DEFAULT_CONTROL);
228   priv->profiles = DEFAULT_PROFILES;
229   priv->protocols = DEFAULT_PROTOCOLS;
230
231   g_mutex_init (&priv->lock);
232
233   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
234       NULL, (GDestroyNotify) gst_caps_unref);
235 }
236
237 static void
238 gst_rtsp_stream_finalize (GObject * obj)
239 {
240   GstRTSPStream *stream;
241   GstRTSPStreamPrivate *priv;
242
243   stream = GST_RTSP_STREAM (obj);
244   priv = stream->priv;
245
246   GST_DEBUG ("finalize stream %p", stream);
247
248   /* we really need to be unjoined now */
249   g_return_if_fail (!priv->is_joined);
250
251   if (priv->addr_v4)
252     gst_rtsp_address_free (priv->addr_v4);
253   if (priv->addr_v6)
254     gst_rtsp_address_free (priv->addr_v6);
255   if (priv->server_addr_v4)
256     gst_rtsp_address_free (priv->server_addr_v4);
257   if (priv->server_addr_v6)
258     gst_rtsp_address_free (priv->server_addr_v6);
259   if (priv->pool)
260     g_object_unref (priv->pool);
261   gst_object_unref (priv->payloader);
262   gst_object_unref (priv->srcpad);
263   g_free (priv->control);
264   g_mutex_clear (&priv->lock);
265
266   g_hash_table_unref (priv->keys);
267
268   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
269 }
270
271 static void
272 gst_rtsp_stream_get_property (GObject * object, guint propid,
273     GValue * value, GParamSpec * pspec)
274 {
275   GstRTSPStream *stream = GST_RTSP_STREAM (object);
276
277   switch (propid) {
278     case PROP_CONTROL:
279       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
280       break;
281     case PROP_PROFILES:
282       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
283       break;
284     case PROP_PROTOCOLS:
285       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
286       break;
287     default:
288       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
289   }
290 }
291
292 static void
293 gst_rtsp_stream_set_property (GObject * object, guint propid,
294     const GValue * value, GParamSpec * pspec)
295 {
296   GstRTSPStream *stream = GST_RTSP_STREAM (object);
297
298   switch (propid) {
299     case PROP_CONTROL:
300       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
301       break;
302     case PROP_PROFILES:
303       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
304       break;
305     case PROP_PROTOCOLS:
306       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
307       break;
308     default:
309       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
310   }
311 }
312
313 /**
314  * gst_rtsp_stream_new:
315  * @idx: an index
316  * @srcpad: a #GstPad
317  * @payloader: a #GstElement
318  *
319  * Create a new media stream with index @idx that handles RTP data on
320  * @srcpad and has a payloader element @payloader.
321  *
322  * Returns: (transfer full): a new #GstRTSPStream
323  */
324 GstRTSPStream *
325 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
326 {
327   GstRTSPStreamPrivate *priv;
328   GstRTSPStream *stream;
329
330   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
331   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
332   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
333
334   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
335   priv = stream->priv;
336   priv->idx = idx;
337   priv->payloader = gst_object_ref (payloader);
338   priv->srcpad = gst_object_ref (srcpad);
339
340   return stream;
341 }
342
343 /**
344  * gst_rtsp_stream_get_index:
345  * @stream: a #GstRTSPStream
346  *
347  * Get the stream index.
348  *
349  * Return: the stream index.
350  */
351 guint
352 gst_rtsp_stream_get_index (GstRTSPStream * stream)
353 {
354   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
355
356   return stream->priv->idx;
357 }
358
359 /**
360  * gst_rtsp_stream_get_pt:
361  * @stream: a #GstRTSPStream
362  *
363  * Get the stream payload type.
364  *
365  * Return: the stream payload type.
366  */
367 guint
368 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
369 {
370   GstRTSPStreamPrivate *priv;
371   guint pt;
372
373   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
374
375   priv = stream->priv;
376
377   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
378
379   return pt;
380 }
381
382 /**
383  * gst_rtsp_stream_get_srcpad:
384  * @stream: a #GstRTSPStream
385  *
386  * Get the srcpad associated with @stream.
387  *
388  * Returns: (transfer full): the srcpad. Unref after usage.
389  */
390 GstPad *
391 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
392 {
393   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
394
395   return gst_object_ref (stream->priv->srcpad);
396 }
397
398 /**
399  * gst_rtsp_stream_get_control:
400  * @stream: a #GstRTSPStream
401  *
402  * Get the control string to identify this stream.
403  *
404  * Returns: (transfer full): the control string. g_free() after usage.
405  */
406 gchar *
407 gst_rtsp_stream_get_control (GstRTSPStream * stream)
408 {
409   GstRTSPStreamPrivate *priv;
410   gchar *result;
411
412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
413
414   priv = stream->priv;
415
416   g_mutex_lock (&priv->lock);
417   if ((result = g_strdup (priv->control)) == NULL)
418     result = g_strdup_printf ("stream=%u", priv->idx);
419   g_mutex_unlock (&priv->lock);
420
421   return result;
422 }
423
424 /**
425  * gst_rtsp_stream_set_control:
426  * @stream: a #GstRTSPStream
427  * @control: a control string
428  *
429  * Set the control string in @stream.
430  */
431 void
432 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
433 {
434   GstRTSPStreamPrivate *priv;
435
436   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
437
438   priv = stream->priv;
439
440   g_mutex_lock (&priv->lock);
441   g_free (priv->control);
442   priv->control = g_strdup (control);
443   g_mutex_unlock (&priv->lock);
444 }
445
446 /**
447  * gst_rtsp_stream_has_control:
448  * @stream: a #GstRTSPStream
449  * @control: a control string
450  *
451  * Check if @stream has the control string @control.
452  *
453  * Returns: %TRUE is @stream has @control as the control string
454  */
455 gboolean
456 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
457 {
458   GstRTSPStreamPrivate *priv;
459   gboolean res;
460
461   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
462
463   priv = stream->priv;
464
465   g_mutex_lock (&priv->lock);
466   if (priv->control)
467     res = (g_strcmp0 (priv->control, control) == 0);
468   else {
469     guint streamid;
470
471     if (sscanf (control, "stream=%u", &streamid) > 0)
472       res = (streamid == priv->idx);
473     else
474       res = FALSE;
475   }
476   g_mutex_unlock (&priv->lock);
477
478   return res;
479 }
480
481 /**
482  * gst_rtsp_stream_set_mtu:
483  * @stream: a #GstRTSPStream
484  * @mtu: a new MTU
485  *
486  * Configure the mtu in the payloader of @stream to @mtu.
487  */
488 void
489 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
490 {
491   GstRTSPStreamPrivate *priv;
492
493   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
494
495   priv = stream->priv;
496
497   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
498
499   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
500 }
501
502 /**
503  * gst_rtsp_stream_get_mtu:
504  * @stream: a #GstRTSPStream
505  *
506  * Get the configured MTU in the payloader of @stream.
507  *
508  * Returns: the MTU of the payloader.
509  */
510 guint
511 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
512 {
513   GstRTSPStreamPrivate *priv;
514   guint mtu;
515
516   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
517
518   priv = stream->priv;
519
520   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
521
522   return mtu;
523 }
524
525 /* Update the dscp qos property on the udp sinks */
526 static void
527 update_dscp_qos (GstRTSPStream * stream)
528 {
529   GstRTSPStreamPrivate *priv;
530
531   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
532
533   priv = stream->priv;
534
535   if (priv->udpsink[0]) {
536     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
537         NULL);
538   }
539
540   if (priv->udpsink[1]) {
541     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
542         NULL);
543   }
544 }
545
546 /**
547  * gst_rtsp_stream_set_dscp_qos:
548  * @stream: a #GstRTSPStream
549  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
550  *
551  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
552  */
553 void
554 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
555 {
556   GstRTSPStreamPrivate *priv;
557
558   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
559
560   priv = stream->priv;
561
562   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
563
564   if (dscp_qos < -1 || dscp_qos > 63) {
565     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
566     return;
567   }
568
569   priv->dscp_qos = dscp_qos;
570
571   update_dscp_qos (stream);
572 }
573
574 /**
575  * gst_rtsp_stream_get_dscp_qos:
576  * @stream: a #GstRTSPStream
577  *
578  * Get the configured DSCP QoS in of the outgoing sockets.
579  *
580  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
581  */
582 gint
583 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
584 {
585   GstRTSPStreamPrivate *priv;
586
587   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
588
589   priv = stream->priv;
590
591   return priv->dscp_qos;
592 }
593
594 /**
595  * gst_rtsp_stream_is_transport_supported:
596  * @stream: a #GstRTSPStream
597  * @transport: (transfer none): a #GstRTSPTransport
598  *
599  * Check if @transport can be handled by stream
600  *
601  * Returns: %TRUE if @transport can be handled by @stream.
602  */
603 gboolean
604 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
605     GstRTSPTransport * transport)
606 {
607   GstRTSPStreamPrivate *priv;
608
609   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
610
611   priv = stream->priv;
612
613   g_mutex_lock (&priv->lock);
614   if (transport->trans != GST_RTSP_TRANS_RTP)
615     goto unsupported_transmode;
616
617   if (!(transport->profile & priv->profiles))
618     goto unsupported_profile;
619
620   if (!(transport->lower_transport & priv->protocols))
621     goto unsupported_ltrans;
622
623   g_mutex_unlock (&priv->lock);
624
625   return TRUE;
626
627   /* ERRORS */
628 unsupported_transmode:
629   {
630     GST_DEBUG ("unsupported transport mode %d", transport->trans);
631     g_mutex_unlock (&priv->lock);
632     return FALSE;
633   }
634 unsupported_profile:
635   {
636     GST_DEBUG ("unsupported profile %d", transport->profile);
637     g_mutex_unlock (&priv->lock);
638     return FALSE;
639   }
640 unsupported_ltrans:
641   {
642     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
643     g_mutex_unlock (&priv->lock);
644     return FALSE;
645   }
646 }
647
648 /**
649  * gst_rtsp_stream_set_profiles:
650  * @stream: a #GstRTSPStream
651  * @profiles: the new profiles
652  *
653  * Configure the allowed profiles for @stream.
654  */
655 void
656 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
657 {
658   GstRTSPStreamPrivate *priv;
659
660   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
661
662   priv = stream->priv;
663
664   g_mutex_lock (&priv->lock);
665   priv->profiles = profiles;
666   g_mutex_unlock (&priv->lock);
667 }
668
669 /**
670  * gst_rtsp_stream_get_profiles:
671  * @stream: a #GstRTSPStream
672  *
673  * Get the allowed profiles of @stream.
674  *
675  * Returns: a #GstRTSPProfile
676  */
677 GstRTSPProfile
678 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
679 {
680   GstRTSPStreamPrivate *priv;
681   GstRTSPProfile res;
682
683   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
684
685   priv = stream->priv;
686
687   g_mutex_lock (&priv->lock);
688   res = priv->profiles;
689   g_mutex_unlock (&priv->lock);
690
691   return res;
692 }
693
694 /**
695  * gst_rtsp_stream_set_protocols:
696  * @stream: a #GstRTSPStream
697  * @protocols: the new flags
698  *
699  * Configure the allowed lower transport for @stream.
700  */
701 void
702 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
703     GstRTSPLowerTrans protocols)
704 {
705   GstRTSPStreamPrivate *priv;
706
707   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
708
709   priv = stream->priv;
710
711   g_mutex_lock (&priv->lock);
712   priv->protocols = protocols;
713   g_mutex_unlock (&priv->lock);
714 }
715
716 /**
717  * gst_rtsp_stream_get_protocols:
718  * @stream: a #GstRTSPStream
719  *
720  * Get the allowed protocols of @stream.
721  *
722  * Returns: a #GstRTSPLowerTrans
723  */
724 GstRTSPLowerTrans
725 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
726 {
727   GstRTSPStreamPrivate *priv;
728   GstRTSPLowerTrans res;
729
730   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
731       GST_RTSP_LOWER_TRANS_UNKNOWN);
732
733   priv = stream->priv;
734
735   g_mutex_lock (&priv->lock);
736   res = priv->protocols;
737   g_mutex_unlock (&priv->lock);
738
739   return res;
740 }
741
742 /**
743  * gst_rtsp_stream_set_address_pool:
744  * @stream: a #GstRTSPStream
745  * @pool: (transfer none): a #GstRTSPAddressPool
746  *
747  * configure @pool to be used as the address pool of @stream.
748  */
749 void
750 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
751     GstRTSPAddressPool * pool)
752 {
753   GstRTSPStreamPrivate *priv;
754   GstRTSPAddressPool *old;
755
756   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
757
758   priv = stream->priv;
759
760   GST_LOG_OBJECT (stream, "set address pool %p", pool);
761
762   g_mutex_lock (&priv->lock);
763   if ((old = priv->pool) != pool)
764     priv->pool = pool ? g_object_ref (pool) : NULL;
765   else
766     old = NULL;
767   g_mutex_unlock (&priv->lock);
768
769   if (old)
770     g_object_unref (old);
771 }
772
773 /**
774  * gst_rtsp_stream_get_address_pool:
775  * @stream: a #GstRTSPStream
776  *
777  * Get the #GstRTSPAddressPool used as the address pool of @stream.
778  *
779  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
780  * usage.
781  */
782 GstRTSPAddressPool *
783 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
784 {
785   GstRTSPStreamPrivate *priv;
786   GstRTSPAddressPool *result;
787
788   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
789
790   priv = stream->priv;
791
792   g_mutex_lock (&priv->lock);
793   if ((result = priv->pool))
794     g_object_ref (result);
795   g_mutex_unlock (&priv->lock);
796
797   return result;
798 }
799
800 /**
801  * gst_rtsp_stream_get_multicast_address:
802  * @stream: a #GstRTSPStream
803  * @family: the #GSocketFamily
804  *
805  * Get the multicast address of @stream for @family.
806  *
807  * Returns: (transfer full): the #GstRTSPAddress of @stream or %NULL when no
808  * address could be allocated. gst_rtsp_address_free() after usage.
809  */
810 GstRTSPAddress *
811 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
812     GSocketFamily family)
813 {
814   GstRTSPStreamPrivate *priv;
815   GstRTSPAddress *result;
816   GstRTSPAddress **addrp;
817   GstRTSPAddressFlags flags;
818
819   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
820
821   priv = stream->priv;
822
823   if (family == G_SOCKET_FAMILY_IPV6) {
824     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
825     addrp = &priv->addr_v6;
826   } else {
827     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
828     addrp = &priv->addr_v4;
829   }
830
831   g_mutex_lock (&priv->lock);
832   if (*addrp == NULL) {
833     if (priv->pool == NULL)
834       goto no_pool;
835
836     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
837
838     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
839     if (*addrp == NULL)
840       goto no_address;
841   }
842   result = gst_rtsp_address_copy (*addrp);
843   g_mutex_unlock (&priv->lock);
844
845   return result;
846
847   /* ERRORS */
848 no_pool:
849   {
850     GST_ERROR_OBJECT (stream, "no address pool specified");
851     g_mutex_unlock (&priv->lock);
852     return NULL;
853   }
854 no_address:
855   {
856     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
857     g_mutex_unlock (&priv->lock);
858     return NULL;
859   }
860 }
861
862 /**
863  * gst_rtsp_stream_reserve_address:
864  * @stream: a #GstRTSPStream
865  * @address: an address
866  * @port: a port
867  * @n_ports: n_ports
868  * @ttl: a TTL
869  *
870  * Reserve @address and @port as the address and port of @stream.
871  *
872  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
873  * reserved. gst_rtsp_address_free() after usage.
874  */
875 GstRTSPAddress *
876 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
877     const gchar * address, guint port, guint n_ports, guint ttl)
878 {
879   GstRTSPStreamPrivate *priv;
880   GstRTSPAddress *result;
881   GInetAddress *addr;
882   GSocketFamily family;
883   GstRTSPAddress **addrp;
884
885   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
886   g_return_val_if_fail (address != NULL, NULL);
887   g_return_val_if_fail (port > 0, NULL);
888   g_return_val_if_fail (n_ports > 0, NULL);
889   g_return_val_if_fail (ttl > 0, NULL);
890
891   priv = stream->priv;
892
893   addr = g_inet_address_new_from_string (address);
894   if (!addr) {
895     GST_ERROR ("failed to get inet addr from %s", address);
896     family = G_SOCKET_FAMILY_IPV4;
897   } else {
898     family = g_inet_address_get_family (addr);
899     g_object_unref (addr);
900   }
901
902   if (family == G_SOCKET_FAMILY_IPV6)
903     addrp = &priv->addr_v6;
904   else
905     addrp = &priv->addr_v4;
906
907   g_mutex_lock (&priv->lock);
908   if (*addrp == NULL) {
909     GstRTSPAddressPoolResult res;
910
911     if (priv->pool == NULL)
912       goto no_pool;
913
914     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
915         port, n_ports, ttl, addrp);
916     if (res != GST_RTSP_ADDRESS_POOL_OK)
917       goto no_address;
918   } else {
919     if (strcmp ((*addrp)->address, address) ||
920         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
921         (*addrp)->ttl != ttl)
922       goto different_address;
923   }
924   result = gst_rtsp_address_copy (*addrp);
925   g_mutex_unlock (&priv->lock);
926
927   return result;
928
929   /* ERRORS */
930 no_pool:
931   {
932     GST_ERROR_OBJECT (stream, "no address pool specified");
933     g_mutex_unlock (&priv->lock);
934     return NULL;
935   }
936 no_address:
937   {
938     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
939         address);
940     g_mutex_unlock (&priv->lock);
941     return NULL;
942   }
943 different_address:
944   {
945     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
946         " reserved", address);
947     g_mutex_unlock (&priv->lock);
948     return NULL;
949   }
950 }
951
952 static gboolean
953 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
954     GSocketFamily family, GstElement * udpsrc_out[2],
955     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
956     GstRTSPAddress ** server_addr_out)
957 {
958   GstStateChangeReturn ret;
959   GstElement *udpsrc0, *udpsrc1;
960   GstElement *udpsink0, *udpsink1;
961   GSocket *rtp_socket = NULL;
962   GSocket *rtcp_socket;
963   gint tmp_rtp, tmp_rtcp;
964   guint count;
965   gint rtpport, rtcpport;
966   GList *rejected_addresses = NULL;
967   GstRTSPAddress *addr = NULL;
968   GInetAddress *inetaddr = NULL;
969   GSocketAddress *rtp_sockaddr = NULL;
970   GSocketAddress *rtcp_sockaddr = NULL;
971   const gchar *multisink_socket;
972
973   if (family == G_SOCKET_FAMILY_IPV6)
974     multisink_socket = "socket-v6";
975   else
976     multisink_socket = "socket";
977
978   udpsrc0 = NULL;
979   udpsrc1 = NULL;
980   udpsink0 = NULL;
981   udpsink1 = NULL;
982   count = 0;
983
984   /* Start with random port */
985   tmp_rtp = 0;
986
987   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
988       G_SOCKET_PROTOCOL_UDP, NULL);
989   if (!rtcp_socket)
990     goto no_udp_protocol;
991
992   if (*server_addr_out)
993     gst_rtsp_address_free (*server_addr_out);
994
995   /* try to allocate 2 UDP ports, the RTP port should be an even
996    * number and the RTCP port should be the next (uneven) port */
997 again:
998
999   if (rtp_socket == NULL) {
1000     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1001         G_SOCKET_PROTOCOL_UDP, NULL);
1002     if (!rtp_socket)
1003       goto no_udp_protocol;
1004   }
1005
1006   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1007     GstRTSPAddressFlags flags;
1008
1009     if (addr)
1010       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1011
1012     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1013     if (family == G_SOCKET_FAMILY_IPV6)
1014       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1015     else
1016       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1017
1018     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1019
1020     if (addr == NULL)
1021       goto no_ports;
1022
1023     tmp_rtp = addr->port;
1024
1025     g_clear_object (&inetaddr);
1026     inetaddr = g_inet_address_new_from_string (addr->address);
1027   } else {
1028     if (tmp_rtp != 0) {
1029       tmp_rtp += 2;
1030       if (++count > 20)
1031         goto no_ports;
1032     }
1033
1034     if (inetaddr == NULL)
1035       inetaddr = g_inet_address_new_any (family);
1036   }
1037
1038   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1039   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1040     g_object_unref (rtp_sockaddr);
1041     goto again;
1042   }
1043   g_object_unref (rtp_sockaddr);
1044
1045   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1046   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1047     g_clear_object (&rtp_sockaddr);
1048     goto socket_error;
1049   }
1050
1051   tmp_rtp =
1052       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1053   g_object_unref (rtp_sockaddr);
1054
1055   /* check if port is even */
1056   if ((tmp_rtp & 1) != 0) {
1057     /* port not even, close and allocate another */
1058     tmp_rtp++;
1059     g_clear_object (&rtp_socket);
1060     goto again;
1061   }
1062
1063   /* set port */
1064   tmp_rtcp = tmp_rtp + 1;
1065
1066   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1067   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1068     g_object_unref (rtcp_sockaddr);
1069     g_clear_object (&rtp_socket);
1070     goto again;
1071   }
1072   g_object_unref (rtcp_sockaddr);
1073
1074   g_clear_object (&inetaddr);
1075
1076   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1077   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1078
1079   if (udpsrc0 == NULL || udpsrc1 == NULL)
1080     goto no_udp_protocol;
1081
1082   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1083   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1084
1085   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1086   if (ret == GST_STATE_CHANGE_FAILURE)
1087     goto element_error;
1088   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1089   if (ret == GST_STATE_CHANGE_FAILURE)
1090     goto element_error;
1091
1092   /* all fine, do port check */
1093   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1094   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1095
1096   /* this should not happen... */
1097   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1098     goto port_error;
1099
1100   if (udpsink_out[0])
1101     udpsink0 = udpsink_out[0];
1102   else
1103     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1104
1105   if (!udpsink0)
1106     goto no_udp_protocol;
1107
1108   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1109   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1110
1111   if (udpsink_out[1])
1112     udpsink1 = udpsink_out[1];
1113   else
1114     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1115
1116   if (!udpsink1)
1117     goto no_udp_protocol;
1118
1119   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1120   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1121   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1122
1123   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1124   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1125   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1126   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1127   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1128   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1129   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1130   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1131
1132   /* we keep these elements, we will further configure them when the
1133    * client told us to really use the UDP ports. */
1134   udpsrc_out[0] = udpsrc0;
1135   udpsrc_out[1] = udpsrc1;
1136   udpsink_out[0] = udpsink0;
1137   udpsink_out[1] = udpsink1;
1138   server_port_out->min = rtpport;
1139   server_port_out->max = rtcpport;
1140
1141   *server_addr_out = addr;
1142   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1143
1144   g_object_unref (rtp_socket);
1145   g_object_unref (rtcp_socket);
1146
1147   return TRUE;
1148
1149   /* ERRORS */
1150 no_udp_protocol:
1151   {
1152     goto cleanup;
1153   }
1154 no_ports:
1155   {
1156     goto cleanup;
1157   }
1158 port_error:
1159   {
1160     goto cleanup;
1161   }
1162 socket_error:
1163   {
1164     goto cleanup;
1165   }
1166 element_error:
1167   {
1168     goto cleanup;
1169   }
1170 cleanup:
1171   {
1172     if (udpsrc0) {
1173       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1174       gst_object_unref (udpsrc0);
1175     }
1176     if (udpsrc1) {
1177       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1178       gst_object_unref (udpsrc1);
1179     }
1180     if (udpsink0) {
1181       gst_element_set_state (udpsink0, GST_STATE_NULL);
1182       gst_object_unref (udpsink0);
1183     }
1184     if (inetaddr)
1185       g_object_unref (inetaddr);
1186     g_list_free_full (rejected_addresses,
1187         (GDestroyNotify) gst_rtsp_address_free);
1188     if (addr)
1189       gst_rtsp_address_free (addr);
1190     if (rtp_socket)
1191       g_object_unref (rtp_socket);
1192     if (rtcp_socket)
1193       g_object_unref (rtcp_socket);
1194     return FALSE;
1195   }
1196 }
1197
1198 /* must be called with lock */
1199 static gboolean
1200 alloc_ports (GstRTSPStream * stream)
1201 {
1202   GstRTSPStreamPrivate *priv = stream->priv;
1203
1204   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1205       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1206       &priv->server_port_v4, &priv->server_addr_v4);
1207
1208   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1209       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1210       &priv->server_port_v6, &priv->server_addr_v6);
1211
1212   return priv->have_ipv4 || priv->have_ipv6;
1213 }
1214
1215 /**
1216  * gst_rtsp_stream_get_server_port:
1217  * @stream: a #GstRTSPStream
1218  * @server_port: (out): result server port
1219  * @family: the port family to get
1220  *
1221  * Fill @server_port with the port pair used by the server. This function can
1222  * only be called when @stream has been joined.
1223  */
1224 void
1225 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1226     GstRTSPRange * server_port, GSocketFamily family)
1227 {
1228   GstRTSPStreamPrivate *priv;
1229
1230   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1231   priv = stream->priv;
1232   g_return_if_fail (priv->is_joined);
1233
1234   g_mutex_lock (&priv->lock);
1235   if (family == G_SOCKET_FAMILY_IPV4) {
1236     if (server_port)
1237       *server_port = priv->server_port_v4;
1238   } else {
1239     if (server_port)
1240       *server_port = priv->server_port_v6;
1241   }
1242   g_mutex_unlock (&priv->lock);
1243 }
1244
1245 /**
1246  * gst_rtsp_stream_get_rtpsession:
1247  * @stream: a #GstRTSPStream
1248  *
1249  * Get the RTP session of this stream.
1250  *
1251  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1252  */
1253 GObject *
1254 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1255 {
1256   GstRTSPStreamPrivate *priv;
1257   GObject *session;
1258
1259   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1260
1261   priv = stream->priv;
1262
1263   g_mutex_lock (&priv->lock);
1264   if ((session = priv->session))
1265     g_object_ref (session);
1266   g_mutex_unlock (&priv->lock);
1267
1268   return session;
1269 }
1270
1271 /**
1272  * gst_rtsp_stream_get_ssrc:
1273  * @stream: a #GstRTSPStream
1274  * @ssrc: (out): result ssrc
1275  *
1276  * Get the SSRC used by the RTP session of this stream. This function can only
1277  * be called when @stream has been joined.
1278  */
1279 void
1280 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1281 {
1282   GstRTSPStreamPrivate *priv;
1283
1284   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1285   priv = stream->priv;
1286   g_return_if_fail (priv->is_joined);
1287
1288   g_mutex_lock (&priv->lock);
1289   if (ssrc && priv->session)
1290     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1291   g_mutex_unlock (&priv->lock);
1292 }
1293
1294 /* executed from streaming thread */
1295 static void
1296 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1297 {
1298   GstRTSPStreamPrivate *priv = stream->priv;
1299   GstCaps *newcaps, *oldcaps;
1300
1301   newcaps = gst_pad_get_current_caps (pad);
1302
1303   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1304       newcaps);
1305
1306   g_mutex_lock (&priv->lock);
1307   oldcaps = priv->caps;
1308   priv->caps = newcaps;
1309   g_mutex_unlock (&priv->lock);
1310
1311   if (oldcaps)
1312     gst_caps_unref (oldcaps);
1313 }
1314
1315 static void
1316 dump_structure (const GstStructure * s)
1317 {
1318   gchar *sstr;
1319
1320   sstr = gst_structure_to_string (s);
1321   GST_INFO ("structure: %s", sstr);
1322   g_free (sstr);
1323 }
1324
1325 static GstRTSPStreamTransport *
1326 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1327 {
1328   GstRTSPStreamPrivate *priv = stream->priv;
1329   GList *walk;
1330   GstRTSPStreamTransport *result = NULL;
1331   const gchar *tmp;
1332   gchar *dest;
1333   guint port;
1334
1335   if (rtcp_from == NULL)
1336     return NULL;
1337
1338   tmp = g_strrstr (rtcp_from, ":");
1339   if (tmp == NULL)
1340     return NULL;
1341
1342   port = atoi (tmp + 1);
1343   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1344
1345   g_mutex_lock (&priv->lock);
1346   GST_INFO ("finding %s:%d in %d transports", dest, port,
1347       g_list_length (priv->transports));
1348
1349   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1350     GstRTSPStreamTransport *trans = walk->data;
1351     const GstRTSPTransport *tr;
1352     gint min, max;
1353
1354     tr = gst_rtsp_stream_transport_get_transport (trans);
1355
1356     min = tr->client_port.min;
1357     max = tr->client_port.max;
1358
1359     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1360       result = trans;
1361       break;
1362     }
1363   }
1364   if (result)
1365     g_object_ref (result);
1366   g_mutex_unlock (&priv->lock);
1367
1368   g_free (dest);
1369
1370   return result;
1371 }
1372
1373 static GstRTSPStreamTransport *
1374 check_transport (GObject * source, GstRTSPStream * stream)
1375 {
1376   GstStructure *stats;
1377   GstRTSPStreamTransport *trans;
1378
1379   /* see if we have a stream to match with the origin of the RTCP packet */
1380   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1381   if (trans == NULL) {
1382     g_object_get (source, "stats", &stats, NULL);
1383     if (stats) {
1384       const gchar *rtcp_from;
1385
1386       dump_structure (stats);
1387
1388       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1389       if ((trans = find_transport (stream, rtcp_from))) {
1390         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1391             source);
1392         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1393             g_object_unref);
1394       }
1395       gst_structure_free (stats);
1396     }
1397   }
1398   return trans;
1399 }
1400
1401
1402 static void
1403 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1404 {
1405   GstRTSPStreamTransport *trans;
1406
1407   GST_INFO ("%p: new source %p", stream, source);
1408
1409   trans = check_transport (source, stream);
1410
1411   if (trans)
1412     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1413 }
1414
1415 static void
1416 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1417 {
1418   GST_INFO ("%p: new SDES %p", stream, source);
1419 }
1420
1421 static void
1422 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1423 {
1424   GstRTSPStreamTransport *trans;
1425
1426   trans = check_transport (source, stream);
1427
1428   if (trans) {
1429     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1430     gst_rtsp_stream_transport_keep_alive (trans);
1431   }
1432 #ifdef DUMP_STATS
1433   {
1434     GstStructure *stats;
1435     g_object_get (source, "stats", &stats, NULL);
1436     if (stats) {
1437       dump_structure (stats);
1438       gst_structure_free (stats);
1439     }
1440   }
1441 #endif
1442 }
1443
1444 static void
1445 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1446 {
1447   GST_INFO ("%p: source %p bye", stream, source);
1448 }
1449
1450 static void
1451 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1452 {
1453   GstRTSPStreamTransport *trans;
1454
1455   GST_INFO ("%p: source %p bye timeout", stream, source);
1456
1457   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1458     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1459     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1460   }
1461 }
1462
1463 static void
1464 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1465 {
1466   GstRTSPStreamTransport *trans;
1467
1468   GST_INFO ("%p: source %p timeout", stream, source);
1469
1470   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1471     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1472     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1473   }
1474 }
1475
1476 static void
1477 clear_tr_cache (GstRTSPStreamPrivate * priv)
1478 {
1479   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1480   g_list_free (priv->tr_cache);
1481   priv->tr_cache = NULL;
1482 }
1483
1484 static GstFlowReturn
1485 handle_new_sample (GstAppSink * sink, gpointer user_data)
1486 {
1487   GstRTSPStreamPrivate *priv;
1488   GList *walk;
1489   GstSample *sample;
1490   GstBuffer *buffer;
1491   GstRTSPStream *stream;
1492   gboolean is_rtp;
1493
1494   sample = gst_app_sink_pull_sample (sink);
1495   if (!sample)
1496     return GST_FLOW_OK;
1497
1498   stream = (GstRTSPStream *) user_data;
1499   priv = stream->priv;
1500   buffer = gst_sample_get_buffer (sample);
1501
1502   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1503
1504   g_mutex_lock (&priv->lock);
1505   if (priv->tr_changed) {
1506     clear_tr_cache (priv);
1507     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1508       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1509       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1510     }
1511     priv->tr_changed = FALSE;
1512   }
1513   g_mutex_unlock (&priv->lock);
1514
1515   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1516     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1517
1518     if (is_rtp) {
1519       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1520     } else {
1521       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1522     }
1523   }
1524   gst_sample_unref (sample);
1525
1526   return GST_FLOW_OK;
1527 }
1528
1529 static GstAppSinkCallbacks sink_cb = {
1530   NULL,                         /* not interested in EOS */
1531   NULL,                         /* not interested in preroll samples */
1532   handle_new_sample,
1533 };
1534
1535 static GstElement *
1536 get_rtp_encoder (GstRTSPStream * stream, guint session)
1537 {
1538   GstRTSPStreamPrivate *priv = stream->priv;
1539
1540   if (priv->srtpenc == NULL) {
1541     gchar *name;
1542
1543     name = g_strdup_printf ("srtpenc_%u", session);
1544     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1545     g_free (name);
1546
1547     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1548   }
1549   return gst_object_ref (priv->srtpenc);
1550 }
1551
1552 static GstElement *
1553 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1554 {
1555   GstRTSPStreamPrivate *priv = stream->priv;
1556   GstElement *oldenc, *enc;
1557   GstPad *pad;
1558   gchar *name;
1559
1560   if (priv->idx != session)
1561     return NULL;
1562
1563   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1564
1565   oldenc = priv->srtpenc;
1566   enc = get_rtp_encoder (stream, session);
1567   name = g_strdup_printf ("rtp_sink_%d", session);
1568   pad = gst_element_get_request_pad (enc, name);
1569   g_free (name);
1570   gst_object_unref (pad);
1571
1572   if (oldenc == NULL)
1573     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1574         enc);
1575
1576   return enc;
1577 }
1578
1579 static GstElement *
1580 request_rtcp_encoder (GstElement * rtpbin, guint session,
1581     GstRTSPStream * stream)
1582 {
1583   GstRTSPStreamPrivate *priv = stream->priv;
1584   GstElement *oldenc, *enc;
1585   GstPad *pad;
1586   gchar *name;
1587
1588   if (priv->idx != session)
1589     return NULL;
1590
1591   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1592
1593   oldenc = priv->srtpenc;
1594   enc = get_rtp_encoder (stream, session);
1595   name = g_strdup_printf ("rtcp_sink_%d", session);
1596   pad = gst_element_get_request_pad (enc, name);
1597   g_free (name);
1598   gst_object_unref (pad);
1599
1600   if (oldenc == NULL)
1601     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1602         enc);
1603
1604   return enc;
1605 }
1606
1607 static GstCaps *
1608 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1609 {
1610   GstRTSPStreamPrivate *priv = stream->priv;
1611   GstCaps *caps;
1612
1613   GST_DEBUG ("request key %08x", ssrc);
1614
1615   g_mutex_lock (&priv->lock);
1616   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1617     gst_caps_ref (caps);
1618   g_mutex_unlock (&priv->lock);
1619
1620   return caps;
1621 }
1622
1623 static GstElement *
1624 request_rtcp_decoder (GstElement * rtpbin, guint session,
1625     GstRTSPStream * stream)
1626 {
1627   GstRTSPStreamPrivate *priv = stream->priv;
1628
1629   if (priv->idx != session)
1630     return NULL;
1631
1632   if (priv->srtpdec == NULL) {
1633     gchar *name;
1634
1635     name = g_strdup_printf ("srtpdec_%u", session);
1636     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1637     g_free (name);
1638
1639     g_signal_connect (priv->srtpdec, "request-key",
1640         (GCallback) request_key, stream);
1641   }
1642   return gst_object_ref (priv->srtpdec);
1643 }
1644
1645 /**
1646  * gst_rtsp_stream_join_bin:
1647  * @stream: a #GstRTSPStream
1648  * @bin: (transfer none): a #GstBin to join
1649  * @rtpbin: (transfer none): a rtpbin element in @bin
1650  * @state: the target state of the new elements
1651  *
1652  * Join the #GstBin @bin that contains the element @rtpbin.
1653  *
1654  * @stream will link to @rtpbin, which must be inside @bin. The elements
1655  * added to @bin will be set to the state given in @state.
1656  *
1657  * Returns: %TRUE on success.
1658  */
1659 gboolean
1660 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1661     GstElement * rtpbin, GstState state)
1662 {
1663   GstRTSPStreamPrivate *priv;
1664   gint i;
1665   guint idx;
1666   gchar *name;
1667   GstPad *pad, *sinkpad, *selpad;
1668   GstPadLinkReturn ret;
1669
1670   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1671   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1672   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1673
1674   priv = stream->priv;
1675
1676   g_mutex_lock (&priv->lock);
1677   if (priv->is_joined)
1678     goto was_joined;
1679
1680   /* create a session with the same index as the stream */
1681   idx = priv->idx;
1682
1683   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1684
1685   if (!alloc_ports (stream))
1686     goto no_ports;
1687
1688   /* update the dscp qos field in the sinks */
1689   update_dscp_qos (stream);
1690
1691   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1692       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1693     /* For SRTP */
1694     g_signal_connect (rtpbin, "request-rtp-encoder",
1695         (GCallback) request_rtp_encoder, stream);
1696     g_signal_connect (rtpbin, "request-rtcp-encoder",
1697         (GCallback) request_rtcp_encoder, stream);
1698     g_signal_connect (rtpbin, "request-rtcp-decoder",
1699         (GCallback) request_rtcp_decoder, stream);
1700   }
1701
1702   /* get a pad for sending RTP */
1703   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1704   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1705   g_free (name);
1706   /* link the RTP pad to the session manager, it should not really fail unless
1707    * this is not really an RTP pad */
1708   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1709   if (ret != GST_PAD_LINK_OK)
1710     goto link_failed;
1711
1712   /* get pads from the RTP session element for sending and receiving
1713    * RTP/RTCP*/
1714   name = g_strdup_printf ("send_rtp_src_%u", idx);
1715   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1716   g_free (name);
1717   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1718   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1719   g_free (name);
1720   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1721   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1722   g_free (name);
1723   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1724   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1725   g_free (name);
1726
1727   /* get the session */
1728   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1729
1730   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1731       stream);
1732   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1733       stream);
1734   g_signal_connect (priv->session, "on-ssrc-active",
1735       (GCallback) on_ssrc_active, stream);
1736   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1737       stream);
1738   g_signal_connect (priv->session, "on-bye-timeout",
1739       (GCallback) on_bye_timeout, stream);
1740   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1741       stream);
1742
1743   for (i = 0; i < 2; i++) {
1744     GstPad *teepad, *queuepad;
1745     /* For the sender we create this bit of pipeline for both
1746      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1747      * we need to add a queue before appsink to make the pipeline
1748      * not block. For the TCP case, we want to pump data to the
1749      * client as fast as possible anyway.
1750      *
1751      * .--------.      .-----.    .---------.
1752      * | rtpbin |      | tee |    | udpsink |
1753      * |       send->sink   src->sink       |
1754      * '--------'      |     |    '---------'
1755      *                 |     |    .---------.    .---------.
1756      *                 |     |    |  queue  |    | appsink |
1757      *                 |    src->sink      src->sink       |
1758      *                 '-----'    '---------'    '---------'
1759      *
1760      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1761      * udpsink directly to the session.
1762      */
1763     /* add udpsink */
1764     gst_bin_add (bin, priv->udpsink[i]);
1765     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1766
1767     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1768       /* make tee for RTP/RTCP */
1769       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1770       gst_bin_add (bin, priv->tee[i]);
1771
1772       /* and link to rtpbin send pad */
1773       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1774       gst_pad_link (priv->send_src[i], pad);
1775       gst_object_unref (pad);
1776
1777       /* link tee to udpsink */
1778       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1779       gst_pad_link (teepad, sinkpad);
1780       gst_object_unref (teepad);
1781
1782       /* make queue */
1783       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1784       gst_bin_add (bin, priv->appqueue[i]);
1785       /* and link to tee */
1786       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1787       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1788       gst_pad_link (teepad, pad);
1789       gst_object_unref (pad);
1790       gst_object_unref (teepad);
1791
1792       /* make appsink */
1793       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1794       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1795       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1796       gst_bin_add (bin, priv->appsink[i]);
1797       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1798           &sink_cb, stream, NULL);
1799       /* and link to queue */
1800       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1801       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1802       gst_pad_link (queuepad, pad);
1803       gst_object_unref (pad);
1804       gst_object_unref (queuepad);
1805     } else {
1806       /* else only udpsink needed, link it to the session */
1807       gst_pad_link (priv->send_src[i], sinkpad);
1808     }
1809     gst_object_unref (sinkpad);
1810
1811     /* For the receiver we create this bit of pipeline for both
1812      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1813      * and it is all funneled into the rtpbin receive pad.
1814      *
1815      * .--------.     .--------.    .--------.
1816      * | udpsrc |     | funnel |    | rtpbin |
1817      * |       src->sink      src->sink      |
1818      * '--------'     |        |    '--------'
1819      * .--------.     |        |
1820      * | appsrc |     |        |
1821      * |       src->sink       |
1822      * '--------'     '--------'
1823      */
1824     /* make funnel for the RTP/RTCP receivers */
1825     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1826     gst_bin_add (bin, priv->funnel[i]);
1827
1828     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1829     gst_pad_link (pad, priv->recv_sink[i]);
1830     gst_object_unref (pad);
1831
1832     if (priv->udpsrc_v4[i]) {
1833       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1834        * values */
1835       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1836       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1837       /* add udpsrc */
1838       gst_bin_add (bin, priv->udpsrc_v4[i]);
1839
1840       /* and link to the funnel v4 */
1841       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1842       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1843       gst_pad_link (pad, selpad);
1844       gst_object_unref (pad);
1845       gst_object_unref (selpad);
1846     }
1847
1848     if (priv->udpsrc_v6[i]) {
1849       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1850       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1851       gst_bin_add (bin, priv->udpsrc_v6[i]);
1852
1853       /* and link to the funnel v6 */
1854       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1855       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1856       gst_pad_link (pad, selpad);
1857       gst_object_unref (pad);
1858       gst_object_unref (selpad);
1859     }
1860
1861     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1862       /* make and add appsrc */
1863       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1864       gst_bin_add (bin, priv->appsrc[i]);
1865       /* and link to the funnel */
1866       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1867       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1868       gst_pad_link (pad, selpad);
1869       gst_object_unref (pad);
1870       gst_object_unref (selpad);
1871     }
1872
1873     /* check if we need to set to a special state */
1874     if (state != GST_STATE_NULL) {
1875       if (priv->udpsink[i])
1876         gst_element_set_state (priv->udpsink[i], state);
1877       if (priv->appsink[i])
1878         gst_element_set_state (priv->appsink[i], state);
1879       if (priv->appqueue[i])
1880         gst_element_set_state (priv->appqueue[i], state);
1881       if (priv->tee[i])
1882         gst_element_set_state (priv->tee[i], state);
1883       if (priv->funnel[i])
1884         gst_element_set_state (priv->funnel[i], state);
1885       if (priv->appsrc[i])
1886         gst_element_set_state (priv->appsrc[i], state);
1887     }
1888   }
1889
1890   /* be notified of caps changes */
1891   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
1892       (GCallback) caps_notify, stream);
1893
1894   priv->is_joined = TRUE;
1895   g_mutex_unlock (&priv->lock);
1896
1897   return TRUE;
1898
1899   /* ERRORS */
1900 was_joined:
1901   {
1902     g_mutex_unlock (&priv->lock);
1903     return TRUE;
1904   }
1905 no_ports:
1906   {
1907     g_mutex_unlock (&priv->lock);
1908     GST_WARNING ("failed to allocate ports %u", idx);
1909     return FALSE;
1910   }
1911 link_failed:
1912   {
1913     GST_WARNING ("failed to link stream %u", idx);
1914     gst_object_unref (priv->send_rtp_sink);
1915     priv->send_rtp_sink = NULL;
1916     g_mutex_unlock (&priv->lock);
1917     return FALSE;
1918   }
1919 }
1920
1921 /**
1922  * gst_rtsp_stream_leave_bin:
1923  * @stream: a #GstRTSPStream
1924  * @bin: (transfer none): a #GstBin
1925  * @rtpbin: (transfer none): a rtpbin #GstElement
1926  *
1927  * Remove the elements of @stream from @bin.
1928  *
1929  * Return: %TRUE on success.
1930  */
1931 gboolean
1932 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1933     GstElement * rtpbin)
1934 {
1935   GstRTSPStreamPrivate *priv;
1936   gint i;
1937
1938   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1939   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1940   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1941
1942   priv = stream->priv;
1943
1944   g_mutex_lock (&priv->lock);
1945   if (!priv->is_joined)
1946     goto was_not_joined;
1947
1948   /* all transports must be removed by now */
1949   g_return_val_if_fail (priv->transports == NULL, FALSE);
1950
1951   clear_tr_cache (priv);
1952
1953   GST_INFO ("stream %p leaving bin", stream);
1954
1955   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1956   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
1957   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1958   gst_object_unref (priv->send_rtp_sink);
1959   priv->send_rtp_sink = NULL;
1960
1961   for (i = 0; i < 2; i++) {
1962     if (priv->udpsink[i])
1963       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1964     if (priv->appsink[i])
1965       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1966     if (priv->appqueue[i])
1967       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1968     if (priv->tee[i])
1969       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1970     if (priv->funnel[i])
1971       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1972     if (priv->appsrc[i])
1973       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1974     if (priv->udpsrc_v4[i]) {
1975       /* and set udpsrc to NULL now before removing */
1976       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1977       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1978       /* removing them should also nicely release the request
1979        * pads when they finalize */
1980       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1981     }
1982     if (priv->udpsrc_v6[i]) {
1983       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1984       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1985       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1986     }
1987     if (priv->udpsink[i])
1988       gst_bin_remove (bin, priv->udpsink[i]);
1989     if (priv->appsrc[i])
1990       gst_bin_remove (bin, priv->appsrc[i]);
1991     if (priv->appsink[i])
1992       gst_bin_remove (bin, priv->appsink[i]);
1993     if (priv->appqueue[i])
1994       gst_bin_remove (bin, priv->appqueue[i]);
1995     if (priv->tee[i])
1996       gst_bin_remove (bin, priv->tee[i]);
1997     if (priv->funnel[i])
1998       gst_bin_remove (bin, priv->funnel[i]);
1999
2000     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2001     gst_object_unref (priv->recv_sink[i]);
2002     priv->recv_sink[i] = NULL;
2003
2004     priv->udpsrc_v4[i] = NULL;
2005     priv->udpsrc_v6[i] = NULL;
2006     priv->udpsink[i] = NULL;
2007     priv->appsrc[i] = NULL;
2008     priv->appsink[i] = NULL;
2009     priv->appqueue[i] = NULL;
2010     priv->tee[i] = NULL;
2011     priv->funnel[i] = NULL;
2012   }
2013   gst_object_unref (priv->send_src[0]);
2014   priv->send_src[0] = NULL;
2015
2016   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2017   gst_object_unref (priv->send_src[1]);
2018   priv->send_src[1] = NULL;
2019
2020   g_object_unref (priv->session);
2021   priv->session = NULL;
2022   if (priv->caps)
2023     gst_caps_unref (priv->caps);
2024   priv->caps = NULL;
2025
2026   if (priv->srtpenc)
2027     gst_object_unref (priv->srtpenc);
2028
2029   priv->is_joined = FALSE;
2030   g_mutex_unlock (&priv->lock);
2031
2032   return TRUE;
2033
2034 was_not_joined:
2035   {
2036     g_mutex_unlock (&priv->lock);
2037     return TRUE;
2038   }
2039 }
2040
2041 /**
2042  * gst_rtsp_stream_get_rtpinfo:
2043  * @stream: a #GstRTSPStream
2044  * @rtptime: (allow-none): result RTP timestamp
2045  * @seq: (allow-none): result RTP seqnum
2046  * @clock_rate: (allow-none): the clock rate
2047  * @running_time: (allow-none): result running-time
2048  *
2049  * Retrieve the current rtptime, seq and running-time. This is used to
2050  * construct a RTPInfo reply header.
2051  *
2052  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2053  */
2054 gboolean
2055 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2056     guint * rtptime, guint * seq, guint * clock_rate,
2057     GstClockTime * running_time)
2058 {
2059   GstRTSPStreamPrivate *priv;
2060   GstStructure *stats;
2061   GObjectClass *payobjclass;
2062
2063   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2064
2065   priv = stream->priv;
2066
2067   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2068
2069   g_mutex_lock (&priv->lock);
2070
2071   if (g_object_class_find_property (payobjclass, "stats")) {
2072     g_object_get (priv->payloader, "stats", &stats, NULL);
2073     if (stats == NULL)
2074       goto no_stats;
2075
2076     if (seq)
2077       gst_structure_get_uint (stats, "seqnum", seq);
2078
2079     if (rtptime)
2080       gst_structure_get_uint (stats, "timestamp", rtptime);
2081
2082     if (running_time)
2083       gst_structure_get_clock_time (stats, "running-time", running_time);
2084
2085     if (clock_rate) {
2086       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2087       if (*clock_rate == 0 && running_time)
2088         *running_time = GST_CLOCK_TIME_NONE;
2089     }
2090     gst_structure_free (stats);
2091   } else {
2092     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2093         !g_object_class_find_property (payobjclass, "timestamp"))
2094       goto no_stats;
2095
2096     if (seq)
2097       g_object_get (priv->payloader, "seqnum", seq, NULL);
2098
2099     if (rtptime)
2100       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2101
2102     if (running_time)
2103       *running_time = GST_CLOCK_TIME_NONE;
2104   }
2105   g_mutex_unlock (&priv->lock);
2106
2107   return TRUE;
2108
2109   /* ERRORS */
2110 no_stats:
2111   {
2112     GST_WARNING ("Could not get payloader stats");
2113     g_mutex_unlock (&priv->lock);
2114     return FALSE;
2115   }
2116 }
2117
2118 /**
2119  * gst_rtsp_stream_get_caps:
2120  * @stream: a #GstRTSPStream
2121  *
2122  * Retrieve the current caps of @stream.
2123  *
2124  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2125  * after usage.
2126  */
2127 GstCaps *
2128 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2129 {
2130   GstRTSPStreamPrivate *priv;
2131   GstCaps *result;
2132
2133   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2134
2135   priv = stream->priv;
2136
2137   g_mutex_lock (&priv->lock);
2138   if ((result = priv->caps))
2139     gst_caps_ref (result);
2140   g_mutex_unlock (&priv->lock);
2141
2142   return result;
2143 }
2144
2145 /**
2146  * gst_rtsp_stream_recv_rtp:
2147  * @stream: a #GstRTSPStream
2148  * @buffer: (transfer full): a #GstBuffer
2149  *
2150  * Handle an RTP buffer for the stream. This method is usually called when a
2151  * message has been received from a client using the TCP transport.
2152  *
2153  * This function takes ownership of @buffer.
2154  *
2155  * Returns: a GstFlowReturn.
2156  */
2157 GstFlowReturn
2158 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2159 {
2160   GstRTSPStreamPrivate *priv;
2161   GstFlowReturn ret;
2162   GstElement *element;
2163
2164   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2165   priv = stream->priv;
2166   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2167   g_return_val_if_fail (priv->is_joined, FALSE);
2168
2169   g_mutex_lock (&priv->lock);
2170   if (priv->appsrc[0])
2171     element = gst_object_ref (priv->appsrc[0]);
2172   else
2173     element = NULL;
2174   g_mutex_unlock (&priv->lock);
2175
2176   if (element) {
2177     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2178     gst_object_unref (element);
2179   } else {
2180     ret = GST_FLOW_OK;
2181   }
2182   return ret;
2183 }
2184
2185 /**
2186  * gst_rtsp_stream_recv_rtcp:
2187  * @stream: a #GstRTSPStream
2188  * @buffer: (transfer full): a #GstBuffer
2189  *
2190  * Handle an RTCP buffer for the stream. This method is usually called when a
2191  * message has been received from a client using the TCP transport.
2192  *
2193  * This function takes ownership of @buffer.
2194  *
2195  * Returns: a GstFlowReturn.
2196  */
2197 GstFlowReturn
2198 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2199 {
2200   GstRTSPStreamPrivate *priv;
2201   GstFlowReturn ret;
2202   GstElement *element;
2203
2204   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2205   priv = stream->priv;
2206   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2207   g_return_val_if_fail (priv->is_joined, FALSE);
2208
2209   g_mutex_lock (&priv->lock);
2210   if (priv->appsrc[1])
2211     element = gst_object_ref (priv->appsrc[1]);
2212   else
2213     element = NULL;
2214   g_mutex_unlock (&priv->lock);
2215
2216   if (element) {
2217     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2218     gst_object_unref (element);
2219   } else {
2220     ret = GST_FLOW_OK;
2221     gst_buffer_unref (buffer);
2222   }
2223   return ret;
2224 }
2225
2226 /* must be called with lock */
2227 static gboolean
2228 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2229     gboolean add)
2230 {
2231   GstRTSPStreamPrivate *priv = stream->priv;
2232   const GstRTSPTransport *tr;
2233
2234   tr = gst_rtsp_stream_transport_get_transport (trans);
2235
2236   switch (tr->lower_transport) {
2237     case GST_RTSP_LOWER_TRANS_UDP:
2238     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2239     {
2240       gchar *dest;
2241       gint min, max;
2242       guint ttl = 0;
2243
2244       dest = tr->destination;
2245       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2246         min = tr->port.min;
2247         max = tr->port.max;
2248         ttl = tr->ttl;
2249       } else {
2250         min = tr->client_port.min;
2251         max = tr->client_port.max;
2252       }
2253
2254       if (add) {
2255         if (ttl > 0) {
2256           GST_INFO ("setting ttl-mc %d", ttl);
2257           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2258           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2259         }
2260         GST_INFO ("adding %s:%d-%d", dest, min, max);
2261         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2262         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2263         priv->transports = g_list_prepend (priv->transports, trans);
2264       } else {
2265         GST_INFO ("removing %s:%d-%d", dest, min, max);
2266         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2267         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2268         priv->transports = g_list_remove (priv->transports, trans);
2269       }
2270       priv->tr_changed = TRUE;
2271       break;
2272     }
2273     case GST_RTSP_LOWER_TRANS_TCP:
2274       if (add) {
2275         GST_INFO ("adding TCP %s", tr->destination);
2276         priv->transports = g_list_prepend (priv->transports, trans);
2277       } else {
2278         GST_INFO ("removing TCP %s", tr->destination);
2279         priv->transports = g_list_remove (priv->transports, trans);
2280       }
2281       priv->tr_changed = TRUE;
2282       break;
2283     default:
2284       goto unknown_transport;
2285   }
2286   return TRUE;
2287
2288   /* ERRORS */
2289 unknown_transport:
2290   {
2291     GST_INFO ("Unknown transport %d", tr->lower_transport);
2292     return FALSE;
2293   }
2294 }
2295
2296
2297 /**
2298  * gst_rtsp_stream_add_transport:
2299  * @stream: a #GstRTSPStream
2300  * @trans: (transfer none): a #GstRTSPStreamTransport
2301  *
2302  * Add the transport in @trans to @stream. The media of @stream will
2303  * then also be send to the values configured in @trans.
2304  *
2305  * @stream must be joined to a bin.
2306  *
2307  * @trans must contain a valid #GstRTSPTransport.
2308  *
2309  * Returns: %TRUE if @trans was added
2310  */
2311 gboolean
2312 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2313     GstRTSPStreamTransport * trans)
2314 {
2315   GstRTSPStreamPrivate *priv;
2316   gboolean res;
2317
2318   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2319   priv = stream->priv;
2320   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2321   g_return_val_if_fail (priv->is_joined, FALSE);
2322
2323   g_mutex_lock (&priv->lock);
2324   res = update_transport (stream, trans, TRUE);
2325   g_mutex_unlock (&priv->lock);
2326
2327   return res;
2328 }
2329
2330 /**
2331  * gst_rtsp_stream_remove_transport:
2332  * @stream: a #GstRTSPStream
2333  * @trans: (transfer none): a #GstRTSPStreamTransport
2334  *
2335  * Remove the transport in @trans from @stream. The media of @stream will
2336  * not be sent to the values configured in @trans.
2337  *
2338  * @stream must be joined to a bin.
2339  *
2340  * @trans must contain a valid #GstRTSPTransport.
2341  *
2342  * Returns: %TRUE if @trans was removed
2343  */
2344 gboolean
2345 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2346     GstRTSPStreamTransport * trans)
2347 {
2348   GstRTSPStreamPrivate *priv;
2349   gboolean res;
2350
2351   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2352   priv = stream->priv;
2353   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2354   g_return_val_if_fail (priv->is_joined, FALSE);
2355
2356   g_mutex_lock (&priv->lock);
2357   res = update_transport (stream, trans, FALSE);
2358   g_mutex_unlock (&priv->lock);
2359
2360   return res;
2361 }
2362
2363 /**
2364  * gst_rtsp_stream_update_crypto:
2365  * @stream: a #GstRTSPStream
2366  * @ssrc: the SSRC
2367  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2368  *
2369  * Update the new crypto information for @ssrc in @stream. If information
2370  * for @ssrc did not exist, it will be added. If information
2371  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2372  * be removed from @stream.
2373  *
2374  * Returns: %TRUE if @crypto could be updated
2375  */
2376 gboolean
2377 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2378     guint ssrc, GstCaps * crypto)
2379 {
2380   GstRTSPStreamPrivate *priv;
2381
2382   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2383   g_return_val_if_fail (GST_IS_CAPS (crypto), FALSE);
2384
2385   priv = stream->priv;
2386
2387   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2388
2389   g_mutex_lock (&priv->lock);
2390   if (crypto)
2391     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2392         gst_caps_ref (crypto));
2393   else
2394     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2395   g_mutex_unlock (&priv->lock);
2396
2397   return TRUE;
2398 }
2399
2400 /**
2401  * gst_rtsp_stream_get_rtp_socket:
2402  * @stream: a #GstRTSPStream
2403  * @family: the socket family
2404  *
2405  * Get the RTP socket from @stream for a @family.
2406  *
2407  * @stream must be joined to a bin.
2408  *
2409  * Returns: (transfer full): the RTP socket or %NULL if no socket could be
2410  * allocated for @family. Unref after usage
2411  */
2412 GSocket *
2413 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2414 {
2415   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2416   GSocket *socket;
2417   const gchar *name;
2418
2419   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2420   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2421       family == G_SOCKET_FAMILY_IPV6, NULL);
2422   g_return_val_if_fail (priv->udpsink[0], NULL);
2423
2424   if (family == G_SOCKET_FAMILY_IPV6)
2425     name = "socket-v6";
2426   else
2427     name = "socket";
2428
2429   g_object_get (priv->udpsink[0], name, &socket, NULL);
2430
2431   return socket;
2432 }
2433
2434 /**
2435  * gst_rtsp_stream_get_rtcp_socket:
2436  * @stream: a #GstRTSPStream
2437  * @family: the socket family
2438  *
2439  * Get the RTCP socket from @stream for a @family.
2440  *
2441  * @stream must be joined to a bin.
2442  *
2443  * Returns: (transfer full): the RTCP socket or %NULL if no socket could be
2444  * allocated for @family. Unref after usage
2445  */
2446 GSocket *
2447 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2448 {
2449   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2450   GSocket *socket;
2451   const gchar *name;
2452
2453   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2454   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2455       family == G_SOCKET_FAMILY_IPV6, NULL);
2456   g_return_val_if_fail (priv->udpsink[1], NULL);
2457
2458   if (family == G_SOCKET_FAMILY_IPV6)
2459     name = "socket-v6";
2460   else
2461     name = "socket";
2462
2463   g_object_get (priv->udpsink[1], name, &socket, NULL);
2464
2465   return socket;
2466 }
2467
2468 /**
2469  * gst_rtsp_stream_transport_filter:
2470  * @stream: a #GstRTSPStream
2471  * @func: (scope call) (allow-none): a callback
2472  * @user_data: (closure): user data passed to @func
2473  *
2474  * Call @func for each transport managed by @stream. The result value of @func
2475  * determines what happens to the transport. @func will be called with @stream
2476  * locked so no further actions on @stream can be performed from @func.
2477  *
2478  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2479  * @stream.
2480  *
2481  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2482  *
2483  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2484  * will also be added with an additional ref to the result #GList of this
2485  * function..
2486  *
2487  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2488  *
2489  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2490  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2491  * element in the #GList should be unreffed before the list is freed.
2492  */
2493 GList *
2494 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2495     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2496 {
2497   GstRTSPStreamPrivate *priv;
2498   GList *result, *walk, *next;
2499
2500   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2501
2502   priv = stream->priv;
2503
2504   result = NULL;
2505
2506   g_mutex_lock (&priv->lock);
2507   for (walk = priv->transports; walk; walk = next) {
2508     GstRTSPStreamTransport *trans = walk->data;
2509     GstRTSPFilterResult res;
2510
2511     next = g_list_next (walk);
2512
2513     if (func)
2514       res = func (stream, trans, user_data);
2515     else
2516       res = GST_RTSP_FILTER_REF;
2517
2518     switch (res) {
2519       case GST_RTSP_FILTER_REMOVE:
2520         update_transport (stream, trans, FALSE);
2521         break;
2522       case GST_RTSP_FILTER_REF:
2523         result = g_list_prepend (result, g_object_ref (trans));
2524         break;
2525       case GST_RTSP_FILTER_KEEP:
2526       default:
2527         break;
2528     }
2529   }
2530   g_mutex_unlock (&priv->lock);
2531
2532   return result;
2533 }
2534
2535 static GstPadProbeReturn
2536 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2537 {
2538   GstRTSPStreamPrivate *priv;
2539   GstRTSPStream *stream;
2540
2541   stream = user_data;
2542   priv = stream->priv;
2543
2544   GST_DEBUG_OBJECT (pad, "now blocking");
2545
2546   g_mutex_lock (&priv->lock);
2547   priv->blocking = TRUE;
2548   g_mutex_unlock (&priv->lock);
2549
2550   gst_element_post_message (priv->payloader,
2551       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2552           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2553
2554   return GST_PAD_PROBE_OK;
2555 }
2556
2557 /**
2558  * gst_rtsp_stream_set_blocked:
2559  * @stream: a #GstRTSPStream
2560  * @blocked: boolean indicating we should block or unblock
2561  *
2562  * Blocks or unblocks the dataflow on @stream.
2563  *
2564  * Returns: %TRUE on success
2565  */
2566 gboolean
2567 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2568 {
2569   GstRTSPStreamPrivate *priv;
2570
2571   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2572
2573   priv = stream->priv;
2574
2575   g_mutex_lock (&priv->lock);
2576   if (blocked) {
2577     priv->blocking = FALSE;
2578     if (priv->blocked_id == 0) {
2579       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2580           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2581           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2582           g_object_ref (stream), g_object_unref);
2583     }
2584   } else {
2585     if (priv->blocked_id != 0) {
2586       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2587       priv->blocked_id = 0;
2588       priv->blocking = FALSE;
2589     }
2590   }
2591   g_mutex_unlock (&priv->lock);
2592
2593   return TRUE;
2594 }
2595
2596 /**
2597  * gst_rtsp_stream_is_blocking:
2598  * @stream: a #GstRTSPStream
2599  *
2600  * Check if @stream is blocking on a #GstBuffer.
2601  *
2602  * Returns: %TRUE if @stream is blocking
2603  */
2604 gboolean
2605 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2606 {
2607   GstRTSPStreamPrivate *priv;
2608   gboolean result;
2609
2610   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2611
2612   priv = stream->priv;
2613
2614   g_mutex_lock (&priv->lock);
2615   result = priv->blocking;
2616   g_mutex_unlock (&priv->lock);
2617
2618   return result;
2619 }