stream: fix compilation
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
83    * sockets */
84   GstElement *udpsrc_v4[2];
85
86   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
87    * sockets */
88   GstElement *udpsrc_v6[2];
89
90   GstElement *udpsink[2];
91
92   /* for TCP transport */
93   GstElement *appsrc[2];
94   GstElement *appqueue[2];
95   GstElement *appsink[2];
96
97   GstElement *tee[2];
98   GstElement *funnel[2];
99
100   /* server ports for sending/receiving over ipv4 */
101   GstRTSPRange server_port_v4;
102   GstRTSPAddress *server_addr_v4;
103   gboolean have_ipv4;
104
105   /* server ports for sending/receiving over ipv6 */
106   GstRTSPRange server_port_v6;
107   GstRTSPAddress *server_addr_v6;
108   gboolean have_ipv6;
109
110   /* multicast addresses */
111   GstRTSPAddressPool *pool;
112   GstRTSPAddress *addr_v4;
113   GstRTSPAddress *addr_v6;
114
115   /* the caps of the stream */
116   gulong caps_sig;
117   GstCaps *caps;
118
119   /* transports we stream to */
120   guint n_active;
121   GList *transports;
122
123   gint dscp_qos;
124
125   /* stream blocking */
126   gulong blocked_id;
127   gboolean blocking;
128 };
129
130 #define DEFAULT_CONTROL         NULL
131 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
132 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
133                                         GST_RTSP_LOWER_TRANS_TCP
134
135 enum
136 {
137   PROP_0,
138   PROP_CONTROL,
139   PROP_PROFILES,
140   PROP_PROTOCOLS,
141   PROP_LAST
142 };
143
144 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
145 #define GST_CAT_DEFAULT rtsp_stream_debug
146
147 static GQuark ssrc_stream_map_key;
148
149 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
150     GValue * value, GParamSpec * pspec);
151 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
152     const GValue * value, GParamSpec * pspec);
153
154 static void gst_rtsp_stream_finalize (GObject * obj);
155
156 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
157
158 static void
159 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
160 {
161   GObjectClass *gobject_class;
162
163   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
164
165   gobject_class = G_OBJECT_CLASS (klass);
166
167   gobject_class->get_property = gst_rtsp_stream_get_property;
168   gobject_class->set_property = gst_rtsp_stream_set_property;
169   gobject_class->finalize = gst_rtsp_stream_finalize;
170
171   g_object_class_install_property (gobject_class, PROP_CONTROL,
172       g_param_spec_string ("control", "Control",
173           "The control string for this stream", DEFAULT_CONTROL,
174           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
175
176   g_object_class_install_property (gobject_class, PROP_PROFILES,
177       g_param_spec_flags ("profiles", "Profiles",
178           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
179           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
180
181   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
182       g_param_spec_flags ("protocols", "Protocols",
183           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
184           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185
186   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
187
188   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
189 }
190
191 static void
192 gst_rtsp_stream_init (GstRTSPStream * stream)
193 {
194   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
195
196   GST_DEBUG ("new stream %p", stream);
197
198   stream->priv = priv;
199
200   priv->dscp_qos = -1;
201   priv->control = g_strdup (DEFAULT_CONTROL);
202   priv->profiles = DEFAULT_PROFILES;
203   priv->protocols = DEFAULT_PROTOCOLS;
204
205   g_mutex_init (&priv->lock);
206 }
207
208 static void
209 gst_rtsp_stream_finalize (GObject * obj)
210 {
211   GstRTSPStream *stream;
212   GstRTSPStreamPrivate *priv;
213
214   stream = GST_RTSP_STREAM (obj);
215   priv = stream->priv;
216
217   GST_DEBUG ("finalize stream %p", stream);
218
219   /* we really need to be unjoined now */
220   g_return_if_fail (!priv->is_joined);
221
222   if (priv->addr_v4)
223     gst_rtsp_address_free (priv->addr_v4);
224   if (priv->addr_v6)
225     gst_rtsp_address_free (priv->addr_v6);
226   if (priv->server_addr_v4)
227     gst_rtsp_address_free (priv->server_addr_v4);
228   if (priv->server_addr_v6)
229     gst_rtsp_address_free (priv->server_addr_v6);
230   if (priv->pool)
231     g_object_unref (priv->pool);
232   gst_object_unref (priv->payloader);
233   gst_object_unref (priv->srcpad);
234   g_free (priv->control);
235   g_mutex_clear (&priv->lock);
236
237   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
238 }
239
240 static void
241 gst_rtsp_stream_get_property (GObject * object, guint propid,
242     GValue * value, GParamSpec * pspec)
243 {
244   GstRTSPStream *stream = GST_RTSP_STREAM (object);
245
246   switch (propid) {
247     case PROP_CONTROL:
248       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
249       break;
250     case PROP_PROFILES:
251       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
252       break;
253     case PROP_PROTOCOLS:
254       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
255       break;
256     default:
257       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
258   }
259 }
260
261 static void
262 gst_rtsp_stream_set_property (GObject * object, guint propid,
263     const GValue * value, GParamSpec * pspec)
264 {
265   GstRTSPStream *stream = GST_RTSP_STREAM (object);
266
267   switch (propid) {
268     case PROP_CONTROL:
269       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
270       break;
271     case PROP_PROFILES:
272       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
273       break;
274     case PROP_PROTOCOLS:
275       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
276       break;
277     default:
278       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
279   }
280 }
281
282 /**
283  * gst_rtsp_stream_new:
284  * @idx: an index
285  * @srcpad: a #GstPad
286  * @payloader: a #GstElement
287  *
288  * Create a new media stream with index @idx that handles RTP data on
289  * @srcpad and has a payloader element @payloader.
290  *
291  * Returns: a new #GstRTSPStream
292  */
293 GstRTSPStream *
294 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
295 {
296   GstRTSPStreamPrivate *priv;
297   GstRTSPStream *stream;
298
299   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
300   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
301   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
302
303   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
304   priv = stream->priv;
305   priv->idx = idx;
306   priv->payloader = gst_object_ref (payloader);
307   priv->srcpad = gst_object_ref (srcpad);
308
309   return stream;
310 }
311
312 /**
313  * gst_rtsp_stream_get_index:
314  * @stream: a #GstRTSPStream
315  *
316  * Get the stream index.
317  *
318  * Return: the stream index.
319  */
320 guint
321 gst_rtsp_stream_get_index (GstRTSPStream * stream)
322 {
323   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
324
325   return stream->priv->idx;
326 }
327
328 /**
329  * gst_rtsp_stream_get_pt:
330  * @stream: a #GstRTSPStream
331  *
332  * Get the stream payload type.
333  *
334  * Return: the stream payload type.
335  */
336 guint
337 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
338 {
339   GstRTSPStreamPrivate *priv;
340   guint pt;
341
342   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
343
344   priv = stream->priv;
345
346   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
347
348   return pt;
349 }
350
351 /**
352  * gst_rtsp_stream_get_srcpad:
353  * @stream: a #GstRTSPStream
354  *
355  * Get the srcpad associated with @stream.
356  *
357  * Returns: (transfer full): the srcpad. Unref after usage.
358  */
359 GstPad *
360 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
361 {
362   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
363
364   return gst_object_ref (stream->priv->srcpad);
365 }
366
367 /**
368  * gst_rtsp_stream_get_control:
369  * @stream: a #GstRTSPStream
370  *
371  * Get the control string to identify this stream.
372  *
373  * Returns: (transfer full): the control string. free after usage.
374  */
375 gchar *
376 gst_rtsp_stream_get_control (GstRTSPStream * stream)
377 {
378   GstRTSPStreamPrivate *priv;
379   gchar *result;
380
381   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
382
383   priv = stream->priv;
384
385   g_mutex_lock (&priv->lock);
386   if ((result = g_strdup (priv->control)) == NULL)
387     result = g_strdup_printf ("stream=%u", priv->idx);
388   g_mutex_unlock (&priv->lock);
389
390   return result;
391 }
392
393 /**
394  * gst_rtsp_stream_set_control:
395  * @stream: a #GstRTSPStream
396  * @control: a control string
397  *
398  * Set the control string in @stream.
399  */
400 void
401 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
402 {
403   GstRTSPStreamPrivate *priv;
404
405   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
406
407   priv = stream->priv;
408
409   g_mutex_lock (&priv->lock);
410   g_free (priv->control);
411   priv->control = g_strdup (control);
412   g_mutex_unlock (&priv->lock);
413 }
414
415 /**
416  * gst_rtsp_stream_has_control:
417  * @stream: a #GstRTSPStream
418  * @control: a control string
419  *
420  * Check if @stream has the control string @control.
421  *
422  * Returns: %TRUE is @stream has @control as the control string
423  */
424 gboolean
425 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
426 {
427   GstRTSPStreamPrivate *priv;
428   gboolean res;
429
430   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
431
432   priv = stream->priv;
433
434   g_mutex_lock (&priv->lock);
435   if (priv->control)
436     res = (g_strcmp0 (priv->control, control) == 0);
437   else {
438     guint streamid;
439
440     if (sscanf (control, "stream=%u", &streamid) > 0)
441       res = (streamid == priv->idx);
442     else
443       res = FALSE;
444   }
445   g_mutex_unlock (&priv->lock);
446
447   return res;
448 }
449
450 /**
451  * gst_rtsp_stream_set_mtu:
452  * @stream: a #GstRTSPStream
453  * @mtu: a new MTU
454  *
455  * Configure the mtu in the payloader of @stream to @mtu.
456  */
457 void
458 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
459 {
460   GstRTSPStreamPrivate *priv;
461
462   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
463
464   priv = stream->priv;
465
466   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
467
468   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
469 }
470
471 /**
472  * gst_rtsp_stream_get_mtu:
473  * @stream: a #GstRTSPStream
474  *
475  * Get the configured MTU in the payloader of @stream.
476  *
477  * Returns: the MTU of the payloader.
478  */
479 guint
480 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
481 {
482   GstRTSPStreamPrivate *priv;
483   guint mtu;
484
485   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
486
487   priv = stream->priv;
488
489   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
490
491   return mtu;
492 }
493
494 /* Update the dscp qos property on the udp sinks */
495 static void
496 update_dscp_qos (GstRTSPStream * stream)
497 {
498   GstRTSPStreamPrivate *priv;
499
500   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
501
502   priv = stream->priv;
503
504   if (priv->udpsink[0]) {
505     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
506         NULL);
507   }
508
509   if (priv->udpsink[1]) {
510     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
511         NULL);
512   }
513 }
514
515 /**
516  * gst_rtsp_stream_set_dscp_qos:
517  * @stream: a #GstRTSPStream
518  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
519  *
520  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
521  */
522 void
523 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
524 {
525   GstRTSPStreamPrivate *priv;
526
527   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
528
529   priv = stream->priv;
530
531   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
532
533   if (dscp_qos < -1 || dscp_qos > 63) {
534     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
535     return;
536   }
537
538   priv->dscp_qos = dscp_qos;
539
540   update_dscp_qos (stream);
541 }
542
543 /**
544  * gst_rtsp_stream_get_dscp_qos:
545  * @stream: a #GstRTSPStream
546  *
547  * Get the configured DSCP QoS in of the outgoing sockets.
548  *
549  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
550  */
551 gint
552 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
553 {
554   GstRTSPStreamPrivate *priv;
555
556   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
557
558   priv = stream->priv;
559
560   return priv->dscp_qos;
561 }
562
563 /**
564  * gst_rtsp_stream_is_transport_supported:
565  * @stream: a #GstRTSPStream
566  * @transport: a #GstRTSPTransport
567  *
568  * Check if @transport can be handled by stream
569  *
570  * Returns: %TRUE if @transport can be handled by @stream.
571  */
572 gboolean
573 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
574     GstRTSPTransport * transport)
575 {
576   GstRTSPStreamPrivate *priv;
577
578   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
579
580   priv = stream->priv;
581
582   g_mutex_lock (&priv->lock);
583   if (transport->trans != GST_RTSP_TRANS_RTP)
584     goto unsupported_transmode;
585
586   if (!(transport->profile & priv->profiles))
587     goto unsupported_profile;
588
589   if (!(transport->lower_transport & priv->protocols))
590     goto unsupported_ltrans;
591
592   g_mutex_unlock (&priv->lock);
593
594   return TRUE;
595
596   /* ERRORS */
597 unsupported_transmode:
598   {
599     GST_DEBUG ("unsupported transport mode %d", transport->trans);
600     return FALSE;
601   }
602 unsupported_profile:
603   {
604     GST_DEBUG ("unsupported profile %d", transport->profile);
605     return FALSE;
606   }
607 unsupported_ltrans:
608   {
609     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
610     return FALSE;
611   }
612 }
613
614 /**
615  * gst_rtsp_stream_set_profiles:
616  * @stream: a #GstRTSPStream
617  * @profiles: the new profiles
618  *
619  * Configure the allowed profiles for @stream.
620  */
621 void
622 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
623 {
624   GstRTSPStreamPrivate *priv;
625
626   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
627
628   priv = stream->priv;
629
630   g_mutex_lock (&priv->lock);
631   priv->profiles = profiles;
632   g_mutex_unlock (&priv->lock);
633 }
634
635 /**
636  * gst_rtsp_stream_get_profiles:
637  * @stream: a #GstRTSPStream
638  *
639  * Get the allowed profiles of @stream.
640  *
641  * Returns: a #GstRTSPProfile
642  */
643 GstRTSPProfile
644 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
645 {
646   GstRTSPStreamPrivate *priv;
647   GstRTSPProfile res;
648
649   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
650
651   priv = stream->priv;
652
653   g_mutex_lock (&priv->lock);
654   res = priv->profiles;
655   g_mutex_unlock (&priv->lock);
656
657   return res;
658 }
659
660 /**
661  * gst_rtsp_stream_set_protocols:
662  * @stream: a #GstRTSPStream
663  * @protocols: the new flags
664  *
665  * Configure the allowed lower transport for @stream.
666  */
667 void
668 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
669     GstRTSPLowerTrans protocols)
670 {
671   GstRTSPStreamPrivate *priv;
672
673   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
674
675   priv = stream->priv;
676
677   g_mutex_lock (&priv->lock);
678   priv->protocols = protocols;
679   g_mutex_unlock (&priv->lock);
680 }
681
682 /**
683  * gst_rtsp_stream_get_protocols:
684  * @stream: a #GstRTSPStream
685  *
686  * Get the allowed protocols of @stream.
687  *
688  * Returns: a #GstRTSPLowerTrans
689  */
690 GstRTSPLowerTrans
691 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
692 {
693   GstRTSPStreamPrivate *priv;
694   GstRTSPLowerTrans res;
695
696   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
697       GST_RTSP_LOWER_TRANS_UNKNOWN);
698
699   priv = stream->priv;
700
701   g_mutex_lock (&priv->lock);
702   res = priv->protocols;
703   g_mutex_unlock (&priv->lock);
704
705   return res;
706 }
707
708 /**
709  * gst_rtsp_stream_set_address_pool:
710  * @stream: a #GstRTSPStream
711  * @pool: a #GstRTSPAddressPool
712  *
713  * configure @pool to be used as the address pool of @stream.
714  */
715 void
716 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
717     GstRTSPAddressPool * pool)
718 {
719   GstRTSPStreamPrivate *priv;
720   GstRTSPAddressPool *old;
721
722   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
723
724   priv = stream->priv;
725
726   GST_LOG_OBJECT (stream, "set address pool %p", pool);
727
728   g_mutex_lock (&priv->lock);
729   if ((old = priv->pool) != pool)
730     priv->pool = pool ? g_object_ref (pool) : NULL;
731   else
732     old = NULL;
733   g_mutex_unlock (&priv->lock);
734
735   if (old)
736     g_object_unref (old);
737 }
738
739 /**
740  * gst_rtsp_stream_get_address_pool:
741  * @stream: a #GstRTSPStream
742  *
743  * Get the #GstRTSPAddressPool used as the address pool of @stream.
744  *
745  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
746  * usage.
747  */
748 GstRTSPAddressPool *
749 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
750 {
751   GstRTSPStreamPrivate *priv;
752   GstRTSPAddressPool *result;
753
754   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
755
756   priv = stream->priv;
757
758   g_mutex_lock (&priv->lock);
759   if ((result = priv->pool))
760     g_object_ref (result);
761   g_mutex_unlock (&priv->lock);
762
763   return result;
764 }
765
766 /**
767  * gst_rtsp_stream_get_multicast_address:
768  * @stream: a #GstRTSPStream
769  * @family: the #GSocketFamily
770  *
771  * Get the multicast address of @stream for @family.
772  *
773  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
774  * allocated. gst_rtsp_address_free() after usage.
775  */
776 GstRTSPAddress *
777 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
778     GSocketFamily family)
779 {
780   GstRTSPStreamPrivate *priv;
781   GstRTSPAddress *result;
782   GstRTSPAddress **addrp;
783   GstRTSPAddressFlags flags;
784
785   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
786
787   priv = stream->priv;
788
789   if (family == G_SOCKET_FAMILY_IPV6) {
790     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
791     addrp = &priv->addr_v4;
792   } else {
793     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
794     addrp = &priv->addr_v6;
795   }
796
797   g_mutex_lock (&priv->lock);
798   if (*addrp == NULL) {
799     if (priv->pool == NULL)
800       goto no_pool;
801
802     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
803
804     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
805     if (*addrp == NULL)
806       goto no_address;
807   }
808   result = gst_rtsp_address_copy (*addrp);
809   g_mutex_unlock (&priv->lock);
810
811   return result;
812
813   /* ERRORS */
814 no_pool:
815   {
816     GST_ERROR_OBJECT (stream, "no address pool specified");
817     g_mutex_unlock (&priv->lock);
818     return NULL;
819   }
820 no_address:
821   {
822     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
823     g_mutex_unlock (&priv->lock);
824     return NULL;
825   }
826 }
827
828 /**
829  * gst_rtsp_stream_reserve_address:
830  * @stream: a #GstRTSPStream
831  * @address: an address
832  * @port: a port
833  * @n_ports: n_ports
834  * @ttl: a TTL
835  *
836  * Reserve @address and @port as the address and port of @stream.
837  *
838  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
839  * reserved. gst_rtsp_address_free() after usage.
840  */
841 GstRTSPAddress *
842 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
843     const gchar * address, guint port, guint n_ports, guint ttl)
844 {
845   GstRTSPStreamPrivate *priv;
846   GstRTSPAddress *result;
847   GInetAddress *addr;
848   GSocketFamily family;
849   GstRTSPAddress **addrp;
850
851   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
852   g_return_val_if_fail (address != NULL, NULL);
853   g_return_val_if_fail (port > 0, NULL);
854   g_return_val_if_fail (n_ports > 0, NULL);
855   g_return_val_if_fail (ttl > 0, NULL);
856
857   priv = stream->priv;
858
859   addr = g_inet_address_new_from_string (address);
860   if (!addr) {
861     GST_ERROR ("failed to get inet addr from %s", address);
862     family = G_SOCKET_FAMILY_IPV4;
863   } else {
864     family = g_inet_address_get_family (addr);
865     g_object_unref (addr);
866   }
867
868   if (family == G_SOCKET_FAMILY_IPV6)
869     addrp = &priv->addr_v4;
870   else
871     addrp = &priv->addr_v6;
872
873   g_mutex_lock (&priv->lock);
874   if (*addrp == NULL) {
875     GstRTSPAddressPoolResult res;
876
877     if (priv->pool == NULL)
878       goto no_pool;
879
880     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
881         port, n_ports, ttl, addrp);
882     if (res != GST_RTSP_ADDRESS_POOL_OK)
883       goto no_address;
884   } else {
885     if (strcmp ((*addrp)->address, address) ||
886         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
887         (*addrp)->ttl != ttl)
888       goto different_address;
889   }
890   result = gst_rtsp_address_copy (*addrp);
891   g_mutex_unlock (&priv->lock);
892
893   return result;
894
895   /* ERRORS */
896 no_pool:
897   {
898     GST_ERROR_OBJECT (stream, "no address pool specified");
899     g_mutex_unlock (&priv->lock);
900     return NULL;
901   }
902 no_address:
903   {
904     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
905         address);
906     g_mutex_unlock (&priv->lock);
907     return NULL;
908   }
909 different_address:
910   {
911     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
912         " reserved", address);
913     g_mutex_unlock (&priv->lock);
914     return NULL;
915   }
916 }
917
918 static gboolean
919 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
920     GSocketFamily family, GstElement * udpsrc_out[2],
921     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
922     GstRTSPAddress ** server_addr_out)
923 {
924   GstStateChangeReturn ret;
925   GstElement *udpsrc0, *udpsrc1;
926   GstElement *udpsink0, *udpsink1;
927   GSocket *rtp_socket = NULL;
928   GSocket *rtcp_socket;
929   gint tmp_rtp, tmp_rtcp;
930   guint count;
931   gint rtpport, rtcpport;
932   GList *rejected_addresses = NULL;
933   GstRTSPAddress *addr = NULL;
934   GInetAddress *inetaddr = NULL;
935   GSocketAddress *rtp_sockaddr = NULL;
936   GSocketAddress *rtcp_sockaddr = NULL;
937   const gchar *multisink_socket;
938
939   if (family == G_SOCKET_FAMILY_IPV6)
940     multisink_socket = "socket-v6";
941   else
942     multisink_socket = "socket";
943
944   udpsrc0 = NULL;
945   udpsrc1 = NULL;
946   udpsink0 = NULL;
947   udpsink1 = NULL;
948   count = 0;
949
950   /* Start with random port */
951   tmp_rtp = 0;
952
953   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
954       G_SOCKET_PROTOCOL_UDP, NULL);
955   if (!rtcp_socket)
956     goto no_udp_protocol;
957
958   if (*server_addr_out)
959     gst_rtsp_address_free (*server_addr_out);
960
961   /* try to allocate 2 UDP ports, the RTP port should be an even
962    * number and the RTCP port should be the next (uneven) port */
963 again:
964
965   if (rtp_socket == NULL) {
966     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
967         G_SOCKET_PROTOCOL_UDP, NULL);
968     if (!rtp_socket)
969       goto no_udp_protocol;
970   }
971
972   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
973     GstRTSPAddressFlags flags;
974
975     if (addr)
976       rejected_addresses = g_list_prepend (rejected_addresses, addr);
977
978     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
979     if (family == G_SOCKET_FAMILY_IPV6)
980       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
981     else
982       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
983
984     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
985
986     if (addr == NULL)
987       goto no_ports;
988
989     tmp_rtp = addr->port;
990
991     g_clear_object (&inetaddr);
992     inetaddr = g_inet_address_new_from_string (addr->address);
993   } else {
994     if (tmp_rtp != 0) {
995       tmp_rtp += 2;
996       if (++count > 20)
997         goto no_ports;
998     }
999
1000     if (inetaddr == NULL)
1001       inetaddr = g_inet_address_new_any (family);
1002   }
1003
1004   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1005   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1006     g_object_unref (rtp_sockaddr);
1007     goto again;
1008   }
1009   g_object_unref (rtp_sockaddr);
1010
1011   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1012   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1013     g_clear_object (&rtp_sockaddr);
1014     goto socket_error;
1015   }
1016
1017   tmp_rtp =
1018       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1019   g_object_unref (rtp_sockaddr);
1020
1021   /* check if port is even */
1022   if ((tmp_rtp & 1) != 0) {
1023     /* port not even, close and allocate another */
1024     tmp_rtp++;
1025     g_clear_object (&rtp_socket);
1026     goto again;
1027   }
1028
1029   /* set port */
1030   tmp_rtcp = tmp_rtp + 1;
1031
1032   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1033   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1034     g_object_unref (rtcp_sockaddr);
1035     g_clear_object (&rtp_socket);
1036     goto again;
1037   }
1038   g_object_unref (rtcp_sockaddr);
1039
1040   g_clear_object (&inetaddr);
1041
1042   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1043   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1044
1045   if (udpsrc0 == NULL || udpsrc1 == NULL)
1046     goto no_udp_protocol;
1047
1048   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1049   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1050
1051   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
1052   if (ret == GST_STATE_CHANGE_FAILURE)
1053     goto element_error;
1054   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
1055   if (ret == GST_STATE_CHANGE_FAILURE)
1056     goto element_error;
1057
1058   /* all fine, do port check */
1059   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1060   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1061
1062   /* this should not happen... */
1063   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1064     goto port_error;
1065
1066   if (udpsink_out[0])
1067     udpsink0 = udpsink_out[0];
1068   else
1069     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1070
1071   if (!udpsink0)
1072     goto no_udp_protocol;
1073
1074   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1075   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1076
1077   if (udpsink_out[1])
1078     udpsink1 = udpsink_out[1];
1079   else
1080     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1081
1082   if (!udpsink1)
1083     goto no_udp_protocol;
1084
1085   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1086   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1087   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1088
1089   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1090   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1091   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1092   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1093   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1094   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1095   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1096   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1097
1098   /* we keep these elements, we will further configure them when the
1099    * client told us to really use the UDP ports. */
1100   udpsrc_out[0] = udpsrc0;
1101   udpsrc_out[1] = udpsrc1;
1102   udpsink_out[0] = udpsink0;
1103   udpsink_out[1] = udpsink1;
1104   server_port_out->min = rtpport;
1105   server_port_out->max = rtcpport;
1106
1107   *server_addr_out = addr;
1108   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1109
1110   g_object_unref (rtp_socket);
1111   g_object_unref (rtcp_socket);
1112
1113   return TRUE;
1114
1115   /* ERRORS */
1116 no_udp_protocol:
1117   {
1118     goto cleanup;
1119   }
1120 no_ports:
1121   {
1122     goto cleanup;
1123   }
1124 port_error:
1125   {
1126     goto cleanup;
1127   }
1128 socket_error:
1129   {
1130     goto cleanup;
1131   }
1132 element_error:
1133   {
1134     goto cleanup;
1135   }
1136 cleanup:
1137   {
1138     if (udpsrc0) {
1139       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1140       gst_object_unref (udpsrc0);
1141     }
1142     if (udpsrc1) {
1143       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1144       gst_object_unref (udpsrc1);
1145     }
1146     if (udpsink0) {
1147       gst_element_set_state (udpsink0, GST_STATE_NULL);
1148       gst_object_unref (udpsink0);
1149     }
1150     if (inetaddr)
1151       g_object_unref (inetaddr);
1152     g_list_free_full (rejected_addresses,
1153         (GDestroyNotify) gst_rtsp_address_free);
1154     if (addr)
1155       gst_rtsp_address_free (addr);
1156     if (rtp_socket)
1157       g_object_unref (rtp_socket);
1158     if (rtcp_socket)
1159       g_object_unref (rtcp_socket);
1160     return FALSE;
1161   }
1162 }
1163
1164 /* must be called with lock */
1165 static gboolean
1166 alloc_ports (GstRTSPStream * stream)
1167 {
1168   GstRTSPStreamPrivate *priv = stream->priv;
1169
1170   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1171       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1172       &priv->server_port_v4, &priv->server_addr_v4);
1173
1174   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1175       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1176       &priv->server_port_v6, &priv->server_addr_v6);
1177
1178   return priv->have_ipv4 || priv->have_ipv6;
1179 }
1180
1181 /**
1182  * gst_rtsp_stream_get_server_port:
1183  * @stream: a #GstRTSPStream
1184  * @server_port: (out): result server port
1185  * @family: the port family to get
1186  *
1187  * Fill @server_port with the port pair used by the server. This function can
1188  * only be called when @stream has been joined.
1189  */
1190 void
1191 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1192     GstRTSPRange * server_port, GSocketFamily family)
1193 {
1194   GstRTSPStreamPrivate *priv;
1195
1196   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1197   priv = stream->priv;
1198   g_return_if_fail (priv->is_joined);
1199
1200   g_mutex_lock (&priv->lock);
1201   if (family == G_SOCKET_FAMILY_IPV4) {
1202     if (server_port)
1203       *server_port = priv->server_port_v4;
1204   } else {
1205     if (server_port)
1206       *server_port = priv->server_port_v6;
1207   }
1208   g_mutex_unlock (&priv->lock);
1209 }
1210
1211 /**
1212  * gst_rtsp_stream_get_rtpsession:
1213  * @stream: a #GstRTSPStream
1214  *
1215  * Get the RTP session of this stream.
1216  *
1217  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1218  */
1219 GObject *
1220 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1221 {
1222   GstRTSPStreamPrivate *priv;
1223   GObject *session;
1224
1225   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1226
1227   priv = stream->priv;
1228
1229   g_mutex_lock (&priv->lock);
1230   if ((session = priv->session))
1231     g_object_ref (session);
1232   g_mutex_unlock (&priv->lock);
1233
1234   return session;
1235 }
1236
1237 /**
1238  * gst_rtsp_stream_get_ssrc:
1239  * @stream: a #GstRTSPStream
1240  * @ssrc: (out): result ssrc
1241  *
1242  * Get the SSRC used by the RTP session of this stream. This function can only
1243  * be called when @stream has been joined.
1244  */
1245 void
1246 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1247 {
1248   GstRTSPStreamPrivate *priv;
1249
1250   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1251   priv = stream->priv;
1252   g_return_if_fail (priv->is_joined);
1253
1254   g_mutex_lock (&priv->lock);
1255   if (ssrc && priv->session)
1256     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1257   g_mutex_unlock (&priv->lock);
1258 }
1259
1260 /* executed from streaming thread */
1261 static void
1262 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1263 {
1264   GstRTSPStreamPrivate *priv = stream->priv;
1265   GstCaps *newcaps, *oldcaps;
1266
1267   newcaps = gst_pad_get_current_caps (pad);
1268
1269   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1270       newcaps);
1271
1272   g_mutex_lock (&priv->lock);
1273   oldcaps = priv->caps;
1274   priv->caps = newcaps;
1275   g_mutex_unlock (&priv->lock);
1276
1277   if (oldcaps)
1278     gst_caps_unref (oldcaps);
1279 }
1280
1281 static void
1282 dump_structure (const GstStructure * s)
1283 {
1284   gchar *sstr;
1285
1286   sstr = gst_structure_to_string (s);
1287   GST_INFO ("structure: %s", sstr);
1288   g_free (sstr);
1289 }
1290
1291 static GstRTSPStreamTransport *
1292 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1293 {
1294   GstRTSPStreamPrivate *priv = stream->priv;
1295   GList *walk;
1296   GstRTSPStreamTransport *result = NULL;
1297   const gchar *tmp;
1298   gchar *dest;
1299   guint port;
1300
1301   if (rtcp_from == NULL)
1302     return NULL;
1303
1304   tmp = g_strrstr (rtcp_from, ":");
1305   if (tmp == NULL)
1306     return NULL;
1307
1308   port = atoi (tmp + 1);
1309   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1310
1311   g_mutex_lock (&priv->lock);
1312   GST_INFO ("finding %s:%d in %d transports", dest, port,
1313       g_list_length (priv->transports));
1314
1315   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1316     GstRTSPStreamTransport *trans = walk->data;
1317     const GstRTSPTransport *tr;
1318     gint min, max;
1319
1320     tr = gst_rtsp_stream_transport_get_transport (trans);
1321
1322     min = tr->client_port.min;
1323     max = tr->client_port.max;
1324
1325     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1326       result = trans;
1327       break;
1328     }
1329   }
1330   if (result)
1331     g_object_ref (result);
1332   g_mutex_unlock (&priv->lock);
1333
1334   g_free (dest);
1335
1336   return result;
1337 }
1338
1339 static GstRTSPStreamTransport *
1340 check_transport (GObject * source, GstRTSPStream * stream)
1341 {
1342   GstStructure *stats;
1343   GstRTSPStreamTransport *trans;
1344
1345   /* see if we have a stream to match with the origin of the RTCP packet */
1346   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1347   if (trans == NULL) {
1348     g_object_get (source, "stats", &stats, NULL);
1349     if (stats) {
1350       const gchar *rtcp_from;
1351
1352       dump_structure (stats);
1353
1354       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1355       if ((trans = find_transport (stream, rtcp_from))) {
1356         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1357             source);
1358         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1359             g_object_unref);
1360       }
1361       gst_structure_free (stats);
1362     }
1363   }
1364   return trans;
1365 }
1366
1367
1368 static void
1369 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1370 {
1371   GstRTSPStreamTransport *trans;
1372
1373   GST_INFO ("%p: new source %p", stream, source);
1374
1375   trans = check_transport (source, stream);
1376
1377   if (trans)
1378     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1379 }
1380
1381 static void
1382 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1383 {
1384   GST_INFO ("%p: new SDES %p", stream, source);
1385 }
1386
1387 static void
1388 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1389 {
1390   GstRTSPStreamTransport *trans;
1391
1392   trans = check_transport (source, stream);
1393
1394   if (trans) {
1395     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1396     gst_rtsp_stream_transport_keep_alive (trans);
1397   }
1398 #ifdef DUMP_STATS
1399   {
1400     GstStructure *stats;
1401     g_object_get (source, "stats", &stats, NULL);
1402     if (stats) {
1403       dump_structure (stats);
1404       gst_structure_free (stats);
1405     }
1406   }
1407 #endif
1408 }
1409
1410 static void
1411 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1412 {
1413   GST_INFO ("%p: source %p bye", stream, source);
1414 }
1415
1416 static void
1417 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1418 {
1419   GstRTSPStreamTransport *trans;
1420
1421   GST_INFO ("%p: source %p bye timeout", stream, source);
1422
1423   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1424     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1425     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1426   }
1427 }
1428
1429 static void
1430 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1431 {
1432   GstRTSPStreamTransport *trans;
1433
1434   GST_INFO ("%p: source %p timeout", stream, source);
1435
1436   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1437     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1438     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1439   }
1440 }
1441
1442 static GstFlowReturn
1443 handle_new_sample (GstAppSink * sink, gpointer user_data)
1444 {
1445   GstRTSPStreamPrivate *priv;
1446   GList *walk;
1447   GstSample *sample;
1448   GstBuffer *buffer;
1449   GstRTSPStream *stream;
1450
1451   sample = gst_app_sink_pull_sample (sink);
1452   if (!sample)
1453     return GST_FLOW_OK;
1454
1455   stream = (GstRTSPStream *) user_data;
1456   priv = stream->priv;
1457   buffer = gst_sample_get_buffer (sample);
1458
1459   g_mutex_lock (&priv->lock);
1460   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1461     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1462
1463     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1464       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1465     } else {
1466       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1467     }
1468   }
1469   g_mutex_unlock (&priv->lock);
1470
1471   gst_sample_unref (sample);
1472
1473   return GST_FLOW_OK;
1474 }
1475
1476 static GstAppSinkCallbacks sink_cb = {
1477   NULL,                         /* not interested in EOS */
1478   NULL,                         /* not interested in preroll samples */
1479   handle_new_sample,
1480 };
1481
1482 /**
1483  * gst_rtsp_stream_join_bin:
1484  * @stream: a #GstRTSPStream
1485  * @bin: a #GstBin to join
1486  * @rtpbin: a rtpbin element in @bin
1487  * @state: the target state of the new elements
1488  *
1489  * Join the #GstBin @bin that contains the element @rtpbin.
1490  *
1491  * @stream will link to @rtpbin, which must be inside @bin. The elements
1492  * added to @bin will be set to the state given in @state.
1493  *
1494  * Returns: %TRUE on success.
1495  */
1496 gboolean
1497 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1498     GstElement * rtpbin, GstState state)
1499 {
1500   GstRTSPStreamPrivate *priv;
1501   gint i;
1502   guint idx;
1503   gchar *name;
1504   GstPad *pad, *sinkpad, *selpad;
1505   GstPadLinkReturn ret;
1506
1507   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1508   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1509   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1510
1511   priv = stream->priv;
1512
1513   g_mutex_lock (&priv->lock);
1514   if (priv->is_joined)
1515     goto was_joined;
1516
1517   /* create a session with the same index as the stream */
1518   idx = priv->idx;
1519
1520   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1521
1522   if (!alloc_ports (stream))
1523     goto no_ports;
1524
1525   /* update the dscp qos field in the sinks */
1526   update_dscp_qos (stream);
1527
1528   /* get a pad for sending RTP */
1529   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1530   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1531   g_free (name);
1532   /* link the RTP pad to the session manager, it should not really fail unless
1533    * this is not really an RTP pad */
1534   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1535   if (ret != GST_PAD_LINK_OK)
1536     goto link_failed;
1537
1538   /* get pads from the RTP session element for sending and receiving
1539    * RTP/RTCP*/
1540   name = g_strdup_printf ("send_rtp_src_%u", idx);
1541   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1542   g_free (name);
1543   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1544   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1545   g_free (name);
1546   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1547   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1548   g_free (name);
1549   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1550   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1551   g_free (name);
1552
1553   /* get the session */
1554   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1555
1556   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1557       stream);
1558   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1559       stream);
1560   g_signal_connect (priv->session, "on-ssrc-active",
1561       (GCallback) on_ssrc_active, stream);
1562   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1563       stream);
1564   g_signal_connect (priv->session, "on-bye-timeout",
1565       (GCallback) on_bye_timeout, stream);
1566   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1567       stream);
1568
1569   for (i = 0; i < 2; i++) {
1570     GstPad *teepad, *queuepad;
1571     /* For the sender we create this bit of pipeline for both
1572      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1573      * we need to add a queue before appsink to make the pipeline
1574      * not block. For the TCP case, we want to pump data to the
1575      * client as fast as possible anyway.
1576      *
1577      * .--------.      .-----.    .---------.
1578      * | rtpbin |      | tee |    | udpsink |
1579      * |       send->sink   src->sink       |
1580      * '--------'      |     |    '---------'
1581      *                 |     |    .---------.    .---------.
1582      *                 |     |    |  queue  |    | appsink |
1583      *                 |    src->sink      src->sink       |
1584      *                 '-----'    '---------'    '---------'
1585      *
1586      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1587      * udpsink directly to the session.
1588      */
1589     /* add udpsink */
1590     gst_bin_add (bin, priv->udpsink[i]);
1591     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1592
1593     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1594       /* make tee for RTP/RTCP */
1595       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1596       gst_bin_add (bin, priv->tee[i]);
1597
1598       /* and link to rtpbin send pad */
1599       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1600       gst_pad_link (priv->send_src[i], pad);
1601       gst_object_unref (pad);
1602
1603       /* link tee to udpsink */
1604       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1605       gst_pad_link (teepad, sinkpad);
1606       gst_object_unref (teepad);
1607
1608       /* make queue */
1609       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1610       gst_bin_add (bin, priv->appqueue[i]);
1611       /* and link to tee */
1612       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1613       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1614       gst_pad_link (teepad, pad);
1615       gst_object_unref (pad);
1616       gst_object_unref (teepad);
1617
1618       /* make appsink */
1619       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1620       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1621       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1622       gst_bin_add (bin, priv->appsink[i]);
1623       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1624           &sink_cb, stream, NULL);
1625       /* and link to queue */
1626       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1627       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1628       gst_pad_link (queuepad, pad);
1629       gst_object_unref (pad);
1630       gst_object_unref (queuepad);
1631     } else {
1632       /* else only udpsink needed, link it to the session */
1633       gst_pad_link (priv->send_src[i], sinkpad);
1634     }
1635     gst_object_unref (sinkpad);
1636
1637     /* For the receiver we create this bit of pipeline for both
1638      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1639      * and it is all funneled into the rtpbin receive pad.
1640      *
1641      * .--------.     .--------.    .--------.
1642      * | udpsrc |     | funnel |    | rtpbin |
1643      * |       src->sink      src->sink      |
1644      * '--------'     |        |    '--------'
1645      * .--------.     |        |
1646      * | appsrc |     |        |
1647      * |       src->sink       |
1648      * '--------'     '--------'
1649      */
1650     /* make funnel for the RTP/RTCP receivers */
1651     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1652     gst_bin_add (bin, priv->funnel[i]);
1653
1654     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1655     gst_pad_link (pad, priv->recv_sink[i]);
1656     gst_object_unref (pad);
1657
1658     if (priv->udpsrc_v4[i]) {
1659       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1660        * values */
1661       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1662       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1663       /* add udpsrc */
1664       gst_bin_add (bin, priv->udpsrc_v4[i]);
1665
1666       /* and link to the funnel v4 */
1667       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1668       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1669       gst_pad_link (pad, selpad);
1670       gst_object_unref (pad);
1671       gst_object_unref (selpad);
1672     }
1673
1674     if (priv->udpsrc_v6[i]) {
1675       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1676       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1677       gst_bin_add (bin, priv->udpsrc_v6[i]);
1678
1679       /* and link to the funnel v6 */
1680       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1681       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1682       gst_pad_link (pad, selpad);
1683       gst_object_unref (pad);
1684       gst_object_unref (selpad);
1685     }
1686
1687     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1688       /* make and add appsrc */
1689       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1690       gst_bin_add (bin, priv->appsrc[i]);
1691       /* and link to the funnel */
1692       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1693       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1694       gst_pad_link (pad, selpad);
1695       gst_object_unref (pad);
1696       gst_object_unref (selpad);
1697     }
1698
1699     /* check if we need to set to a special state */
1700     if (state != GST_STATE_NULL) {
1701       if (priv->udpsink[i])
1702         gst_element_set_state (priv->udpsink[i], state);
1703       if (priv->appsink[i])
1704         gst_element_set_state (priv->appsink[i], state);
1705       if (priv->appqueue[i])
1706         gst_element_set_state (priv->appqueue[i], state);
1707       if (priv->tee[i])
1708         gst_element_set_state (priv->tee[i], state);
1709       if (priv->funnel[i])
1710         gst_element_set_state (priv->funnel[i], state);
1711       if (priv->appsrc[i])
1712         gst_element_set_state (priv->appsrc[i], state);
1713     }
1714   }
1715
1716   /* be notified of caps changes */
1717   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1718       (GCallback) caps_notify, stream);
1719
1720   priv->is_joined = TRUE;
1721   g_mutex_unlock (&priv->lock);
1722
1723   return TRUE;
1724
1725   /* ERRORS */
1726 was_joined:
1727   {
1728     g_mutex_unlock (&priv->lock);
1729     return TRUE;
1730   }
1731 no_ports:
1732   {
1733     g_mutex_unlock (&priv->lock);
1734     GST_WARNING ("failed to allocate ports %u", idx);
1735     return FALSE;
1736   }
1737 link_failed:
1738   {
1739     GST_WARNING ("failed to link stream %u", idx);
1740     gst_object_unref (priv->send_rtp_sink);
1741     priv->send_rtp_sink = NULL;
1742     g_mutex_unlock (&priv->lock);
1743     return FALSE;
1744   }
1745 }
1746
1747 /**
1748  * gst_rtsp_stream_leave_bin:
1749  * @stream: a #GstRTSPStream
1750  * @bin: a #GstBin
1751  * @rtpbin: a rtpbin #GstElement
1752  *
1753  * Remove the elements of @stream from @bin.
1754  *
1755  * Return: %TRUE on success.
1756  */
1757 gboolean
1758 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1759     GstElement * rtpbin)
1760 {
1761   GstRTSPStreamPrivate *priv;
1762   gint i;
1763
1764   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1765   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1766   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1767
1768   priv = stream->priv;
1769
1770   g_mutex_lock (&priv->lock);
1771   if (!priv->is_joined)
1772     goto was_not_joined;
1773
1774   /* all transports must be removed by now */
1775   g_return_val_if_fail (priv->transports == NULL, FALSE);
1776
1777   GST_INFO ("stream %p leaving bin", stream);
1778
1779   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1780   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1781   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1782   gst_object_unref (priv->send_rtp_sink);
1783   priv->send_rtp_sink = NULL;
1784
1785   for (i = 0; i < 2; i++) {
1786     if (priv->udpsink[i])
1787       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1788     if (priv->appsink[i])
1789       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1790     if (priv->appqueue[i])
1791       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1792     if (priv->tee[i])
1793       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1794     if (priv->funnel[i])
1795       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1796     if (priv->appsrc[i])
1797       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1798     if (priv->udpsrc_v4[i]) {
1799       /* and set udpsrc to NULL now before removing */
1800       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1801       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1802       /* removing them should also nicely release the request
1803        * pads when they finalize */
1804       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1805     }
1806     if (priv->udpsrc_v6[i]) {
1807       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1808       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1809       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1810     }
1811     if (priv->udpsink[i])
1812       gst_bin_remove (bin, priv->udpsink[i]);
1813     if (priv->appsrc[i])
1814       gst_bin_remove (bin, priv->appsrc[i]);
1815     if (priv->appsink[i])
1816       gst_bin_remove (bin, priv->appsink[i]);
1817     if (priv->appqueue[i])
1818       gst_bin_remove (bin, priv->appqueue[i]);
1819     if (priv->tee[i])
1820       gst_bin_remove (bin, priv->tee[i]);
1821     if (priv->funnel[i])
1822       gst_bin_remove (bin, priv->funnel[i]);
1823
1824     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1825     gst_object_unref (priv->recv_sink[i]);
1826     priv->recv_sink[i] = NULL;
1827
1828     priv->udpsrc_v4[i] = NULL;
1829     priv->udpsrc_v6[i] = NULL;
1830     priv->udpsink[i] = NULL;
1831     priv->appsrc[i] = NULL;
1832     priv->appsink[i] = NULL;
1833     priv->appqueue[i] = NULL;
1834     priv->tee[i] = NULL;
1835     priv->funnel[i] = NULL;
1836   }
1837   gst_object_unref (priv->send_src[0]);
1838   priv->send_src[0] = NULL;
1839
1840   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1841   gst_object_unref (priv->send_src[1]);
1842   priv->send_src[1] = NULL;
1843
1844   g_object_unref (priv->session);
1845   priv->session = NULL;
1846   if (priv->caps)
1847     gst_caps_unref (priv->caps);
1848   priv->caps = NULL;
1849
1850   priv->is_joined = FALSE;
1851   g_mutex_unlock (&priv->lock);
1852
1853   return TRUE;
1854
1855 was_not_joined:
1856   {
1857     return TRUE;
1858   }
1859 }
1860
1861 /**
1862  * gst_rtsp_stream_get_rtpinfo:
1863  * @stream: a #GstRTSPStream
1864  * @rtptime: (allow-none): result RTP timestamp
1865  * @seq: (allow-none): result RTP seqnum
1866  * @clock_rate: the clock rate
1867  * @running_time: (allow-none): result running-time
1868  *
1869  * Retrieve the current rtptime, seq and running-time. This is used to
1870  * construct a RTPInfo reply header.
1871  *
1872  * Returns: %TRUE when rtptime, seq and running-time could be determined.
1873  */
1874 gboolean
1875 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1876     guint * rtptime, guint * seq, guint * clock_rate,
1877     GstClockTime * running_time)
1878 {
1879   GstRTSPStreamPrivate *priv;
1880   GObjectClass *payobjclass;
1881
1882   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1883
1884   priv = stream->priv;
1885
1886   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1887
1888   g_mutex_lock (&priv->lock);
1889   if (seq && g_object_class_find_property (payobjclass, "seqnum"))
1890     g_object_get (priv->payloader, "seqnum", seq, NULL);
1891
1892   if (rtptime && g_object_class_find_property (payobjclass, "timestamp"))
1893     g_object_get (priv->payloader, "timestamp", rtptime, NULL);
1894
1895   if (running_time
1896       && g_object_class_find_property (payobjclass, "running-time"))
1897     g_object_get (priv->payloader, "running-time", running_time, NULL);
1898
1899   if (clock_rate && priv->caps) {
1900     GstStructure *s;
1901
1902     s = gst_caps_get_structure (priv->caps, 0);
1903     if (!gst_structure_get_int (s, "clock-rate", (gint *) clock_rate))
1904       if (running_time)
1905         *running_time = GST_CLOCK_TIME_NONE;
1906   }
1907   g_mutex_unlock (&priv->lock);
1908
1909   return TRUE;
1910 }
1911
1912 /**
1913  * gst_rtsp_stream_get_caps:
1914  * @stream: a #GstRTSPStream
1915  *
1916  * Retrieve the current caps of @stream.
1917  *
1918  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1919  *    after usage.
1920  */
1921 GstCaps *
1922 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1923 {
1924   GstRTSPStreamPrivate *priv;
1925   GstCaps *result;
1926
1927   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1928
1929   priv = stream->priv;
1930
1931   g_mutex_lock (&priv->lock);
1932   if ((result = priv->caps))
1933     gst_caps_ref (result);
1934   g_mutex_unlock (&priv->lock);
1935
1936   return result;
1937 }
1938
1939 /**
1940  * gst_rtsp_stream_recv_rtp:
1941  * @stream: a #GstRTSPStream
1942  * @buffer: (transfer full): a #GstBuffer
1943  *
1944  * Handle an RTP buffer for the stream. This method is usually called when a
1945  * message has been received from a client using the TCP transport.
1946  *
1947  * This function takes ownership of @buffer.
1948  *
1949  * Returns: a GstFlowReturn.
1950  */
1951 GstFlowReturn
1952 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1953 {
1954   GstRTSPStreamPrivate *priv;
1955   GstFlowReturn ret;
1956   GstElement *element;
1957
1958   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1959   priv = stream->priv;
1960   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1961   g_return_val_if_fail (priv->is_joined, FALSE);
1962
1963   g_mutex_lock (&priv->lock);
1964   if (priv->appsrc[0])
1965     element = gst_object_ref (priv->appsrc[0]);
1966   else
1967     element = NULL;
1968   g_mutex_unlock (&priv->lock);
1969
1970   if (element) {
1971     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1972     gst_object_unref (element);
1973   } else {
1974     ret = GST_FLOW_OK;
1975   }
1976   return ret;
1977 }
1978
1979 /**
1980  * gst_rtsp_stream_recv_rtcp:
1981  * @stream: a #GstRTSPStream
1982  * @buffer: (transfer full): a #GstBuffer
1983  *
1984  * Handle an RTCP buffer for the stream. This method is usually called when a
1985  * message has been received from a client using the TCP transport.
1986  *
1987  * This function takes ownership of @buffer.
1988  *
1989  * Returns: a GstFlowReturn.
1990  */
1991 GstFlowReturn
1992 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1993 {
1994   GstRTSPStreamPrivate *priv;
1995   GstFlowReturn ret;
1996   GstElement *element;
1997
1998   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1999   priv = stream->priv;
2000   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2001   g_return_val_if_fail (priv->is_joined, FALSE);
2002
2003   g_mutex_lock (&priv->lock);
2004   if (priv->appsrc[1])
2005     element = gst_object_ref (priv->appsrc[1]);
2006   else
2007     element = NULL;
2008   g_mutex_unlock (&priv->lock);
2009
2010   if (element) {
2011     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2012     gst_object_unref (element);
2013   } else {
2014     ret = GST_FLOW_OK;
2015   }
2016   return ret;
2017 }
2018
2019 /* must be called with lock */
2020 static gboolean
2021 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2022     gboolean add)
2023 {
2024   GstRTSPStreamPrivate *priv = stream->priv;
2025   const GstRTSPTransport *tr;
2026
2027   tr = gst_rtsp_stream_transport_get_transport (trans);
2028
2029   switch (tr->lower_transport) {
2030     case GST_RTSP_LOWER_TRANS_UDP:
2031     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2032     {
2033       gchar *dest;
2034       gint min, max;
2035       guint ttl = 0;
2036
2037       dest = tr->destination;
2038       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2039         min = tr->port.min;
2040         max = tr->port.max;
2041         ttl = tr->ttl;
2042       } else {
2043         min = tr->client_port.min;
2044         max = tr->client_port.max;
2045       }
2046
2047       if (add) {
2048         GST_INFO ("adding %s:%d-%d", dest, min, max);
2049         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2050         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2051         if (ttl > 0) {
2052           GST_INFO ("setting ttl-mc %d", ttl);
2053           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2054           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2055         }
2056         priv->transports = g_list_prepend (priv->transports, trans);
2057       } else {
2058         GST_INFO ("removing %s:%d-%d", dest, min, max);
2059         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2060         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2061         priv->transports = g_list_remove (priv->transports, trans);
2062       }
2063       break;
2064     }
2065     case GST_RTSP_LOWER_TRANS_TCP:
2066       if (add) {
2067         GST_INFO ("adding TCP %s", tr->destination);
2068         priv->transports = g_list_prepend (priv->transports, trans);
2069       } else {
2070         GST_INFO ("removing TCP %s", tr->destination);
2071         priv->transports = g_list_remove (priv->transports, trans);
2072       }
2073       break;
2074     default:
2075       goto unknown_transport;
2076   }
2077   return TRUE;
2078
2079   /* ERRORS */
2080 unknown_transport:
2081   {
2082     GST_INFO ("Unknown transport %d", tr->lower_transport);
2083     return FALSE;
2084   }
2085 }
2086
2087
2088 /**
2089  * gst_rtsp_stream_add_transport:
2090  * @stream: a #GstRTSPStream
2091  * @trans: a #GstRTSPStreamTransport
2092  *
2093  * Add the transport in @trans to @stream. The media of @stream will
2094  * then also be send to the values configured in @trans.
2095  *
2096  * @stream must be joined to a bin.
2097  *
2098  * @trans must contain a valid #GstRTSPTransport.
2099  *
2100  * Returns: %TRUE if @trans was added
2101  */
2102 gboolean
2103 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2104     GstRTSPStreamTransport * trans)
2105 {
2106   GstRTSPStreamPrivate *priv;
2107   gboolean res;
2108
2109   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2110   priv = stream->priv;
2111   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2112   g_return_val_if_fail (priv->is_joined, FALSE);
2113
2114   g_mutex_lock (&priv->lock);
2115   res = update_transport (stream, trans, TRUE);
2116   g_mutex_unlock (&priv->lock);
2117
2118   return res;
2119 }
2120
2121 /**
2122  * gst_rtsp_stream_remove_transport:
2123  * @stream: a #GstRTSPStream
2124  * @trans: a #GstRTSPStreamTransport
2125  *
2126  * Remove the transport in @trans from @stream. The media of @stream will
2127  * not be sent to the values configured in @trans.
2128  *
2129  * @stream must be joined to a bin.
2130  *
2131  * @trans must contain a valid #GstRTSPTransport.
2132  *
2133  * Returns: %TRUE if @trans was removed
2134  */
2135 gboolean
2136 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2137     GstRTSPStreamTransport * trans)
2138 {
2139   GstRTSPStreamPrivate *priv;
2140   gboolean res;
2141
2142   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2143   priv = stream->priv;
2144   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2145   g_return_val_if_fail (priv->is_joined, FALSE);
2146
2147   g_mutex_lock (&priv->lock);
2148   res = update_transport (stream, trans, FALSE);
2149   g_mutex_unlock (&priv->lock);
2150
2151   return res;
2152 }
2153
2154 /**
2155  * gst_rtsp_stream_get_rtp_socket:
2156  * @stream: a #GstRTSPStream
2157  * @family: the socket family
2158  *
2159  * Get the RTP socket from @stream for a @family.
2160  *
2161  * @stream must be joined to a bin.
2162  *
2163  * Returns: the RTP socket or %NULL if no socket could be allocated for @family.
2164  *     Unref after usage
2165  */
2166 GSocket *
2167 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2168 {
2169   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2170   GSocket *socket;
2171   gchar *name;
2172
2173   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2174   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2175       family == G_SOCKET_FAMILY_IPV6, NULL);
2176   g_return_val_if_fail (priv->udpsink[0], NULL);
2177
2178   if (family == G_SOCKET_FAMILY_IPV6)
2179     name = "socket-v6";
2180   else
2181     name = "socket";
2182
2183   g_object_get (priv->udpsink[0], name, &socket, NULL);
2184
2185   return socket;
2186 }
2187
2188 /**
2189  * gst_rtsp_stream_get_rtcp_socket:
2190  * @stream: a #GstRTSPStream
2191  * @family: the socket family
2192  *
2193  * Get the RTCP socket from @stream for a @family.
2194  *
2195  * @stream must be joined to a bin.
2196  *
2197  * Returns: the RTCP socket or %NULL if no socket could be allocated for
2198  *     @family. Unref after usage
2199  */
2200 GSocket *
2201 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2202 {
2203   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2204   GSocket *socket;
2205   gchar *name;
2206
2207   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2208   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2209       family == G_SOCKET_FAMILY_IPV6, NULL);
2210   g_return_val_if_fail (priv->udpsink[1], NULL);
2211
2212   if (family == G_SOCKET_FAMILY_IPV6)
2213     name = "socket-v6";
2214   else
2215     name = "socket";
2216
2217   g_object_get (priv->udpsink[1], name, &socket, NULL);
2218
2219   return socket;
2220 }
2221
2222 /**
2223  * gst_rtsp_stream_transport_filter:
2224  * @stream: a #GstRTSPStream
2225  * @func: (scope call) (allow-none): a callback
2226  * @user_data: user data passed to @func
2227  *
2228  * Call @func for each transport managed by @stream. The result value of @func
2229  * determines what happens to the transport. @func will be called with @stream
2230  * locked so no further actions on @stream can be performed from @func.
2231  *
2232  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2233  * @stream.
2234  *
2235  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2236  *
2237  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2238  * will also be added with an additional ref to the result #GList of this
2239  * function..
2240  *
2241  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2242  *
2243  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2244  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2245  * element in the #GList should be unreffed before the list is freed.
2246  */
2247 GList *
2248 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2249     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2250 {
2251   GstRTSPStreamPrivate *priv;
2252   GList *result, *walk, *next;
2253
2254   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2255
2256   priv = stream->priv;
2257
2258   result = NULL;
2259
2260   g_mutex_lock (&priv->lock);
2261   for (walk = priv->transports; walk; walk = next) {
2262     GstRTSPStreamTransport *trans = walk->data;
2263     GstRTSPFilterResult res;
2264
2265     next = g_list_next (walk);
2266
2267     if (func)
2268       res = func (stream, trans, user_data);
2269     else
2270       res = GST_RTSP_FILTER_REF;
2271
2272     switch (res) {
2273       case GST_RTSP_FILTER_REMOVE:
2274         update_transport (stream, trans, FALSE);
2275         break;
2276       case GST_RTSP_FILTER_REF:
2277         result = g_list_prepend (result, g_object_ref (trans));
2278         break;
2279       case GST_RTSP_FILTER_KEEP:
2280       default:
2281         break;
2282     }
2283   }
2284   g_mutex_unlock (&priv->lock);
2285
2286   return result;
2287 }
2288
2289 static GstPadProbeReturn
2290 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2291 {
2292   GstRTSPStreamPrivate *priv;
2293   GstRTSPStream *stream;
2294
2295   stream = user_data;
2296   priv = stream->priv;
2297
2298   GST_DEBUG_OBJECT (pad, "now blocking");
2299
2300   g_mutex_lock (&priv->lock);
2301   priv->blocking = TRUE;
2302   g_mutex_unlock (&priv->lock);
2303
2304   gst_element_post_message (priv->payloader,
2305       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2306           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2307
2308   return GST_PAD_PROBE_OK;
2309 }
2310
2311 /**
2312  * gst_rtsp_stream_set_blocked:
2313  * @stream: a #GstRTSPStream
2314  * @blocked: boolean indicating we should block or unblock
2315  *
2316  * Blocks or unblocks the dataflow on @stream.
2317  *
2318  * Returns: %TRUE on success
2319  */
2320 gboolean
2321 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2322 {
2323   GstRTSPStreamPrivate *priv;
2324
2325   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2326
2327   priv = stream->priv;
2328
2329   g_mutex_lock (&priv->lock);
2330   if (blocked) {
2331     priv->blocking = FALSE;
2332     if (priv->blocked_id == 0) {
2333       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2334           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2335           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2336           g_object_ref (stream), g_object_unref);
2337     }
2338   } else {
2339     if (priv->blocked_id != 0) {
2340       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2341       priv->blocked_id = 0;
2342       priv->blocking = FALSE;
2343     }
2344   }
2345   g_mutex_unlock (&priv->lock);
2346
2347   return TRUE;
2348 }
2349
2350 /**
2351  * gst_rtsp_stream_is_blocking:
2352  * @stream: a #GstRTSPStream
2353  *
2354  * Check if @stream is blocking on a #GstBuffer.
2355  *
2356  * Returns: %TRUE if @stream is blocking
2357  */
2358 gboolean
2359 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2360 {
2361   GstRTSPStreamPrivate *priv;
2362   gboolean result;
2363
2364   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2365
2366   priv = stream->priv;
2367
2368   g_mutex_lock (&priv->lock);
2369   result = priv->blocking;
2370   g_mutex_unlock (&priv->lock);
2371
2372   return result;
2373 }