stream: release some locks in error cases
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPProfile profiles;
72   GstRTSPLowerTrans protocols;
73
74   /* pads on the rtpbin */
75   GstPad *send_rtp_sink;
76   GstPad *recv_sink[2];
77   GstPad *send_src[2];
78
79   /* the RTPSession object */
80   GObject *session;
81
82   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
83    * sockets */
84   GstElement *udpsrc_v4[2];
85
86   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
87    * sockets */
88   GstElement *udpsrc_v6[2];
89
90   GstElement *udpsink[2];
91
92   /* for TCP transport */
93   GstElement *appsrc[2];
94   GstElement *appqueue[2];
95   GstElement *appsink[2];
96
97   GstElement *tee[2];
98   GstElement *funnel[2];
99
100   /* server ports for sending/receiving over ipv4 */
101   GstRTSPRange server_port_v4;
102   GstRTSPAddress *server_addr_v4;
103   gboolean have_ipv4;
104
105   /* server ports for sending/receiving over ipv6 */
106   GstRTSPRange server_port_v6;
107   GstRTSPAddress *server_addr_v6;
108   gboolean have_ipv6;
109
110   /* multicast addresses */
111   GstRTSPAddressPool *pool;
112   GstRTSPAddress *addr_v4;
113   GstRTSPAddress *addr_v6;
114
115   /* the caps of the stream */
116   gulong caps_sig;
117   GstCaps *caps;
118
119   /* transports we stream to */
120   guint n_active;
121   GList *transports;
122
123   gint dscp_qos;
124
125   /* stream blocking */
126   gulong blocked_id;
127   gboolean blocking;
128 };
129
130 #define DEFAULT_CONTROL         NULL
131 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
132 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
133                                         GST_RTSP_LOWER_TRANS_TCP
134
135 enum
136 {
137   PROP_0,
138   PROP_CONTROL,
139   PROP_PROFILES,
140   PROP_PROTOCOLS,
141   PROP_LAST
142 };
143
144 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
145 #define GST_CAT_DEFAULT rtsp_stream_debug
146
147 static GQuark ssrc_stream_map_key;
148
149 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
150     GValue * value, GParamSpec * pspec);
151 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
152     const GValue * value, GParamSpec * pspec);
153
154 static void gst_rtsp_stream_finalize (GObject * obj);
155
156 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
157
158 static void
159 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
160 {
161   GObjectClass *gobject_class;
162
163   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
164
165   gobject_class = G_OBJECT_CLASS (klass);
166
167   gobject_class->get_property = gst_rtsp_stream_get_property;
168   gobject_class->set_property = gst_rtsp_stream_set_property;
169   gobject_class->finalize = gst_rtsp_stream_finalize;
170
171   g_object_class_install_property (gobject_class, PROP_CONTROL,
172       g_param_spec_string ("control", "Control",
173           "The control string for this stream", DEFAULT_CONTROL,
174           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
175
176   g_object_class_install_property (gobject_class, PROP_PROFILES,
177       g_param_spec_flags ("profiles", "Profiles",
178           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
179           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
180
181   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
182       g_param_spec_flags ("protocols", "Protocols",
183           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
184           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
185
186   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
187
188   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
189 }
190
191 static void
192 gst_rtsp_stream_init (GstRTSPStream * stream)
193 {
194   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
195
196   GST_DEBUG ("new stream %p", stream);
197
198   stream->priv = priv;
199
200   priv->dscp_qos = -1;
201   priv->control = g_strdup (DEFAULT_CONTROL);
202   priv->profiles = DEFAULT_PROFILES;
203   priv->protocols = DEFAULT_PROTOCOLS;
204
205   g_mutex_init (&priv->lock);
206 }
207
208 static void
209 gst_rtsp_stream_finalize (GObject * obj)
210 {
211   GstRTSPStream *stream;
212   GstRTSPStreamPrivate *priv;
213
214   stream = GST_RTSP_STREAM (obj);
215   priv = stream->priv;
216
217   GST_DEBUG ("finalize stream %p", stream);
218
219   /* we really need to be unjoined now */
220   g_return_if_fail (!priv->is_joined);
221
222   if (priv->addr_v4)
223     gst_rtsp_address_free (priv->addr_v4);
224   if (priv->addr_v6)
225     gst_rtsp_address_free (priv->addr_v6);
226   if (priv->server_addr_v4)
227     gst_rtsp_address_free (priv->server_addr_v4);
228   if (priv->server_addr_v6)
229     gst_rtsp_address_free (priv->server_addr_v6);
230   if (priv->pool)
231     g_object_unref (priv->pool);
232   gst_object_unref (priv->payloader);
233   gst_object_unref (priv->srcpad);
234   g_free (priv->control);
235   g_mutex_clear (&priv->lock);
236
237   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
238 }
239
240 static void
241 gst_rtsp_stream_get_property (GObject * object, guint propid,
242     GValue * value, GParamSpec * pspec)
243 {
244   GstRTSPStream *stream = GST_RTSP_STREAM (object);
245
246   switch (propid) {
247     case PROP_CONTROL:
248       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
249       break;
250     case PROP_PROFILES:
251       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
252       break;
253     case PROP_PROTOCOLS:
254       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
255       break;
256     default:
257       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
258   }
259 }
260
261 static void
262 gst_rtsp_stream_set_property (GObject * object, guint propid,
263     const GValue * value, GParamSpec * pspec)
264 {
265   GstRTSPStream *stream = GST_RTSP_STREAM (object);
266
267   switch (propid) {
268     case PROP_CONTROL:
269       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
270       break;
271     case PROP_PROFILES:
272       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
273       break;
274     case PROP_PROTOCOLS:
275       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
276       break;
277     default:
278       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
279   }
280 }
281
282 /**
283  * gst_rtsp_stream_new:
284  * @idx: an index
285  * @srcpad: a #GstPad
286  * @payloader: a #GstElement
287  *
288  * Create a new media stream with index @idx that handles RTP data on
289  * @srcpad and has a payloader element @payloader.
290  *
291  * Returns: a new #GstRTSPStream
292  */
293 GstRTSPStream *
294 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
295 {
296   GstRTSPStreamPrivate *priv;
297   GstRTSPStream *stream;
298
299   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
300   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
301   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
302
303   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
304   priv = stream->priv;
305   priv->idx = idx;
306   priv->payloader = gst_object_ref (payloader);
307   priv->srcpad = gst_object_ref (srcpad);
308
309   return stream;
310 }
311
312 /**
313  * gst_rtsp_stream_get_index:
314  * @stream: a #GstRTSPStream
315  *
316  * Get the stream index.
317  *
318  * Return: the stream index.
319  */
320 guint
321 gst_rtsp_stream_get_index (GstRTSPStream * stream)
322 {
323   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
324
325   return stream->priv->idx;
326 }
327
328 /**
329  * gst_rtsp_stream_get_pt:
330  * @stream: a #GstRTSPStream
331  *
332  * Get the stream payload type.
333  *
334  * Return: the stream payload type.
335  */
336 guint
337 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
338 {
339   GstRTSPStreamPrivate *priv;
340   guint pt;
341
342   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
343
344   priv = stream->priv;
345
346   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
347
348   return pt;
349 }
350
351 /**
352  * gst_rtsp_stream_get_srcpad:
353  * @stream: a #GstRTSPStream
354  *
355  * Get the srcpad associated with @stream.
356  *
357  * Returns: (transfer full): the srcpad. Unref after usage.
358  */
359 GstPad *
360 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
361 {
362   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
363
364   return gst_object_ref (stream->priv->srcpad);
365 }
366
367 /**
368  * gst_rtsp_stream_get_control:
369  * @stream: a #GstRTSPStream
370  *
371  * Get the control string to identify this stream.
372  *
373  * Returns: (transfer full): the control string. free after usage.
374  */
375 gchar *
376 gst_rtsp_stream_get_control (GstRTSPStream * stream)
377 {
378   GstRTSPStreamPrivate *priv;
379   gchar *result;
380
381   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
382
383   priv = stream->priv;
384
385   g_mutex_lock (&priv->lock);
386   if ((result = g_strdup (priv->control)) == NULL)
387     result = g_strdup_printf ("stream=%u", priv->idx);
388   g_mutex_unlock (&priv->lock);
389
390   return result;
391 }
392
393 /**
394  * gst_rtsp_stream_set_control:
395  * @stream: a #GstRTSPStream
396  * @control: a control string
397  *
398  * Set the control string in @stream.
399  */
400 void
401 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
402 {
403   GstRTSPStreamPrivate *priv;
404
405   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
406
407   priv = stream->priv;
408
409   g_mutex_lock (&priv->lock);
410   g_free (priv->control);
411   priv->control = g_strdup (control);
412   g_mutex_unlock (&priv->lock);
413 }
414
415 /**
416  * gst_rtsp_stream_has_control:
417  * @stream: a #GstRTSPStream
418  * @control: a control string
419  *
420  * Check if @stream has the control string @control.
421  *
422  * Returns: %TRUE is @stream has @control as the control string
423  */
424 gboolean
425 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
426 {
427   GstRTSPStreamPrivate *priv;
428   gboolean res;
429
430   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
431
432   priv = stream->priv;
433
434   g_mutex_lock (&priv->lock);
435   if (priv->control)
436     res = (g_strcmp0 (priv->control, control) == 0);
437   else {
438     guint streamid;
439
440     if (sscanf (control, "stream=%u", &streamid) > 0)
441       res = (streamid == priv->idx);
442     else
443       res = FALSE;
444   }
445   g_mutex_unlock (&priv->lock);
446
447   return res;
448 }
449
450 /**
451  * gst_rtsp_stream_set_mtu:
452  * @stream: a #GstRTSPStream
453  * @mtu: a new MTU
454  *
455  * Configure the mtu in the payloader of @stream to @mtu.
456  */
457 void
458 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
459 {
460   GstRTSPStreamPrivate *priv;
461
462   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
463
464   priv = stream->priv;
465
466   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
467
468   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
469 }
470
471 /**
472  * gst_rtsp_stream_get_mtu:
473  * @stream: a #GstRTSPStream
474  *
475  * Get the configured MTU in the payloader of @stream.
476  *
477  * Returns: the MTU of the payloader.
478  */
479 guint
480 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
481 {
482   GstRTSPStreamPrivate *priv;
483   guint mtu;
484
485   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
486
487   priv = stream->priv;
488
489   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
490
491   return mtu;
492 }
493
494 /* Update the dscp qos property on the udp sinks */
495 static void
496 update_dscp_qos (GstRTSPStream * stream)
497 {
498   GstRTSPStreamPrivate *priv;
499
500   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
501
502   priv = stream->priv;
503
504   if (priv->udpsink[0]) {
505     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
506         NULL);
507   }
508
509   if (priv->udpsink[1]) {
510     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
511         NULL);
512   }
513 }
514
515 /**
516  * gst_rtsp_stream_set_dscp_qos:
517  * @stream: a #GstRTSPStream
518  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
519  *
520  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
521  */
522 void
523 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
524 {
525   GstRTSPStreamPrivate *priv;
526
527   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
528
529   priv = stream->priv;
530
531   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
532
533   if (dscp_qos < -1 || dscp_qos > 63) {
534     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
535     return;
536   }
537
538   priv->dscp_qos = dscp_qos;
539
540   update_dscp_qos (stream);
541 }
542
543 /**
544  * gst_rtsp_stream_get_dscp_qos:
545  * @stream: a #GstRTSPStream
546  *
547  * Get the configured DSCP QoS in of the outgoing sockets.
548  *
549  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
550  */
551 gint
552 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
553 {
554   GstRTSPStreamPrivate *priv;
555
556   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
557
558   priv = stream->priv;
559
560   return priv->dscp_qos;
561 }
562
563 /**
564  * gst_rtsp_stream_is_transport_supported:
565  * @stream: a #GstRTSPStream
566  * @transport: a #GstRTSPTransport
567  *
568  * Check if @transport can be handled by stream
569  *
570  * Returns: %TRUE if @transport can be handled by @stream.
571  */
572 gboolean
573 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
574     GstRTSPTransport * transport)
575 {
576   GstRTSPStreamPrivate *priv;
577
578   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
579
580   priv = stream->priv;
581
582   g_mutex_lock (&priv->lock);
583   if (transport->trans != GST_RTSP_TRANS_RTP)
584     goto unsupported_transmode;
585
586   if (!(transport->profile & priv->profiles))
587     goto unsupported_profile;
588
589   if (!(transport->lower_transport & priv->protocols))
590     goto unsupported_ltrans;
591
592   g_mutex_unlock (&priv->lock);
593
594   return TRUE;
595
596   /* ERRORS */
597 unsupported_transmode:
598   {
599     GST_DEBUG ("unsupported transport mode %d", transport->trans);
600     g_mutex_unlock (&priv->lock);
601     return FALSE;
602   }
603 unsupported_profile:
604   {
605     GST_DEBUG ("unsupported profile %d", transport->profile);
606     g_mutex_unlock (&priv->lock);
607     return FALSE;
608   }
609 unsupported_ltrans:
610   {
611     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
612     g_mutex_unlock (&priv->lock);
613     return FALSE;
614   }
615 }
616
617 /**
618  * gst_rtsp_stream_set_profiles:
619  * @stream: a #GstRTSPStream
620  * @profiles: the new profiles
621  *
622  * Configure the allowed profiles for @stream.
623  */
624 void
625 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
626 {
627   GstRTSPStreamPrivate *priv;
628
629   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
630
631   priv = stream->priv;
632
633   g_mutex_lock (&priv->lock);
634   priv->profiles = profiles;
635   g_mutex_unlock (&priv->lock);
636 }
637
638 /**
639  * gst_rtsp_stream_get_profiles:
640  * @stream: a #GstRTSPStream
641  *
642  * Get the allowed profiles of @stream.
643  *
644  * Returns: a #GstRTSPProfile
645  */
646 GstRTSPProfile
647 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
648 {
649   GstRTSPStreamPrivate *priv;
650   GstRTSPProfile res;
651
652   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
653
654   priv = stream->priv;
655
656   g_mutex_lock (&priv->lock);
657   res = priv->profiles;
658   g_mutex_unlock (&priv->lock);
659
660   return res;
661 }
662
663 /**
664  * gst_rtsp_stream_set_protocols:
665  * @stream: a #GstRTSPStream
666  * @protocols: the new flags
667  *
668  * Configure the allowed lower transport for @stream.
669  */
670 void
671 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
672     GstRTSPLowerTrans protocols)
673 {
674   GstRTSPStreamPrivate *priv;
675
676   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
677
678   priv = stream->priv;
679
680   g_mutex_lock (&priv->lock);
681   priv->protocols = protocols;
682   g_mutex_unlock (&priv->lock);
683 }
684
685 /**
686  * gst_rtsp_stream_get_protocols:
687  * @stream: a #GstRTSPStream
688  *
689  * Get the allowed protocols of @stream.
690  *
691  * Returns: a #GstRTSPLowerTrans
692  */
693 GstRTSPLowerTrans
694 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
695 {
696   GstRTSPStreamPrivate *priv;
697   GstRTSPLowerTrans res;
698
699   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
700       GST_RTSP_LOWER_TRANS_UNKNOWN);
701
702   priv = stream->priv;
703
704   g_mutex_lock (&priv->lock);
705   res = priv->protocols;
706   g_mutex_unlock (&priv->lock);
707
708   return res;
709 }
710
711 /**
712  * gst_rtsp_stream_set_address_pool:
713  * @stream: a #GstRTSPStream
714  * @pool: a #GstRTSPAddressPool
715  *
716  * configure @pool to be used as the address pool of @stream.
717  */
718 void
719 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
720     GstRTSPAddressPool * pool)
721 {
722   GstRTSPStreamPrivate *priv;
723   GstRTSPAddressPool *old;
724
725   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
726
727   priv = stream->priv;
728
729   GST_LOG_OBJECT (stream, "set address pool %p", pool);
730
731   g_mutex_lock (&priv->lock);
732   if ((old = priv->pool) != pool)
733     priv->pool = pool ? g_object_ref (pool) : NULL;
734   else
735     old = NULL;
736   g_mutex_unlock (&priv->lock);
737
738   if (old)
739     g_object_unref (old);
740 }
741
742 /**
743  * gst_rtsp_stream_get_address_pool:
744  * @stream: a #GstRTSPStream
745  *
746  * Get the #GstRTSPAddressPool used as the address pool of @stream.
747  *
748  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
749  * usage.
750  */
751 GstRTSPAddressPool *
752 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
753 {
754   GstRTSPStreamPrivate *priv;
755   GstRTSPAddressPool *result;
756
757   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
758
759   priv = stream->priv;
760
761   g_mutex_lock (&priv->lock);
762   if ((result = priv->pool))
763     g_object_ref (result);
764   g_mutex_unlock (&priv->lock);
765
766   return result;
767 }
768
769 /**
770  * gst_rtsp_stream_get_multicast_address:
771  * @stream: a #GstRTSPStream
772  * @family: the #GSocketFamily
773  *
774  * Get the multicast address of @stream for @family.
775  *
776  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
777  * allocated. gst_rtsp_address_free() after usage.
778  */
779 GstRTSPAddress *
780 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
781     GSocketFamily family)
782 {
783   GstRTSPStreamPrivate *priv;
784   GstRTSPAddress *result;
785   GstRTSPAddress **addrp;
786   GstRTSPAddressFlags flags;
787
788   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
789
790   priv = stream->priv;
791
792   if (family == G_SOCKET_FAMILY_IPV6) {
793     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
794     addrp = &priv->addr_v4;
795   } else {
796     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
797     addrp = &priv->addr_v6;
798   }
799
800   g_mutex_lock (&priv->lock);
801   if (*addrp == NULL) {
802     if (priv->pool == NULL)
803       goto no_pool;
804
805     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
806
807     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
808     if (*addrp == NULL)
809       goto no_address;
810   }
811   result = gst_rtsp_address_copy (*addrp);
812   g_mutex_unlock (&priv->lock);
813
814   return result;
815
816   /* ERRORS */
817 no_pool:
818   {
819     GST_ERROR_OBJECT (stream, "no address pool specified");
820     g_mutex_unlock (&priv->lock);
821     return NULL;
822   }
823 no_address:
824   {
825     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
826     g_mutex_unlock (&priv->lock);
827     return NULL;
828   }
829 }
830
831 /**
832  * gst_rtsp_stream_reserve_address:
833  * @stream: a #GstRTSPStream
834  * @address: an address
835  * @port: a port
836  * @n_ports: n_ports
837  * @ttl: a TTL
838  *
839  * Reserve @address and @port as the address and port of @stream.
840  *
841  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
842  * reserved. gst_rtsp_address_free() after usage.
843  */
844 GstRTSPAddress *
845 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
846     const gchar * address, guint port, guint n_ports, guint ttl)
847 {
848   GstRTSPStreamPrivate *priv;
849   GstRTSPAddress *result;
850   GInetAddress *addr;
851   GSocketFamily family;
852   GstRTSPAddress **addrp;
853
854   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
855   g_return_val_if_fail (address != NULL, NULL);
856   g_return_val_if_fail (port > 0, NULL);
857   g_return_val_if_fail (n_ports > 0, NULL);
858   g_return_val_if_fail (ttl > 0, NULL);
859
860   priv = stream->priv;
861
862   addr = g_inet_address_new_from_string (address);
863   if (!addr) {
864     GST_ERROR ("failed to get inet addr from %s", address);
865     family = G_SOCKET_FAMILY_IPV4;
866   } else {
867     family = g_inet_address_get_family (addr);
868     g_object_unref (addr);
869   }
870
871   if (family == G_SOCKET_FAMILY_IPV6)
872     addrp = &priv->addr_v4;
873   else
874     addrp = &priv->addr_v6;
875
876   g_mutex_lock (&priv->lock);
877   if (*addrp == NULL) {
878     GstRTSPAddressPoolResult res;
879
880     if (priv->pool == NULL)
881       goto no_pool;
882
883     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
884         port, n_ports, ttl, addrp);
885     if (res != GST_RTSP_ADDRESS_POOL_OK)
886       goto no_address;
887   } else {
888     if (strcmp ((*addrp)->address, address) ||
889         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
890         (*addrp)->ttl != ttl)
891       goto different_address;
892   }
893   result = gst_rtsp_address_copy (*addrp);
894   g_mutex_unlock (&priv->lock);
895
896   return result;
897
898   /* ERRORS */
899 no_pool:
900   {
901     GST_ERROR_OBJECT (stream, "no address pool specified");
902     g_mutex_unlock (&priv->lock);
903     return NULL;
904   }
905 no_address:
906   {
907     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
908         address);
909     g_mutex_unlock (&priv->lock);
910     return NULL;
911   }
912 different_address:
913   {
914     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
915         " reserved", address);
916     g_mutex_unlock (&priv->lock);
917     return NULL;
918   }
919 }
920
921 static gboolean
922 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
923     GSocketFamily family, GstElement * udpsrc_out[2],
924     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
925     GstRTSPAddress ** server_addr_out)
926 {
927   GstStateChangeReturn ret;
928   GstElement *udpsrc0, *udpsrc1;
929   GstElement *udpsink0, *udpsink1;
930   GSocket *rtp_socket = NULL;
931   GSocket *rtcp_socket;
932   gint tmp_rtp, tmp_rtcp;
933   guint count;
934   gint rtpport, rtcpport;
935   GList *rejected_addresses = NULL;
936   GstRTSPAddress *addr = NULL;
937   GInetAddress *inetaddr = NULL;
938   GSocketAddress *rtp_sockaddr = NULL;
939   GSocketAddress *rtcp_sockaddr = NULL;
940   const gchar *multisink_socket;
941
942   if (family == G_SOCKET_FAMILY_IPV6)
943     multisink_socket = "socket-v6";
944   else
945     multisink_socket = "socket";
946
947   udpsrc0 = NULL;
948   udpsrc1 = NULL;
949   udpsink0 = NULL;
950   udpsink1 = NULL;
951   count = 0;
952
953   /* Start with random port */
954   tmp_rtp = 0;
955
956   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
957       G_SOCKET_PROTOCOL_UDP, NULL);
958   if (!rtcp_socket)
959     goto no_udp_protocol;
960
961   if (*server_addr_out)
962     gst_rtsp_address_free (*server_addr_out);
963
964   /* try to allocate 2 UDP ports, the RTP port should be an even
965    * number and the RTCP port should be the next (uneven) port */
966 again:
967
968   if (rtp_socket == NULL) {
969     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
970         G_SOCKET_PROTOCOL_UDP, NULL);
971     if (!rtp_socket)
972       goto no_udp_protocol;
973   }
974
975   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
976     GstRTSPAddressFlags flags;
977
978     if (addr)
979       rejected_addresses = g_list_prepend (rejected_addresses, addr);
980
981     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
982     if (family == G_SOCKET_FAMILY_IPV6)
983       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
984     else
985       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
986
987     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
988
989     if (addr == NULL)
990       goto no_ports;
991
992     tmp_rtp = addr->port;
993
994     g_clear_object (&inetaddr);
995     inetaddr = g_inet_address_new_from_string (addr->address);
996   } else {
997     if (tmp_rtp != 0) {
998       tmp_rtp += 2;
999       if (++count > 20)
1000         goto no_ports;
1001     }
1002
1003     if (inetaddr == NULL)
1004       inetaddr = g_inet_address_new_any (family);
1005   }
1006
1007   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1008   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1009     g_object_unref (rtp_sockaddr);
1010     goto again;
1011   }
1012   g_object_unref (rtp_sockaddr);
1013
1014   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1015   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1016     g_clear_object (&rtp_sockaddr);
1017     goto socket_error;
1018   }
1019
1020   tmp_rtp =
1021       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1022   g_object_unref (rtp_sockaddr);
1023
1024   /* check if port is even */
1025   if ((tmp_rtp & 1) != 0) {
1026     /* port not even, close and allocate another */
1027     tmp_rtp++;
1028     g_clear_object (&rtp_socket);
1029     goto again;
1030   }
1031
1032   /* set port */
1033   tmp_rtcp = tmp_rtp + 1;
1034
1035   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1036   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1037     g_object_unref (rtcp_sockaddr);
1038     g_clear_object (&rtp_socket);
1039     goto again;
1040   }
1041   g_object_unref (rtcp_sockaddr);
1042
1043   g_clear_object (&inetaddr);
1044
1045   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1046   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1047
1048   if (udpsrc0 == NULL || udpsrc1 == NULL)
1049     goto no_udp_protocol;
1050
1051   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1052   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1053
1054   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
1055   if (ret == GST_STATE_CHANGE_FAILURE)
1056     goto element_error;
1057   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
1058   if (ret == GST_STATE_CHANGE_FAILURE)
1059     goto element_error;
1060
1061   /* all fine, do port check */
1062   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1063   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1064
1065   /* this should not happen... */
1066   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1067     goto port_error;
1068
1069   if (udpsink_out[0])
1070     udpsink0 = udpsink_out[0];
1071   else
1072     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1073
1074   if (!udpsink0)
1075     goto no_udp_protocol;
1076
1077   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1078   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1079
1080   if (udpsink_out[1])
1081     udpsink1 = udpsink_out[1];
1082   else
1083     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1084
1085   if (!udpsink1)
1086     goto no_udp_protocol;
1087
1088   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1089   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1090   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1091
1092   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1093   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1094   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1095   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1096   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1097   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1098   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1099   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1100
1101   /* we keep these elements, we will further configure them when the
1102    * client told us to really use the UDP ports. */
1103   udpsrc_out[0] = udpsrc0;
1104   udpsrc_out[1] = udpsrc1;
1105   udpsink_out[0] = udpsink0;
1106   udpsink_out[1] = udpsink1;
1107   server_port_out->min = rtpport;
1108   server_port_out->max = rtcpport;
1109
1110   *server_addr_out = addr;
1111   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1112
1113   g_object_unref (rtp_socket);
1114   g_object_unref (rtcp_socket);
1115
1116   return TRUE;
1117
1118   /* ERRORS */
1119 no_udp_protocol:
1120   {
1121     goto cleanup;
1122   }
1123 no_ports:
1124   {
1125     goto cleanup;
1126   }
1127 port_error:
1128   {
1129     goto cleanup;
1130   }
1131 socket_error:
1132   {
1133     goto cleanup;
1134   }
1135 element_error:
1136   {
1137     goto cleanup;
1138   }
1139 cleanup:
1140   {
1141     if (udpsrc0) {
1142       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1143       gst_object_unref (udpsrc0);
1144     }
1145     if (udpsrc1) {
1146       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1147       gst_object_unref (udpsrc1);
1148     }
1149     if (udpsink0) {
1150       gst_element_set_state (udpsink0, GST_STATE_NULL);
1151       gst_object_unref (udpsink0);
1152     }
1153     if (inetaddr)
1154       g_object_unref (inetaddr);
1155     g_list_free_full (rejected_addresses,
1156         (GDestroyNotify) gst_rtsp_address_free);
1157     if (addr)
1158       gst_rtsp_address_free (addr);
1159     if (rtp_socket)
1160       g_object_unref (rtp_socket);
1161     if (rtcp_socket)
1162       g_object_unref (rtcp_socket);
1163     return FALSE;
1164   }
1165 }
1166
1167 /* must be called with lock */
1168 static gboolean
1169 alloc_ports (GstRTSPStream * stream)
1170 {
1171   GstRTSPStreamPrivate *priv = stream->priv;
1172
1173   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1174       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1175       &priv->server_port_v4, &priv->server_addr_v4);
1176
1177   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1178       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1179       &priv->server_port_v6, &priv->server_addr_v6);
1180
1181   return priv->have_ipv4 || priv->have_ipv6;
1182 }
1183
1184 /**
1185  * gst_rtsp_stream_get_server_port:
1186  * @stream: a #GstRTSPStream
1187  * @server_port: (out): result server port
1188  * @family: the port family to get
1189  *
1190  * Fill @server_port with the port pair used by the server. This function can
1191  * only be called when @stream has been joined.
1192  */
1193 void
1194 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1195     GstRTSPRange * server_port, GSocketFamily family)
1196 {
1197   GstRTSPStreamPrivate *priv;
1198
1199   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1200   priv = stream->priv;
1201   g_return_if_fail (priv->is_joined);
1202
1203   g_mutex_lock (&priv->lock);
1204   if (family == G_SOCKET_FAMILY_IPV4) {
1205     if (server_port)
1206       *server_port = priv->server_port_v4;
1207   } else {
1208     if (server_port)
1209       *server_port = priv->server_port_v6;
1210   }
1211   g_mutex_unlock (&priv->lock);
1212 }
1213
1214 /**
1215  * gst_rtsp_stream_get_rtpsession:
1216  * @stream: a #GstRTSPStream
1217  *
1218  * Get the RTP session of this stream.
1219  *
1220  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1221  */
1222 GObject *
1223 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1224 {
1225   GstRTSPStreamPrivate *priv;
1226   GObject *session;
1227
1228   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1229
1230   priv = stream->priv;
1231
1232   g_mutex_lock (&priv->lock);
1233   if ((session = priv->session))
1234     g_object_ref (session);
1235   g_mutex_unlock (&priv->lock);
1236
1237   return session;
1238 }
1239
1240 /**
1241  * gst_rtsp_stream_get_ssrc:
1242  * @stream: a #GstRTSPStream
1243  * @ssrc: (out): result ssrc
1244  *
1245  * Get the SSRC used by the RTP session of this stream. This function can only
1246  * be called when @stream has been joined.
1247  */
1248 void
1249 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1250 {
1251   GstRTSPStreamPrivate *priv;
1252
1253   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1254   priv = stream->priv;
1255   g_return_if_fail (priv->is_joined);
1256
1257   g_mutex_lock (&priv->lock);
1258   if (ssrc && priv->session)
1259     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1260   g_mutex_unlock (&priv->lock);
1261 }
1262
1263 /* executed from streaming thread */
1264 static void
1265 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1266 {
1267   GstRTSPStreamPrivate *priv = stream->priv;
1268   GstCaps *newcaps, *oldcaps;
1269
1270   newcaps = gst_pad_get_current_caps (pad);
1271
1272   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1273       newcaps);
1274
1275   g_mutex_lock (&priv->lock);
1276   oldcaps = priv->caps;
1277   priv->caps = newcaps;
1278   g_mutex_unlock (&priv->lock);
1279
1280   if (oldcaps)
1281     gst_caps_unref (oldcaps);
1282 }
1283
1284 static void
1285 dump_structure (const GstStructure * s)
1286 {
1287   gchar *sstr;
1288
1289   sstr = gst_structure_to_string (s);
1290   GST_INFO ("structure: %s", sstr);
1291   g_free (sstr);
1292 }
1293
1294 static GstRTSPStreamTransport *
1295 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1296 {
1297   GstRTSPStreamPrivate *priv = stream->priv;
1298   GList *walk;
1299   GstRTSPStreamTransport *result = NULL;
1300   const gchar *tmp;
1301   gchar *dest;
1302   guint port;
1303
1304   if (rtcp_from == NULL)
1305     return NULL;
1306
1307   tmp = g_strrstr (rtcp_from, ":");
1308   if (tmp == NULL)
1309     return NULL;
1310
1311   port = atoi (tmp + 1);
1312   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1313
1314   g_mutex_lock (&priv->lock);
1315   GST_INFO ("finding %s:%d in %d transports", dest, port,
1316       g_list_length (priv->transports));
1317
1318   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1319     GstRTSPStreamTransport *trans = walk->data;
1320     const GstRTSPTransport *tr;
1321     gint min, max;
1322
1323     tr = gst_rtsp_stream_transport_get_transport (trans);
1324
1325     min = tr->client_port.min;
1326     max = tr->client_port.max;
1327
1328     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1329       result = trans;
1330       break;
1331     }
1332   }
1333   if (result)
1334     g_object_ref (result);
1335   g_mutex_unlock (&priv->lock);
1336
1337   g_free (dest);
1338
1339   return result;
1340 }
1341
1342 static GstRTSPStreamTransport *
1343 check_transport (GObject * source, GstRTSPStream * stream)
1344 {
1345   GstStructure *stats;
1346   GstRTSPStreamTransport *trans;
1347
1348   /* see if we have a stream to match with the origin of the RTCP packet */
1349   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1350   if (trans == NULL) {
1351     g_object_get (source, "stats", &stats, NULL);
1352     if (stats) {
1353       const gchar *rtcp_from;
1354
1355       dump_structure (stats);
1356
1357       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1358       if ((trans = find_transport (stream, rtcp_from))) {
1359         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1360             source);
1361         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1362             g_object_unref);
1363       }
1364       gst_structure_free (stats);
1365     }
1366   }
1367   return trans;
1368 }
1369
1370
1371 static void
1372 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1373 {
1374   GstRTSPStreamTransport *trans;
1375
1376   GST_INFO ("%p: new source %p", stream, source);
1377
1378   trans = check_transport (source, stream);
1379
1380   if (trans)
1381     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1382 }
1383
1384 static void
1385 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1386 {
1387   GST_INFO ("%p: new SDES %p", stream, source);
1388 }
1389
1390 static void
1391 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1392 {
1393   GstRTSPStreamTransport *trans;
1394
1395   trans = check_transport (source, stream);
1396
1397   if (trans) {
1398     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1399     gst_rtsp_stream_transport_keep_alive (trans);
1400   }
1401 #ifdef DUMP_STATS
1402   {
1403     GstStructure *stats;
1404     g_object_get (source, "stats", &stats, NULL);
1405     if (stats) {
1406       dump_structure (stats);
1407       gst_structure_free (stats);
1408     }
1409   }
1410 #endif
1411 }
1412
1413 static void
1414 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1415 {
1416   GST_INFO ("%p: source %p bye", stream, source);
1417 }
1418
1419 static void
1420 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1421 {
1422   GstRTSPStreamTransport *trans;
1423
1424   GST_INFO ("%p: source %p bye timeout", stream, source);
1425
1426   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1427     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1428     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1429   }
1430 }
1431
1432 static void
1433 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1434 {
1435   GstRTSPStreamTransport *trans;
1436
1437   GST_INFO ("%p: source %p timeout", stream, source);
1438
1439   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1440     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1441     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1442   }
1443 }
1444
1445 static GstFlowReturn
1446 handle_new_sample (GstAppSink * sink, gpointer user_data)
1447 {
1448   GstRTSPStreamPrivate *priv;
1449   GList *walk;
1450   GstSample *sample;
1451   GstBuffer *buffer;
1452   GstRTSPStream *stream;
1453
1454   sample = gst_app_sink_pull_sample (sink);
1455   if (!sample)
1456     return GST_FLOW_OK;
1457
1458   stream = (GstRTSPStream *) user_data;
1459   priv = stream->priv;
1460   buffer = gst_sample_get_buffer (sample);
1461
1462   g_mutex_lock (&priv->lock);
1463   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1464     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1465
1466     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1467       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1468     } else {
1469       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1470     }
1471   }
1472   g_mutex_unlock (&priv->lock);
1473
1474   gst_sample_unref (sample);
1475
1476   return GST_FLOW_OK;
1477 }
1478
1479 static GstAppSinkCallbacks sink_cb = {
1480   NULL,                         /* not interested in EOS */
1481   NULL,                         /* not interested in preroll samples */
1482   handle_new_sample,
1483 };
1484
1485 /**
1486  * gst_rtsp_stream_join_bin:
1487  * @stream: a #GstRTSPStream
1488  * @bin: a #GstBin to join
1489  * @rtpbin: a rtpbin element in @bin
1490  * @state: the target state of the new elements
1491  *
1492  * Join the #GstBin @bin that contains the element @rtpbin.
1493  *
1494  * @stream will link to @rtpbin, which must be inside @bin. The elements
1495  * added to @bin will be set to the state given in @state.
1496  *
1497  * Returns: %TRUE on success.
1498  */
1499 gboolean
1500 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1501     GstElement * rtpbin, GstState state)
1502 {
1503   GstRTSPStreamPrivate *priv;
1504   gint i;
1505   guint idx;
1506   gchar *name;
1507   GstPad *pad, *sinkpad, *selpad;
1508   GstPadLinkReturn ret;
1509
1510   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1511   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1512   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1513
1514   priv = stream->priv;
1515
1516   g_mutex_lock (&priv->lock);
1517   if (priv->is_joined)
1518     goto was_joined;
1519
1520   /* create a session with the same index as the stream */
1521   idx = priv->idx;
1522
1523   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1524
1525   if (!alloc_ports (stream))
1526     goto no_ports;
1527
1528   /* update the dscp qos field in the sinks */
1529   update_dscp_qos (stream);
1530
1531   /* get a pad for sending RTP */
1532   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1533   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1534   g_free (name);
1535   /* link the RTP pad to the session manager, it should not really fail unless
1536    * this is not really an RTP pad */
1537   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1538   if (ret != GST_PAD_LINK_OK)
1539     goto link_failed;
1540
1541   /* get pads from the RTP session element for sending and receiving
1542    * RTP/RTCP*/
1543   name = g_strdup_printf ("send_rtp_src_%u", idx);
1544   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1545   g_free (name);
1546   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1547   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1548   g_free (name);
1549   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1550   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1551   g_free (name);
1552   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1553   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1554   g_free (name);
1555
1556   /* get the session */
1557   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1558
1559   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1560       stream);
1561   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1562       stream);
1563   g_signal_connect (priv->session, "on-ssrc-active",
1564       (GCallback) on_ssrc_active, stream);
1565   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1566       stream);
1567   g_signal_connect (priv->session, "on-bye-timeout",
1568       (GCallback) on_bye_timeout, stream);
1569   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1570       stream);
1571
1572   for (i = 0; i < 2; i++) {
1573     GstPad *teepad, *queuepad;
1574     /* For the sender we create this bit of pipeline for both
1575      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1576      * we need to add a queue before appsink to make the pipeline
1577      * not block. For the TCP case, we want to pump data to the
1578      * client as fast as possible anyway.
1579      *
1580      * .--------.      .-----.    .---------.
1581      * | rtpbin |      | tee |    | udpsink |
1582      * |       send->sink   src->sink       |
1583      * '--------'      |     |    '---------'
1584      *                 |     |    .---------.    .---------.
1585      *                 |     |    |  queue  |    | appsink |
1586      *                 |    src->sink      src->sink       |
1587      *                 '-----'    '---------'    '---------'
1588      *
1589      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1590      * udpsink directly to the session.
1591      */
1592     /* add udpsink */
1593     gst_bin_add (bin, priv->udpsink[i]);
1594     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1595
1596     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1597       /* make tee for RTP/RTCP */
1598       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1599       gst_bin_add (bin, priv->tee[i]);
1600
1601       /* and link to rtpbin send pad */
1602       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1603       gst_pad_link (priv->send_src[i], pad);
1604       gst_object_unref (pad);
1605
1606       /* link tee to udpsink */
1607       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1608       gst_pad_link (teepad, sinkpad);
1609       gst_object_unref (teepad);
1610
1611       /* make queue */
1612       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1613       gst_bin_add (bin, priv->appqueue[i]);
1614       /* and link to tee */
1615       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1616       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1617       gst_pad_link (teepad, pad);
1618       gst_object_unref (pad);
1619       gst_object_unref (teepad);
1620
1621       /* make appsink */
1622       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1623       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1624       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1625       gst_bin_add (bin, priv->appsink[i]);
1626       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1627           &sink_cb, stream, NULL);
1628       /* and link to queue */
1629       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1630       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1631       gst_pad_link (queuepad, pad);
1632       gst_object_unref (pad);
1633       gst_object_unref (queuepad);
1634     } else {
1635       /* else only udpsink needed, link it to the session */
1636       gst_pad_link (priv->send_src[i], sinkpad);
1637     }
1638     gst_object_unref (sinkpad);
1639
1640     /* For the receiver we create this bit of pipeline for both
1641      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1642      * and it is all funneled into the rtpbin receive pad.
1643      *
1644      * .--------.     .--------.    .--------.
1645      * | udpsrc |     | funnel |    | rtpbin |
1646      * |       src->sink      src->sink      |
1647      * '--------'     |        |    '--------'
1648      * .--------.     |        |
1649      * | appsrc |     |        |
1650      * |       src->sink       |
1651      * '--------'     '--------'
1652      */
1653     /* make funnel for the RTP/RTCP receivers */
1654     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1655     gst_bin_add (bin, priv->funnel[i]);
1656
1657     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1658     gst_pad_link (pad, priv->recv_sink[i]);
1659     gst_object_unref (pad);
1660
1661     if (priv->udpsrc_v4[i]) {
1662       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1663        * values */
1664       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1665       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1666       /* add udpsrc */
1667       gst_bin_add (bin, priv->udpsrc_v4[i]);
1668
1669       /* and link to the funnel v4 */
1670       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1671       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1672       gst_pad_link (pad, selpad);
1673       gst_object_unref (pad);
1674       gst_object_unref (selpad);
1675     }
1676
1677     if (priv->udpsrc_v6[i]) {
1678       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1679       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1680       gst_bin_add (bin, priv->udpsrc_v6[i]);
1681
1682       /* and link to the funnel v6 */
1683       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1684       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1685       gst_pad_link (pad, selpad);
1686       gst_object_unref (pad);
1687       gst_object_unref (selpad);
1688     }
1689
1690     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1691       /* make and add appsrc */
1692       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1693       gst_bin_add (bin, priv->appsrc[i]);
1694       /* and link to the funnel */
1695       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1696       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1697       gst_pad_link (pad, selpad);
1698       gst_object_unref (pad);
1699       gst_object_unref (selpad);
1700     }
1701
1702     /* check if we need to set to a special state */
1703     if (state != GST_STATE_NULL) {
1704       if (priv->udpsink[i])
1705         gst_element_set_state (priv->udpsink[i], state);
1706       if (priv->appsink[i])
1707         gst_element_set_state (priv->appsink[i], state);
1708       if (priv->appqueue[i])
1709         gst_element_set_state (priv->appqueue[i], state);
1710       if (priv->tee[i])
1711         gst_element_set_state (priv->tee[i], state);
1712       if (priv->funnel[i])
1713         gst_element_set_state (priv->funnel[i], state);
1714       if (priv->appsrc[i])
1715         gst_element_set_state (priv->appsrc[i], state);
1716     }
1717   }
1718
1719   /* be notified of caps changes */
1720   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1721       (GCallback) caps_notify, stream);
1722
1723   priv->is_joined = TRUE;
1724   g_mutex_unlock (&priv->lock);
1725
1726   return TRUE;
1727
1728   /* ERRORS */
1729 was_joined:
1730   {
1731     g_mutex_unlock (&priv->lock);
1732     return TRUE;
1733   }
1734 no_ports:
1735   {
1736     g_mutex_unlock (&priv->lock);
1737     GST_WARNING ("failed to allocate ports %u", idx);
1738     return FALSE;
1739   }
1740 link_failed:
1741   {
1742     GST_WARNING ("failed to link stream %u", idx);
1743     gst_object_unref (priv->send_rtp_sink);
1744     priv->send_rtp_sink = NULL;
1745     g_mutex_unlock (&priv->lock);
1746     return FALSE;
1747   }
1748 }
1749
1750 /**
1751  * gst_rtsp_stream_leave_bin:
1752  * @stream: a #GstRTSPStream
1753  * @bin: a #GstBin
1754  * @rtpbin: a rtpbin #GstElement
1755  *
1756  * Remove the elements of @stream from @bin.
1757  *
1758  * Return: %TRUE on success.
1759  */
1760 gboolean
1761 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1762     GstElement * rtpbin)
1763 {
1764   GstRTSPStreamPrivate *priv;
1765   gint i;
1766
1767   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1768   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1769   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1770
1771   priv = stream->priv;
1772
1773   g_mutex_lock (&priv->lock);
1774   if (!priv->is_joined)
1775     goto was_not_joined;
1776
1777   /* all transports must be removed by now */
1778   g_return_val_if_fail (priv->transports == NULL, FALSE);
1779
1780   GST_INFO ("stream %p leaving bin", stream);
1781
1782   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1783   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1784   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1785   gst_object_unref (priv->send_rtp_sink);
1786   priv->send_rtp_sink = NULL;
1787
1788   for (i = 0; i < 2; i++) {
1789     if (priv->udpsink[i])
1790       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1791     if (priv->appsink[i])
1792       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1793     if (priv->appqueue[i])
1794       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1795     if (priv->tee[i])
1796       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1797     if (priv->funnel[i])
1798       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1799     if (priv->appsrc[i])
1800       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1801     if (priv->udpsrc_v4[i]) {
1802       /* and set udpsrc to NULL now before removing */
1803       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1804       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1805       /* removing them should also nicely release the request
1806        * pads when they finalize */
1807       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1808     }
1809     if (priv->udpsrc_v6[i]) {
1810       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1811       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1812       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1813     }
1814     if (priv->udpsink[i])
1815       gst_bin_remove (bin, priv->udpsink[i]);
1816     if (priv->appsrc[i])
1817       gst_bin_remove (bin, priv->appsrc[i]);
1818     if (priv->appsink[i])
1819       gst_bin_remove (bin, priv->appsink[i]);
1820     if (priv->appqueue[i])
1821       gst_bin_remove (bin, priv->appqueue[i]);
1822     if (priv->tee[i])
1823       gst_bin_remove (bin, priv->tee[i]);
1824     if (priv->funnel[i])
1825       gst_bin_remove (bin, priv->funnel[i]);
1826
1827     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1828     gst_object_unref (priv->recv_sink[i]);
1829     priv->recv_sink[i] = NULL;
1830
1831     priv->udpsrc_v4[i] = NULL;
1832     priv->udpsrc_v6[i] = NULL;
1833     priv->udpsink[i] = NULL;
1834     priv->appsrc[i] = NULL;
1835     priv->appsink[i] = NULL;
1836     priv->appqueue[i] = NULL;
1837     priv->tee[i] = NULL;
1838     priv->funnel[i] = NULL;
1839   }
1840   gst_object_unref (priv->send_src[0]);
1841   priv->send_src[0] = NULL;
1842
1843   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1844   gst_object_unref (priv->send_src[1]);
1845   priv->send_src[1] = NULL;
1846
1847   g_object_unref (priv->session);
1848   priv->session = NULL;
1849   if (priv->caps)
1850     gst_caps_unref (priv->caps);
1851   priv->caps = NULL;
1852
1853   priv->is_joined = FALSE;
1854   g_mutex_unlock (&priv->lock);
1855
1856   return TRUE;
1857
1858 was_not_joined:
1859   {
1860     g_mutex_unlock (&priv->lock);
1861     return TRUE;
1862   }
1863 }
1864
1865 /**
1866  * gst_rtsp_stream_get_rtpinfo:
1867  * @stream: a #GstRTSPStream
1868  * @rtptime: (allow-none): result RTP timestamp
1869  * @seq: (allow-none): result RTP seqnum
1870  * @clock_rate: the clock rate
1871  * @running_time: (allow-none): result running-time
1872  *
1873  * Retrieve the current rtptime, seq and running-time. This is used to
1874  * construct a RTPInfo reply header.
1875  *
1876  * Returns: %TRUE when rtptime, seq and running-time could be determined.
1877  */
1878 gboolean
1879 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1880     guint * rtptime, guint * seq, guint * clock_rate,
1881     GstClockTime * running_time)
1882 {
1883   GstRTSPStreamPrivate *priv;
1884   GstStructure *stats;
1885   GObjectClass *payobjclass;
1886
1887   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1888
1889   priv = stream->priv;
1890
1891   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1892
1893   g_mutex_lock (&priv->lock);
1894
1895   if (g_object_class_find_property (payobjclass, "stats")) {
1896     g_object_get (priv->payloader, "stats", &stats, NULL);
1897     if (stats == NULL)
1898       goto no_stats;
1899
1900     if (seq)
1901       gst_structure_get_uint (stats, "seqnum", seq);
1902
1903     if (rtptime)
1904       gst_structure_get_uint (stats, "timestamp", rtptime);
1905
1906     if (running_time)
1907       gst_structure_get_clock_time (stats, "running-time", running_time);
1908
1909     if (clock_rate) {
1910       gst_structure_get_uint (stats, "clock-rate", clock_rate);
1911       if (*clock_rate == 0 && running_time)
1912         *running_time = GST_CLOCK_TIME_NONE;
1913     }
1914     gst_structure_free (stats);
1915   } else {
1916     if (!g_object_class_find_property (payobjclass, "seqnum") ||
1917         !g_object_class_find_property (payobjclass, "timestamp"))
1918       goto no_stats;
1919
1920     if (seq)
1921       g_object_get (priv->payloader, "seqnum", seq, NULL);
1922
1923     if (rtptime)
1924       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
1925
1926     if (running_time)
1927       *running_time = GST_CLOCK_TIME_NONE;
1928   }
1929   g_mutex_unlock (&priv->lock);
1930
1931   return TRUE;
1932
1933   /* ERRORS */
1934 no_stats:
1935   {
1936     GST_WARNING ("Could not get payloader stats");
1937     g_mutex_unlock (&priv->lock);
1938     return FALSE;
1939   }
1940 }
1941
1942 /**
1943  * gst_rtsp_stream_get_caps:
1944  * @stream: a #GstRTSPStream
1945  *
1946  * Retrieve the current caps of @stream.
1947  *
1948  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1949  *    after usage.
1950  */
1951 GstCaps *
1952 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1953 {
1954   GstRTSPStreamPrivate *priv;
1955   GstCaps *result;
1956
1957   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1958
1959   priv = stream->priv;
1960
1961   g_mutex_lock (&priv->lock);
1962   if ((result = priv->caps))
1963     gst_caps_ref (result);
1964   g_mutex_unlock (&priv->lock);
1965
1966   return result;
1967 }
1968
1969 /**
1970  * gst_rtsp_stream_recv_rtp:
1971  * @stream: a #GstRTSPStream
1972  * @buffer: (transfer full): a #GstBuffer
1973  *
1974  * Handle an RTP buffer for the stream. This method is usually called when a
1975  * message has been received from a client using the TCP transport.
1976  *
1977  * This function takes ownership of @buffer.
1978  *
1979  * Returns: a GstFlowReturn.
1980  */
1981 GstFlowReturn
1982 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1983 {
1984   GstRTSPStreamPrivate *priv;
1985   GstFlowReturn ret;
1986   GstElement *element;
1987
1988   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1989   priv = stream->priv;
1990   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1991   g_return_val_if_fail (priv->is_joined, FALSE);
1992
1993   g_mutex_lock (&priv->lock);
1994   if (priv->appsrc[0])
1995     element = gst_object_ref (priv->appsrc[0]);
1996   else
1997     element = NULL;
1998   g_mutex_unlock (&priv->lock);
1999
2000   if (element) {
2001     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2002     gst_object_unref (element);
2003   } else {
2004     ret = GST_FLOW_OK;
2005   }
2006   return ret;
2007 }
2008
2009 /**
2010  * gst_rtsp_stream_recv_rtcp:
2011  * @stream: a #GstRTSPStream
2012  * @buffer: (transfer full): a #GstBuffer
2013  *
2014  * Handle an RTCP buffer for the stream. This method is usually called when a
2015  * message has been received from a client using the TCP transport.
2016  *
2017  * This function takes ownership of @buffer.
2018  *
2019  * Returns: a GstFlowReturn.
2020  */
2021 GstFlowReturn
2022 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2023 {
2024   GstRTSPStreamPrivate *priv;
2025   GstFlowReturn ret;
2026   GstElement *element;
2027
2028   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2029   priv = stream->priv;
2030   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2031   g_return_val_if_fail (priv->is_joined, FALSE);
2032
2033   g_mutex_lock (&priv->lock);
2034   if (priv->appsrc[1])
2035     element = gst_object_ref (priv->appsrc[1]);
2036   else
2037     element = NULL;
2038   g_mutex_unlock (&priv->lock);
2039
2040   if (element) {
2041     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2042     gst_object_unref (element);
2043   } else {
2044     ret = GST_FLOW_OK;
2045   }
2046   return ret;
2047 }
2048
2049 /* must be called with lock */
2050 static gboolean
2051 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2052     gboolean add)
2053 {
2054   GstRTSPStreamPrivate *priv = stream->priv;
2055   const GstRTSPTransport *tr;
2056
2057   tr = gst_rtsp_stream_transport_get_transport (trans);
2058
2059   switch (tr->lower_transport) {
2060     case GST_RTSP_LOWER_TRANS_UDP:
2061     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2062     {
2063       gchar *dest;
2064       gint min, max;
2065       guint ttl = 0;
2066
2067       dest = tr->destination;
2068       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2069         min = tr->port.min;
2070         max = tr->port.max;
2071         ttl = tr->ttl;
2072       } else {
2073         min = tr->client_port.min;
2074         max = tr->client_port.max;
2075       }
2076
2077       if (add) {
2078         if (ttl > 0) {
2079           GST_INFO ("setting ttl-mc %d", ttl);
2080           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2081           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2082         }
2083         GST_INFO ("adding %s:%d-%d", dest, min, max);
2084         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2085         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2086         priv->transports = g_list_prepend (priv->transports, trans);
2087       } else {
2088         GST_INFO ("removing %s:%d-%d", dest, min, max);
2089         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2090         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2091         priv->transports = g_list_remove (priv->transports, trans);
2092       }
2093       break;
2094     }
2095     case GST_RTSP_LOWER_TRANS_TCP:
2096       if (add) {
2097         GST_INFO ("adding TCP %s", tr->destination);
2098         priv->transports = g_list_prepend (priv->transports, trans);
2099       } else {
2100         GST_INFO ("removing TCP %s", tr->destination);
2101         priv->transports = g_list_remove (priv->transports, trans);
2102       }
2103       break;
2104     default:
2105       goto unknown_transport;
2106   }
2107   return TRUE;
2108
2109   /* ERRORS */
2110 unknown_transport:
2111   {
2112     GST_INFO ("Unknown transport %d", tr->lower_transport);
2113     return FALSE;
2114   }
2115 }
2116
2117
2118 /**
2119  * gst_rtsp_stream_add_transport:
2120  * @stream: a #GstRTSPStream
2121  * @trans: a #GstRTSPStreamTransport
2122  *
2123  * Add the transport in @trans to @stream. The media of @stream will
2124  * then also be send to the values configured in @trans.
2125  *
2126  * @stream must be joined to a bin.
2127  *
2128  * @trans must contain a valid #GstRTSPTransport.
2129  *
2130  * Returns: %TRUE if @trans was added
2131  */
2132 gboolean
2133 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2134     GstRTSPStreamTransport * trans)
2135 {
2136   GstRTSPStreamPrivate *priv;
2137   gboolean res;
2138
2139   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2140   priv = stream->priv;
2141   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2142   g_return_val_if_fail (priv->is_joined, FALSE);
2143
2144   g_mutex_lock (&priv->lock);
2145   res = update_transport (stream, trans, TRUE);
2146   g_mutex_unlock (&priv->lock);
2147
2148   return res;
2149 }
2150
2151 /**
2152  * gst_rtsp_stream_remove_transport:
2153  * @stream: a #GstRTSPStream
2154  * @trans: a #GstRTSPStreamTransport
2155  *
2156  * Remove the transport in @trans from @stream. The media of @stream will
2157  * not be sent to the values configured in @trans.
2158  *
2159  * @stream must be joined to a bin.
2160  *
2161  * @trans must contain a valid #GstRTSPTransport.
2162  *
2163  * Returns: %TRUE if @trans was removed
2164  */
2165 gboolean
2166 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2167     GstRTSPStreamTransport * trans)
2168 {
2169   GstRTSPStreamPrivate *priv;
2170   gboolean res;
2171
2172   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2173   priv = stream->priv;
2174   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2175   g_return_val_if_fail (priv->is_joined, FALSE);
2176
2177   g_mutex_lock (&priv->lock);
2178   res = update_transport (stream, trans, FALSE);
2179   g_mutex_unlock (&priv->lock);
2180
2181   return res;
2182 }
2183
2184 /**
2185  * gst_rtsp_stream_get_rtp_socket:
2186  * @stream: a #GstRTSPStream
2187  * @family: the socket family
2188  *
2189  * Get the RTP socket from @stream for a @family.
2190  *
2191  * @stream must be joined to a bin.
2192  *
2193  * Returns: (transfer full): the RTP socket or %NULL if no socket could be
2194  *     allocated for @family. Unref after usage
2195  */
2196 GSocket *
2197 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2198 {
2199   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2200   GSocket *socket;
2201   const gchar *name;
2202
2203   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2204   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2205       family == G_SOCKET_FAMILY_IPV6, NULL);
2206   g_return_val_if_fail (priv->udpsink[0], NULL);
2207
2208   if (family == G_SOCKET_FAMILY_IPV6)
2209     name = "socket-v6";
2210   else
2211     name = "socket";
2212
2213   g_object_get (priv->udpsink[0], name, &socket, NULL);
2214
2215   return socket;
2216 }
2217
2218 /**
2219  * gst_rtsp_stream_get_rtcp_socket:
2220  * @stream: a #GstRTSPStream
2221  * @family: the socket family
2222  *
2223  * Get the RTCP socket from @stream for a @family.
2224  *
2225  * @stream must be joined to a bin.
2226  *
2227  * Returns: (transfer full): the RTCP socket or %NULL if no socket could be
2228  *     allocated for @family. Unref after usage
2229  */
2230 GSocket *
2231 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2232 {
2233   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2234   GSocket *socket;
2235   const gchar *name;
2236
2237   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2238   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2239       family == G_SOCKET_FAMILY_IPV6, NULL);
2240   g_return_val_if_fail (priv->udpsink[1], NULL);
2241
2242   if (family == G_SOCKET_FAMILY_IPV6)
2243     name = "socket-v6";
2244   else
2245     name = "socket";
2246
2247   g_object_get (priv->udpsink[1], name, &socket, NULL);
2248
2249   return socket;
2250 }
2251
2252 /**
2253  * gst_rtsp_stream_transport_filter:
2254  * @stream: a #GstRTSPStream
2255  * @func: (scope call) (allow-none): a callback
2256  * @user_data: user data passed to @func
2257  *
2258  * Call @func for each transport managed by @stream. The result value of @func
2259  * determines what happens to the transport. @func will be called with @stream
2260  * locked so no further actions on @stream can be performed from @func.
2261  *
2262  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2263  * @stream.
2264  *
2265  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2266  *
2267  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2268  * will also be added with an additional ref to the result #GList of this
2269  * function..
2270  *
2271  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2272  *
2273  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2274  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2275  * element in the #GList should be unreffed before the list is freed.
2276  */
2277 GList *
2278 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2279     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2280 {
2281   GstRTSPStreamPrivate *priv;
2282   GList *result, *walk, *next;
2283
2284   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2285
2286   priv = stream->priv;
2287
2288   result = NULL;
2289
2290   g_mutex_lock (&priv->lock);
2291   for (walk = priv->transports; walk; walk = next) {
2292     GstRTSPStreamTransport *trans = walk->data;
2293     GstRTSPFilterResult res;
2294
2295     next = g_list_next (walk);
2296
2297     if (func)
2298       res = func (stream, trans, user_data);
2299     else
2300       res = GST_RTSP_FILTER_REF;
2301
2302     switch (res) {
2303       case GST_RTSP_FILTER_REMOVE:
2304         update_transport (stream, trans, FALSE);
2305         break;
2306       case GST_RTSP_FILTER_REF:
2307         result = g_list_prepend (result, g_object_ref (trans));
2308         break;
2309       case GST_RTSP_FILTER_KEEP:
2310       default:
2311         break;
2312     }
2313   }
2314   g_mutex_unlock (&priv->lock);
2315
2316   return result;
2317 }
2318
2319 static GstPadProbeReturn
2320 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2321 {
2322   GstRTSPStreamPrivate *priv;
2323   GstRTSPStream *stream;
2324
2325   stream = user_data;
2326   priv = stream->priv;
2327
2328   GST_DEBUG_OBJECT (pad, "now blocking");
2329
2330   g_mutex_lock (&priv->lock);
2331   priv->blocking = TRUE;
2332   g_mutex_unlock (&priv->lock);
2333
2334   gst_element_post_message (priv->payloader,
2335       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2336           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2337
2338   return GST_PAD_PROBE_OK;
2339 }
2340
2341 /**
2342  * gst_rtsp_stream_set_blocked:
2343  * @stream: a #GstRTSPStream
2344  * @blocked: boolean indicating we should block or unblock
2345  *
2346  * Blocks or unblocks the dataflow on @stream.
2347  *
2348  * Returns: %TRUE on success
2349  */
2350 gboolean
2351 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2352 {
2353   GstRTSPStreamPrivate *priv;
2354
2355   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2356
2357   priv = stream->priv;
2358
2359   g_mutex_lock (&priv->lock);
2360   if (blocked) {
2361     priv->blocking = FALSE;
2362     if (priv->blocked_id == 0) {
2363       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2364           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2365           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2366           g_object_ref (stream), g_object_unref);
2367     }
2368   } else {
2369     if (priv->blocked_id != 0) {
2370       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2371       priv->blocked_id = 0;
2372       priv->blocking = FALSE;
2373     }
2374   }
2375   g_mutex_unlock (&priv->lock);
2376
2377   return TRUE;
2378 }
2379
2380 /**
2381  * gst_rtsp_stream_is_blocking:
2382  * @stream: a #GstRTSPStream
2383  *
2384  * Check if @stream is blocking on a #GstBuffer.
2385  *
2386  * Returns: %TRUE if @stream is blocking
2387  */
2388 gboolean
2389 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2390 {
2391   GstRTSPStreamPrivate *priv;
2392   gboolean result;
2393
2394   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2395
2396   priv = stream->priv;
2397
2398   g_mutex_lock (&priv->lock);
2399   result = priv->blocking;
2400   g_mutex_unlock (&priv->lock);
2401
2402   return result;
2403 }