stream: release lock while pushing out packets
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
83    * sockets */
84   GstElement *udpsrc_v4[2];
85
86   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
87    * sockets */
88   GstElement *udpsrc_v6[2];
89
90   GstElement *udpsink[2];
91
92   /* for TCP transport */
93   GstElement *appsrc[2];
94   GstElement *appqueue[2];
95   GstElement *appsink[2];
96
97   GstElement *tee[2];
98   GstElement *funnel[2];
99
100   /* server ports for sending/receiving over ipv4 */
101   GstRTSPRange server_port_v4;
102   GstRTSPAddress *server_addr_v4;
103   gboolean have_ipv4;
104
105   /* server ports for sending/receiving over ipv6 */
106   GstRTSPRange server_port_v6;
107   GstRTSPAddress *server_addr_v6;
108   gboolean have_ipv6;
109
110   /* multicast addresses */
111   GstRTSPAddressPool *pool;
112   GstRTSPAddress *addr_v4;
113   GstRTSPAddress *addr_v6;
114
115   /* the caps of the stream */
116   gulong caps_sig;
117   GstCaps *caps;
118
119   /* transports we stream to */
120   guint n_active;
121   GList *transports;
122   gboolean tr_changed;
123   GList *tr_cache;
124
125   gint dscp_qos;
126
127   /* stream blocking */
128   gulong blocked_id;
129   gboolean blocking;
130 };
131
132 #define DEFAULT_CONTROL         NULL
133 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
134 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
135                                         GST_RTSP_LOWER_TRANS_TCP
136
137 enum
138 {
139   PROP_0,
140   PROP_CONTROL,
141   PROP_PROFILES,
142   PROP_PROTOCOLS,
143   PROP_LAST
144 };
145
146 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
147 #define GST_CAT_DEFAULT rtsp_stream_debug
148
149 static GQuark ssrc_stream_map_key;
150
151 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
152     GValue * value, GParamSpec * pspec);
153 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
154     const GValue * value, GParamSpec * pspec);
155
156 static void gst_rtsp_stream_finalize (GObject * obj);
157
158 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
159
160 static void
161 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
162 {
163   GObjectClass *gobject_class;
164
165   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
166
167   gobject_class = G_OBJECT_CLASS (klass);
168
169   gobject_class->get_property = gst_rtsp_stream_get_property;
170   gobject_class->set_property = gst_rtsp_stream_set_property;
171   gobject_class->finalize = gst_rtsp_stream_finalize;
172
173   g_object_class_install_property (gobject_class, PROP_CONTROL,
174       g_param_spec_string ("control", "Control",
175           "The control string for this stream", DEFAULT_CONTROL,
176           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177
178   g_object_class_install_property (gobject_class, PROP_PROFILES,
179       g_param_spec_flags ("profiles", "Profiles",
180           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
181           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
182
183   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
184       g_param_spec_flags ("protocols", "Protocols",
185           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
186           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
187
188   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
189
190   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
191 }
192
193 static void
194 gst_rtsp_stream_init (GstRTSPStream * stream)
195 {
196   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
197
198   GST_DEBUG ("new stream %p", stream);
199
200   stream->priv = priv;
201
202   priv->dscp_qos = -1;
203   priv->control = g_strdup (DEFAULT_CONTROL);
204   priv->profiles = DEFAULT_PROFILES;
205   priv->protocols = DEFAULT_PROTOCOLS;
206
207   g_mutex_init (&priv->lock);
208 }
209
210 static void
211 gst_rtsp_stream_finalize (GObject * obj)
212 {
213   GstRTSPStream *stream;
214   GstRTSPStreamPrivate *priv;
215
216   stream = GST_RTSP_STREAM (obj);
217   priv = stream->priv;
218
219   GST_DEBUG ("finalize stream %p", stream);
220
221   /* we really need to be unjoined now */
222   g_return_if_fail (!priv->is_joined);
223
224   if (priv->addr_v4)
225     gst_rtsp_address_free (priv->addr_v4);
226   if (priv->addr_v6)
227     gst_rtsp_address_free (priv->addr_v6);
228   if (priv->server_addr_v4)
229     gst_rtsp_address_free (priv->server_addr_v4);
230   if (priv->server_addr_v6)
231     gst_rtsp_address_free (priv->server_addr_v6);
232   if (priv->pool)
233     g_object_unref (priv->pool);
234   gst_object_unref (priv->payloader);
235   gst_object_unref (priv->srcpad);
236   g_free (priv->control);
237   g_mutex_clear (&priv->lock);
238
239   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
240 }
241
242 static void
243 gst_rtsp_stream_get_property (GObject * object, guint propid,
244     GValue * value, GParamSpec * pspec)
245 {
246   GstRTSPStream *stream = GST_RTSP_STREAM (object);
247
248   switch (propid) {
249     case PROP_CONTROL:
250       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
251       break;
252     case PROP_PROFILES:
253       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
254       break;
255     case PROP_PROTOCOLS:
256       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
257       break;
258     default:
259       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
260   }
261 }
262
263 static void
264 gst_rtsp_stream_set_property (GObject * object, guint propid,
265     const GValue * value, GParamSpec * pspec)
266 {
267   GstRTSPStream *stream = GST_RTSP_STREAM (object);
268
269   switch (propid) {
270     case PROP_CONTROL:
271       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
272       break;
273     case PROP_PROFILES:
274       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
275       break;
276     case PROP_PROTOCOLS:
277       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
278       break;
279     default:
280       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
281   }
282 }
283
284 /**
285  * gst_rtsp_stream_new:
286  * @idx: an index
287  * @srcpad: a #GstPad
288  * @payloader: a #GstElement
289  *
290  * Create a new media stream with index @idx that handles RTP data on
291  * @srcpad and has a payloader element @payloader.
292  *
293  * Returns: a new #GstRTSPStream
294  */
295 GstRTSPStream *
296 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
297 {
298   GstRTSPStreamPrivate *priv;
299   GstRTSPStream *stream;
300
301   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
302   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
303   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
304
305   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
306   priv = stream->priv;
307   priv->idx = idx;
308   priv->payloader = gst_object_ref (payloader);
309   priv->srcpad = gst_object_ref (srcpad);
310
311   return stream;
312 }
313
314 /**
315  * gst_rtsp_stream_get_index:
316  * @stream: a #GstRTSPStream
317  *
318  * Get the stream index.
319  *
320  * Return: the stream index.
321  */
322 guint
323 gst_rtsp_stream_get_index (GstRTSPStream * stream)
324 {
325   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
326
327   return stream->priv->idx;
328 }
329
330 /**
331  * gst_rtsp_stream_get_pt:
332  * @stream: a #GstRTSPStream
333  *
334  * Get the stream payload type.
335  *
336  * Return: the stream payload type.
337  */
338 guint
339 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
340 {
341   GstRTSPStreamPrivate *priv;
342   guint pt;
343
344   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
345
346   priv = stream->priv;
347
348   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
349
350   return pt;
351 }
352
353 /**
354  * gst_rtsp_stream_get_srcpad:
355  * @stream: a #GstRTSPStream
356  *
357  * Get the srcpad associated with @stream.
358  *
359  * Returns: (transfer full): the srcpad. Unref after usage.
360  */
361 GstPad *
362 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
363 {
364   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
365
366   return gst_object_ref (stream->priv->srcpad);
367 }
368
369 /**
370  * gst_rtsp_stream_get_control:
371  * @stream: a #GstRTSPStream
372  *
373  * Get the control string to identify this stream.
374  *
375  * Returns: (transfer full): the control string. free after usage.
376  */
377 gchar *
378 gst_rtsp_stream_get_control (GstRTSPStream * stream)
379 {
380   GstRTSPStreamPrivate *priv;
381   gchar *result;
382
383   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
384
385   priv = stream->priv;
386
387   g_mutex_lock (&priv->lock);
388   if ((result = g_strdup (priv->control)) == NULL)
389     result = g_strdup_printf ("stream=%u", priv->idx);
390   g_mutex_unlock (&priv->lock);
391
392   return result;
393 }
394
395 /**
396  * gst_rtsp_stream_set_control:
397  * @stream: a #GstRTSPStream
398  * @control: a control string
399  *
400  * Set the control string in @stream.
401  */
402 void
403 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
404 {
405   GstRTSPStreamPrivate *priv;
406
407   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
408
409   priv = stream->priv;
410
411   g_mutex_lock (&priv->lock);
412   g_free (priv->control);
413   priv->control = g_strdup (control);
414   g_mutex_unlock (&priv->lock);
415 }
416
417 /**
418  * gst_rtsp_stream_has_control:
419  * @stream: a #GstRTSPStream
420  * @control: a control string
421  *
422  * Check if @stream has the control string @control.
423  *
424  * Returns: %TRUE is @stream has @control as the control string
425  */
426 gboolean
427 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
428 {
429   GstRTSPStreamPrivate *priv;
430   gboolean res;
431
432   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
433
434   priv = stream->priv;
435
436   g_mutex_lock (&priv->lock);
437   if (priv->control)
438     res = (g_strcmp0 (priv->control, control) == 0);
439   else {
440     guint streamid;
441
442     if (sscanf (control, "stream=%u", &streamid) > 0)
443       res = (streamid == priv->idx);
444     else
445       res = FALSE;
446   }
447   g_mutex_unlock (&priv->lock);
448
449   return res;
450 }
451
452 /**
453  * gst_rtsp_stream_set_mtu:
454  * @stream: a #GstRTSPStream
455  * @mtu: a new MTU
456  *
457  * Configure the mtu in the payloader of @stream to @mtu.
458  */
459 void
460 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
461 {
462   GstRTSPStreamPrivate *priv;
463
464   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
465
466   priv = stream->priv;
467
468   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
469
470   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
471 }
472
473 /**
474  * gst_rtsp_stream_get_mtu:
475  * @stream: a #GstRTSPStream
476  *
477  * Get the configured MTU in the payloader of @stream.
478  *
479  * Returns: the MTU of the payloader.
480  */
481 guint
482 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
483 {
484   GstRTSPStreamPrivate *priv;
485   guint mtu;
486
487   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
488
489   priv = stream->priv;
490
491   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
492
493   return mtu;
494 }
495
496 /* Update the dscp qos property on the udp sinks */
497 static void
498 update_dscp_qos (GstRTSPStream * stream)
499 {
500   GstRTSPStreamPrivate *priv;
501
502   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
503
504   priv = stream->priv;
505
506   if (priv->udpsink[0]) {
507     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
508         NULL);
509   }
510
511   if (priv->udpsink[1]) {
512     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
513         NULL);
514   }
515 }
516
517 /**
518  * gst_rtsp_stream_set_dscp_qos:
519  * @stream: a #GstRTSPStream
520  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
521  *
522  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
523  */
524 void
525 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
526 {
527   GstRTSPStreamPrivate *priv;
528
529   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
530
531   priv = stream->priv;
532
533   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
534
535   if (dscp_qos < -1 || dscp_qos > 63) {
536     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
537     return;
538   }
539
540   priv->dscp_qos = dscp_qos;
541
542   update_dscp_qos (stream);
543 }
544
545 /**
546  * gst_rtsp_stream_get_dscp_qos:
547  * @stream: a #GstRTSPStream
548  *
549  * Get the configured DSCP QoS in of the outgoing sockets.
550  *
551  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
552  */
553 gint
554 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
555 {
556   GstRTSPStreamPrivate *priv;
557
558   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
559
560   priv = stream->priv;
561
562   return priv->dscp_qos;
563 }
564
565 /**
566  * gst_rtsp_stream_is_transport_supported:
567  * @stream: a #GstRTSPStream
568  * @transport: a #GstRTSPTransport
569  *
570  * Check if @transport can be handled by stream
571  *
572  * Returns: %TRUE if @transport can be handled by @stream.
573  */
574 gboolean
575 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
576     GstRTSPTransport * transport)
577 {
578   GstRTSPStreamPrivate *priv;
579
580   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
581
582   priv = stream->priv;
583
584   g_mutex_lock (&priv->lock);
585   if (transport->trans != GST_RTSP_TRANS_RTP)
586     goto unsupported_transmode;
587
588   if (!(transport->profile & priv->profiles))
589     goto unsupported_profile;
590
591   if (!(transport->lower_transport & priv->protocols))
592     goto unsupported_ltrans;
593
594   g_mutex_unlock (&priv->lock);
595
596   return TRUE;
597
598   /* ERRORS */
599 unsupported_transmode:
600   {
601     GST_DEBUG ("unsupported transport mode %d", transport->trans);
602     g_mutex_unlock (&priv->lock);
603     return FALSE;
604   }
605 unsupported_profile:
606   {
607     GST_DEBUG ("unsupported profile %d", transport->profile);
608     g_mutex_unlock (&priv->lock);
609     return FALSE;
610   }
611 unsupported_ltrans:
612   {
613     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
614     g_mutex_unlock (&priv->lock);
615     return FALSE;
616   }
617 }
618
619 /**
620  * gst_rtsp_stream_set_profiles:
621  * @stream: a #GstRTSPStream
622  * @profiles: the new profiles
623  *
624  * Configure the allowed profiles for @stream.
625  */
626 void
627 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
628 {
629   GstRTSPStreamPrivate *priv;
630
631   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
632
633   priv = stream->priv;
634
635   g_mutex_lock (&priv->lock);
636   priv->profiles = profiles;
637   g_mutex_unlock (&priv->lock);
638 }
639
640 /**
641  * gst_rtsp_stream_get_profiles:
642  * @stream: a #GstRTSPStream
643  *
644  * Get the allowed profiles of @stream.
645  *
646  * Returns: a #GstRTSPProfile
647  */
648 GstRTSPProfile
649 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
650 {
651   GstRTSPStreamPrivate *priv;
652   GstRTSPProfile res;
653
654   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
655
656   priv = stream->priv;
657
658   g_mutex_lock (&priv->lock);
659   res = priv->profiles;
660   g_mutex_unlock (&priv->lock);
661
662   return res;
663 }
664
665 /**
666  * gst_rtsp_stream_set_protocols:
667  * @stream: a #GstRTSPStream
668  * @protocols: the new flags
669  *
670  * Configure the allowed lower transport for @stream.
671  */
672 void
673 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
674     GstRTSPLowerTrans protocols)
675 {
676   GstRTSPStreamPrivate *priv;
677
678   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
679
680   priv = stream->priv;
681
682   g_mutex_lock (&priv->lock);
683   priv->protocols = protocols;
684   g_mutex_unlock (&priv->lock);
685 }
686
687 /**
688  * gst_rtsp_stream_get_protocols:
689  * @stream: a #GstRTSPStream
690  *
691  * Get the allowed protocols of @stream.
692  *
693  * Returns: a #GstRTSPLowerTrans
694  */
695 GstRTSPLowerTrans
696 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
697 {
698   GstRTSPStreamPrivate *priv;
699   GstRTSPLowerTrans res;
700
701   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
702       GST_RTSP_LOWER_TRANS_UNKNOWN);
703
704   priv = stream->priv;
705
706   g_mutex_lock (&priv->lock);
707   res = priv->protocols;
708   g_mutex_unlock (&priv->lock);
709
710   return res;
711 }
712
713 /**
714  * gst_rtsp_stream_set_address_pool:
715  * @stream: a #GstRTSPStream
716  * @pool: a #GstRTSPAddressPool
717  *
718  * configure @pool to be used as the address pool of @stream.
719  */
720 void
721 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
722     GstRTSPAddressPool * pool)
723 {
724   GstRTSPStreamPrivate *priv;
725   GstRTSPAddressPool *old;
726
727   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
728
729   priv = stream->priv;
730
731   GST_LOG_OBJECT (stream, "set address pool %p", pool);
732
733   g_mutex_lock (&priv->lock);
734   if ((old = priv->pool) != pool)
735     priv->pool = pool ? g_object_ref (pool) : NULL;
736   else
737     old = NULL;
738   g_mutex_unlock (&priv->lock);
739
740   if (old)
741     g_object_unref (old);
742 }
743
744 /**
745  * gst_rtsp_stream_get_address_pool:
746  * @stream: a #GstRTSPStream
747  *
748  * Get the #GstRTSPAddressPool used as the address pool of @stream.
749  *
750  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
751  * usage.
752  */
753 GstRTSPAddressPool *
754 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
755 {
756   GstRTSPStreamPrivate *priv;
757   GstRTSPAddressPool *result;
758
759   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
760
761   priv = stream->priv;
762
763   g_mutex_lock (&priv->lock);
764   if ((result = priv->pool))
765     g_object_ref (result);
766   g_mutex_unlock (&priv->lock);
767
768   return result;
769 }
770
771 /**
772  * gst_rtsp_stream_get_multicast_address:
773  * @stream: a #GstRTSPStream
774  * @family: the #GSocketFamily
775  *
776  * Get the multicast address of @stream for @family.
777  *
778  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
779  * allocated. gst_rtsp_address_free() after usage.
780  */
781 GstRTSPAddress *
782 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
783     GSocketFamily family)
784 {
785   GstRTSPStreamPrivate *priv;
786   GstRTSPAddress *result;
787   GstRTSPAddress **addrp;
788   GstRTSPAddressFlags flags;
789
790   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
791
792   priv = stream->priv;
793
794   if (family == G_SOCKET_FAMILY_IPV6) {
795     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
796     addrp = &priv->addr_v4;
797   } else {
798     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
799     addrp = &priv->addr_v6;
800   }
801
802   g_mutex_lock (&priv->lock);
803   if (*addrp == NULL) {
804     if (priv->pool == NULL)
805       goto no_pool;
806
807     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
808
809     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
810     if (*addrp == NULL)
811       goto no_address;
812   }
813   result = gst_rtsp_address_copy (*addrp);
814   g_mutex_unlock (&priv->lock);
815
816   return result;
817
818   /* ERRORS */
819 no_pool:
820   {
821     GST_ERROR_OBJECT (stream, "no address pool specified");
822     g_mutex_unlock (&priv->lock);
823     return NULL;
824   }
825 no_address:
826   {
827     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
828     g_mutex_unlock (&priv->lock);
829     return NULL;
830   }
831 }
832
833 /**
834  * gst_rtsp_stream_reserve_address:
835  * @stream: a #GstRTSPStream
836  * @address: an address
837  * @port: a port
838  * @n_ports: n_ports
839  * @ttl: a TTL
840  *
841  * Reserve @address and @port as the address and port of @stream.
842  *
843  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
844  * reserved. gst_rtsp_address_free() after usage.
845  */
846 GstRTSPAddress *
847 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
848     const gchar * address, guint port, guint n_ports, guint ttl)
849 {
850   GstRTSPStreamPrivate *priv;
851   GstRTSPAddress *result;
852   GInetAddress *addr;
853   GSocketFamily family;
854   GstRTSPAddress **addrp;
855
856   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
857   g_return_val_if_fail (address != NULL, NULL);
858   g_return_val_if_fail (port > 0, NULL);
859   g_return_val_if_fail (n_ports > 0, NULL);
860   g_return_val_if_fail (ttl > 0, NULL);
861
862   priv = stream->priv;
863
864   addr = g_inet_address_new_from_string (address);
865   if (!addr) {
866     GST_ERROR ("failed to get inet addr from %s", address);
867     family = G_SOCKET_FAMILY_IPV4;
868   } else {
869     family = g_inet_address_get_family (addr);
870     g_object_unref (addr);
871   }
872
873   if (family == G_SOCKET_FAMILY_IPV6)
874     addrp = &priv->addr_v4;
875   else
876     addrp = &priv->addr_v6;
877
878   g_mutex_lock (&priv->lock);
879   if (*addrp == NULL) {
880     GstRTSPAddressPoolResult res;
881
882     if (priv->pool == NULL)
883       goto no_pool;
884
885     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
886         port, n_ports, ttl, addrp);
887     if (res != GST_RTSP_ADDRESS_POOL_OK)
888       goto no_address;
889   } else {
890     if (strcmp ((*addrp)->address, address) ||
891         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
892         (*addrp)->ttl != ttl)
893       goto different_address;
894   }
895   result = gst_rtsp_address_copy (*addrp);
896   g_mutex_unlock (&priv->lock);
897
898   return result;
899
900   /* ERRORS */
901 no_pool:
902   {
903     GST_ERROR_OBJECT (stream, "no address pool specified");
904     g_mutex_unlock (&priv->lock);
905     return NULL;
906   }
907 no_address:
908   {
909     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
910         address);
911     g_mutex_unlock (&priv->lock);
912     return NULL;
913   }
914 different_address:
915   {
916     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
917         " reserved", address);
918     g_mutex_unlock (&priv->lock);
919     return NULL;
920   }
921 }
922
923 static gboolean
924 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
925     GSocketFamily family, GstElement * udpsrc_out[2],
926     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
927     GstRTSPAddress ** server_addr_out)
928 {
929   GstStateChangeReturn ret;
930   GstElement *udpsrc0, *udpsrc1;
931   GstElement *udpsink0, *udpsink1;
932   GSocket *rtp_socket = NULL;
933   GSocket *rtcp_socket;
934   gint tmp_rtp, tmp_rtcp;
935   guint count;
936   gint rtpport, rtcpport;
937   GList *rejected_addresses = NULL;
938   GstRTSPAddress *addr = NULL;
939   GInetAddress *inetaddr = NULL;
940   GSocketAddress *rtp_sockaddr = NULL;
941   GSocketAddress *rtcp_sockaddr = NULL;
942   const gchar *multisink_socket;
943
944   if (family == G_SOCKET_FAMILY_IPV6)
945     multisink_socket = "socket-v6";
946   else
947     multisink_socket = "socket";
948
949   udpsrc0 = NULL;
950   udpsrc1 = NULL;
951   udpsink0 = NULL;
952   udpsink1 = NULL;
953   count = 0;
954
955   /* Start with random port */
956   tmp_rtp = 0;
957
958   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
959       G_SOCKET_PROTOCOL_UDP, NULL);
960   if (!rtcp_socket)
961     goto no_udp_protocol;
962
963   if (*server_addr_out)
964     gst_rtsp_address_free (*server_addr_out);
965
966   /* try to allocate 2 UDP ports, the RTP port should be an even
967    * number and the RTCP port should be the next (uneven) port */
968 again:
969
970   if (rtp_socket == NULL) {
971     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
972         G_SOCKET_PROTOCOL_UDP, NULL);
973     if (!rtp_socket)
974       goto no_udp_protocol;
975   }
976
977   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
978     GstRTSPAddressFlags flags;
979
980     if (addr)
981       rejected_addresses = g_list_prepend (rejected_addresses, addr);
982
983     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
984     if (family == G_SOCKET_FAMILY_IPV6)
985       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
986     else
987       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
988
989     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
990
991     if (addr == NULL)
992       goto no_ports;
993
994     tmp_rtp = addr->port;
995
996     g_clear_object (&inetaddr);
997     inetaddr = g_inet_address_new_from_string (addr->address);
998   } else {
999     if (tmp_rtp != 0) {
1000       tmp_rtp += 2;
1001       if (++count > 20)
1002         goto no_ports;
1003     }
1004
1005     if (inetaddr == NULL)
1006       inetaddr = g_inet_address_new_any (family);
1007   }
1008
1009   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1010   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1011     g_object_unref (rtp_sockaddr);
1012     goto again;
1013   }
1014   g_object_unref (rtp_sockaddr);
1015
1016   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1017   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1018     g_clear_object (&rtp_sockaddr);
1019     goto socket_error;
1020   }
1021
1022   tmp_rtp =
1023       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1024   g_object_unref (rtp_sockaddr);
1025
1026   /* check if port is even */
1027   if ((tmp_rtp & 1) != 0) {
1028     /* port not even, close and allocate another */
1029     tmp_rtp++;
1030     g_clear_object (&rtp_socket);
1031     goto again;
1032   }
1033
1034   /* set port */
1035   tmp_rtcp = tmp_rtp + 1;
1036
1037   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1038   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1039     g_object_unref (rtcp_sockaddr);
1040     g_clear_object (&rtp_socket);
1041     goto again;
1042   }
1043   g_object_unref (rtcp_sockaddr);
1044
1045   g_clear_object (&inetaddr);
1046
1047   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1048   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1049
1050   if (udpsrc0 == NULL || udpsrc1 == NULL)
1051     goto no_udp_protocol;
1052
1053   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1054   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1055
1056   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
1057   if (ret == GST_STATE_CHANGE_FAILURE)
1058     goto element_error;
1059   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
1060   if (ret == GST_STATE_CHANGE_FAILURE)
1061     goto element_error;
1062
1063   /* all fine, do port check */
1064   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1065   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1066
1067   /* this should not happen... */
1068   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1069     goto port_error;
1070
1071   if (udpsink_out[0])
1072     udpsink0 = udpsink_out[0];
1073   else
1074     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1075
1076   if (!udpsink0)
1077     goto no_udp_protocol;
1078
1079   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1080   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1081
1082   if (udpsink_out[1])
1083     udpsink1 = udpsink_out[1];
1084   else
1085     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1086
1087   if (!udpsink1)
1088     goto no_udp_protocol;
1089
1090   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1091   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1092   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1093
1094   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1095   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1096   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1097   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1098   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1099   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1100   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1101   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1102
1103   /* we keep these elements, we will further configure them when the
1104    * client told us to really use the UDP ports. */
1105   udpsrc_out[0] = udpsrc0;
1106   udpsrc_out[1] = udpsrc1;
1107   udpsink_out[0] = udpsink0;
1108   udpsink_out[1] = udpsink1;
1109   server_port_out->min = rtpport;
1110   server_port_out->max = rtcpport;
1111
1112   *server_addr_out = addr;
1113   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1114
1115   g_object_unref (rtp_socket);
1116   g_object_unref (rtcp_socket);
1117
1118   return TRUE;
1119
1120   /* ERRORS */
1121 no_udp_protocol:
1122   {
1123     goto cleanup;
1124   }
1125 no_ports:
1126   {
1127     goto cleanup;
1128   }
1129 port_error:
1130   {
1131     goto cleanup;
1132   }
1133 socket_error:
1134   {
1135     goto cleanup;
1136   }
1137 element_error:
1138   {
1139     goto cleanup;
1140   }
1141 cleanup:
1142   {
1143     if (udpsrc0) {
1144       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1145       gst_object_unref (udpsrc0);
1146     }
1147     if (udpsrc1) {
1148       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1149       gst_object_unref (udpsrc1);
1150     }
1151     if (udpsink0) {
1152       gst_element_set_state (udpsink0, GST_STATE_NULL);
1153       gst_object_unref (udpsink0);
1154     }
1155     if (inetaddr)
1156       g_object_unref (inetaddr);
1157     g_list_free_full (rejected_addresses,
1158         (GDestroyNotify) gst_rtsp_address_free);
1159     if (addr)
1160       gst_rtsp_address_free (addr);
1161     if (rtp_socket)
1162       g_object_unref (rtp_socket);
1163     if (rtcp_socket)
1164       g_object_unref (rtcp_socket);
1165     return FALSE;
1166   }
1167 }
1168
1169 /* must be called with lock */
1170 static gboolean
1171 alloc_ports (GstRTSPStream * stream)
1172 {
1173   GstRTSPStreamPrivate *priv = stream->priv;
1174
1175   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1176       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1177       &priv->server_port_v4, &priv->server_addr_v4);
1178
1179   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1180       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1181       &priv->server_port_v6, &priv->server_addr_v6);
1182
1183   return priv->have_ipv4 || priv->have_ipv6;
1184 }
1185
1186 /**
1187  * gst_rtsp_stream_get_server_port:
1188  * @stream: a #GstRTSPStream
1189  * @server_port: (out): result server port
1190  * @family: the port family to get
1191  *
1192  * Fill @server_port with the port pair used by the server. This function can
1193  * only be called when @stream has been joined.
1194  */
1195 void
1196 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1197     GstRTSPRange * server_port, GSocketFamily family)
1198 {
1199   GstRTSPStreamPrivate *priv;
1200
1201   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1202   priv = stream->priv;
1203   g_return_if_fail (priv->is_joined);
1204
1205   g_mutex_lock (&priv->lock);
1206   if (family == G_SOCKET_FAMILY_IPV4) {
1207     if (server_port)
1208       *server_port = priv->server_port_v4;
1209   } else {
1210     if (server_port)
1211       *server_port = priv->server_port_v6;
1212   }
1213   g_mutex_unlock (&priv->lock);
1214 }
1215
1216 /**
1217  * gst_rtsp_stream_get_rtpsession:
1218  * @stream: a #GstRTSPStream
1219  *
1220  * Get the RTP session of this stream.
1221  *
1222  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1223  */
1224 GObject *
1225 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1226 {
1227   GstRTSPStreamPrivate *priv;
1228   GObject *session;
1229
1230   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1231
1232   priv = stream->priv;
1233
1234   g_mutex_lock (&priv->lock);
1235   if ((session = priv->session))
1236     g_object_ref (session);
1237   g_mutex_unlock (&priv->lock);
1238
1239   return session;
1240 }
1241
1242 /**
1243  * gst_rtsp_stream_get_ssrc:
1244  * @stream: a #GstRTSPStream
1245  * @ssrc: (out): result ssrc
1246  *
1247  * Get the SSRC used by the RTP session of this stream. This function can only
1248  * be called when @stream has been joined.
1249  */
1250 void
1251 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1252 {
1253   GstRTSPStreamPrivate *priv;
1254
1255   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1256   priv = stream->priv;
1257   g_return_if_fail (priv->is_joined);
1258
1259   g_mutex_lock (&priv->lock);
1260   if (ssrc && priv->session)
1261     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1262   g_mutex_unlock (&priv->lock);
1263 }
1264
1265 /* executed from streaming thread */
1266 static void
1267 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1268 {
1269   GstRTSPStreamPrivate *priv = stream->priv;
1270   GstCaps *newcaps, *oldcaps;
1271
1272   newcaps = gst_pad_get_current_caps (pad);
1273
1274   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1275       newcaps);
1276
1277   g_mutex_lock (&priv->lock);
1278   oldcaps = priv->caps;
1279   priv->caps = newcaps;
1280   g_mutex_unlock (&priv->lock);
1281
1282   if (oldcaps)
1283     gst_caps_unref (oldcaps);
1284 }
1285
1286 static void
1287 dump_structure (const GstStructure * s)
1288 {
1289   gchar *sstr;
1290
1291   sstr = gst_structure_to_string (s);
1292   GST_INFO ("structure: %s", sstr);
1293   g_free (sstr);
1294 }
1295
1296 static GstRTSPStreamTransport *
1297 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1298 {
1299   GstRTSPStreamPrivate *priv = stream->priv;
1300   GList *walk;
1301   GstRTSPStreamTransport *result = NULL;
1302   const gchar *tmp;
1303   gchar *dest;
1304   guint port;
1305
1306   if (rtcp_from == NULL)
1307     return NULL;
1308
1309   tmp = g_strrstr (rtcp_from, ":");
1310   if (tmp == NULL)
1311     return NULL;
1312
1313   port = atoi (tmp + 1);
1314   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1315
1316   g_mutex_lock (&priv->lock);
1317   GST_INFO ("finding %s:%d in %d transports", dest, port,
1318       g_list_length (priv->transports));
1319
1320   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1321     GstRTSPStreamTransport *trans = walk->data;
1322     const GstRTSPTransport *tr;
1323     gint min, max;
1324
1325     tr = gst_rtsp_stream_transport_get_transport (trans);
1326
1327     min = tr->client_port.min;
1328     max = tr->client_port.max;
1329
1330     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1331       result = trans;
1332       break;
1333     }
1334   }
1335   if (result)
1336     g_object_ref (result);
1337   g_mutex_unlock (&priv->lock);
1338
1339   g_free (dest);
1340
1341   return result;
1342 }
1343
1344 static GstRTSPStreamTransport *
1345 check_transport (GObject * source, GstRTSPStream * stream)
1346 {
1347   GstStructure *stats;
1348   GstRTSPStreamTransport *trans;
1349
1350   /* see if we have a stream to match with the origin of the RTCP packet */
1351   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1352   if (trans == NULL) {
1353     g_object_get (source, "stats", &stats, NULL);
1354     if (stats) {
1355       const gchar *rtcp_from;
1356
1357       dump_structure (stats);
1358
1359       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1360       if ((trans = find_transport (stream, rtcp_from))) {
1361         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1362             source);
1363         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1364             g_object_unref);
1365       }
1366       gst_structure_free (stats);
1367     }
1368   }
1369   return trans;
1370 }
1371
1372
1373 static void
1374 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1375 {
1376   GstRTSPStreamTransport *trans;
1377
1378   GST_INFO ("%p: new source %p", stream, source);
1379
1380   trans = check_transport (source, stream);
1381
1382   if (trans)
1383     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1384 }
1385
1386 static void
1387 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1388 {
1389   GST_INFO ("%p: new SDES %p", stream, source);
1390 }
1391
1392 static void
1393 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1394 {
1395   GstRTSPStreamTransport *trans;
1396
1397   trans = check_transport (source, stream);
1398
1399   if (trans) {
1400     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1401     gst_rtsp_stream_transport_keep_alive (trans);
1402   }
1403 #ifdef DUMP_STATS
1404   {
1405     GstStructure *stats;
1406     g_object_get (source, "stats", &stats, NULL);
1407     if (stats) {
1408       dump_structure (stats);
1409       gst_structure_free (stats);
1410     }
1411   }
1412 #endif
1413 }
1414
1415 static void
1416 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1417 {
1418   GST_INFO ("%p: source %p bye", stream, source);
1419 }
1420
1421 static void
1422 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1423 {
1424   GstRTSPStreamTransport *trans;
1425
1426   GST_INFO ("%p: source %p bye timeout", stream, source);
1427
1428   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1429     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1430     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1431   }
1432 }
1433
1434 static void
1435 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1436 {
1437   GstRTSPStreamTransport *trans;
1438
1439   GST_INFO ("%p: source %p timeout", stream, source);
1440
1441   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1442     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1443     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1444   }
1445 }
1446
1447 static void
1448 clear_tr_cache (GstRTSPStreamPrivate * priv)
1449 {
1450   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1451   g_list_free (priv->tr_cache);
1452   priv->tr_cache = NULL;
1453 }
1454
1455 static GstFlowReturn
1456 handle_new_sample (GstAppSink * sink, gpointer user_data)
1457 {
1458   GstRTSPStreamPrivate *priv;
1459   GList *walk;
1460   GstSample *sample;
1461   GstBuffer *buffer;
1462   GstRTSPStream *stream;
1463   gboolean is_rtp;
1464
1465   sample = gst_app_sink_pull_sample (sink);
1466   if (!sample)
1467     return GST_FLOW_OK;
1468
1469   stream = (GstRTSPStream *) user_data;
1470   priv = stream->priv;
1471   buffer = gst_sample_get_buffer (sample);
1472
1473   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1474
1475   g_mutex_lock (&priv->lock);
1476   if (priv->tr_changed) {
1477     clear_tr_cache (priv);
1478     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1479       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1480       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1481     }
1482     priv->tr_changed = FALSE;
1483   }
1484   g_mutex_unlock (&priv->lock);
1485
1486   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1487     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1488
1489     if (is_rtp) {
1490       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1491     } else {
1492       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1493     }
1494   }
1495   gst_sample_unref (sample);
1496
1497   return GST_FLOW_OK;
1498 }
1499
1500 static GstAppSinkCallbacks sink_cb = {
1501   NULL,                         /* not interested in EOS */
1502   NULL,                         /* not interested in preroll samples */
1503   handle_new_sample,
1504 };
1505
1506 /**
1507  * gst_rtsp_stream_join_bin:
1508  * @stream: a #GstRTSPStream
1509  * @bin: a #GstBin to join
1510  * @rtpbin: a rtpbin element in @bin
1511  * @state: the target state of the new elements
1512  *
1513  * Join the #GstBin @bin that contains the element @rtpbin.
1514  *
1515  * @stream will link to @rtpbin, which must be inside @bin. The elements
1516  * added to @bin will be set to the state given in @state.
1517  *
1518  * Returns: %TRUE on success.
1519  */
1520 gboolean
1521 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1522     GstElement * rtpbin, GstState state)
1523 {
1524   GstRTSPStreamPrivate *priv;
1525   gint i;
1526   guint idx;
1527   gchar *name;
1528   GstPad *pad, *sinkpad, *selpad;
1529   GstPadLinkReturn ret;
1530
1531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1532   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1533   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1534
1535   priv = stream->priv;
1536
1537   g_mutex_lock (&priv->lock);
1538   if (priv->is_joined)
1539     goto was_joined;
1540
1541   /* create a session with the same index as the stream */
1542   idx = priv->idx;
1543
1544   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1545
1546   if (!alloc_ports (stream))
1547     goto no_ports;
1548
1549   /* update the dscp qos field in the sinks */
1550   update_dscp_qos (stream);
1551
1552   /* get a pad for sending RTP */
1553   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1554   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1555   g_free (name);
1556   /* link the RTP pad to the session manager, it should not really fail unless
1557    * this is not really an RTP pad */
1558   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1559   if (ret != GST_PAD_LINK_OK)
1560     goto link_failed;
1561
1562   /* get pads from the RTP session element for sending and receiving
1563    * RTP/RTCP*/
1564   name = g_strdup_printf ("send_rtp_src_%u", idx);
1565   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1566   g_free (name);
1567   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1568   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1569   g_free (name);
1570   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1571   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1572   g_free (name);
1573   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1574   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1575   g_free (name);
1576
1577   /* get the session */
1578   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1579
1580   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1581       stream);
1582   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1583       stream);
1584   g_signal_connect (priv->session, "on-ssrc-active",
1585       (GCallback) on_ssrc_active, stream);
1586   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1587       stream);
1588   g_signal_connect (priv->session, "on-bye-timeout",
1589       (GCallback) on_bye_timeout, stream);
1590   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1591       stream);
1592
1593   for (i = 0; i < 2; i++) {
1594     GstPad *teepad, *queuepad;
1595     /* For the sender we create this bit of pipeline for both
1596      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1597      * we need to add a queue before appsink to make the pipeline
1598      * not block. For the TCP case, we want to pump data to the
1599      * client as fast as possible anyway.
1600      *
1601      * .--------.      .-----.    .---------.
1602      * | rtpbin |      | tee |    | udpsink |
1603      * |       send->sink   src->sink       |
1604      * '--------'      |     |    '---------'
1605      *                 |     |    .---------.    .---------.
1606      *                 |     |    |  queue  |    | appsink |
1607      *                 |    src->sink      src->sink       |
1608      *                 '-----'    '---------'    '---------'
1609      *
1610      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1611      * udpsink directly to the session.
1612      */
1613     /* add udpsink */
1614     gst_bin_add (bin, priv->udpsink[i]);
1615     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1616
1617     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1618       /* make tee for RTP/RTCP */
1619       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1620       gst_bin_add (bin, priv->tee[i]);
1621
1622       /* and link to rtpbin send pad */
1623       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1624       gst_pad_link (priv->send_src[i], pad);
1625       gst_object_unref (pad);
1626
1627       /* link tee to udpsink */
1628       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1629       gst_pad_link (teepad, sinkpad);
1630       gst_object_unref (teepad);
1631
1632       /* make queue */
1633       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1634       gst_bin_add (bin, priv->appqueue[i]);
1635       /* and link to tee */
1636       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1637       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1638       gst_pad_link (teepad, pad);
1639       gst_object_unref (pad);
1640       gst_object_unref (teepad);
1641
1642       /* make appsink */
1643       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1644       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1645       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1646       gst_bin_add (bin, priv->appsink[i]);
1647       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1648           &sink_cb, stream, NULL);
1649       /* and link to queue */
1650       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1651       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1652       gst_pad_link (queuepad, pad);
1653       gst_object_unref (pad);
1654       gst_object_unref (queuepad);
1655     } else {
1656       /* else only udpsink needed, link it to the session */
1657       gst_pad_link (priv->send_src[i], sinkpad);
1658     }
1659     gst_object_unref (sinkpad);
1660
1661     /* For the receiver we create this bit of pipeline for both
1662      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1663      * and it is all funneled into the rtpbin receive pad.
1664      *
1665      * .--------.     .--------.    .--------.
1666      * | udpsrc |     | funnel |    | rtpbin |
1667      * |       src->sink      src->sink      |
1668      * '--------'     |        |    '--------'
1669      * .--------.     |        |
1670      * | appsrc |     |        |
1671      * |       src->sink       |
1672      * '--------'     '--------'
1673      */
1674     /* make funnel for the RTP/RTCP receivers */
1675     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1676     gst_bin_add (bin, priv->funnel[i]);
1677
1678     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1679     gst_pad_link (pad, priv->recv_sink[i]);
1680     gst_object_unref (pad);
1681
1682     if (priv->udpsrc_v4[i]) {
1683       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1684        * values */
1685       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1686       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1687       /* add udpsrc */
1688       gst_bin_add (bin, priv->udpsrc_v4[i]);
1689
1690       /* and link to the funnel v4 */
1691       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1692       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1693       gst_pad_link (pad, selpad);
1694       gst_object_unref (pad);
1695       gst_object_unref (selpad);
1696     }
1697
1698     if (priv->udpsrc_v6[i]) {
1699       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1700       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1701       gst_bin_add (bin, priv->udpsrc_v6[i]);
1702
1703       /* and link to the funnel v6 */
1704       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1705       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1706       gst_pad_link (pad, selpad);
1707       gst_object_unref (pad);
1708       gst_object_unref (selpad);
1709     }
1710
1711     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1712       /* make and add appsrc */
1713       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1714       gst_bin_add (bin, priv->appsrc[i]);
1715       /* and link to the funnel */
1716       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1717       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1718       gst_pad_link (pad, selpad);
1719       gst_object_unref (pad);
1720       gst_object_unref (selpad);
1721     }
1722
1723     /* check if we need to set to a special state */
1724     if (state != GST_STATE_NULL) {
1725       if (priv->udpsink[i])
1726         gst_element_set_state (priv->udpsink[i], state);
1727       if (priv->appsink[i])
1728         gst_element_set_state (priv->appsink[i], state);
1729       if (priv->appqueue[i])
1730         gst_element_set_state (priv->appqueue[i], state);
1731       if (priv->tee[i])
1732         gst_element_set_state (priv->tee[i], state);
1733       if (priv->funnel[i])
1734         gst_element_set_state (priv->funnel[i], state);
1735       if (priv->appsrc[i])
1736         gst_element_set_state (priv->appsrc[i], state);
1737     }
1738   }
1739
1740   /* be notified of caps changes */
1741   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1742       (GCallback) caps_notify, stream);
1743
1744   priv->is_joined = TRUE;
1745   g_mutex_unlock (&priv->lock);
1746
1747   return TRUE;
1748
1749   /* ERRORS */
1750 was_joined:
1751   {
1752     g_mutex_unlock (&priv->lock);
1753     return TRUE;
1754   }
1755 no_ports:
1756   {
1757     g_mutex_unlock (&priv->lock);
1758     GST_WARNING ("failed to allocate ports %u", idx);
1759     return FALSE;
1760   }
1761 link_failed:
1762   {
1763     GST_WARNING ("failed to link stream %u", idx);
1764     gst_object_unref (priv->send_rtp_sink);
1765     priv->send_rtp_sink = NULL;
1766     g_mutex_unlock (&priv->lock);
1767     return FALSE;
1768   }
1769 }
1770
1771 /**
1772  * gst_rtsp_stream_leave_bin:
1773  * @stream: a #GstRTSPStream
1774  * @bin: a #GstBin
1775  * @rtpbin: a rtpbin #GstElement
1776  *
1777  * Remove the elements of @stream from @bin.
1778  *
1779  * Return: %TRUE on success.
1780  */
1781 gboolean
1782 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1783     GstElement * rtpbin)
1784 {
1785   GstRTSPStreamPrivate *priv;
1786   gint i;
1787
1788   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1789   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1790   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1791
1792   priv = stream->priv;
1793
1794   g_mutex_lock (&priv->lock);
1795   if (!priv->is_joined)
1796     goto was_not_joined;
1797
1798   /* all transports must be removed by now */
1799   g_return_val_if_fail (priv->transports == NULL, FALSE);
1800
1801   clear_tr_cache (priv);
1802
1803   GST_INFO ("stream %p leaving bin", stream);
1804
1805   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1806   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1807   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1808   gst_object_unref (priv->send_rtp_sink);
1809   priv->send_rtp_sink = NULL;
1810
1811   for (i = 0; i < 2; i++) {
1812     if (priv->udpsink[i])
1813       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1814     if (priv->appsink[i])
1815       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1816     if (priv->appqueue[i])
1817       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1818     if (priv->tee[i])
1819       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1820     if (priv->funnel[i])
1821       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1822     if (priv->appsrc[i])
1823       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1824     if (priv->udpsrc_v4[i]) {
1825       /* and set udpsrc to NULL now before removing */
1826       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1827       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1828       /* removing them should also nicely release the request
1829        * pads when they finalize */
1830       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1831     }
1832     if (priv->udpsrc_v6[i]) {
1833       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1834       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1835       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1836     }
1837     if (priv->udpsink[i])
1838       gst_bin_remove (bin, priv->udpsink[i]);
1839     if (priv->appsrc[i])
1840       gst_bin_remove (bin, priv->appsrc[i]);
1841     if (priv->appsink[i])
1842       gst_bin_remove (bin, priv->appsink[i]);
1843     if (priv->appqueue[i])
1844       gst_bin_remove (bin, priv->appqueue[i]);
1845     if (priv->tee[i])
1846       gst_bin_remove (bin, priv->tee[i]);
1847     if (priv->funnel[i])
1848       gst_bin_remove (bin, priv->funnel[i]);
1849
1850     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1851     gst_object_unref (priv->recv_sink[i]);
1852     priv->recv_sink[i] = NULL;
1853
1854     priv->udpsrc_v4[i] = NULL;
1855     priv->udpsrc_v6[i] = NULL;
1856     priv->udpsink[i] = NULL;
1857     priv->appsrc[i] = NULL;
1858     priv->appsink[i] = NULL;
1859     priv->appqueue[i] = NULL;
1860     priv->tee[i] = NULL;
1861     priv->funnel[i] = NULL;
1862   }
1863   gst_object_unref (priv->send_src[0]);
1864   priv->send_src[0] = NULL;
1865
1866   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1867   gst_object_unref (priv->send_src[1]);
1868   priv->send_src[1] = NULL;
1869
1870   g_object_unref (priv->session);
1871   priv->session = NULL;
1872   if (priv->caps)
1873     gst_caps_unref (priv->caps);
1874   priv->caps = NULL;
1875
1876   priv->is_joined = FALSE;
1877   g_mutex_unlock (&priv->lock);
1878
1879   return TRUE;
1880
1881 was_not_joined:
1882   {
1883     g_mutex_unlock (&priv->lock);
1884     return TRUE;
1885   }
1886 }
1887
1888 /**
1889  * gst_rtsp_stream_get_rtpinfo:
1890  * @stream: a #GstRTSPStream
1891  * @rtptime: (allow-none): result RTP timestamp
1892  * @seq: (allow-none): result RTP seqnum
1893  * @clock_rate: the clock rate
1894  * @running_time: (allow-none): result running-time
1895  *
1896  * Retrieve the current rtptime, seq and running-time. This is used to
1897  * construct a RTPInfo reply header.
1898  *
1899  * Returns: %TRUE when rtptime, seq and running-time could be determined.
1900  */
1901 gboolean
1902 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1903     guint * rtptime, guint * seq, guint * clock_rate,
1904     GstClockTime * running_time)
1905 {
1906   GstRTSPStreamPrivate *priv;
1907   GstStructure *stats;
1908   GObjectClass *payobjclass;
1909
1910   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1911
1912   priv = stream->priv;
1913
1914   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1915
1916   g_mutex_lock (&priv->lock);
1917
1918   if (g_object_class_find_property (payobjclass, "stats")) {
1919     g_object_get (priv->payloader, "stats", &stats, NULL);
1920     if (stats == NULL)
1921       goto no_stats;
1922
1923     if (seq)
1924       gst_structure_get_uint (stats, "seqnum", seq);
1925
1926     if (rtptime)
1927       gst_structure_get_uint (stats, "timestamp", rtptime);
1928
1929     if (running_time)
1930       gst_structure_get_clock_time (stats, "running-time", running_time);
1931
1932     if (clock_rate) {
1933       gst_structure_get_uint (stats, "clock-rate", clock_rate);
1934       if (*clock_rate == 0 && running_time)
1935         *running_time = GST_CLOCK_TIME_NONE;
1936     }
1937     gst_structure_free (stats);
1938   } else {
1939     if (!g_object_class_find_property (payobjclass, "seqnum") ||
1940         !g_object_class_find_property (payobjclass, "timestamp"))
1941       goto no_stats;
1942
1943     if (seq)
1944       g_object_get (priv->payloader, "seqnum", seq, NULL);
1945
1946     if (rtptime)
1947       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
1948
1949     if (running_time)
1950       *running_time = GST_CLOCK_TIME_NONE;
1951   }
1952   g_mutex_unlock (&priv->lock);
1953
1954   return TRUE;
1955
1956   /* ERRORS */
1957 no_stats:
1958   {
1959     GST_WARNING ("Could not get payloader stats");
1960     g_mutex_unlock (&priv->lock);
1961     return FALSE;
1962   }
1963 }
1964
1965 /**
1966  * gst_rtsp_stream_get_caps:
1967  * @stream: a #GstRTSPStream
1968  *
1969  * Retrieve the current caps of @stream.
1970  *
1971  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1972  *    after usage.
1973  */
1974 GstCaps *
1975 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1976 {
1977   GstRTSPStreamPrivate *priv;
1978   GstCaps *result;
1979
1980   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1981
1982   priv = stream->priv;
1983
1984   g_mutex_lock (&priv->lock);
1985   if ((result = priv->caps))
1986     gst_caps_ref (result);
1987   g_mutex_unlock (&priv->lock);
1988
1989   return result;
1990 }
1991
1992 /**
1993  * gst_rtsp_stream_recv_rtp:
1994  * @stream: a #GstRTSPStream
1995  * @buffer: (transfer full): a #GstBuffer
1996  *
1997  * Handle an RTP buffer for the stream. This method is usually called when a
1998  * message has been received from a client using the TCP transport.
1999  *
2000  * This function takes ownership of @buffer.
2001  *
2002  * Returns: a GstFlowReturn.
2003  */
2004 GstFlowReturn
2005 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2006 {
2007   GstRTSPStreamPrivate *priv;
2008   GstFlowReturn ret;
2009   GstElement *element;
2010
2011   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2012   priv = stream->priv;
2013   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2014   g_return_val_if_fail (priv->is_joined, FALSE);
2015
2016   g_mutex_lock (&priv->lock);
2017   if (priv->appsrc[0])
2018     element = gst_object_ref (priv->appsrc[0]);
2019   else
2020     element = NULL;
2021   g_mutex_unlock (&priv->lock);
2022
2023   if (element) {
2024     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2025     gst_object_unref (element);
2026   } else {
2027     ret = GST_FLOW_OK;
2028   }
2029   return ret;
2030 }
2031
2032 /**
2033  * gst_rtsp_stream_recv_rtcp:
2034  * @stream: a #GstRTSPStream
2035  * @buffer: (transfer full): a #GstBuffer
2036  *
2037  * Handle an RTCP buffer for the stream. This method is usually called when a
2038  * message has been received from a client using the TCP transport.
2039  *
2040  * This function takes ownership of @buffer.
2041  *
2042  * Returns: a GstFlowReturn.
2043  */
2044 GstFlowReturn
2045 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2046 {
2047   GstRTSPStreamPrivate *priv;
2048   GstFlowReturn ret;
2049   GstElement *element;
2050
2051   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2052   priv = stream->priv;
2053   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2054   g_return_val_if_fail (priv->is_joined, FALSE);
2055
2056   g_mutex_lock (&priv->lock);
2057   if (priv->appsrc[1])
2058     element = gst_object_ref (priv->appsrc[1]);
2059   else
2060     element = NULL;
2061   g_mutex_unlock (&priv->lock);
2062
2063   if (element) {
2064     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2065     gst_object_unref (element);
2066   } else {
2067     ret = GST_FLOW_OK;
2068   }
2069   return ret;
2070 }
2071
2072 /* must be called with lock */
2073 static gboolean
2074 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2075     gboolean add)
2076 {
2077   GstRTSPStreamPrivate *priv = stream->priv;
2078   const GstRTSPTransport *tr;
2079
2080   tr = gst_rtsp_stream_transport_get_transport (trans);
2081
2082   switch (tr->lower_transport) {
2083     case GST_RTSP_LOWER_TRANS_UDP:
2084     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2085     {
2086       gchar *dest;
2087       gint min, max;
2088       guint ttl = 0;
2089
2090       dest = tr->destination;
2091       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2092         min = tr->port.min;
2093         max = tr->port.max;
2094         ttl = tr->ttl;
2095       } else {
2096         min = tr->client_port.min;
2097         max = tr->client_port.max;
2098       }
2099
2100       if (add) {
2101         if (ttl > 0) {
2102           GST_INFO ("setting ttl-mc %d", ttl);
2103           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2104           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2105         }
2106         GST_INFO ("adding %s:%d-%d", dest, min, max);
2107         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2108         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2109         priv->transports = g_list_prepend (priv->transports, trans);
2110       } else {
2111         GST_INFO ("removing %s:%d-%d", dest, min, max);
2112         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2113         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2114         priv->transports = g_list_remove (priv->transports, trans);
2115       }
2116       priv->tr_changed = TRUE;
2117       break;
2118     }
2119     case GST_RTSP_LOWER_TRANS_TCP:
2120       if (add) {
2121         GST_INFO ("adding TCP %s", tr->destination);
2122         priv->transports = g_list_prepend (priv->transports, trans);
2123       } else {
2124         GST_INFO ("removing TCP %s", tr->destination);
2125         priv->transports = g_list_remove (priv->transports, trans);
2126       }
2127       priv->tr_changed = TRUE;
2128       break;
2129     default:
2130       goto unknown_transport;
2131   }
2132   return TRUE;
2133
2134   /* ERRORS */
2135 unknown_transport:
2136   {
2137     GST_INFO ("Unknown transport %d", tr->lower_transport);
2138     return FALSE;
2139   }
2140 }
2141
2142
2143 /**
2144  * gst_rtsp_stream_add_transport:
2145  * @stream: a #GstRTSPStream
2146  * @trans: a #GstRTSPStreamTransport
2147  *
2148  * Add the transport in @trans to @stream. The media of @stream will
2149  * then also be send to the values configured in @trans.
2150  *
2151  * @stream must be joined to a bin.
2152  *
2153  * @trans must contain a valid #GstRTSPTransport.
2154  *
2155  * Returns: %TRUE if @trans was added
2156  */
2157 gboolean
2158 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2159     GstRTSPStreamTransport * trans)
2160 {
2161   GstRTSPStreamPrivate *priv;
2162   gboolean res;
2163
2164   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2165   priv = stream->priv;
2166   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2167   g_return_val_if_fail (priv->is_joined, FALSE);
2168
2169   g_mutex_lock (&priv->lock);
2170   res = update_transport (stream, trans, TRUE);
2171   g_mutex_unlock (&priv->lock);
2172
2173   return res;
2174 }
2175
2176 /**
2177  * gst_rtsp_stream_remove_transport:
2178  * @stream: a #GstRTSPStream
2179  * @trans: a #GstRTSPStreamTransport
2180  *
2181  * Remove the transport in @trans from @stream. The media of @stream will
2182  * not be sent to the values configured in @trans.
2183  *
2184  * @stream must be joined to a bin.
2185  *
2186  * @trans must contain a valid #GstRTSPTransport.
2187  *
2188  * Returns: %TRUE if @trans was removed
2189  */
2190 gboolean
2191 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2192     GstRTSPStreamTransport * trans)
2193 {
2194   GstRTSPStreamPrivate *priv;
2195   gboolean res;
2196
2197   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2198   priv = stream->priv;
2199   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2200   g_return_val_if_fail (priv->is_joined, FALSE);
2201
2202   g_mutex_lock (&priv->lock);
2203   res = update_transport (stream, trans, FALSE);
2204   g_mutex_unlock (&priv->lock);
2205
2206   return res;
2207 }
2208
2209 /**
2210  * gst_rtsp_stream_get_rtp_socket:
2211  * @stream: a #GstRTSPStream
2212  * @family: the socket family
2213  *
2214  * Get the RTP socket from @stream for a @family.
2215  *
2216  * @stream must be joined to a bin.
2217  *
2218  * Returns: (transfer full): the RTP socket or %NULL if no socket could be
2219  *     allocated for @family. Unref after usage
2220  */
2221 GSocket *
2222 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2223 {
2224   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2225   GSocket *socket;
2226   const gchar *name;
2227
2228   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2229   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2230       family == G_SOCKET_FAMILY_IPV6, NULL);
2231   g_return_val_if_fail (priv->udpsink[0], NULL);
2232
2233   if (family == G_SOCKET_FAMILY_IPV6)
2234     name = "socket-v6";
2235   else
2236     name = "socket";
2237
2238   g_object_get (priv->udpsink[0], name, &socket, NULL);
2239
2240   return socket;
2241 }
2242
2243 /**
2244  * gst_rtsp_stream_get_rtcp_socket:
2245  * @stream: a #GstRTSPStream
2246  * @family: the socket family
2247  *
2248  * Get the RTCP socket from @stream for a @family.
2249  *
2250  * @stream must be joined to a bin.
2251  *
2252  * Returns: (transfer full): the RTCP socket or %NULL if no socket could be
2253  *     allocated for @family. Unref after usage
2254  */
2255 GSocket *
2256 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2257 {
2258   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2259   GSocket *socket;
2260   const gchar *name;
2261
2262   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2263   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2264       family == G_SOCKET_FAMILY_IPV6, NULL);
2265   g_return_val_if_fail (priv->udpsink[1], NULL);
2266
2267   if (family == G_SOCKET_FAMILY_IPV6)
2268     name = "socket-v6";
2269   else
2270     name = "socket";
2271
2272   g_object_get (priv->udpsink[1], name, &socket, NULL);
2273
2274   return socket;
2275 }
2276
2277 /**
2278  * gst_rtsp_stream_transport_filter:
2279  * @stream: a #GstRTSPStream
2280  * @func: (scope call) (allow-none): a callback
2281  * @user_data: user data passed to @func
2282  *
2283  * Call @func for each transport managed by @stream. The result value of @func
2284  * determines what happens to the transport. @func will be called with @stream
2285  * locked so no further actions on @stream can be performed from @func.
2286  *
2287  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2288  * @stream.
2289  *
2290  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2291  *
2292  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2293  * will also be added with an additional ref to the result #GList of this
2294  * function..
2295  *
2296  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2297  *
2298  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2299  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2300  * element in the #GList should be unreffed before the list is freed.
2301  */
2302 GList *
2303 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2304     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2305 {
2306   GstRTSPStreamPrivate *priv;
2307   GList *result, *walk, *next;
2308
2309   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2310
2311   priv = stream->priv;
2312
2313   result = NULL;
2314
2315   g_mutex_lock (&priv->lock);
2316   for (walk = priv->transports; walk; walk = next) {
2317     GstRTSPStreamTransport *trans = walk->data;
2318     GstRTSPFilterResult res;
2319
2320     next = g_list_next (walk);
2321
2322     if (func)
2323       res = func (stream, trans, user_data);
2324     else
2325       res = GST_RTSP_FILTER_REF;
2326
2327     switch (res) {
2328       case GST_RTSP_FILTER_REMOVE:
2329         update_transport (stream, trans, FALSE);
2330         break;
2331       case GST_RTSP_FILTER_REF:
2332         result = g_list_prepend (result, g_object_ref (trans));
2333         break;
2334       case GST_RTSP_FILTER_KEEP:
2335       default:
2336         break;
2337     }
2338   }
2339   g_mutex_unlock (&priv->lock);
2340
2341   return result;
2342 }
2343
2344 static GstPadProbeReturn
2345 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2346 {
2347   GstRTSPStreamPrivate *priv;
2348   GstRTSPStream *stream;
2349
2350   stream = user_data;
2351   priv = stream->priv;
2352
2353   GST_DEBUG_OBJECT (pad, "now blocking");
2354
2355   g_mutex_lock (&priv->lock);
2356   priv->blocking = TRUE;
2357   g_mutex_unlock (&priv->lock);
2358
2359   gst_element_post_message (priv->payloader,
2360       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2361           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2362
2363   return GST_PAD_PROBE_OK;
2364 }
2365
2366 /**
2367  * gst_rtsp_stream_set_blocked:
2368  * @stream: a #GstRTSPStream
2369  * @blocked: boolean indicating we should block or unblock
2370  *
2371  * Blocks or unblocks the dataflow on @stream.
2372  *
2373  * Returns: %TRUE on success
2374  */
2375 gboolean
2376 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2377 {
2378   GstRTSPStreamPrivate *priv;
2379
2380   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2381
2382   priv = stream->priv;
2383
2384   g_mutex_lock (&priv->lock);
2385   if (blocked) {
2386     priv->blocking = FALSE;
2387     if (priv->blocked_id == 0) {
2388       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2389           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2390           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2391           g_object_ref (stream), g_object_unref);
2392     }
2393   } else {
2394     if (priv->blocked_id != 0) {
2395       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2396       priv->blocked_id = 0;
2397       priv->blocking = FALSE;
2398     }
2399   }
2400   g_mutex_unlock (&priv->lock);
2401
2402   return TRUE;
2403 }
2404
2405 /**
2406  * gst_rtsp_stream_is_blocking:
2407  * @stream: a #GstRTSPStream
2408  *
2409  * Check if @stream is blocking on a #GstBuffer.
2410  *
2411  * Returns: %TRUE if @stream is blocking
2412  */
2413 gboolean
2414 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2415 {
2416   GstRTSPStreamPrivate *priv;
2417   gboolean result;
2418
2419   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2420
2421   priv = stream->priv;
2422
2423   g_mutex_lock (&priv->lock);
2424   result = priv->blocking;
2425   g_mutex_unlock (&priv->lock);
2426
2427   return result;
2428 }