stream: add fallback for missing stats property
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
83    * sockets */
84   GstElement *udpsrc_v4[2];
85
86   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
87    * sockets */
88   GstElement *udpsrc_v6[2];
89
90   GstElement *udpsink[2];
91
92   /* for TCP transport */
93   GstElement *appsrc[2];
94   GstElement *appqueue[2];
95   GstElement *appsink[2];
96
97   GstElement *tee[2];
98   GstElement *funnel[2];
99
100   /* server ports for sending/receiving over ipv4 */
101   GstRTSPRange server_port_v4;
102   GstRTSPAddress *server_addr_v4;
103   gboolean have_ipv4;
104
105   /* server ports for sending/receiving over ipv6 */
106   GstRTSPRange server_port_v6;
107   GstRTSPAddress *server_addr_v6;
108   gboolean have_ipv6;
109
110   /* multicast addresses */
111   GstRTSPAddressPool *pool;
112   GstRTSPAddress *addr_v4;
113   GstRTSPAddress *addr_v6;
114
115   /* the caps of the stream */
116   gulong caps_sig;
117   GstCaps *caps;
118
119   /* transports we stream to */
120   guint n_active;
121   GList *transports;
122
123   gint dscp_qos;
124
125   /* stream blocking */
126   gulong blocked_id;
127   gboolean blocking;
128 };
129
130 #define DEFAULT_CONTROL         NULL
131 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
132 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
133                                         GST_RTSP_LOWER_TRANS_TCP
134
135 enum
136 {
137   PROP_0,
138   PROP_CONTROL,
139   PROP_PROFILES,
140   PROP_PROTOCOLS,
141   PROP_LAST
142 };
143
144 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
145 #define GST_CAT_DEFAULT rtsp_stream_debug
146
147 static GQuark ssrc_stream_map_key;
148
149 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
150     GValue * value, GParamSpec * pspec);
151 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
152     const GValue * value, GParamSpec * pspec);
153
154 static void gst_rtsp_stream_finalize (GObject * obj);
155
156 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
157
158 static void
159 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
160 {
161   GObjectClass *gobject_class;
162
163   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
164
165   gobject_class = G_OBJECT_CLASS (klass);
166
167   gobject_class->get_property = gst_rtsp_stream_get_property;
168   gobject_class->set_property = gst_rtsp_stream_set_property;
169   gobject_class->finalize = gst_rtsp_stream_finalize;
170
171   g_object_class_install_property (gobject_class, PROP_CONTROL,
172       g_param_spec_string ("control", "Control",
173           "The control string for this stream", DEFAULT_CONTROL,
174           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
175
176 #ifdef GST_TYPE_RTSP_PROFILE
177   g_object_class_install_property (gobject_class, PROP_PROFILES,
178       g_param_spec_flags ("profiles", "Profiles",
179           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
180           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
181 #endif
182
183   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
184       g_param_spec_flags ("protocols", "Protocols",
185           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
186           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
187
188   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
189
190   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
191 }
192
193 static void
194 gst_rtsp_stream_init (GstRTSPStream * stream)
195 {
196   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
197
198   GST_DEBUG ("new stream %p", stream);
199
200   stream->priv = priv;
201
202   priv->dscp_qos = -1;
203   priv->control = g_strdup (DEFAULT_CONTROL);
204   priv->profiles = DEFAULT_PROFILES;
205   priv->protocols = DEFAULT_PROTOCOLS;
206
207   g_mutex_init (&priv->lock);
208 }
209
210 static void
211 gst_rtsp_stream_finalize (GObject * obj)
212 {
213   GstRTSPStream *stream;
214   GstRTSPStreamPrivate *priv;
215
216   stream = GST_RTSP_STREAM (obj);
217   priv = stream->priv;
218
219   GST_DEBUG ("finalize stream %p", stream);
220
221   /* we really need to be unjoined now */
222   g_return_if_fail (!priv->is_joined);
223
224   if (priv->addr_v4)
225     gst_rtsp_address_free (priv->addr_v4);
226   if (priv->addr_v6)
227     gst_rtsp_address_free (priv->addr_v6);
228   if (priv->server_addr_v4)
229     gst_rtsp_address_free (priv->server_addr_v4);
230   if (priv->server_addr_v6)
231     gst_rtsp_address_free (priv->server_addr_v6);
232   if (priv->pool)
233     g_object_unref (priv->pool);
234   gst_object_unref (priv->payloader);
235   gst_object_unref (priv->srcpad);
236   g_free (priv->control);
237   g_mutex_clear (&priv->lock);
238
239   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
240 }
241
242 static void
243 gst_rtsp_stream_get_property (GObject * object, guint propid,
244     GValue * value, GParamSpec * pspec)
245 {
246   GstRTSPStream *stream = GST_RTSP_STREAM (object);
247
248   switch (propid) {
249     case PROP_CONTROL:
250       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
251       break;
252     case PROP_PROFILES:
253       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
254       break;
255     case PROP_PROTOCOLS:
256       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
257       break;
258     default:
259       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
260   }
261 }
262
263 static void
264 gst_rtsp_stream_set_property (GObject * object, guint propid,
265     const GValue * value, GParamSpec * pspec)
266 {
267   GstRTSPStream *stream = GST_RTSP_STREAM (object);
268
269   switch (propid) {
270     case PROP_CONTROL:
271       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
272       break;
273     case PROP_PROFILES:
274       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
275       break;
276     case PROP_PROTOCOLS:
277       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
278       break;
279     default:
280       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
281   }
282 }
283
284 /**
285  * gst_rtsp_stream_new:
286  * @idx: an index
287  * @srcpad: a #GstPad
288  * @payloader: a #GstElement
289  *
290  * Create a new media stream with index @idx that handles RTP data on
291  * @srcpad and has a payloader element @payloader.
292  *
293  * Returns: a new #GstRTSPStream
294  */
295 GstRTSPStream *
296 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
297 {
298   GstRTSPStreamPrivate *priv;
299   GstRTSPStream *stream;
300
301   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
302   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
303   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
304
305   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
306   priv = stream->priv;
307   priv->idx = idx;
308   priv->payloader = gst_object_ref (payloader);
309   priv->srcpad = gst_object_ref (srcpad);
310
311   return stream;
312 }
313
314 /**
315  * gst_rtsp_stream_get_index:
316  * @stream: a #GstRTSPStream
317  *
318  * Get the stream index.
319  *
320  * Return: the stream index.
321  */
322 guint
323 gst_rtsp_stream_get_index (GstRTSPStream * stream)
324 {
325   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
326
327   return stream->priv->idx;
328 }
329
330 /**
331  * gst_rtsp_stream_get_pt:
332  * @stream: a #GstRTSPStream
333  *
334  * Get the stream payload type.
335  *
336  * Return: the stream payload type.
337  */
338 guint
339 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
340 {
341   GstRTSPStreamPrivate *priv;
342   guint pt;
343
344   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
345
346   priv = stream->priv;
347
348   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
349
350   return pt;
351 }
352
353 /**
354  * gst_rtsp_stream_get_srcpad:
355  * @stream: a #GstRTSPStream
356  *
357  * Get the srcpad associated with @stream.
358  *
359  * Returns: (transfer full): the srcpad. Unref after usage.
360  */
361 GstPad *
362 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
363 {
364   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
365
366   return gst_object_ref (stream->priv->srcpad);
367 }
368
369 /**
370  * gst_rtsp_stream_get_control:
371  * @stream: a #GstRTSPStream
372  *
373  * Get the control string to identify this stream.
374  *
375  * Returns: (transfer full): the control string. free after usage.
376  */
377 gchar *
378 gst_rtsp_stream_get_control (GstRTSPStream * stream)
379 {
380   GstRTSPStreamPrivate *priv;
381   gchar *result;
382
383   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
384
385   priv = stream->priv;
386
387   g_mutex_lock (&priv->lock);
388   if ((result = g_strdup (priv->control)) == NULL)
389     result = g_strdup_printf ("stream=%u", priv->idx);
390   g_mutex_unlock (&priv->lock);
391
392   return result;
393 }
394
395 /**
396  * gst_rtsp_stream_set_control:
397  * @stream: a #GstRTSPStream
398  * @control: a control string
399  *
400  * Set the control string in @stream.
401  */
402 void
403 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
404 {
405   GstRTSPStreamPrivate *priv;
406
407   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
408
409   priv = stream->priv;
410
411   g_mutex_lock (&priv->lock);
412   g_free (priv->control);
413   priv->control = g_strdup (control);
414   g_mutex_unlock (&priv->lock);
415 }
416
417 /**
418  * gst_rtsp_stream_has_control:
419  * @stream: a #GstRTSPStream
420  * @control: a control string
421  *
422  * Check if @stream has the control string @control.
423  *
424  * Returns: %TRUE is @stream has @control as the control string
425  */
426 gboolean
427 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
428 {
429   GstRTSPStreamPrivate *priv;
430   gboolean res;
431
432   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
433
434   priv = stream->priv;
435
436   g_mutex_lock (&priv->lock);
437   if (priv->control)
438     res = (g_strcmp0 (priv->control, control) == 0);
439   else {
440     guint streamid;
441
442     if (sscanf (control, "stream=%u", &streamid) > 0)
443       res = (streamid == priv->idx);
444     else
445       res = FALSE;
446   }
447   g_mutex_unlock (&priv->lock);
448
449   return res;
450 }
451
452 /**
453  * gst_rtsp_stream_set_mtu:
454  * @stream: a #GstRTSPStream
455  * @mtu: a new MTU
456  *
457  * Configure the mtu in the payloader of @stream to @mtu.
458  */
459 void
460 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
461 {
462   GstRTSPStreamPrivate *priv;
463
464   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
465
466   priv = stream->priv;
467
468   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
469
470   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
471 }
472
473 /**
474  * gst_rtsp_stream_get_mtu:
475  * @stream: a #GstRTSPStream
476  *
477  * Get the configured MTU in the payloader of @stream.
478  *
479  * Returns: the MTU of the payloader.
480  */
481 guint
482 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
483 {
484   GstRTSPStreamPrivate *priv;
485   guint mtu;
486
487   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
488
489   priv = stream->priv;
490
491   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
492
493   return mtu;
494 }
495
496 /* Update the dscp qos property on the udp sinks */
497 static void
498 update_dscp_qos (GstRTSPStream * stream)
499 {
500   GstRTSPStreamPrivate *priv;
501
502   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
503
504   priv = stream->priv;
505
506   if (priv->udpsink[0]) {
507     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
508         NULL);
509   }
510
511   if (priv->udpsink[1]) {
512     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
513         NULL);
514   }
515 }
516
517 /**
518  * gst_rtsp_stream_set_dscp_qos:
519  * @stream: a #GstRTSPStream
520  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
521  *
522  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
523  */
524 void
525 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
526 {
527   GstRTSPStreamPrivate *priv;
528
529   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
530
531   priv = stream->priv;
532
533   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
534
535   if (dscp_qos < -1 || dscp_qos > 63) {
536     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
537     return;
538   }
539
540   priv->dscp_qos = dscp_qos;
541
542   update_dscp_qos (stream);
543 }
544
545 /**
546  * gst_rtsp_stream_get_dscp_qos:
547  * @stream: a #GstRTSPStream
548  *
549  * Get the configured DSCP QoS in of the outgoing sockets.
550  *
551  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
552  */
553 gint
554 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
555 {
556   GstRTSPStreamPrivate *priv;
557
558   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
559
560   priv = stream->priv;
561
562   return priv->dscp_qos;
563 }
564
565 /**
566  * gst_rtsp_stream_is_transport_supported:
567  * @stream: a #GstRTSPStream
568  * @transport: a #GstRTSPTransport
569  *
570  * Check if @transport can be handled by stream
571  *
572  * Returns: %TRUE if @transport can be handled by @stream.
573  */
574 gboolean
575 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
576     GstRTSPTransport * transport)
577 {
578   GstRTSPStreamPrivate *priv;
579
580   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
581
582   priv = stream->priv;
583
584   g_mutex_lock (&priv->lock);
585   if (transport->trans != GST_RTSP_TRANS_RTP)
586     goto unsupported_transmode;
587
588   if (!(transport->profile & priv->profiles))
589     goto unsupported_profile;
590
591   if (!(transport->lower_transport & priv->protocols))
592     goto unsupported_ltrans;
593
594   g_mutex_unlock (&priv->lock);
595
596   return TRUE;
597
598   /* ERRORS */
599 unsupported_transmode:
600   {
601     GST_DEBUG ("unsupported transport mode %d", transport->trans);
602     return FALSE;
603   }
604 unsupported_profile:
605   {
606     GST_DEBUG ("unsupported profile %d", transport->profile);
607     return FALSE;
608   }
609 unsupported_ltrans:
610   {
611     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
612     return FALSE;
613   }
614 }
615
616 /**
617  * gst_rtsp_stream_set_profiles:
618  * @stream: a #GstRTSPStream
619  * @profiles: the new profiles
620  *
621  * Configure the allowed profiles for @stream.
622  */
623 void
624 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
625 {
626   GstRTSPStreamPrivate *priv;
627
628   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
629
630   priv = stream->priv;
631
632   g_mutex_lock (&priv->lock);
633   priv->profiles = profiles;
634   g_mutex_unlock (&priv->lock);
635 }
636
637 /**
638  * gst_rtsp_stream_get_profiles:
639  * @stream: a #GstRTSPStream
640  *
641  * Get the allowed profiles of @stream.
642  *
643  * Returns: a #GstRTSPProfile
644  */
645 GstRTSPProfile
646 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
647 {
648   GstRTSPStreamPrivate *priv;
649   GstRTSPProfile res;
650
651   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
652
653   priv = stream->priv;
654
655   g_mutex_lock (&priv->lock);
656   res = priv->profiles;
657   g_mutex_unlock (&priv->lock);
658
659   return res;
660 }
661
662 /**
663  * gst_rtsp_stream_set_protocols:
664  * @stream: a #GstRTSPStream
665  * @protocols: the new flags
666  *
667  * Configure the allowed lower transport for @stream.
668  */
669 void
670 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
671     GstRTSPLowerTrans protocols)
672 {
673   GstRTSPStreamPrivate *priv;
674
675   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
676
677   priv = stream->priv;
678
679   g_mutex_lock (&priv->lock);
680   priv->protocols = protocols;
681   g_mutex_unlock (&priv->lock);
682 }
683
684 /**
685  * gst_rtsp_stream_get_protocols:
686  * @stream: a #GstRTSPStream
687  *
688  * Get the allowed protocols of @stream.
689  *
690  * Returns: a #GstRTSPLowerTrans
691  */
692 GstRTSPLowerTrans
693 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
694 {
695   GstRTSPStreamPrivate *priv;
696   GstRTSPLowerTrans res;
697
698   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
699       GST_RTSP_LOWER_TRANS_UNKNOWN);
700
701   priv = stream->priv;
702
703   g_mutex_lock (&priv->lock);
704   res = priv->protocols;
705   g_mutex_unlock (&priv->lock);
706
707   return res;
708 }
709
710 /**
711  * gst_rtsp_stream_set_address_pool:
712  * @stream: a #GstRTSPStream
713  * @pool: a #GstRTSPAddressPool
714  *
715  * configure @pool to be used as the address pool of @stream.
716  */
717 void
718 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
719     GstRTSPAddressPool * pool)
720 {
721   GstRTSPStreamPrivate *priv;
722   GstRTSPAddressPool *old;
723
724   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
725
726   priv = stream->priv;
727
728   GST_LOG_OBJECT (stream, "set address pool %p", pool);
729
730   g_mutex_lock (&priv->lock);
731   if ((old = priv->pool) != pool)
732     priv->pool = pool ? g_object_ref (pool) : NULL;
733   else
734     old = NULL;
735   g_mutex_unlock (&priv->lock);
736
737   if (old)
738     g_object_unref (old);
739 }
740
741 /**
742  * gst_rtsp_stream_get_address_pool:
743  * @stream: a #GstRTSPStream
744  *
745  * Get the #GstRTSPAddressPool used as the address pool of @stream.
746  *
747  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
748  * usage.
749  */
750 GstRTSPAddressPool *
751 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
752 {
753   GstRTSPStreamPrivate *priv;
754   GstRTSPAddressPool *result;
755
756   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
757
758   priv = stream->priv;
759
760   g_mutex_lock (&priv->lock);
761   if ((result = priv->pool))
762     g_object_ref (result);
763   g_mutex_unlock (&priv->lock);
764
765   return result;
766 }
767
768 /**
769  * gst_rtsp_stream_get_multicast_address:
770  * @stream: a #GstRTSPStream
771  * @family: the #GSocketFamily
772  *
773  * Get the multicast address of @stream for @family.
774  *
775  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
776  * allocated. gst_rtsp_address_free() after usage.
777  */
778 GstRTSPAddress *
779 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
780     GSocketFamily family)
781 {
782   GstRTSPStreamPrivate *priv;
783   GstRTSPAddress *result;
784   GstRTSPAddress **addrp;
785   GstRTSPAddressFlags flags;
786
787   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
788
789   priv = stream->priv;
790
791   if (family == G_SOCKET_FAMILY_IPV6) {
792     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
793     addrp = &priv->addr_v4;
794   } else {
795     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
796     addrp = &priv->addr_v6;
797   }
798
799   g_mutex_lock (&priv->lock);
800   if (*addrp == NULL) {
801     if (priv->pool == NULL)
802       goto no_pool;
803
804     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
805
806     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
807     if (*addrp == NULL)
808       goto no_address;
809   }
810   result = gst_rtsp_address_copy (*addrp);
811   g_mutex_unlock (&priv->lock);
812
813   return result;
814
815   /* ERRORS */
816 no_pool:
817   {
818     GST_ERROR_OBJECT (stream, "no address pool specified");
819     g_mutex_unlock (&priv->lock);
820     return NULL;
821   }
822 no_address:
823   {
824     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
825     g_mutex_unlock (&priv->lock);
826     return NULL;
827   }
828 }
829
830 /**
831  * gst_rtsp_stream_reserve_address:
832  * @stream: a #GstRTSPStream
833  * @address: an address
834  * @port: a port
835  * @n_ports: n_ports
836  * @ttl: a TTL
837  *
838  * Reserve @address and @port as the address and port of @stream.
839  *
840  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
841  * reserved. gst_rtsp_address_free() after usage.
842  */
843 GstRTSPAddress *
844 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
845     const gchar * address, guint port, guint n_ports, guint ttl)
846 {
847   GstRTSPStreamPrivate *priv;
848   GstRTSPAddress *result;
849   GInetAddress *addr;
850   GSocketFamily family;
851   GstRTSPAddress **addrp;
852
853   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
854   g_return_val_if_fail (address != NULL, NULL);
855   g_return_val_if_fail (port > 0, NULL);
856   g_return_val_if_fail (n_ports > 0, NULL);
857   g_return_val_if_fail (ttl > 0, NULL);
858
859   priv = stream->priv;
860
861   addr = g_inet_address_new_from_string (address);
862   if (!addr) {
863     GST_ERROR ("failed to get inet addr from %s", address);
864     family = G_SOCKET_FAMILY_IPV4;
865   } else {
866     family = g_inet_address_get_family (addr);
867     g_object_unref (addr);
868   }
869
870   if (family == G_SOCKET_FAMILY_IPV6)
871     addrp = &priv->addr_v4;
872   else
873     addrp = &priv->addr_v6;
874
875   g_mutex_lock (&priv->lock);
876   if (*addrp == NULL) {
877     GstRTSPAddressPoolResult res;
878
879     if (priv->pool == NULL)
880       goto no_pool;
881
882     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
883         port, n_ports, ttl, addrp);
884     if (res != GST_RTSP_ADDRESS_POOL_OK)
885       goto no_address;
886   } else {
887     if (strcmp ((*addrp)->address, address) ||
888         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
889         (*addrp)->ttl != ttl)
890       goto different_address;
891   }
892   result = gst_rtsp_address_copy (*addrp);
893   g_mutex_unlock (&priv->lock);
894
895   return result;
896
897   /* ERRORS */
898 no_pool:
899   {
900     GST_ERROR_OBJECT (stream, "no address pool specified");
901     g_mutex_unlock (&priv->lock);
902     return NULL;
903   }
904 no_address:
905   {
906     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
907         address);
908     g_mutex_unlock (&priv->lock);
909     return NULL;
910   }
911 different_address:
912   {
913     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
914         " reserved", address);
915     g_mutex_unlock (&priv->lock);
916     return NULL;
917   }
918 }
919
920 static gboolean
921 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
922     GSocketFamily family, GstElement * udpsrc_out[2],
923     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
924     GstRTSPAddress ** server_addr_out)
925 {
926   GstStateChangeReturn ret;
927   GstElement *udpsrc0, *udpsrc1;
928   GstElement *udpsink0, *udpsink1;
929   GSocket *rtp_socket = NULL;
930   GSocket *rtcp_socket;
931   gint tmp_rtp, tmp_rtcp;
932   guint count;
933   gint rtpport, rtcpport;
934   GList *rejected_addresses = NULL;
935   GstRTSPAddress *addr = NULL;
936   GInetAddress *inetaddr = NULL;
937   GSocketAddress *rtp_sockaddr = NULL;
938   GSocketAddress *rtcp_sockaddr = NULL;
939   const gchar *multisink_socket;
940
941   if (family == G_SOCKET_FAMILY_IPV6)
942     multisink_socket = "socket-v6";
943   else
944     multisink_socket = "socket";
945
946   udpsrc0 = NULL;
947   udpsrc1 = NULL;
948   udpsink0 = NULL;
949   udpsink1 = NULL;
950   count = 0;
951
952   /* Start with random port */
953   tmp_rtp = 0;
954
955   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
956       G_SOCKET_PROTOCOL_UDP, NULL);
957   if (!rtcp_socket)
958     goto no_udp_protocol;
959
960   if (*server_addr_out)
961     gst_rtsp_address_free (*server_addr_out);
962
963   /* try to allocate 2 UDP ports, the RTP port should be an even
964    * number and the RTCP port should be the next (uneven) port */
965 again:
966
967   if (rtp_socket == NULL) {
968     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
969         G_SOCKET_PROTOCOL_UDP, NULL);
970     if (!rtp_socket)
971       goto no_udp_protocol;
972   }
973
974   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
975     GstRTSPAddressFlags flags;
976
977     if (addr)
978       rejected_addresses = g_list_prepend (rejected_addresses, addr);
979
980     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
981     if (family == G_SOCKET_FAMILY_IPV6)
982       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
983     else
984       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
985
986     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
987
988     if (addr == NULL)
989       goto no_ports;
990
991     tmp_rtp = addr->port;
992
993     g_clear_object (&inetaddr);
994     inetaddr = g_inet_address_new_from_string (addr->address);
995   } else {
996     if (tmp_rtp != 0) {
997       tmp_rtp += 2;
998       if (++count > 20)
999         goto no_ports;
1000     }
1001
1002     if (inetaddr == NULL)
1003       inetaddr = g_inet_address_new_any (family);
1004   }
1005
1006   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1007   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1008     g_object_unref (rtp_sockaddr);
1009     goto again;
1010   }
1011   g_object_unref (rtp_sockaddr);
1012
1013   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1014   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1015     g_clear_object (&rtp_sockaddr);
1016     goto socket_error;
1017   }
1018
1019   tmp_rtp =
1020       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1021   g_object_unref (rtp_sockaddr);
1022
1023   /* check if port is even */
1024   if ((tmp_rtp & 1) != 0) {
1025     /* port not even, close and allocate another */
1026     tmp_rtp++;
1027     g_clear_object (&rtp_socket);
1028     goto again;
1029   }
1030
1031   /* set port */
1032   tmp_rtcp = tmp_rtp + 1;
1033
1034   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1035   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1036     g_object_unref (rtcp_sockaddr);
1037     g_clear_object (&rtp_socket);
1038     goto again;
1039   }
1040   g_object_unref (rtcp_sockaddr);
1041
1042   g_clear_object (&inetaddr);
1043
1044   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1045   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1046
1047   if (udpsrc0 == NULL || udpsrc1 == NULL)
1048     goto no_udp_protocol;
1049
1050   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1051   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1052
1053   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
1054   if (ret == GST_STATE_CHANGE_FAILURE)
1055     goto element_error;
1056   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
1057   if (ret == GST_STATE_CHANGE_FAILURE)
1058     goto element_error;
1059
1060   /* all fine, do port check */
1061   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1062   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1063
1064   /* this should not happen... */
1065   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1066     goto port_error;
1067
1068   if (udpsink_out[0])
1069     udpsink0 = udpsink_out[0];
1070   else
1071     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1072
1073   if (!udpsink0)
1074     goto no_udp_protocol;
1075
1076   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1077   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1078
1079   if (udpsink_out[1])
1080     udpsink1 = udpsink_out[1];
1081   else
1082     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1083
1084   if (!udpsink1)
1085     goto no_udp_protocol;
1086
1087   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1088   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1089   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1090
1091   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1092   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1093   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1094   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1095   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1096   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1097   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1098   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1099
1100   /* we keep these elements, we will further configure them when the
1101    * client told us to really use the UDP ports. */
1102   udpsrc_out[0] = udpsrc0;
1103   udpsrc_out[1] = udpsrc1;
1104   udpsink_out[0] = udpsink0;
1105   udpsink_out[1] = udpsink1;
1106   server_port_out->min = rtpport;
1107   server_port_out->max = rtcpport;
1108
1109   *server_addr_out = addr;
1110   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1111
1112   g_object_unref (rtp_socket);
1113   g_object_unref (rtcp_socket);
1114
1115   return TRUE;
1116
1117   /* ERRORS */
1118 no_udp_protocol:
1119   {
1120     goto cleanup;
1121   }
1122 no_ports:
1123   {
1124     goto cleanup;
1125   }
1126 port_error:
1127   {
1128     goto cleanup;
1129   }
1130 socket_error:
1131   {
1132     goto cleanup;
1133   }
1134 element_error:
1135   {
1136     goto cleanup;
1137   }
1138 cleanup:
1139   {
1140     if (udpsrc0) {
1141       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1142       gst_object_unref (udpsrc0);
1143     }
1144     if (udpsrc1) {
1145       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1146       gst_object_unref (udpsrc1);
1147     }
1148     if (udpsink0) {
1149       gst_element_set_state (udpsink0, GST_STATE_NULL);
1150       gst_object_unref (udpsink0);
1151     }
1152     if (inetaddr)
1153       g_object_unref (inetaddr);
1154     g_list_free_full (rejected_addresses,
1155         (GDestroyNotify) gst_rtsp_address_free);
1156     if (addr)
1157       gst_rtsp_address_free (addr);
1158     if (rtp_socket)
1159       g_object_unref (rtp_socket);
1160     if (rtcp_socket)
1161       g_object_unref (rtcp_socket);
1162     return FALSE;
1163   }
1164 }
1165
1166 /* must be called with lock */
1167 static gboolean
1168 alloc_ports (GstRTSPStream * stream)
1169 {
1170   GstRTSPStreamPrivate *priv = stream->priv;
1171
1172   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1173       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1174       &priv->server_port_v4, &priv->server_addr_v4);
1175
1176   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1177       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1178       &priv->server_port_v6, &priv->server_addr_v6);
1179
1180   return priv->have_ipv4 || priv->have_ipv6;
1181 }
1182
1183 /**
1184  * gst_rtsp_stream_get_server_port:
1185  * @stream: a #GstRTSPStream
1186  * @server_port: (out): result server port
1187  * @family: the port family to get
1188  *
1189  * Fill @server_port with the port pair used by the server. This function can
1190  * only be called when @stream has been joined.
1191  */
1192 void
1193 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1194     GstRTSPRange * server_port, GSocketFamily family)
1195 {
1196   GstRTSPStreamPrivate *priv;
1197
1198   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1199   priv = stream->priv;
1200   g_return_if_fail (priv->is_joined);
1201
1202   g_mutex_lock (&priv->lock);
1203   if (family == G_SOCKET_FAMILY_IPV4) {
1204     if (server_port)
1205       *server_port = priv->server_port_v4;
1206   } else {
1207     if (server_port)
1208       *server_port = priv->server_port_v6;
1209   }
1210   g_mutex_unlock (&priv->lock);
1211 }
1212
1213 /**
1214  * gst_rtsp_stream_get_rtpsession:
1215  * @stream: a #GstRTSPStream
1216  *
1217  * Get the RTP session of this stream.
1218  *
1219  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1220  */
1221 GObject *
1222 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1223 {
1224   GstRTSPStreamPrivate *priv;
1225   GObject *session;
1226
1227   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1228
1229   priv = stream->priv;
1230
1231   g_mutex_lock (&priv->lock);
1232   if ((session = priv->session))
1233     g_object_ref (session);
1234   g_mutex_unlock (&priv->lock);
1235
1236   return session;
1237 }
1238
1239 /**
1240  * gst_rtsp_stream_get_ssrc:
1241  * @stream: a #GstRTSPStream
1242  * @ssrc: (out): result ssrc
1243  *
1244  * Get the SSRC used by the RTP session of this stream. This function can only
1245  * be called when @stream has been joined.
1246  */
1247 void
1248 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1249 {
1250   GstRTSPStreamPrivate *priv;
1251
1252   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1253   priv = stream->priv;
1254   g_return_if_fail (priv->is_joined);
1255
1256   g_mutex_lock (&priv->lock);
1257   if (ssrc && priv->session)
1258     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1259   g_mutex_unlock (&priv->lock);
1260 }
1261
1262 /* executed from streaming thread */
1263 static void
1264 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1265 {
1266   GstRTSPStreamPrivate *priv = stream->priv;
1267   GstCaps *newcaps, *oldcaps;
1268
1269   newcaps = gst_pad_get_current_caps (pad);
1270
1271   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1272       newcaps);
1273
1274   g_mutex_lock (&priv->lock);
1275   oldcaps = priv->caps;
1276   priv->caps = newcaps;
1277   g_mutex_unlock (&priv->lock);
1278
1279   if (oldcaps)
1280     gst_caps_unref (oldcaps);
1281 }
1282
1283 static void
1284 dump_structure (const GstStructure * s)
1285 {
1286   gchar *sstr;
1287
1288   sstr = gst_structure_to_string (s);
1289   GST_INFO ("structure: %s", sstr);
1290   g_free (sstr);
1291 }
1292
1293 static GstRTSPStreamTransport *
1294 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1295 {
1296   GstRTSPStreamPrivate *priv = stream->priv;
1297   GList *walk;
1298   GstRTSPStreamTransport *result = NULL;
1299   const gchar *tmp;
1300   gchar *dest;
1301   guint port;
1302
1303   if (rtcp_from == NULL)
1304     return NULL;
1305
1306   tmp = g_strrstr (rtcp_from, ":");
1307   if (tmp == NULL)
1308     return NULL;
1309
1310   port = atoi (tmp + 1);
1311   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1312
1313   g_mutex_lock (&priv->lock);
1314   GST_INFO ("finding %s:%d in %d transports", dest, port,
1315       g_list_length (priv->transports));
1316
1317   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1318     GstRTSPStreamTransport *trans = walk->data;
1319     const GstRTSPTransport *tr;
1320     gint min, max;
1321
1322     tr = gst_rtsp_stream_transport_get_transport (trans);
1323
1324     min = tr->client_port.min;
1325     max = tr->client_port.max;
1326
1327     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1328       result = trans;
1329       break;
1330     }
1331   }
1332   if (result)
1333     g_object_ref (result);
1334   g_mutex_unlock (&priv->lock);
1335
1336   g_free (dest);
1337
1338   return result;
1339 }
1340
1341 static GstRTSPStreamTransport *
1342 check_transport (GObject * source, GstRTSPStream * stream)
1343 {
1344   GstStructure *stats;
1345   GstRTSPStreamTransport *trans;
1346
1347   /* see if we have a stream to match with the origin of the RTCP packet */
1348   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1349   if (trans == NULL) {
1350     g_object_get (source, "stats", &stats, NULL);
1351     if (stats) {
1352       const gchar *rtcp_from;
1353
1354       dump_structure (stats);
1355
1356       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1357       if ((trans = find_transport (stream, rtcp_from))) {
1358         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1359             source);
1360         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1361             g_object_unref);
1362       }
1363       gst_structure_free (stats);
1364     }
1365   }
1366   return trans;
1367 }
1368
1369
1370 static void
1371 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1372 {
1373   GstRTSPStreamTransport *trans;
1374
1375   GST_INFO ("%p: new source %p", stream, source);
1376
1377   trans = check_transport (source, stream);
1378
1379   if (trans)
1380     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1381 }
1382
1383 static void
1384 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1385 {
1386   GST_INFO ("%p: new SDES %p", stream, source);
1387 }
1388
1389 static void
1390 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1391 {
1392   GstRTSPStreamTransport *trans;
1393
1394   trans = check_transport (source, stream);
1395
1396   if (trans) {
1397     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1398     gst_rtsp_stream_transport_keep_alive (trans);
1399   }
1400 #ifdef DUMP_STATS
1401   {
1402     GstStructure *stats;
1403     g_object_get (source, "stats", &stats, NULL);
1404     if (stats) {
1405       dump_structure (stats);
1406       gst_structure_free (stats);
1407     }
1408   }
1409 #endif
1410 }
1411
1412 static void
1413 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1414 {
1415   GST_INFO ("%p: source %p bye", stream, source);
1416 }
1417
1418 static void
1419 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1420 {
1421   GstRTSPStreamTransport *trans;
1422
1423   GST_INFO ("%p: source %p bye timeout", stream, source);
1424
1425   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1426     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1427     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1428   }
1429 }
1430
1431 static void
1432 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1433 {
1434   GstRTSPStreamTransport *trans;
1435
1436   GST_INFO ("%p: source %p timeout", stream, source);
1437
1438   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1439     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1440     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1441   }
1442 }
1443
1444 static GstFlowReturn
1445 handle_new_sample (GstAppSink * sink, gpointer user_data)
1446 {
1447   GstRTSPStreamPrivate *priv;
1448   GList *walk;
1449   GstSample *sample;
1450   GstBuffer *buffer;
1451   GstRTSPStream *stream;
1452
1453   sample = gst_app_sink_pull_sample (sink);
1454   if (!sample)
1455     return GST_FLOW_OK;
1456
1457   stream = (GstRTSPStream *) user_data;
1458   priv = stream->priv;
1459   buffer = gst_sample_get_buffer (sample);
1460
1461   g_mutex_lock (&priv->lock);
1462   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1463     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1464
1465     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1466       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1467     } else {
1468       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1469     }
1470   }
1471   g_mutex_unlock (&priv->lock);
1472
1473   gst_sample_unref (sample);
1474
1475   return GST_FLOW_OK;
1476 }
1477
1478 static GstAppSinkCallbacks sink_cb = {
1479   NULL,                         /* not interested in EOS */
1480   NULL,                         /* not interested in preroll samples */
1481   handle_new_sample,
1482 };
1483
1484 /**
1485  * gst_rtsp_stream_join_bin:
1486  * @stream: a #GstRTSPStream
1487  * @bin: a #GstBin to join
1488  * @rtpbin: a rtpbin element in @bin
1489  * @state: the target state of the new elements
1490  *
1491  * Join the #GstBin @bin that contains the element @rtpbin.
1492  *
1493  * @stream will link to @rtpbin, which must be inside @bin. The elements
1494  * added to @bin will be set to the state given in @state.
1495  *
1496  * Returns: %TRUE on success.
1497  */
1498 gboolean
1499 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1500     GstElement * rtpbin, GstState state)
1501 {
1502   GstRTSPStreamPrivate *priv;
1503   gint i;
1504   guint idx;
1505   gchar *name;
1506   GstPad *pad, *sinkpad, *selpad;
1507   GstPadLinkReturn ret;
1508
1509   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1510   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1511   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1512
1513   priv = stream->priv;
1514
1515   g_mutex_lock (&priv->lock);
1516   if (priv->is_joined)
1517     goto was_joined;
1518
1519   /* create a session with the same index as the stream */
1520   idx = priv->idx;
1521
1522   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1523
1524   if (!alloc_ports (stream))
1525     goto no_ports;
1526
1527   /* update the dscp qos field in the sinks */
1528   update_dscp_qos (stream);
1529
1530   /* get a pad for sending RTP */
1531   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1532   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1533   g_free (name);
1534   /* link the RTP pad to the session manager, it should not really fail unless
1535    * this is not really an RTP pad */
1536   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1537   if (ret != GST_PAD_LINK_OK)
1538     goto link_failed;
1539
1540   /* get pads from the RTP session element for sending and receiving
1541    * RTP/RTCP*/
1542   name = g_strdup_printf ("send_rtp_src_%u", idx);
1543   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1544   g_free (name);
1545   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1546   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1547   g_free (name);
1548   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1549   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1550   g_free (name);
1551   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1552   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1553   g_free (name);
1554
1555   /* get the session */
1556   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1557
1558   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1559       stream);
1560   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1561       stream);
1562   g_signal_connect (priv->session, "on-ssrc-active",
1563       (GCallback) on_ssrc_active, stream);
1564   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1565       stream);
1566   g_signal_connect (priv->session, "on-bye-timeout",
1567       (GCallback) on_bye_timeout, stream);
1568   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1569       stream);
1570
1571   for (i = 0; i < 2; i++) {
1572     GstPad *teepad, *queuepad;
1573     /* For the sender we create this bit of pipeline for both
1574      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1575      * we need to add a queue before appsink to make the pipeline
1576      * not block. For the TCP case, we want to pump data to the
1577      * client as fast as possible anyway.
1578      *
1579      * .--------.      .-----.    .---------.
1580      * | rtpbin |      | tee |    | udpsink |
1581      * |       send->sink   src->sink       |
1582      * '--------'      |     |    '---------'
1583      *                 |     |    .---------.    .---------.
1584      *                 |     |    |  queue  |    | appsink |
1585      *                 |    src->sink      src->sink       |
1586      *                 '-----'    '---------'    '---------'
1587      *
1588      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1589      * udpsink directly to the session.
1590      */
1591     /* add udpsink */
1592     gst_bin_add (bin, priv->udpsink[i]);
1593     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1594
1595     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1596       /* make tee for RTP/RTCP */
1597       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1598       gst_bin_add (bin, priv->tee[i]);
1599
1600       /* and link to rtpbin send pad */
1601       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1602       gst_pad_link (priv->send_src[i], pad);
1603       gst_object_unref (pad);
1604
1605       /* link tee to udpsink */
1606       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1607       gst_pad_link (teepad, sinkpad);
1608       gst_object_unref (teepad);
1609
1610       /* make queue */
1611       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1612       gst_bin_add (bin, priv->appqueue[i]);
1613       /* and link to tee */
1614       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1615       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1616       gst_pad_link (teepad, pad);
1617       gst_object_unref (pad);
1618       gst_object_unref (teepad);
1619
1620       /* make appsink */
1621       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1622       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1623       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1624       gst_bin_add (bin, priv->appsink[i]);
1625       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1626           &sink_cb, stream, NULL);
1627       /* and link to queue */
1628       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1629       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1630       gst_pad_link (queuepad, pad);
1631       gst_object_unref (pad);
1632       gst_object_unref (queuepad);
1633     } else {
1634       /* else only udpsink needed, link it to the session */
1635       gst_pad_link (priv->send_src[i], sinkpad);
1636     }
1637     gst_object_unref (sinkpad);
1638
1639     /* For the receiver we create this bit of pipeline for both
1640      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1641      * and it is all funneled into the rtpbin receive pad.
1642      *
1643      * .--------.     .--------.    .--------.
1644      * | udpsrc |     | funnel |    | rtpbin |
1645      * |       src->sink      src->sink      |
1646      * '--------'     |        |    '--------'
1647      * .--------.     |        |
1648      * | appsrc |     |        |
1649      * |       src->sink       |
1650      * '--------'     '--------'
1651      */
1652     /* make funnel for the RTP/RTCP receivers */
1653     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1654     gst_bin_add (bin, priv->funnel[i]);
1655
1656     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1657     gst_pad_link (pad, priv->recv_sink[i]);
1658     gst_object_unref (pad);
1659
1660     if (priv->udpsrc_v4[i]) {
1661       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1662        * values */
1663       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1664       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1665       /* add udpsrc */
1666       gst_bin_add (bin, priv->udpsrc_v4[i]);
1667
1668       /* and link to the funnel v4 */
1669       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1670       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1671       gst_pad_link (pad, selpad);
1672       gst_object_unref (pad);
1673       gst_object_unref (selpad);
1674     }
1675
1676     if (priv->udpsrc_v6[i]) {
1677       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1678       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1679       gst_bin_add (bin, priv->udpsrc_v6[i]);
1680
1681       /* and link to the funnel v6 */
1682       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1683       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1684       gst_pad_link (pad, selpad);
1685       gst_object_unref (pad);
1686       gst_object_unref (selpad);
1687     }
1688
1689     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1690       /* make and add appsrc */
1691       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1692       gst_bin_add (bin, priv->appsrc[i]);
1693       /* and link to the funnel */
1694       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1695       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1696       gst_pad_link (pad, selpad);
1697       gst_object_unref (pad);
1698       gst_object_unref (selpad);
1699     }
1700
1701     /* check if we need to set to a special state */
1702     if (state != GST_STATE_NULL) {
1703       if (priv->udpsink[i])
1704         gst_element_set_state (priv->udpsink[i], state);
1705       if (priv->appsink[i])
1706         gst_element_set_state (priv->appsink[i], state);
1707       if (priv->appqueue[i])
1708         gst_element_set_state (priv->appqueue[i], state);
1709       if (priv->tee[i])
1710         gst_element_set_state (priv->tee[i], state);
1711       if (priv->funnel[i])
1712         gst_element_set_state (priv->funnel[i], state);
1713       if (priv->appsrc[i])
1714         gst_element_set_state (priv->appsrc[i], state);
1715     }
1716   }
1717
1718   /* be notified of caps changes */
1719   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1720       (GCallback) caps_notify, stream);
1721
1722   priv->is_joined = TRUE;
1723   g_mutex_unlock (&priv->lock);
1724
1725   return TRUE;
1726
1727   /* ERRORS */
1728 was_joined:
1729   {
1730     g_mutex_unlock (&priv->lock);
1731     return TRUE;
1732   }
1733 no_ports:
1734   {
1735     g_mutex_unlock (&priv->lock);
1736     GST_WARNING ("failed to allocate ports %u", idx);
1737     return FALSE;
1738   }
1739 link_failed:
1740   {
1741     GST_WARNING ("failed to link stream %u", idx);
1742     gst_object_unref (priv->send_rtp_sink);
1743     priv->send_rtp_sink = NULL;
1744     g_mutex_unlock (&priv->lock);
1745     return FALSE;
1746   }
1747 }
1748
1749 /**
1750  * gst_rtsp_stream_leave_bin:
1751  * @stream: a #GstRTSPStream
1752  * @bin: a #GstBin
1753  * @rtpbin: a rtpbin #GstElement
1754  *
1755  * Remove the elements of @stream from @bin.
1756  *
1757  * Return: %TRUE on success.
1758  */
1759 gboolean
1760 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1761     GstElement * rtpbin)
1762 {
1763   GstRTSPStreamPrivate *priv;
1764   gint i;
1765
1766   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1767   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1768   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1769
1770   priv = stream->priv;
1771
1772   g_mutex_lock (&priv->lock);
1773   if (!priv->is_joined)
1774     goto was_not_joined;
1775
1776   /* all transports must be removed by now */
1777   g_return_val_if_fail (priv->transports == NULL, FALSE);
1778
1779   GST_INFO ("stream %p leaving bin", stream);
1780
1781   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1782   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1783   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1784   gst_object_unref (priv->send_rtp_sink);
1785   priv->send_rtp_sink = NULL;
1786
1787   for (i = 0; i < 2; i++) {
1788     if (priv->udpsink[i])
1789       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1790     if (priv->appsink[i])
1791       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1792     if (priv->appqueue[i])
1793       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1794     if (priv->tee[i])
1795       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1796     if (priv->funnel[i])
1797       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1798     if (priv->appsrc[i])
1799       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1800     if (priv->udpsrc_v4[i]) {
1801       /* and set udpsrc to NULL now before removing */
1802       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1803       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1804       /* removing them should also nicely release the request
1805        * pads when they finalize */
1806       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1807     }
1808     if (priv->udpsrc_v6[i]) {
1809       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1810       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1811       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1812     }
1813     if (priv->udpsink[i])
1814       gst_bin_remove (bin, priv->udpsink[i]);
1815     if (priv->appsrc[i])
1816       gst_bin_remove (bin, priv->appsrc[i]);
1817     if (priv->appsink[i])
1818       gst_bin_remove (bin, priv->appsink[i]);
1819     if (priv->appqueue[i])
1820       gst_bin_remove (bin, priv->appqueue[i]);
1821     if (priv->tee[i])
1822       gst_bin_remove (bin, priv->tee[i]);
1823     if (priv->funnel[i])
1824       gst_bin_remove (bin, priv->funnel[i]);
1825
1826     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1827     gst_object_unref (priv->recv_sink[i]);
1828     priv->recv_sink[i] = NULL;
1829
1830     priv->udpsrc_v4[i] = NULL;
1831     priv->udpsrc_v6[i] = NULL;
1832     priv->udpsink[i] = NULL;
1833     priv->appsrc[i] = NULL;
1834     priv->appsink[i] = NULL;
1835     priv->appqueue[i] = NULL;
1836     priv->tee[i] = NULL;
1837     priv->funnel[i] = NULL;
1838   }
1839   gst_object_unref (priv->send_src[0]);
1840   priv->send_src[0] = NULL;
1841
1842   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1843   gst_object_unref (priv->send_src[1]);
1844   priv->send_src[1] = NULL;
1845
1846   g_object_unref (priv->session);
1847   priv->session = NULL;
1848   if (priv->caps)
1849     gst_caps_unref (priv->caps);
1850   priv->caps = NULL;
1851
1852   priv->is_joined = FALSE;
1853   g_mutex_unlock (&priv->lock);
1854
1855   return TRUE;
1856
1857 was_not_joined:
1858   {
1859     return TRUE;
1860   }
1861 }
1862
1863 /**
1864  * gst_rtsp_stream_get_rtpinfo:
1865  * @stream: a #GstRTSPStream
1866  * @rtptime: (allow-none): result RTP timestamp
1867  * @seq: (allow-none): result RTP seqnum
1868  * @clock_rate: the clock rate
1869  * @running_time: (allow-none): result running-time
1870  *
1871  * Retrieve the current rtptime, seq and running-time. This is used to
1872  * construct a RTPInfo reply header.
1873  *
1874  * Returns: %TRUE when rtptime, seq and running-time could be determined.
1875  */
1876 gboolean
1877 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1878     guint * rtptime, guint * seq, guint * clock_rate,
1879     GstClockTime * running_time)
1880 {
1881   GstRTSPStreamPrivate *priv;
1882   GstStructure *stats;
1883   GObjectClass *payobjclass;
1884
1885   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1886
1887   priv = stream->priv;
1888
1889   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1890
1891   g_mutex_lock (&priv->lock);
1892
1893   if (g_object_class_find_property (payobjclass, "stats")) {
1894     g_object_get (priv->payloader, "stats", &stats, NULL);
1895     if (stats == NULL)
1896       goto no_stats;
1897
1898     if (seq)
1899       gst_structure_get_uint (stats, "seqnum", seq);
1900
1901     if (rtptime)
1902       gst_structure_get_uint (stats, "timestamp", rtptime);
1903
1904     if (running_time)
1905       gst_structure_get_clock_time (stats, "running-time", running_time);
1906
1907     if (clock_rate) {
1908       gst_structure_get_uint (stats, "clock-rate", clock_rate);
1909       if (*clock_rate == 0 && running_time)
1910         *running_time = GST_CLOCK_TIME_NONE;
1911     }
1912     gst_structure_free (stats);
1913   } else {
1914     if (!g_object_class_find_property (payobjclass, "seqnum") ||
1915         !g_object_class_find_property (payobjclass, "timestamp"))
1916       goto no_stats;
1917
1918     g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1919
1920     if (running_time)
1921       *running_time = GST_CLOCK_TIME_NONE;
1922   }
1923   g_mutex_unlock (&priv->lock);
1924
1925   return TRUE;
1926
1927   /* ERRORS */
1928 no_stats:
1929   {
1930     GST_WARNING ("Could not get payloader stats");
1931     g_mutex_unlock (&priv->lock);
1932     return FALSE;
1933   }
1934 }
1935
1936 /**
1937  * gst_rtsp_stream_get_caps:
1938  * @stream: a #GstRTSPStream
1939  *
1940  * Retrieve the current caps of @stream.
1941  *
1942  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1943  *    after usage.
1944  */
1945 GstCaps *
1946 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1947 {
1948   GstRTSPStreamPrivate *priv;
1949   GstCaps *result;
1950
1951   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1952
1953   priv = stream->priv;
1954
1955   g_mutex_lock (&priv->lock);
1956   if ((result = priv->caps))
1957     gst_caps_ref (result);
1958   g_mutex_unlock (&priv->lock);
1959
1960   return result;
1961 }
1962
1963 /**
1964  * gst_rtsp_stream_recv_rtp:
1965  * @stream: a #GstRTSPStream
1966  * @buffer: (transfer full): a #GstBuffer
1967  *
1968  * Handle an RTP buffer for the stream. This method is usually called when a
1969  * message has been received from a client using the TCP transport.
1970  *
1971  * This function takes ownership of @buffer.
1972  *
1973  * Returns: a GstFlowReturn.
1974  */
1975 GstFlowReturn
1976 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1977 {
1978   GstRTSPStreamPrivate *priv;
1979   GstFlowReturn ret;
1980   GstElement *element;
1981
1982   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1983   priv = stream->priv;
1984   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1985   g_return_val_if_fail (priv->is_joined, FALSE);
1986
1987   g_mutex_lock (&priv->lock);
1988   if (priv->appsrc[0])
1989     element = gst_object_ref (priv->appsrc[0]);
1990   else
1991     element = NULL;
1992   g_mutex_unlock (&priv->lock);
1993
1994   if (element) {
1995     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1996     gst_object_unref (element);
1997   } else {
1998     ret = GST_FLOW_OK;
1999   }
2000   return ret;
2001 }
2002
2003 /**
2004  * gst_rtsp_stream_recv_rtcp:
2005  * @stream: a #GstRTSPStream
2006  * @buffer: (transfer full): a #GstBuffer
2007  *
2008  * Handle an RTCP buffer for the stream. This method is usually called when a
2009  * message has been received from a client using the TCP transport.
2010  *
2011  * This function takes ownership of @buffer.
2012  *
2013  * Returns: a GstFlowReturn.
2014  */
2015 GstFlowReturn
2016 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2017 {
2018   GstRTSPStreamPrivate *priv;
2019   GstFlowReturn ret;
2020   GstElement *element;
2021
2022   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2023   priv = stream->priv;
2024   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2025   g_return_val_if_fail (priv->is_joined, FALSE);
2026
2027   g_mutex_lock (&priv->lock);
2028   if (priv->appsrc[1])
2029     element = gst_object_ref (priv->appsrc[1]);
2030   else
2031     element = NULL;
2032   g_mutex_unlock (&priv->lock);
2033
2034   if (element) {
2035     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2036     gst_object_unref (element);
2037   } else {
2038     ret = GST_FLOW_OK;
2039   }
2040   return ret;
2041 }
2042
2043 /* must be called with lock */
2044 static gboolean
2045 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2046     gboolean add)
2047 {
2048   GstRTSPStreamPrivate *priv = stream->priv;
2049   const GstRTSPTransport *tr;
2050
2051   tr = gst_rtsp_stream_transport_get_transport (trans);
2052
2053   switch (tr->lower_transport) {
2054     case GST_RTSP_LOWER_TRANS_UDP:
2055     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2056     {
2057       gchar *dest;
2058       gint min, max;
2059       guint ttl = 0;
2060
2061       dest = tr->destination;
2062       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2063         min = tr->port.min;
2064         max = tr->port.max;
2065         ttl = tr->ttl;
2066       } else {
2067         min = tr->client_port.min;
2068         max = tr->client_port.max;
2069       }
2070
2071       if (add) {
2072         GST_INFO ("adding %s:%d-%d", dest, min, max);
2073         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2074         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2075         if (ttl > 0) {
2076           GST_INFO ("setting ttl-mc %d", ttl);
2077           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2078           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2079         }
2080         priv->transports = g_list_prepend (priv->transports, trans);
2081       } else {
2082         GST_INFO ("removing %s:%d-%d", dest, min, max);
2083         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2084         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2085         priv->transports = g_list_remove (priv->transports, trans);
2086       }
2087       break;
2088     }
2089     case GST_RTSP_LOWER_TRANS_TCP:
2090       if (add) {
2091         GST_INFO ("adding TCP %s", tr->destination);
2092         priv->transports = g_list_prepend (priv->transports, trans);
2093       } else {
2094         GST_INFO ("removing TCP %s", tr->destination);
2095         priv->transports = g_list_remove (priv->transports, trans);
2096       }
2097       break;
2098     default:
2099       goto unknown_transport;
2100   }
2101   return TRUE;
2102
2103   /* ERRORS */
2104 unknown_transport:
2105   {
2106     GST_INFO ("Unknown transport %d", tr->lower_transport);
2107     return FALSE;
2108   }
2109 }
2110
2111
2112 /**
2113  * gst_rtsp_stream_add_transport:
2114  * @stream: a #GstRTSPStream
2115  * @trans: a #GstRTSPStreamTransport
2116  *
2117  * Add the transport in @trans to @stream. The media of @stream will
2118  * then also be send to the values configured in @trans.
2119  *
2120  * @stream must be joined to a bin.
2121  *
2122  * @trans must contain a valid #GstRTSPTransport.
2123  *
2124  * Returns: %TRUE if @trans was added
2125  */
2126 gboolean
2127 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2128     GstRTSPStreamTransport * trans)
2129 {
2130   GstRTSPStreamPrivate *priv;
2131   gboolean res;
2132
2133   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2134   priv = stream->priv;
2135   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2136   g_return_val_if_fail (priv->is_joined, FALSE);
2137
2138   g_mutex_lock (&priv->lock);
2139   res = update_transport (stream, trans, TRUE);
2140   g_mutex_unlock (&priv->lock);
2141
2142   return res;
2143 }
2144
2145 /**
2146  * gst_rtsp_stream_remove_transport:
2147  * @stream: a #GstRTSPStream
2148  * @trans: a #GstRTSPStreamTransport
2149  *
2150  * Remove the transport in @trans from @stream. The media of @stream will
2151  * not be sent to the values configured in @trans.
2152  *
2153  * @stream must be joined to a bin.
2154  *
2155  * @trans must contain a valid #GstRTSPTransport.
2156  *
2157  * Returns: %TRUE if @trans was removed
2158  */
2159 gboolean
2160 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2161     GstRTSPStreamTransport * trans)
2162 {
2163   GstRTSPStreamPrivate *priv;
2164   gboolean res;
2165
2166   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2167   priv = stream->priv;
2168   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2169   g_return_val_if_fail (priv->is_joined, FALSE);
2170
2171   g_mutex_lock (&priv->lock);
2172   res = update_transport (stream, trans, FALSE);
2173   g_mutex_unlock (&priv->lock);
2174
2175   return res;
2176 }
2177
2178 /**
2179  * gst_rtsp_stream_get_rtp_socket:
2180  * @stream: a #GstRTSPStream
2181  * @family: the socket family
2182  *
2183  * Get the RTP socket from @stream for a @family.
2184  *
2185  * @stream must be joined to a bin.
2186  *
2187  * Returns: the RTP socket or %NULL if no socket could be allocated for @family.
2188  *     Unref after usage
2189  */
2190 GSocket *
2191 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2192 {
2193   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2194   GSocket *socket;
2195   gchar *name;
2196
2197   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2198   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2199       family == G_SOCKET_FAMILY_IPV6, NULL);
2200   g_return_val_if_fail (priv->udpsink[0], NULL);
2201
2202   if (family == G_SOCKET_FAMILY_IPV6)
2203     name = "socket-v6";
2204   else
2205     name = "socket";
2206
2207   g_object_get (priv->udpsink[0], name, &socket, NULL);
2208
2209   return socket;
2210 }
2211
2212 /**
2213  * gst_rtsp_stream_get_rtcp_socket:
2214  * @stream: a #GstRTSPStream
2215  * @family: the socket family
2216  *
2217  * Get the RTCP socket from @stream for a @family.
2218  *
2219  * @stream must be joined to a bin.
2220  *
2221  * Returns: the RTCP socket or %NULL if no socket could be allocated for
2222  *     @family. Unref after usage
2223  */
2224 GSocket *
2225 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2226 {
2227   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2228   GSocket *socket;
2229   gchar *name;
2230
2231   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2232   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2233       family == G_SOCKET_FAMILY_IPV6, NULL);
2234   g_return_val_if_fail (priv->udpsink[1], NULL);
2235
2236   if (family == G_SOCKET_FAMILY_IPV6)
2237     name = "socket-v6";
2238   else
2239     name = "socket";
2240
2241   g_object_get (priv->udpsink[1], name, &socket, NULL);
2242
2243   return socket;
2244 }
2245
2246 /**
2247  * gst_rtsp_stream_transport_filter:
2248  * @stream: a #GstRTSPStream
2249  * @func: (scope call) (allow-none): a callback
2250  * @user_data: user data passed to @func
2251  *
2252  * Call @func for each transport managed by @stream. The result value of @func
2253  * determines what happens to the transport. @func will be called with @stream
2254  * locked so no further actions on @stream can be performed from @func.
2255  *
2256  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2257  * @stream.
2258  *
2259  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2260  *
2261  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2262  * will also be added with an additional ref to the result #GList of this
2263  * function..
2264  *
2265  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2266  *
2267  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2268  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2269  * element in the #GList should be unreffed before the list is freed.
2270  */
2271 GList *
2272 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2273     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2274 {
2275   GstRTSPStreamPrivate *priv;
2276   GList *result, *walk, *next;
2277
2278   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2279
2280   priv = stream->priv;
2281
2282   result = NULL;
2283
2284   g_mutex_lock (&priv->lock);
2285   for (walk = priv->transports; walk; walk = next) {
2286     GstRTSPStreamTransport *trans = walk->data;
2287     GstRTSPFilterResult res;
2288
2289     next = g_list_next (walk);
2290
2291     if (func)
2292       res = func (stream, trans, user_data);
2293     else
2294       res = GST_RTSP_FILTER_REF;
2295
2296     switch (res) {
2297       case GST_RTSP_FILTER_REMOVE:
2298         update_transport (stream, trans, FALSE);
2299         break;
2300       case GST_RTSP_FILTER_REF:
2301         result = g_list_prepend (result, g_object_ref (trans));
2302         break;
2303       case GST_RTSP_FILTER_KEEP:
2304       default:
2305         break;
2306     }
2307   }
2308   g_mutex_unlock (&priv->lock);
2309
2310   return result;
2311 }
2312
2313 static GstPadProbeReturn
2314 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2315 {
2316   GstRTSPStreamPrivate *priv;
2317   GstRTSPStream *stream;
2318
2319   stream = user_data;
2320   priv = stream->priv;
2321
2322   GST_DEBUG_OBJECT (pad, "now blocking");
2323
2324   g_mutex_lock (&priv->lock);
2325   priv->blocking = TRUE;
2326   g_mutex_unlock (&priv->lock);
2327
2328   gst_element_post_message (priv->payloader,
2329       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2330           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2331
2332   return GST_PAD_PROBE_OK;
2333 }
2334
2335 /**
2336  * gst_rtsp_stream_set_blocked:
2337  * @stream: a #GstRTSPStream
2338  * @blocked: boolean indicating we should block or unblock
2339  *
2340  * Blocks or unblocks the dataflow on @stream.
2341  *
2342  * Returns: %TRUE on success
2343  */
2344 gboolean
2345 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2346 {
2347   GstRTSPStreamPrivate *priv;
2348
2349   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2350
2351   priv = stream->priv;
2352
2353   g_mutex_lock (&priv->lock);
2354   if (blocked) {
2355     priv->blocking = FALSE;
2356     if (priv->blocked_id == 0) {
2357       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2358           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2359           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2360           g_object_ref (stream), g_object_unref);
2361     }
2362   } else {
2363     if (priv->blocked_id != 0) {
2364       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2365       priv->blocked_id = 0;
2366       priv->blocking = FALSE;
2367     }
2368   }
2369   g_mutex_unlock (&priv->lock);
2370
2371   return TRUE;
2372 }
2373
2374 /**
2375  * gst_rtsp_stream_is_blocking:
2376  * @stream: a #GstRTSPStream
2377  *
2378  * Check if @stream is blocking on a #GstBuffer.
2379  *
2380  * Returns: %TRUE if @stream is blocking
2381  */
2382 gboolean
2383 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2384 {
2385   GstRTSPStreamPrivate *priv;
2386   gboolean result;
2387
2388   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2389
2390   priv = stream->priv;
2391
2392   g_mutex_lock (&priv->lock);
2393   result = priv->blocking;
2394   g_mutex_unlock (&priv->lock);
2395
2396   return result;
2397 }