stream: add SRTP support
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* SRTP encoder/decoder */
83   GstElement *srtpenc;
84   GstElement *srtpdec;
85
86   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
87    * sockets */
88   GstElement *udpsrc_v4[2];
89
90   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
91    * sockets */
92   GstElement *udpsrc_v6[2];
93
94   GstElement *udpsink[2];
95
96   /* for TCP transport */
97   GstElement *appsrc[2];
98   GstElement *appqueue[2];
99   GstElement *appsink[2];
100
101   GstElement *tee[2];
102   GstElement *funnel[2];
103
104   /* server ports for sending/receiving over ipv4 */
105   GstRTSPRange server_port_v4;
106   GstRTSPAddress *server_addr_v4;
107   gboolean have_ipv4;
108
109   /* server ports for sending/receiving over ipv6 */
110   GstRTSPRange server_port_v6;
111   GstRTSPAddress *server_addr_v6;
112   gboolean have_ipv6;
113
114   /* multicast addresses */
115   GstRTSPAddressPool *pool;
116   GstRTSPAddress *addr_v4;
117   GstRTSPAddress *addr_v6;
118
119   /* the caps of the stream */
120   gulong caps_sig;
121   GstCaps *caps;
122
123   /* transports we stream to */
124   guint n_active;
125   GList *transports;
126   gboolean tr_changed;
127   GList *tr_cache;
128
129   gint dscp_qos;
130
131   /* stream blocking */
132   gulong blocked_id;
133   gboolean blocking;
134 };
135
136 #define DEFAULT_CONTROL         NULL
137 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
138 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
139                                         GST_RTSP_LOWER_TRANS_TCP
140
141 enum
142 {
143   PROP_0,
144   PROP_CONTROL,
145   PROP_PROFILES,
146   PROP_PROTOCOLS,
147   PROP_LAST
148 };
149
150 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
151 #define GST_CAT_DEFAULT rtsp_stream_debug
152
153 static GQuark ssrc_stream_map_key;
154
155 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
156     GValue * value, GParamSpec * pspec);
157 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
158     const GValue * value, GParamSpec * pspec);
159
160 static void gst_rtsp_stream_finalize (GObject * obj);
161
162 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
163
164 static void
165 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
166 {
167   GObjectClass *gobject_class;
168
169   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
170
171   gobject_class = G_OBJECT_CLASS (klass);
172
173   gobject_class->get_property = gst_rtsp_stream_get_property;
174   gobject_class->set_property = gst_rtsp_stream_set_property;
175   gobject_class->finalize = gst_rtsp_stream_finalize;
176
177   g_object_class_install_property (gobject_class, PROP_CONTROL,
178       g_param_spec_string ("control", "Control",
179           "The control string for this stream", DEFAULT_CONTROL,
180           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
181
182   g_object_class_install_property (gobject_class, PROP_PROFILES,
183       g_param_spec_flags ("profiles", "Profiles",
184           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
185           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
186
187   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
188       g_param_spec_flags ("protocols", "Protocols",
189           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
190           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191
192   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
193
194   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
195 }
196
197 static void
198 gst_rtsp_stream_init (GstRTSPStream * stream)
199 {
200   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
201
202   GST_DEBUG ("new stream %p", stream);
203
204   stream->priv = priv;
205
206   priv->dscp_qos = -1;
207   priv->control = g_strdup (DEFAULT_CONTROL);
208   priv->profiles = DEFAULT_PROFILES;
209   priv->protocols = DEFAULT_PROTOCOLS;
210
211   g_mutex_init (&priv->lock);
212 }
213
214 static void
215 gst_rtsp_stream_finalize (GObject * obj)
216 {
217   GstRTSPStream *stream;
218   GstRTSPStreamPrivate *priv;
219
220   stream = GST_RTSP_STREAM (obj);
221   priv = stream->priv;
222
223   GST_DEBUG ("finalize stream %p", stream);
224
225   /* we really need to be unjoined now */
226   g_return_if_fail (!priv->is_joined);
227
228   if (priv->addr_v4)
229     gst_rtsp_address_free (priv->addr_v4);
230   if (priv->addr_v6)
231     gst_rtsp_address_free (priv->addr_v6);
232   if (priv->server_addr_v4)
233     gst_rtsp_address_free (priv->server_addr_v4);
234   if (priv->server_addr_v6)
235     gst_rtsp_address_free (priv->server_addr_v6);
236   if (priv->pool)
237     g_object_unref (priv->pool);
238   gst_object_unref (priv->payloader);
239   gst_object_unref (priv->srcpad);
240   g_free (priv->control);
241   g_mutex_clear (&priv->lock);
242
243   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
244 }
245
246 static void
247 gst_rtsp_stream_get_property (GObject * object, guint propid,
248     GValue * value, GParamSpec * pspec)
249 {
250   GstRTSPStream *stream = GST_RTSP_STREAM (object);
251
252   switch (propid) {
253     case PROP_CONTROL:
254       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
255       break;
256     case PROP_PROFILES:
257       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
258       break;
259     case PROP_PROTOCOLS:
260       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
261       break;
262     default:
263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
264   }
265 }
266
267 static void
268 gst_rtsp_stream_set_property (GObject * object, guint propid,
269     const GValue * value, GParamSpec * pspec)
270 {
271   GstRTSPStream *stream = GST_RTSP_STREAM (object);
272
273   switch (propid) {
274     case PROP_CONTROL:
275       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
276       break;
277     case PROP_PROFILES:
278       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
279       break;
280     case PROP_PROTOCOLS:
281       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
282       break;
283     default:
284       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
285   }
286 }
287
288 /**
289  * gst_rtsp_stream_new:
290  * @idx: an index
291  * @srcpad: a #GstPad
292  * @payloader: a #GstElement
293  *
294  * Create a new media stream with index @idx that handles RTP data on
295  * @srcpad and has a payloader element @payloader.
296  *
297  * Returns: (transfer full): a new #GstRTSPStream
298  */
299 GstRTSPStream *
300 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
301 {
302   GstRTSPStreamPrivate *priv;
303   GstRTSPStream *stream;
304
305   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
306   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
307   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
308
309   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
310   priv = stream->priv;
311   priv->idx = idx;
312   priv->payloader = gst_object_ref (payloader);
313   priv->srcpad = gst_object_ref (srcpad);
314
315   return stream;
316 }
317
318 /**
319  * gst_rtsp_stream_get_index:
320  * @stream: a #GstRTSPStream
321  *
322  * Get the stream index.
323  *
324  * Return: the stream index.
325  */
326 guint
327 gst_rtsp_stream_get_index (GstRTSPStream * stream)
328 {
329   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
330
331   return stream->priv->idx;
332 }
333
334 /**
335  * gst_rtsp_stream_get_pt:
336  * @stream: a #GstRTSPStream
337  *
338  * Get the stream payload type.
339  *
340  * Return: the stream payload type.
341  */
342 guint
343 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
344 {
345   GstRTSPStreamPrivate *priv;
346   guint pt;
347
348   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
349
350   priv = stream->priv;
351
352   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
353
354   return pt;
355 }
356
357 /**
358  * gst_rtsp_stream_get_srcpad:
359  * @stream: a #GstRTSPStream
360  *
361  * Get the srcpad associated with @stream.
362  *
363  * Returns: (transfer full): the srcpad. Unref after usage.
364  */
365 GstPad *
366 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
367 {
368   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
369
370   return gst_object_ref (stream->priv->srcpad);
371 }
372
373 /**
374  * gst_rtsp_stream_get_control:
375  * @stream: a #GstRTSPStream
376  *
377  * Get the control string to identify this stream.
378  *
379  * Returns: (transfer full): the control string. g_free() after usage.
380  */
381 gchar *
382 gst_rtsp_stream_get_control (GstRTSPStream * stream)
383 {
384   GstRTSPStreamPrivate *priv;
385   gchar *result;
386
387   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
388
389   priv = stream->priv;
390
391   g_mutex_lock (&priv->lock);
392   if ((result = g_strdup (priv->control)) == NULL)
393     result = g_strdup_printf ("stream=%u", priv->idx);
394   g_mutex_unlock (&priv->lock);
395
396   return result;
397 }
398
399 /**
400  * gst_rtsp_stream_set_control:
401  * @stream: a #GstRTSPStream
402  * @control: a control string
403  *
404  * Set the control string in @stream.
405  */
406 void
407 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
408 {
409   GstRTSPStreamPrivate *priv;
410
411   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
412
413   priv = stream->priv;
414
415   g_mutex_lock (&priv->lock);
416   g_free (priv->control);
417   priv->control = g_strdup (control);
418   g_mutex_unlock (&priv->lock);
419 }
420
421 /**
422  * gst_rtsp_stream_has_control:
423  * @stream: a #GstRTSPStream
424  * @control: a control string
425  *
426  * Check if @stream has the control string @control.
427  *
428  * Returns: %TRUE is @stream has @control as the control string
429  */
430 gboolean
431 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
432 {
433   GstRTSPStreamPrivate *priv;
434   gboolean res;
435
436   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
437
438   priv = stream->priv;
439
440   g_mutex_lock (&priv->lock);
441   if (priv->control)
442     res = (g_strcmp0 (priv->control, control) == 0);
443   else {
444     guint streamid;
445
446     if (sscanf (control, "stream=%u", &streamid) > 0)
447       res = (streamid == priv->idx);
448     else
449       res = FALSE;
450   }
451   g_mutex_unlock (&priv->lock);
452
453   return res;
454 }
455
456 /**
457  * gst_rtsp_stream_set_mtu:
458  * @stream: a #GstRTSPStream
459  * @mtu: a new MTU
460  *
461  * Configure the mtu in the payloader of @stream to @mtu.
462  */
463 void
464 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
465 {
466   GstRTSPStreamPrivate *priv;
467
468   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
469
470   priv = stream->priv;
471
472   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
473
474   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
475 }
476
477 /**
478  * gst_rtsp_stream_get_mtu:
479  * @stream: a #GstRTSPStream
480  *
481  * Get the configured MTU in the payloader of @stream.
482  *
483  * Returns: the MTU of the payloader.
484  */
485 guint
486 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
487 {
488   GstRTSPStreamPrivate *priv;
489   guint mtu;
490
491   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
492
493   priv = stream->priv;
494
495   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
496
497   return mtu;
498 }
499
500 /* Update the dscp qos property on the udp sinks */
501 static void
502 update_dscp_qos (GstRTSPStream * stream)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   if (priv->udpsink[0]) {
511     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
512         NULL);
513   }
514
515   if (priv->udpsink[1]) {
516     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
517         NULL);
518   }
519 }
520
521 /**
522  * gst_rtsp_stream_set_dscp_qos:
523  * @stream: a #GstRTSPStream
524  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
525  *
526  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
527  */
528 void
529 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
530 {
531   GstRTSPStreamPrivate *priv;
532
533   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
534
535   priv = stream->priv;
536
537   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
538
539   if (dscp_qos < -1 || dscp_qos > 63) {
540     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
541     return;
542   }
543
544   priv->dscp_qos = dscp_qos;
545
546   update_dscp_qos (stream);
547 }
548
549 /**
550  * gst_rtsp_stream_get_dscp_qos:
551  * @stream: a #GstRTSPStream
552  *
553  * Get the configured DSCP QoS in of the outgoing sockets.
554  *
555  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
556  */
557 gint
558 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
559 {
560   GstRTSPStreamPrivate *priv;
561
562   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
563
564   priv = stream->priv;
565
566   return priv->dscp_qos;
567 }
568
569 /**
570  * gst_rtsp_stream_is_transport_supported:
571  * @stream: a #GstRTSPStream
572  * @transport: (transfer none): a #GstRTSPTransport
573  *
574  * Check if @transport can be handled by stream
575  *
576  * Returns: %TRUE if @transport can be handled by @stream.
577  */
578 gboolean
579 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
580     GstRTSPTransport * transport)
581 {
582   GstRTSPStreamPrivate *priv;
583
584   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
585
586   priv = stream->priv;
587
588   g_mutex_lock (&priv->lock);
589   if (transport->trans != GST_RTSP_TRANS_RTP)
590     goto unsupported_transmode;
591
592   if (!(transport->profile & priv->profiles))
593     goto unsupported_profile;
594
595   if (!(transport->lower_transport & priv->protocols))
596     goto unsupported_ltrans;
597
598   g_mutex_unlock (&priv->lock);
599
600   return TRUE;
601
602   /* ERRORS */
603 unsupported_transmode:
604   {
605     GST_DEBUG ("unsupported transport mode %d", transport->trans);
606     g_mutex_unlock (&priv->lock);
607     return FALSE;
608   }
609 unsupported_profile:
610   {
611     GST_DEBUG ("unsupported profile %d", transport->profile);
612     g_mutex_unlock (&priv->lock);
613     return FALSE;
614   }
615 unsupported_ltrans:
616   {
617     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
618     g_mutex_unlock (&priv->lock);
619     return FALSE;
620   }
621 }
622
623 /**
624  * gst_rtsp_stream_set_profiles:
625  * @stream: a #GstRTSPStream
626  * @profiles: the new profiles
627  *
628  * Configure the allowed profiles for @stream.
629  */
630 void
631 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
632 {
633   GstRTSPStreamPrivate *priv;
634
635   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
636
637   priv = stream->priv;
638
639   g_mutex_lock (&priv->lock);
640   priv->profiles = profiles;
641   g_mutex_unlock (&priv->lock);
642 }
643
644 /**
645  * gst_rtsp_stream_get_profiles:
646  * @stream: a #GstRTSPStream
647  *
648  * Get the allowed profiles of @stream.
649  *
650  * Returns: a #GstRTSPProfile
651  */
652 GstRTSPProfile
653 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
654 {
655   GstRTSPStreamPrivate *priv;
656   GstRTSPProfile res;
657
658   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
659
660   priv = stream->priv;
661
662   g_mutex_lock (&priv->lock);
663   res = priv->profiles;
664   g_mutex_unlock (&priv->lock);
665
666   return res;
667 }
668
669 /**
670  * gst_rtsp_stream_set_protocols:
671  * @stream: a #GstRTSPStream
672  * @protocols: the new flags
673  *
674  * Configure the allowed lower transport for @stream.
675  */
676 void
677 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
678     GstRTSPLowerTrans protocols)
679 {
680   GstRTSPStreamPrivate *priv;
681
682   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
683
684   priv = stream->priv;
685
686   g_mutex_lock (&priv->lock);
687   priv->protocols = protocols;
688   g_mutex_unlock (&priv->lock);
689 }
690
691 /**
692  * gst_rtsp_stream_get_protocols:
693  * @stream: a #GstRTSPStream
694  *
695  * Get the allowed protocols of @stream.
696  *
697  * Returns: a #GstRTSPLowerTrans
698  */
699 GstRTSPLowerTrans
700 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
701 {
702   GstRTSPStreamPrivate *priv;
703   GstRTSPLowerTrans res;
704
705   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
706       GST_RTSP_LOWER_TRANS_UNKNOWN);
707
708   priv = stream->priv;
709
710   g_mutex_lock (&priv->lock);
711   res = priv->protocols;
712   g_mutex_unlock (&priv->lock);
713
714   return res;
715 }
716
717 /**
718  * gst_rtsp_stream_set_address_pool:
719  * @stream: a #GstRTSPStream
720  * @pool: (transfer none): a #GstRTSPAddressPool
721  *
722  * configure @pool to be used as the address pool of @stream.
723  */
724 void
725 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
726     GstRTSPAddressPool * pool)
727 {
728   GstRTSPStreamPrivate *priv;
729   GstRTSPAddressPool *old;
730
731   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
732
733   priv = stream->priv;
734
735   GST_LOG_OBJECT (stream, "set address pool %p", pool);
736
737   g_mutex_lock (&priv->lock);
738   if ((old = priv->pool) != pool)
739     priv->pool = pool ? g_object_ref (pool) : NULL;
740   else
741     old = NULL;
742   g_mutex_unlock (&priv->lock);
743
744   if (old)
745     g_object_unref (old);
746 }
747
748 /**
749  * gst_rtsp_stream_get_address_pool:
750  * @stream: a #GstRTSPStream
751  *
752  * Get the #GstRTSPAddressPool used as the address pool of @stream.
753  *
754  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
755  * usage.
756  */
757 GstRTSPAddressPool *
758 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
759 {
760   GstRTSPStreamPrivate *priv;
761   GstRTSPAddressPool *result;
762
763   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
764
765   priv = stream->priv;
766
767   g_mutex_lock (&priv->lock);
768   if ((result = priv->pool))
769     g_object_ref (result);
770   g_mutex_unlock (&priv->lock);
771
772   return result;
773 }
774
775 /**
776  * gst_rtsp_stream_get_multicast_address:
777  * @stream: a #GstRTSPStream
778  * @family: the #GSocketFamily
779  *
780  * Get the multicast address of @stream for @family.
781  *
782  * Returns: (transfer full): the #GstRTSPAddress of @stream or %NULL when no
783  * address could be allocated. gst_rtsp_address_free() after usage.
784  */
785 GstRTSPAddress *
786 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
787     GSocketFamily family)
788 {
789   GstRTSPStreamPrivate *priv;
790   GstRTSPAddress *result;
791   GstRTSPAddress **addrp;
792   GstRTSPAddressFlags flags;
793
794   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
795
796   priv = stream->priv;
797
798   if (family == G_SOCKET_FAMILY_IPV6) {
799     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
800     addrp = &priv->addr_v6;
801   } else {
802     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
803     addrp = &priv->addr_v4;
804   }
805
806   g_mutex_lock (&priv->lock);
807   if (*addrp == NULL) {
808     if (priv->pool == NULL)
809       goto no_pool;
810
811     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
812
813     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
814     if (*addrp == NULL)
815       goto no_address;
816   }
817   result = gst_rtsp_address_copy (*addrp);
818   g_mutex_unlock (&priv->lock);
819
820   return result;
821
822   /* ERRORS */
823 no_pool:
824   {
825     GST_ERROR_OBJECT (stream, "no address pool specified");
826     g_mutex_unlock (&priv->lock);
827     return NULL;
828   }
829 no_address:
830   {
831     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
832     g_mutex_unlock (&priv->lock);
833     return NULL;
834   }
835 }
836
837 /**
838  * gst_rtsp_stream_reserve_address:
839  * @stream: a #GstRTSPStream
840  * @address: an address
841  * @port: a port
842  * @n_ports: n_ports
843  * @ttl: a TTL
844  *
845  * Reserve @address and @port as the address and port of @stream.
846  *
847  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
848  * reserved. gst_rtsp_address_free() after usage.
849  */
850 GstRTSPAddress *
851 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
852     const gchar * address, guint port, guint n_ports, guint ttl)
853 {
854   GstRTSPStreamPrivate *priv;
855   GstRTSPAddress *result;
856   GInetAddress *addr;
857   GSocketFamily family;
858   GstRTSPAddress **addrp;
859
860   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
861   g_return_val_if_fail (address != NULL, NULL);
862   g_return_val_if_fail (port > 0, NULL);
863   g_return_val_if_fail (n_ports > 0, NULL);
864   g_return_val_if_fail (ttl > 0, NULL);
865
866   priv = stream->priv;
867
868   addr = g_inet_address_new_from_string (address);
869   if (!addr) {
870     GST_ERROR ("failed to get inet addr from %s", address);
871     family = G_SOCKET_FAMILY_IPV4;
872   } else {
873     family = g_inet_address_get_family (addr);
874     g_object_unref (addr);
875   }
876
877   if (family == G_SOCKET_FAMILY_IPV6)
878     addrp = &priv->addr_v6;
879   else
880     addrp = &priv->addr_v4;
881
882   g_mutex_lock (&priv->lock);
883   if (*addrp == NULL) {
884     GstRTSPAddressPoolResult res;
885
886     if (priv->pool == NULL)
887       goto no_pool;
888
889     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
890         port, n_ports, ttl, addrp);
891     if (res != GST_RTSP_ADDRESS_POOL_OK)
892       goto no_address;
893   } else {
894     if (strcmp ((*addrp)->address, address) ||
895         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
896         (*addrp)->ttl != ttl)
897       goto different_address;
898   }
899   result = gst_rtsp_address_copy (*addrp);
900   g_mutex_unlock (&priv->lock);
901
902   return result;
903
904   /* ERRORS */
905 no_pool:
906   {
907     GST_ERROR_OBJECT (stream, "no address pool specified");
908     g_mutex_unlock (&priv->lock);
909     return NULL;
910   }
911 no_address:
912   {
913     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
914         address);
915     g_mutex_unlock (&priv->lock);
916     return NULL;
917   }
918 different_address:
919   {
920     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
921         " reserved", address);
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 }
926
927 static gboolean
928 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
929     GSocketFamily family, GstElement * udpsrc_out[2],
930     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
931     GstRTSPAddress ** server_addr_out)
932 {
933   GstStateChangeReturn ret;
934   GstElement *udpsrc0, *udpsrc1;
935   GstElement *udpsink0, *udpsink1;
936   GSocket *rtp_socket = NULL;
937   GSocket *rtcp_socket;
938   gint tmp_rtp, tmp_rtcp;
939   guint count;
940   gint rtpport, rtcpport;
941   GList *rejected_addresses = NULL;
942   GstRTSPAddress *addr = NULL;
943   GInetAddress *inetaddr = NULL;
944   GSocketAddress *rtp_sockaddr = NULL;
945   GSocketAddress *rtcp_sockaddr = NULL;
946   const gchar *multisink_socket;
947
948   if (family == G_SOCKET_FAMILY_IPV6)
949     multisink_socket = "socket-v6";
950   else
951     multisink_socket = "socket";
952
953   udpsrc0 = NULL;
954   udpsrc1 = NULL;
955   udpsink0 = NULL;
956   udpsink1 = NULL;
957   count = 0;
958
959   /* Start with random port */
960   tmp_rtp = 0;
961
962   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
963       G_SOCKET_PROTOCOL_UDP, NULL);
964   if (!rtcp_socket)
965     goto no_udp_protocol;
966
967   if (*server_addr_out)
968     gst_rtsp_address_free (*server_addr_out);
969
970   /* try to allocate 2 UDP ports, the RTP port should be an even
971    * number and the RTCP port should be the next (uneven) port */
972 again:
973
974   if (rtp_socket == NULL) {
975     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
976         G_SOCKET_PROTOCOL_UDP, NULL);
977     if (!rtp_socket)
978       goto no_udp_protocol;
979   }
980
981   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
982     GstRTSPAddressFlags flags;
983
984     if (addr)
985       rejected_addresses = g_list_prepend (rejected_addresses, addr);
986
987     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
988     if (family == G_SOCKET_FAMILY_IPV6)
989       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
990     else
991       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
992
993     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
994
995     if (addr == NULL)
996       goto no_ports;
997
998     tmp_rtp = addr->port;
999
1000     g_clear_object (&inetaddr);
1001     inetaddr = g_inet_address_new_from_string (addr->address);
1002   } else {
1003     if (tmp_rtp != 0) {
1004       tmp_rtp += 2;
1005       if (++count > 20)
1006         goto no_ports;
1007     }
1008
1009     if (inetaddr == NULL)
1010       inetaddr = g_inet_address_new_any (family);
1011   }
1012
1013   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1014   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1015     g_object_unref (rtp_sockaddr);
1016     goto again;
1017   }
1018   g_object_unref (rtp_sockaddr);
1019
1020   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1021   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1022     g_clear_object (&rtp_sockaddr);
1023     goto socket_error;
1024   }
1025
1026   tmp_rtp =
1027       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1028   g_object_unref (rtp_sockaddr);
1029
1030   /* check if port is even */
1031   if ((tmp_rtp & 1) != 0) {
1032     /* port not even, close and allocate another */
1033     tmp_rtp++;
1034     g_clear_object (&rtp_socket);
1035     goto again;
1036   }
1037
1038   /* set port */
1039   tmp_rtcp = tmp_rtp + 1;
1040
1041   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1042   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1043     g_object_unref (rtcp_sockaddr);
1044     g_clear_object (&rtp_socket);
1045     goto again;
1046   }
1047   g_object_unref (rtcp_sockaddr);
1048
1049   g_clear_object (&inetaddr);
1050
1051   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1052   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1053
1054   if (udpsrc0 == NULL || udpsrc1 == NULL)
1055     goto no_udp_protocol;
1056
1057   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1058   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1059
1060   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
1061   if (ret == GST_STATE_CHANGE_FAILURE)
1062     goto element_error;
1063   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
1064   if (ret == GST_STATE_CHANGE_FAILURE)
1065     goto element_error;
1066
1067   /* all fine, do port check */
1068   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1069   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1070
1071   /* this should not happen... */
1072   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1073     goto port_error;
1074
1075   if (udpsink_out[0])
1076     udpsink0 = udpsink_out[0];
1077   else
1078     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1079
1080   if (!udpsink0)
1081     goto no_udp_protocol;
1082
1083   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1085
1086   if (udpsink_out[1])
1087     udpsink1 = udpsink_out[1];
1088   else
1089     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1090
1091   if (!udpsink1)
1092     goto no_udp_protocol;
1093
1094   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1095   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1096   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1097
1098   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1099   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1100   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1101   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1102   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1103   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1104   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1105   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1106
1107   /* we keep these elements, we will further configure them when the
1108    * client told us to really use the UDP ports. */
1109   udpsrc_out[0] = udpsrc0;
1110   udpsrc_out[1] = udpsrc1;
1111   udpsink_out[0] = udpsink0;
1112   udpsink_out[1] = udpsink1;
1113   server_port_out->min = rtpport;
1114   server_port_out->max = rtcpport;
1115
1116   *server_addr_out = addr;
1117   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1118
1119   g_object_unref (rtp_socket);
1120   g_object_unref (rtcp_socket);
1121
1122   return TRUE;
1123
1124   /* ERRORS */
1125 no_udp_protocol:
1126   {
1127     goto cleanup;
1128   }
1129 no_ports:
1130   {
1131     goto cleanup;
1132   }
1133 port_error:
1134   {
1135     goto cleanup;
1136   }
1137 socket_error:
1138   {
1139     goto cleanup;
1140   }
1141 element_error:
1142   {
1143     goto cleanup;
1144   }
1145 cleanup:
1146   {
1147     if (udpsrc0) {
1148       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1149       gst_object_unref (udpsrc0);
1150     }
1151     if (udpsrc1) {
1152       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1153       gst_object_unref (udpsrc1);
1154     }
1155     if (udpsink0) {
1156       gst_element_set_state (udpsink0, GST_STATE_NULL);
1157       gst_object_unref (udpsink0);
1158     }
1159     if (inetaddr)
1160       g_object_unref (inetaddr);
1161     g_list_free_full (rejected_addresses,
1162         (GDestroyNotify) gst_rtsp_address_free);
1163     if (addr)
1164       gst_rtsp_address_free (addr);
1165     if (rtp_socket)
1166       g_object_unref (rtp_socket);
1167     if (rtcp_socket)
1168       g_object_unref (rtcp_socket);
1169     return FALSE;
1170   }
1171 }
1172
1173 /* must be called with lock */
1174 static gboolean
1175 alloc_ports (GstRTSPStream * stream)
1176 {
1177   GstRTSPStreamPrivate *priv = stream->priv;
1178
1179   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1180       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1181       &priv->server_port_v4, &priv->server_addr_v4);
1182
1183   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1184       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1185       &priv->server_port_v6, &priv->server_addr_v6);
1186
1187   return priv->have_ipv4 || priv->have_ipv6;
1188 }
1189
1190 /**
1191  * gst_rtsp_stream_get_server_port:
1192  * @stream: a #GstRTSPStream
1193  * @server_port: (out): result server port
1194  * @family: the port family to get
1195  *
1196  * Fill @server_port with the port pair used by the server. This function can
1197  * only be called when @stream has been joined.
1198  */
1199 void
1200 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1201     GstRTSPRange * server_port, GSocketFamily family)
1202 {
1203   GstRTSPStreamPrivate *priv;
1204
1205   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1206   priv = stream->priv;
1207   g_return_if_fail (priv->is_joined);
1208
1209   g_mutex_lock (&priv->lock);
1210   if (family == G_SOCKET_FAMILY_IPV4) {
1211     if (server_port)
1212       *server_port = priv->server_port_v4;
1213   } else {
1214     if (server_port)
1215       *server_port = priv->server_port_v6;
1216   }
1217   g_mutex_unlock (&priv->lock);
1218 }
1219
1220 /**
1221  * gst_rtsp_stream_get_rtpsession:
1222  * @stream: a #GstRTSPStream
1223  *
1224  * Get the RTP session of this stream.
1225  *
1226  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1227  */
1228 GObject *
1229 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1230 {
1231   GstRTSPStreamPrivate *priv;
1232   GObject *session;
1233
1234   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1235
1236   priv = stream->priv;
1237
1238   g_mutex_lock (&priv->lock);
1239   if ((session = priv->session))
1240     g_object_ref (session);
1241   g_mutex_unlock (&priv->lock);
1242
1243   return session;
1244 }
1245
1246 /**
1247  * gst_rtsp_stream_get_ssrc:
1248  * @stream: a #GstRTSPStream
1249  * @ssrc: (out): result ssrc
1250  *
1251  * Get the SSRC used by the RTP session of this stream. This function can only
1252  * be called when @stream has been joined.
1253  */
1254 void
1255 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1256 {
1257   GstRTSPStreamPrivate *priv;
1258
1259   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1260   priv = stream->priv;
1261   g_return_if_fail (priv->is_joined);
1262
1263   g_mutex_lock (&priv->lock);
1264   if (ssrc && priv->session)
1265     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1266   g_mutex_unlock (&priv->lock);
1267 }
1268
1269 /* executed from streaming thread */
1270 static void
1271 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1272 {
1273   GstRTSPStreamPrivate *priv = stream->priv;
1274   GstCaps *newcaps, *oldcaps;
1275
1276   newcaps = gst_pad_get_current_caps (pad);
1277
1278   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1279       newcaps);
1280
1281   g_mutex_lock (&priv->lock);
1282   oldcaps = priv->caps;
1283   priv->caps = newcaps;
1284   g_mutex_unlock (&priv->lock);
1285
1286   if (oldcaps)
1287     gst_caps_unref (oldcaps);
1288 }
1289
1290 static void
1291 dump_structure (const GstStructure * s)
1292 {
1293   gchar *sstr;
1294
1295   sstr = gst_structure_to_string (s);
1296   GST_INFO ("structure: %s", sstr);
1297   g_free (sstr);
1298 }
1299
1300 static GstRTSPStreamTransport *
1301 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1302 {
1303   GstRTSPStreamPrivate *priv = stream->priv;
1304   GList *walk;
1305   GstRTSPStreamTransport *result = NULL;
1306   const gchar *tmp;
1307   gchar *dest;
1308   guint port;
1309
1310   if (rtcp_from == NULL)
1311     return NULL;
1312
1313   tmp = g_strrstr (rtcp_from, ":");
1314   if (tmp == NULL)
1315     return NULL;
1316
1317   port = atoi (tmp + 1);
1318   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1319
1320   g_mutex_lock (&priv->lock);
1321   GST_INFO ("finding %s:%d in %d transports", dest, port,
1322       g_list_length (priv->transports));
1323
1324   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1325     GstRTSPStreamTransport *trans = walk->data;
1326     const GstRTSPTransport *tr;
1327     gint min, max;
1328
1329     tr = gst_rtsp_stream_transport_get_transport (trans);
1330
1331     min = tr->client_port.min;
1332     max = tr->client_port.max;
1333
1334     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1335       result = trans;
1336       break;
1337     }
1338   }
1339   if (result)
1340     g_object_ref (result);
1341   g_mutex_unlock (&priv->lock);
1342
1343   g_free (dest);
1344
1345   return result;
1346 }
1347
1348 static GstRTSPStreamTransport *
1349 check_transport (GObject * source, GstRTSPStream * stream)
1350 {
1351   GstStructure *stats;
1352   GstRTSPStreamTransport *trans;
1353
1354   /* see if we have a stream to match with the origin of the RTCP packet */
1355   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1356   if (trans == NULL) {
1357     g_object_get (source, "stats", &stats, NULL);
1358     if (stats) {
1359       const gchar *rtcp_from;
1360
1361       dump_structure (stats);
1362
1363       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1364       if ((trans = find_transport (stream, rtcp_from))) {
1365         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1366             source);
1367         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1368             g_object_unref);
1369       }
1370       gst_structure_free (stats);
1371     }
1372   }
1373   return trans;
1374 }
1375
1376
1377 static void
1378 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1379 {
1380   GstRTSPStreamTransport *trans;
1381
1382   GST_INFO ("%p: new source %p", stream, source);
1383
1384   trans = check_transport (source, stream);
1385
1386   if (trans)
1387     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1388 }
1389
1390 static void
1391 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1392 {
1393   GST_INFO ("%p: new SDES %p", stream, source);
1394 }
1395
1396 static void
1397 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1398 {
1399   GstRTSPStreamTransport *trans;
1400
1401   trans = check_transport (source, stream);
1402
1403   if (trans) {
1404     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1405     gst_rtsp_stream_transport_keep_alive (trans);
1406   }
1407 #ifdef DUMP_STATS
1408   {
1409     GstStructure *stats;
1410     g_object_get (source, "stats", &stats, NULL);
1411     if (stats) {
1412       dump_structure (stats);
1413       gst_structure_free (stats);
1414     }
1415   }
1416 #endif
1417 }
1418
1419 static void
1420 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1421 {
1422   GST_INFO ("%p: source %p bye", stream, source);
1423 }
1424
1425 static void
1426 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1427 {
1428   GstRTSPStreamTransport *trans;
1429
1430   GST_INFO ("%p: source %p bye timeout", stream, source);
1431
1432   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1433     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1434     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1435   }
1436 }
1437
1438 static void
1439 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1440 {
1441   GstRTSPStreamTransport *trans;
1442
1443   GST_INFO ("%p: source %p timeout", stream, source);
1444
1445   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1446     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1447     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1448   }
1449 }
1450
1451 static void
1452 clear_tr_cache (GstRTSPStreamPrivate * priv)
1453 {
1454   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1455   g_list_free (priv->tr_cache);
1456   priv->tr_cache = NULL;
1457 }
1458
1459 static GstFlowReturn
1460 handle_new_sample (GstAppSink * sink, gpointer user_data)
1461 {
1462   GstRTSPStreamPrivate *priv;
1463   GList *walk;
1464   GstSample *sample;
1465   GstBuffer *buffer;
1466   GstRTSPStream *stream;
1467   gboolean is_rtp;
1468
1469   sample = gst_app_sink_pull_sample (sink);
1470   if (!sample)
1471     return GST_FLOW_OK;
1472
1473   stream = (GstRTSPStream *) user_data;
1474   priv = stream->priv;
1475   buffer = gst_sample_get_buffer (sample);
1476
1477   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1478
1479   g_mutex_lock (&priv->lock);
1480   if (priv->tr_changed) {
1481     clear_tr_cache (priv);
1482     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1483       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1484       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1485     }
1486     priv->tr_changed = FALSE;
1487   }
1488   g_mutex_unlock (&priv->lock);
1489
1490   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1491     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1492
1493     if (is_rtp) {
1494       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1495     } else {
1496       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1497     }
1498   }
1499   gst_sample_unref (sample);
1500
1501   return GST_FLOW_OK;
1502 }
1503
1504 static GstAppSinkCallbacks sink_cb = {
1505   NULL,                         /* not interested in EOS */
1506   NULL,                         /* not interested in preroll samples */
1507   handle_new_sample,
1508 };
1509
1510 static GstElement *
1511 get_rtp_encoder (GstRTSPStream * stream, guint session)
1512 {
1513   GstRTSPStreamPrivate *priv = stream->priv;
1514
1515   if (priv->srtpenc == NULL) {
1516     gchar *name;
1517
1518     name = g_strdup_printf ("srtpenc_%u", session);
1519     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1520     g_free (name);
1521
1522     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1523   }
1524   return gst_object_ref (priv->srtpenc);
1525 }
1526
1527 static GstElement *
1528 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1529 {
1530   GstRTSPStreamPrivate *priv = stream->priv;
1531   GstElement *enc;
1532   GstPad *pad;
1533   gchar *name;
1534
1535   if (priv->idx != session)
1536     return NULL;
1537
1538   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1539
1540   enc = get_rtp_encoder (stream, session);
1541   name = g_strdup_printf ("rtp_sink_%d", session);
1542   pad = gst_element_get_request_pad (enc, name);
1543   g_free (name);
1544   gst_object_unref (pad);
1545
1546   return enc;
1547 }
1548
1549 static GstElement *
1550 request_rtcp_encoder (GstElement * rtpbin, guint session,
1551     GstRTSPStream * stream)
1552 {
1553   GstRTSPStreamPrivate *priv = stream->priv;
1554   GstElement *enc;
1555   GstPad *pad;
1556   gchar *name;
1557
1558   if (priv->idx != session)
1559     return NULL;
1560
1561   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1562
1563   enc = get_rtp_encoder (stream, session);
1564   name = g_strdup_printf ("rtcp_sink_%d", session);
1565   pad = gst_element_get_request_pad (enc, name);
1566   g_free (name);
1567   gst_object_unref (pad);
1568
1569   return enc;
1570 }
1571
1572 static GstElement *
1573 request_rtcp_decoder (GstElement * rtpbin, guint session,
1574     GstRTSPStream * stream)
1575 {
1576   GstRTSPStreamPrivate *priv = stream->priv;
1577
1578   if (priv->idx != session)
1579     return NULL;
1580
1581   if (priv->srtpdec == NULL) {
1582     gchar *name;
1583
1584     name = g_strdup_printf ("srtpdec_%u", session);
1585     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1586     g_free (name);
1587   }
1588   return gst_object_ref (priv->srtpdec);
1589 }
1590
1591 /**
1592  * gst_rtsp_stream_join_bin:
1593  * @stream: a #GstRTSPStream
1594  * @bin: (transfer none): a #GstBin to join
1595  * @rtpbin: (transfer none): a rtpbin element in @bin
1596  * @state: the target state of the new elements
1597  *
1598  * Join the #GstBin @bin that contains the element @rtpbin.
1599  *
1600  * @stream will link to @rtpbin, which must be inside @bin. The elements
1601  * added to @bin will be set to the state given in @state.
1602  *
1603  * Returns: %TRUE on success.
1604  */
1605 gboolean
1606 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1607     GstElement * rtpbin, GstState state)
1608 {
1609   GstRTSPStreamPrivate *priv;
1610   gint i;
1611   guint idx;
1612   gchar *name;
1613   GstPad *pad, *sinkpad, *selpad;
1614   GstPadLinkReturn ret;
1615
1616   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1617   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1618   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1619
1620   priv = stream->priv;
1621
1622   g_mutex_lock (&priv->lock);
1623   if (priv->is_joined)
1624     goto was_joined;
1625
1626   /* create a session with the same index as the stream */
1627   idx = priv->idx;
1628
1629   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1630
1631   if (!alloc_ports (stream))
1632     goto no_ports;
1633
1634   /* update the dscp qos field in the sinks */
1635   update_dscp_qos (stream);
1636
1637   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1638       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1639     /* For SRTP */
1640     g_signal_connect (rtpbin, "request-rtp-encoder",
1641         (GCallback) request_rtp_encoder, stream);
1642     g_signal_connect (rtpbin, "request-rtcp-encoder",
1643         (GCallback) request_rtcp_encoder, stream);
1644 #if 0
1645     g_signal_connect (rtpbin, "request-rtp-decoder",
1646         (GCallback) request_rtp_decoder, stream);
1647 #endif
1648     g_signal_connect (rtpbin, "request-rtcp-decoder",
1649         (GCallback) request_rtcp_decoder, stream);
1650   }
1651
1652   /* get a pad for sending RTP */
1653   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1654   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1655   g_free (name);
1656   /* link the RTP pad to the session manager, it should not really fail unless
1657    * this is not really an RTP pad */
1658   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1659   if (ret != GST_PAD_LINK_OK)
1660     goto link_failed;
1661
1662   /* get pads from the RTP session element for sending and receiving
1663    * RTP/RTCP*/
1664   name = g_strdup_printf ("send_rtp_src_%u", idx);
1665   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1666   g_free (name);
1667   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1668   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1669   g_free (name);
1670   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1671   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1672   g_free (name);
1673   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1674   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1675   g_free (name);
1676
1677   /* get the session */
1678   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1679
1680   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1681       stream);
1682   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1683       stream);
1684   g_signal_connect (priv->session, "on-ssrc-active",
1685       (GCallback) on_ssrc_active, stream);
1686   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1687       stream);
1688   g_signal_connect (priv->session, "on-bye-timeout",
1689       (GCallback) on_bye_timeout, stream);
1690   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1691       stream);
1692
1693   for (i = 0; i < 2; i++) {
1694     GstPad *teepad, *queuepad;
1695     /* For the sender we create this bit of pipeline for both
1696      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1697      * we need to add a queue before appsink to make the pipeline
1698      * not block. For the TCP case, we want to pump data to the
1699      * client as fast as possible anyway.
1700      *
1701      * .--------.      .-----.    .---------.
1702      * | rtpbin |      | tee |    | udpsink |
1703      * |       send->sink   src->sink       |
1704      * '--------'      |     |    '---------'
1705      *                 |     |    .---------.    .---------.
1706      *                 |     |    |  queue  |    | appsink |
1707      *                 |    src->sink      src->sink       |
1708      *                 '-----'    '---------'    '---------'
1709      *
1710      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1711      * udpsink directly to the session.
1712      */
1713     /* add udpsink */
1714     gst_bin_add (bin, priv->udpsink[i]);
1715     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1716
1717     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1718       /* make tee for RTP/RTCP */
1719       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1720       gst_bin_add (bin, priv->tee[i]);
1721
1722       /* and link to rtpbin send pad */
1723       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1724       gst_pad_link (priv->send_src[i], pad);
1725       gst_object_unref (pad);
1726
1727       /* link tee to udpsink */
1728       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1729       gst_pad_link (teepad, sinkpad);
1730       gst_object_unref (teepad);
1731
1732       /* make queue */
1733       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1734       gst_bin_add (bin, priv->appqueue[i]);
1735       /* and link to tee */
1736       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1737       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1738       gst_pad_link (teepad, pad);
1739       gst_object_unref (pad);
1740       gst_object_unref (teepad);
1741
1742       /* make appsink */
1743       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1744       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1745       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1746       gst_bin_add (bin, priv->appsink[i]);
1747       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1748           &sink_cb, stream, NULL);
1749       /* and link to queue */
1750       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1751       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1752       gst_pad_link (queuepad, pad);
1753       gst_object_unref (pad);
1754       gst_object_unref (queuepad);
1755     } else {
1756       /* else only udpsink needed, link it to the session */
1757       gst_pad_link (priv->send_src[i], sinkpad);
1758     }
1759     gst_object_unref (sinkpad);
1760
1761     /* For the receiver we create this bit of pipeline for both
1762      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1763      * and it is all funneled into the rtpbin receive pad.
1764      *
1765      * .--------.     .--------.    .--------.
1766      * | udpsrc |     | funnel |    | rtpbin |
1767      * |       src->sink      src->sink      |
1768      * '--------'     |        |    '--------'
1769      * .--------.     |        |
1770      * | appsrc |     |        |
1771      * |       src->sink       |
1772      * '--------'     '--------'
1773      */
1774     /* make funnel for the RTP/RTCP receivers */
1775     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1776     gst_bin_add (bin, priv->funnel[i]);
1777
1778     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1779     gst_pad_link (pad, priv->recv_sink[i]);
1780     gst_object_unref (pad);
1781
1782     if (priv->udpsrc_v4[i]) {
1783       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1784        * values */
1785       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1786       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1787       /* add udpsrc */
1788       gst_bin_add (bin, priv->udpsrc_v4[i]);
1789
1790       /* and link to the funnel v4 */
1791       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1792       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1793       gst_pad_link (pad, selpad);
1794       gst_object_unref (pad);
1795       gst_object_unref (selpad);
1796     }
1797
1798     if (priv->udpsrc_v6[i]) {
1799       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1800       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1801       gst_bin_add (bin, priv->udpsrc_v6[i]);
1802
1803       /* and link to the funnel v6 */
1804       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1805       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1806       gst_pad_link (pad, selpad);
1807       gst_object_unref (pad);
1808       gst_object_unref (selpad);
1809     }
1810
1811     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1812       /* make and add appsrc */
1813       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1814       gst_bin_add (bin, priv->appsrc[i]);
1815       /* and link to the funnel */
1816       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1817       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1818       gst_pad_link (pad, selpad);
1819       gst_object_unref (pad);
1820       gst_object_unref (selpad);
1821     }
1822
1823     /* check if we need to set to a special state */
1824     if (state != GST_STATE_NULL) {
1825       if (priv->udpsink[i])
1826         gst_element_set_state (priv->udpsink[i], state);
1827       if (priv->appsink[i])
1828         gst_element_set_state (priv->appsink[i], state);
1829       if (priv->appqueue[i])
1830         gst_element_set_state (priv->appqueue[i], state);
1831       if (priv->tee[i])
1832         gst_element_set_state (priv->tee[i], state);
1833       if (priv->funnel[i])
1834         gst_element_set_state (priv->funnel[i], state);
1835       if (priv->appsrc[i])
1836         gst_element_set_state (priv->appsrc[i], state);
1837     }
1838   }
1839
1840   /* be notified of caps changes */
1841   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
1842       (GCallback) caps_notify, stream);
1843
1844   priv->is_joined = TRUE;
1845   g_mutex_unlock (&priv->lock);
1846
1847   return TRUE;
1848
1849   /* ERRORS */
1850 was_joined:
1851   {
1852     g_mutex_unlock (&priv->lock);
1853     return TRUE;
1854   }
1855 no_ports:
1856   {
1857     g_mutex_unlock (&priv->lock);
1858     GST_WARNING ("failed to allocate ports %u", idx);
1859     return FALSE;
1860   }
1861 link_failed:
1862   {
1863     GST_WARNING ("failed to link stream %u", idx);
1864     gst_object_unref (priv->send_rtp_sink);
1865     priv->send_rtp_sink = NULL;
1866     g_mutex_unlock (&priv->lock);
1867     return FALSE;
1868   }
1869 }
1870
1871 /**
1872  * gst_rtsp_stream_leave_bin:
1873  * @stream: a #GstRTSPStream
1874  * @bin: (transfer none): a #GstBin
1875  * @rtpbin: (transfer none): a rtpbin #GstElement
1876  *
1877  * Remove the elements of @stream from @bin.
1878  *
1879  * Return: %TRUE on success.
1880  */
1881 gboolean
1882 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1883     GstElement * rtpbin)
1884 {
1885   GstRTSPStreamPrivate *priv;
1886   gint i;
1887
1888   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1889   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1890   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1891
1892   priv = stream->priv;
1893
1894   g_mutex_lock (&priv->lock);
1895   if (!priv->is_joined)
1896     goto was_not_joined;
1897
1898   /* all transports must be removed by now */
1899   g_return_val_if_fail (priv->transports == NULL, FALSE);
1900
1901   clear_tr_cache (priv);
1902
1903   GST_INFO ("stream %p leaving bin", stream);
1904
1905   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1906   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
1907   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1908   gst_object_unref (priv->send_rtp_sink);
1909   priv->send_rtp_sink = NULL;
1910
1911   for (i = 0; i < 2; i++) {
1912     if (priv->udpsink[i])
1913       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1914     if (priv->appsink[i])
1915       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1916     if (priv->appqueue[i])
1917       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1918     if (priv->tee[i])
1919       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1920     if (priv->funnel[i])
1921       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1922     if (priv->appsrc[i])
1923       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1924     if (priv->udpsrc_v4[i]) {
1925       /* and set udpsrc to NULL now before removing */
1926       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1927       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1928       /* removing them should also nicely release the request
1929        * pads when they finalize */
1930       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1931     }
1932     if (priv->udpsrc_v6[i]) {
1933       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1934       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1935       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1936     }
1937     if (priv->udpsink[i])
1938       gst_bin_remove (bin, priv->udpsink[i]);
1939     if (priv->appsrc[i])
1940       gst_bin_remove (bin, priv->appsrc[i]);
1941     if (priv->appsink[i])
1942       gst_bin_remove (bin, priv->appsink[i]);
1943     if (priv->appqueue[i])
1944       gst_bin_remove (bin, priv->appqueue[i]);
1945     if (priv->tee[i])
1946       gst_bin_remove (bin, priv->tee[i]);
1947     if (priv->funnel[i])
1948       gst_bin_remove (bin, priv->funnel[i]);
1949
1950     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1951     gst_object_unref (priv->recv_sink[i]);
1952     priv->recv_sink[i] = NULL;
1953
1954     priv->udpsrc_v4[i] = NULL;
1955     priv->udpsrc_v6[i] = NULL;
1956     priv->udpsink[i] = NULL;
1957     priv->appsrc[i] = NULL;
1958     priv->appsink[i] = NULL;
1959     priv->appqueue[i] = NULL;
1960     priv->tee[i] = NULL;
1961     priv->funnel[i] = NULL;
1962   }
1963   gst_object_unref (priv->send_src[0]);
1964   priv->send_src[0] = NULL;
1965
1966   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1967   gst_object_unref (priv->send_src[1]);
1968   priv->send_src[1] = NULL;
1969
1970   g_object_unref (priv->session);
1971   priv->session = NULL;
1972   if (priv->caps)
1973     gst_caps_unref (priv->caps);
1974   priv->caps = NULL;
1975
1976   if (priv->srtpenc)
1977     gst_object_unref (priv->srtpenc);
1978
1979   priv->is_joined = FALSE;
1980   g_mutex_unlock (&priv->lock);
1981
1982   return TRUE;
1983
1984 was_not_joined:
1985   {
1986     g_mutex_unlock (&priv->lock);
1987     return TRUE;
1988   }
1989 }
1990
1991 /**
1992  * gst_rtsp_stream_get_rtpinfo:
1993  * @stream: a #GstRTSPStream
1994  * @rtptime: (allow-none): result RTP timestamp
1995  * @seq: (allow-none): result RTP seqnum
1996  * @clock_rate: (allow-none): the clock rate
1997  * @running_time: (allow-none): result running-time
1998  *
1999  * Retrieve the current rtptime, seq and running-time. This is used to
2000  * construct a RTPInfo reply header.
2001  *
2002  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2003  */
2004 gboolean
2005 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2006     guint * rtptime, guint * seq, guint * clock_rate,
2007     GstClockTime * running_time)
2008 {
2009   GstRTSPStreamPrivate *priv;
2010   GstStructure *stats;
2011   GObjectClass *payobjclass;
2012
2013   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2014
2015   priv = stream->priv;
2016
2017   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2018
2019   g_mutex_lock (&priv->lock);
2020
2021   if (g_object_class_find_property (payobjclass, "stats")) {
2022     g_object_get (priv->payloader, "stats", &stats, NULL);
2023     if (stats == NULL)
2024       goto no_stats;
2025
2026     if (seq)
2027       gst_structure_get_uint (stats, "seqnum", seq);
2028
2029     if (rtptime)
2030       gst_structure_get_uint (stats, "timestamp", rtptime);
2031
2032     if (running_time)
2033       gst_structure_get_clock_time (stats, "running-time", running_time);
2034
2035     if (clock_rate) {
2036       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2037       if (*clock_rate == 0 && running_time)
2038         *running_time = GST_CLOCK_TIME_NONE;
2039     }
2040     gst_structure_free (stats);
2041   } else {
2042     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2043         !g_object_class_find_property (payobjclass, "timestamp"))
2044       goto no_stats;
2045
2046     if (seq)
2047       g_object_get (priv->payloader, "seqnum", seq, NULL);
2048
2049     if (rtptime)
2050       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2051
2052     if (running_time)
2053       *running_time = GST_CLOCK_TIME_NONE;
2054   }
2055   g_mutex_unlock (&priv->lock);
2056
2057   return TRUE;
2058
2059   /* ERRORS */
2060 no_stats:
2061   {
2062     GST_WARNING ("Could not get payloader stats");
2063     g_mutex_unlock (&priv->lock);
2064     return FALSE;
2065   }
2066 }
2067
2068 /**
2069  * gst_rtsp_stream_get_caps:
2070  * @stream: a #GstRTSPStream
2071  *
2072  * Retrieve the current caps of @stream.
2073  *
2074  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2075  * after usage.
2076  */
2077 GstCaps *
2078 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2079 {
2080   GstRTSPStreamPrivate *priv;
2081   GstCaps *result;
2082
2083   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2084
2085   priv = stream->priv;
2086
2087   g_mutex_lock (&priv->lock);
2088   if ((result = priv->caps))
2089     gst_caps_ref (result);
2090   g_mutex_unlock (&priv->lock);
2091
2092   return result;
2093 }
2094
2095 /**
2096  * gst_rtsp_stream_recv_rtp:
2097  * @stream: a #GstRTSPStream
2098  * @buffer: (transfer full): a #GstBuffer
2099  *
2100  * Handle an RTP buffer for the stream. This method is usually called when a
2101  * message has been received from a client using the TCP transport.
2102  *
2103  * This function takes ownership of @buffer.
2104  *
2105  * Returns: a GstFlowReturn.
2106  */
2107 GstFlowReturn
2108 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2109 {
2110   GstRTSPStreamPrivate *priv;
2111   GstFlowReturn ret;
2112   GstElement *element;
2113
2114   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2115   priv = stream->priv;
2116   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2117   g_return_val_if_fail (priv->is_joined, FALSE);
2118
2119   g_mutex_lock (&priv->lock);
2120   if (priv->appsrc[0])
2121     element = gst_object_ref (priv->appsrc[0]);
2122   else
2123     element = NULL;
2124   g_mutex_unlock (&priv->lock);
2125
2126   if (element) {
2127     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2128     gst_object_unref (element);
2129   } else {
2130     ret = GST_FLOW_OK;
2131   }
2132   return ret;
2133 }
2134
2135 /**
2136  * gst_rtsp_stream_recv_rtcp:
2137  * @stream: a #GstRTSPStream
2138  * @buffer: (transfer full): a #GstBuffer
2139  *
2140  * Handle an RTCP buffer for the stream. This method is usually called when a
2141  * message has been received from a client using the TCP transport.
2142  *
2143  * This function takes ownership of @buffer.
2144  *
2145  * Returns: a GstFlowReturn.
2146  */
2147 GstFlowReturn
2148 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2149 {
2150   GstRTSPStreamPrivate *priv;
2151   GstFlowReturn ret;
2152   GstElement *element;
2153
2154   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2155   priv = stream->priv;
2156   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2157   g_return_val_if_fail (priv->is_joined, FALSE);
2158
2159   g_mutex_lock (&priv->lock);
2160   if (priv->appsrc[1])
2161     element = gst_object_ref (priv->appsrc[1]);
2162   else
2163     element = NULL;
2164   g_mutex_unlock (&priv->lock);
2165
2166   if (element) {
2167     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2168     gst_object_unref (element);
2169   } else {
2170     ret = GST_FLOW_OK;
2171     gst_buffer_unref (buffer);
2172   }
2173   return ret;
2174 }
2175
2176 /* must be called with lock */
2177 static gboolean
2178 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2179     gboolean add)
2180 {
2181   GstRTSPStreamPrivate *priv = stream->priv;
2182   const GstRTSPTransport *tr;
2183
2184   tr = gst_rtsp_stream_transport_get_transport (trans);
2185
2186   switch (tr->lower_transport) {
2187     case GST_RTSP_LOWER_TRANS_UDP:
2188     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2189     {
2190       gchar *dest;
2191       gint min, max;
2192       guint ttl = 0;
2193
2194       dest = tr->destination;
2195       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2196         min = tr->port.min;
2197         max = tr->port.max;
2198         ttl = tr->ttl;
2199       } else {
2200         min = tr->client_port.min;
2201         max = tr->client_port.max;
2202       }
2203
2204       if (add) {
2205         if (ttl > 0) {
2206           GST_INFO ("setting ttl-mc %d", ttl);
2207           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2208           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2209         }
2210         GST_INFO ("adding %s:%d-%d", dest, min, max);
2211         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2212         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2213         priv->transports = g_list_prepend (priv->transports, trans);
2214       } else {
2215         GST_INFO ("removing %s:%d-%d", dest, min, max);
2216         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2217         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2218         priv->transports = g_list_remove (priv->transports, trans);
2219       }
2220       priv->tr_changed = TRUE;
2221       break;
2222     }
2223     case GST_RTSP_LOWER_TRANS_TCP:
2224       if (add) {
2225         GST_INFO ("adding TCP %s", tr->destination);
2226         priv->transports = g_list_prepend (priv->transports, trans);
2227       } else {
2228         GST_INFO ("removing TCP %s", tr->destination);
2229         priv->transports = g_list_remove (priv->transports, trans);
2230       }
2231       priv->tr_changed = TRUE;
2232       break;
2233     default:
2234       goto unknown_transport;
2235   }
2236   return TRUE;
2237
2238   /* ERRORS */
2239 unknown_transport:
2240   {
2241     GST_INFO ("Unknown transport %d", tr->lower_transport);
2242     return FALSE;
2243   }
2244 }
2245
2246
2247 /**
2248  * gst_rtsp_stream_add_transport:
2249  * @stream: a #GstRTSPStream
2250  * @trans: (transfer none): a #GstRTSPStreamTransport
2251  *
2252  * Add the transport in @trans to @stream. The media of @stream will
2253  * then also be send to the values configured in @trans.
2254  *
2255  * @stream must be joined to a bin.
2256  *
2257  * @trans must contain a valid #GstRTSPTransport.
2258  *
2259  * Returns: %TRUE if @trans was added
2260  */
2261 gboolean
2262 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2263     GstRTSPStreamTransport * trans)
2264 {
2265   GstRTSPStreamPrivate *priv;
2266   gboolean res;
2267
2268   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2269   priv = stream->priv;
2270   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2271   g_return_val_if_fail (priv->is_joined, FALSE);
2272
2273   g_mutex_lock (&priv->lock);
2274   res = update_transport (stream, trans, TRUE);
2275   g_mutex_unlock (&priv->lock);
2276
2277   return res;
2278 }
2279
2280 /**
2281  * gst_rtsp_stream_remove_transport:
2282  * @stream: a #GstRTSPStream
2283  * @trans: (transfer none): a #GstRTSPStreamTransport
2284  *
2285  * Remove the transport in @trans from @stream. The media of @stream will
2286  * not be sent to the values configured in @trans.
2287  *
2288  * @stream must be joined to a bin.
2289  *
2290  * @trans must contain a valid #GstRTSPTransport.
2291  *
2292  * Returns: %TRUE if @trans was removed
2293  */
2294 gboolean
2295 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2296     GstRTSPStreamTransport * trans)
2297 {
2298   GstRTSPStreamPrivate *priv;
2299   gboolean res;
2300
2301   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2302   priv = stream->priv;
2303   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2304   g_return_val_if_fail (priv->is_joined, FALSE);
2305
2306   g_mutex_lock (&priv->lock);
2307   res = update_transport (stream, trans, FALSE);
2308   g_mutex_unlock (&priv->lock);
2309
2310   return res;
2311 }
2312
2313 /**
2314  * gst_rtsp_stream_get_rtp_socket:
2315  * @stream: a #GstRTSPStream
2316  * @family: the socket family
2317  *
2318  * Get the RTP socket from @stream for a @family.
2319  *
2320  * @stream must be joined to a bin.
2321  *
2322  * Returns: (transfer full): the RTP socket or %NULL if no socket could be
2323  * allocated for @family. Unref after usage
2324  */
2325 GSocket *
2326 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2327 {
2328   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2329   GSocket *socket;
2330   const gchar *name;
2331
2332   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2333   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2334       family == G_SOCKET_FAMILY_IPV6, NULL);
2335   g_return_val_if_fail (priv->udpsink[0], NULL);
2336
2337   if (family == G_SOCKET_FAMILY_IPV6)
2338     name = "socket-v6";
2339   else
2340     name = "socket";
2341
2342   g_object_get (priv->udpsink[0], name, &socket, NULL);
2343
2344   return socket;
2345 }
2346
2347 /**
2348  * gst_rtsp_stream_get_rtcp_socket:
2349  * @stream: a #GstRTSPStream
2350  * @family: the socket family
2351  *
2352  * Get the RTCP socket from @stream for a @family.
2353  *
2354  * @stream must be joined to a bin.
2355  *
2356  * Returns: (transfer full): the RTCP socket or %NULL if no socket could be
2357  * allocated for @family. Unref after usage
2358  */
2359 GSocket *
2360 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2361 {
2362   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2363   GSocket *socket;
2364   const gchar *name;
2365
2366   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2367   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2368       family == G_SOCKET_FAMILY_IPV6, NULL);
2369   g_return_val_if_fail (priv->udpsink[1], NULL);
2370
2371   if (family == G_SOCKET_FAMILY_IPV6)
2372     name = "socket-v6";
2373   else
2374     name = "socket";
2375
2376   g_object_get (priv->udpsink[1], name, &socket, NULL);
2377
2378   return socket;
2379 }
2380
2381 /**
2382  * gst_rtsp_stream_transport_filter:
2383  * @stream: a #GstRTSPStream
2384  * @func: (scope call) (allow-none): a callback
2385  * @user_data: (closure): user data passed to @func
2386  *
2387  * Call @func for each transport managed by @stream. The result value of @func
2388  * determines what happens to the transport. @func will be called with @stream
2389  * locked so no further actions on @stream can be performed from @func.
2390  *
2391  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2392  * @stream.
2393  *
2394  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2395  *
2396  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2397  * will also be added with an additional ref to the result #GList of this
2398  * function..
2399  *
2400  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2401  *
2402  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2403  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2404  * element in the #GList should be unreffed before the list is freed.
2405  */
2406 GList *
2407 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2408     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2409 {
2410   GstRTSPStreamPrivate *priv;
2411   GList *result, *walk, *next;
2412
2413   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2414
2415   priv = stream->priv;
2416
2417   result = NULL;
2418
2419   g_mutex_lock (&priv->lock);
2420   for (walk = priv->transports; walk; walk = next) {
2421     GstRTSPStreamTransport *trans = walk->data;
2422     GstRTSPFilterResult res;
2423
2424     next = g_list_next (walk);
2425
2426     if (func)
2427       res = func (stream, trans, user_data);
2428     else
2429       res = GST_RTSP_FILTER_REF;
2430
2431     switch (res) {
2432       case GST_RTSP_FILTER_REMOVE:
2433         update_transport (stream, trans, FALSE);
2434         break;
2435       case GST_RTSP_FILTER_REF:
2436         result = g_list_prepend (result, g_object_ref (trans));
2437         break;
2438       case GST_RTSP_FILTER_KEEP:
2439       default:
2440         break;
2441     }
2442   }
2443   g_mutex_unlock (&priv->lock);
2444
2445   return result;
2446 }
2447
2448 static GstPadProbeReturn
2449 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2450 {
2451   GstRTSPStreamPrivate *priv;
2452   GstRTSPStream *stream;
2453
2454   stream = user_data;
2455   priv = stream->priv;
2456
2457   GST_DEBUG_OBJECT (pad, "now blocking");
2458
2459   g_mutex_lock (&priv->lock);
2460   priv->blocking = TRUE;
2461   g_mutex_unlock (&priv->lock);
2462
2463   gst_element_post_message (priv->payloader,
2464       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2465           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2466
2467   return GST_PAD_PROBE_OK;
2468 }
2469
2470 /**
2471  * gst_rtsp_stream_set_blocked:
2472  * @stream: a #GstRTSPStream
2473  * @blocked: boolean indicating we should block or unblock
2474  *
2475  * Blocks or unblocks the dataflow on @stream.
2476  *
2477  * Returns: %TRUE on success
2478  */
2479 gboolean
2480 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2481 {
2482   GstRTSPStreamPrivate *priv;
2483
2484   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2485
2486   priv = stream->priv;
2487
2488   g_mutex_lock (&priv->lock);
2489   if (blocked) {
2490     priv->blocking = FALSE;
2491     if (priv->blocked_id == 0) {
2492       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2493           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2494           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2495           g_object_ref (stream), g_object_unref);
2496     }
2497   } else {
2498     if (priv->blocked_id != 0) {
2499       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2500       priv->blocked_id = 0;
2501       priv->blocking = FALSE;
2502     }
2503   }
2504   g_mutex_unlock (&priv->lock);
2505
2506   return TRUE;
2507 }
2508
2509 /**
2510  * gst_rtsp_stream_is_blocking:
2511  * @stream: a #GstRTSPStream
2512  *
2513  * Check if @stream is blocking on a #GstBuffer.
2514  *
2515  * Returns: %TRUE if @stream is blocking
2516  */
2517 gboolean
2518 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2519 {
2520   GstRTSPStreamPrivate *priv;
2521   gboolean result;
2522
2523   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2524
2525   priv = stream->priv;
2526
2527   g_mutex_lock (&priv->lock);
2528   result = priv->blocking;
2529   g_mutex_unlock (&priv->lock);
2530
2531   return result;
2532 }