stream: add API to block streams
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPLowerTrans protocols;
72
73   /* pads on the rtpbin */
74   GstPad *send_rtp_sink;
75   GstPad *recv_sink[2];
76   GstPad *send_src[2];
77
78   /* the RTPSession object */
79   GObject *session;
80
81   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
82    * sockets */
83   GstElement *udpsrc_v4[2];
84
85   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
86    * sockets */
87   GstElement *udpsrc_v6[2];
88
89   GstElement *udpsink[2];
90
91   /* for TCP transport */
92   GstElement *appsrc[2];
93   GstElement *appqueue[2];
94   GstElement *appsink[2];
95
96   GstElement *tee[2];
97   GstElement *funnel[2];
98
99   /* server ports for sending/receiving over ipv4 */
100   GstRTSPRange server_port_v4;
101   GstRTSPAddress *server_addr_v4;
102   gboolean have_ipv4;
103
104   /* server ports for sending/receiving over ipv6 */
105   GstRTSPRange server_port_v6;
106   GstRTSPAddress *server_addr_v6;
107   gboolean have_ipv6;
108
109   /* multicast addresses */
110   GstRTSPAddressPool *pool;
111   GstRTSPAddress *addr_v4;
112   GstRTSPAddress *addr_v6;
113
114   /* the caps of the stream */
115   gulong caps_sig;
116   GstCaps *caps;
117
118   /* transports we stream to */
119   guint n_active;
120   GList *transports;
121
122   gint dscp_qos;
123
124   /* stream blocking */
125   gulong blocked_id;
126   gboolean blocking;
127 };
128
129 #define DEFAULT_CONTROL         NULL
130 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
131                                         GST_RTSP_LOWER_TRANS_TCP
132
133 enum
134 {
135   PROP_0,
136   PROP_CONTROL,
137   PROP_PROTOCOLS,
138   PROP_LAST
139 };
140
141 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
142 #define GST_CAT_DEFAULT rtsp_stream_debug
143
144 static GQuark ssrc_stream_map_key;
145
146 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
147     GValue * value, GParamSpec * pspec);
148 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
149     const GValue * value, GParamSpec * pspec);
150
151 static void gst_rtsp_stream_finalize (GObject * obj);
152
153 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
154
155 static void
156 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
157 {
158   GObjectClass *gobject_class;
159
160   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
161
162   gobject_class = G_OBJECT_CLASS (klass);
163
164   gobject_class->get_property = gst_rtsp_stream_get_property;
165   gobject_class->set_property = gst_rtsp_stream_set_property;
166   gobject_class->finalize = gst_rtsp_stream_finalize;
167
168   g_object_class_install_property (gobject_class, PROP_CONTROL,
169       g_param_spec_string ("control", "Control",
170           "The control string for this stream", DEFAULT_CONTROL,
171           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172
173   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
174       g_param_spec_flags ("protocols", "Protocols",
175           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
176           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177
178   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
179
180   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
181 }
182
183 static void
184 gst_rtsp_stream_init (GstRTSPStream * stream)
185 {
186   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
187
188   GST_DEBUG ("new stream %p", stream);
189
190   stream->priv = priv;
191
192   priv->dscp_qos = -1;
193   priv->control = g_strdup (DEFAULT_CONTROL);
194   priv->protocols = DEFAULT_PROTOCOLS;
195
196   g_mutex_init (&priv->lock);
197 }
198
199 static void
200 gst_rtsp_stream_finalize (GObject * obj)
201 {
202   GstRTSPStream *stream;
203   GstRTSPStreamPrivate *priv;
204
205   stream = GST_RTSP_STREAM (obj);
206   priv = stream->priv;
207
208   GST_DEBUG ("finalize stream %p", stream);
209
210   /* we really need to be unjoined now */
211   g_return_if_fail (!priv->is_joined);
212
213   if (priv->addr_v4)
214     gst_rtsp_address_free (priv->addr_v4);
215   if (priv->addr_v6)
216     gst_rtsp_address_free (priv->addr_v6);
217   if (priv->server_addr_v4)
218     gst_rtsp_address_free (priv->server_addr_v4);
219   if (priv->server_addr_v6)
220     gst_rtsp_address_free (priv->server_addr_v6);
221   if (priv->pool)
222     g_object_unref (priv->pool);
223   gst_object_unref (priv->payloader);
224   gst_object_unref (priv->srcpad);
225   g_free (priv->control);
226   g_mutex_clear (&priv->lock);
227
228   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
229 }
230
231 static void
232 gst_rtsp_stream_get_property (GObject * object, guint propid,
233     GValue * value, GParamSpec * pspec)
234 {
235   GstRTSPStream *stream = GST_RTSP_STREAM (object);
236
237   switch (propid) {
238     case PROP_CONTROL:
239       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
240       break;
241     case PROP_PROTOCOLS:
242       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
243       break;
244     default:
245       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
246   }
247 }
248
249 static void
250 gst_rtsp_stream_set_property (GObject * object, guint propid,
251     const GValue * value, GParamSpec * pspec)
252 {
253   GstRTSPStream *stream = GST_RTSP_STREAM (object);
254
255   switch (propid) {
256     case PROP_CONTROL:
257       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
258       break;
259     case PROP_PROTOCOLS:
260       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
261       break;
262     default:
263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
264   }
265 }
266
267 /**
268  * gst_rtsp_stream_new:
269  * @idx: an index
270  * @srcpad: a #GstPad
271  * @payloader: a #GstElement
272  *
273  * Create a new media stream with index @idx that handles RTP data on
274  * @srcpad and has a payloader element @payloader.
275  *
276  * Returns: a new #GstRTSPStream
277  */
278 GstRTSPStream *
279 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
280 {
281   GstRTSPStreamPrivate *priv;
282   GstRTSPStream *stream;
283
284   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
285   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
286   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
287
288   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
289   priv = stream->priv;
290   priv->idx = idx;
291   priv->payloader = gst_object_ref (payloader);
292   priv->srcpad = gst_object_ref (srcpad);
293
294   return stream;
295 }
296
297 /**
298  * gst_rtsp_stream_get_index:
299  * @stream: a #GstRTSPStream
300  *
301  * Get the stream index.
302  *
303  * Return: the stream index.
304  */
305 guint
306 gst_rtsp_stream_get_index (GstRTSPStream * stream)
307 {
308   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
309
310   return stream->priv->idx;
311 }
312
313 /**
314  * gst_rtsp_stream_get_pt:
315  * @stream: a #GstRTSPStream
316  *
317  * Get the stream payload type.
318  *
319  * Return: the stream payload type.
320  */
321 guint
322 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
323 {
324   GstRTSPStreamPrivate *priv;
325   guint pt;
326
327   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
328
329   priv = stream->priv;
330
331   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
332
333   return pt;
334 }
335
336 /**
337  * gst_rtsp_stream_get_srcpad:
338  * @stream: a #GstRTSPStream
339  *
340  * Get the srcpad associated with @stream.
341  *
342  * Returns: (transfer full): the srcpad. Unref after usage.
343  */
344 GstPad *
345 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
346 {
347   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
348
349   return gst_object_ref (stream->priv->srcpad);
350 }
351
352 /**
353  * gst_rtsp_stream_get_control:
354  * @stream: a #GstRTSPStream
355  *
356  * Get the control string to identify this stream.
357  *
358  * Returns: (transfer full): the control string. free after usage.
359  */
360 gchar *
361 gst_rtsp_stream_get_control (GstRTSPStream * stream)
362 {
363   GstRTSPStreamPrivate *priv;
364   gchar *result;
365
366   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
367
368   priv = stream->priv;
369
370   g_mutex_lock (&priv->lock);
371   if ((result = g_strdup (priv->control)) == NULL)
372     result = g_strdup_printf ("stream=%u", priv->idx);
373   g_mutex_unlock (&priv->lock);
374
375   return result;
376 }
377
378 /**
379  * gst_rtsp_stream_set_control:
380  * @stream: a #GstRTSPStream
381  * @control: a control string
382  *
383  * Set the control string in @stream.
384  */
385 void
386 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
387 {
388   GstRTSPStreamPrivate *priv;
389
390   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
391
392   priv = stream->priv;
393
394   g_mutex_lock (&priv->lock);
395   g_free (priv->control);
396   priv->control = g_strdup (control);
397   g_mutex_unlock (&priv->lock);
398 }
399
400 /**
401  * gst_rtsp_stream_has_control:
402  * @stream: a #GstRTSPStream
403  * @control: a control string
404  *
405  * Check if @stream has the control string @control.
406  *
407  * Returns: %TRUE is @stream has @control as the control string
408  */
409 gboolean
410 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
411 {
412   GstRTSPStreamPrivate *priv;
413   gboolean res;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
416
417   priv = stream->priv;
418
419   g_mutex_lock (&priv->lock);
420   if (priv->control)
421     res = (g_strcmp0 (priv->control, control) == 0);
422   else {
423     guint streamid;
424     sscanf (control, "stream=%u", &streamid);
425     res = (streamid == priv->idx);
426   }
427   g_mutex_unlock (&priv->lock);
428
429   return res;
430 }
431
432 /**
433  * gst_rtsp_stream_set_mtu:
434  * @stream: a #GstRTSPStream
435  * @mtu: a new MTU
436  *
437  * Configure the mtu in the payloader of @stream to @mtu.
438  */
439 void
440 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
441 {
442   GstRTSPStreamPrivate *priv;
443
444   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
445
446   priv = stream->priv;
447
448   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
449
450   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
451 }
452
453 /**
454  * gst_rtsp_stream_get_mtu:
455  * @stream: a #GstRTSPStream
456  *
457  * Get the configured MTU in the payloader of @stream.
458  *
459  * Returns: the MTU of the payloader.
460  */
461 guint
462 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
463 {
464   GstRTSPStreamPrivate *priv;
465   guint mtu;
466
467   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
468
469   priv = stream->priv;
470
471   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
472
473   return mtu;
474 }
475
476 /* Update the dscp qos property on the udp sinks */
477 static void
478 update_dscp_qos (GstRTSPStream * stream)
479 {
480   GstRTSPStreamPrivate *priv;
481
482   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
483
484   priv = stream->priv;
485
486   if (priv->udpsink[0]) {
487     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
488         NULL);
489   }
490
491   if (priv->udpsink[1]) {
492     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
493         NULL);
494   }
495 }
496
497 /**
498  * gst_rtsp_stream_set_dscp_qos:
499  * @stream: a #GstRTSPStream
500  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
501  *
502  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
503  */
504 void
505 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
506 {
507   GstRTSPStreamPrivate *priv;
508
509   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
510
511   priv = stream->priv;
512
513   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
514
515   if (dscp_qos < -1 || dscp_qos > 63) {
516     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
517     return;
518   }
519
520   priv->dscp_qos = dscp_qos;
521
522   update_dscp_qos (stream);
523 }
524
525 /**
526  * gst_rtsp_stream_get_dscp_qos:
527  * @stream: a #GstRTSPStream
528  *
529  * Get the configured DSCP QoS in of the outgoing sockets.
530  *
531  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
532  */
533 gint
534 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
535 {
536   GstRTSPStreamPrivate *priv;
537
538   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
539
540   priv = stream->priv;
541
542   return priv->dscp_qos;
543 }
544
545 /**
546  * gst_rtsp_stream_set_protocols:
547  * @stream: a #GstRTSPStream
548  * @protocols: the new flags
549  *
550  * Configure the allowed lower transport for @stream.
551  */
552 void
553 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
554     GstRTSPLowerTrans protocols)
555 {
556   GstRTSPStreamPrivate *priv;
557
558   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
559
560   priv = stream->priv;
561
562   g_mutex_lock (&priv->lock);
563   priv->protocols = protocols;
564   g_mutex_unlock (&priv->lock);
565 }
566
567 /**
568  * gst_rtsp_stream_get_protocols:
569  * @stream: a #GstRTSPStream
570  *
571  * Get the allowed protocols of @stream.
572  *
573  * Returns: a #GstRTSPLowerTrans
574  */
575 GstRTSPLowerTrans
576 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
577 {
578   GstRTSPStreamPrivate *priv;
579   GstRTSPLowerTrans res;
580
581   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
582       GST_RTSP_LOWER_TRANS_UNKNOWN);
583
584   priv = stream->priv;
585
586   g_mutex_lock (&priv->lock);
587   res = priv->protocols;
588   g_mutex_unlock (&priv->lock);
589
590   return res;
591 }
592
593 /**
594  * gst_rtsp_stream_set_address_pool:
595  * @stream: a #GstRTSPStream
596  * @pool: a #GstRTSPAddressPool
597  *
598  * configure @pool to be used as the address pool of @stream.
599  */
600 void
601 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
602     GstRTSPAddressPool * pool)
603 {
604   GstRTSPStreamPrivate *priv;
605   GstRTSPAddressPool *old;
606
607   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
608
609   priv = stream->priv;
610
611   GST_LOG_OBJECT (stream, "set address pool %p", pool);
612
613   g_mutex_lock (&priv->lock);
614   if ((old = priv->pool) != pool)
615     priv->pool = pool ? g_object_ref (pool) : NULL;
616   else
617     old = NULL;
618   g_mutex_unlock (&priv->lock);
619
620   if (old)
621     g_object_unref (old);
622 }
623
624 /**
625  * gst_rtsp_stream_get_address_pool:
626  * @stream: a #GstRTSPStream
627  *
628  * Get the #GstRTSPAddressPool used as the address pool of @stream.
629  *
630  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
631  * usage.
632  */
633 GstRTSPAddressPool *
634 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
635 {
636   GstRTSPStreamPrivate *priv;
637   GstRTSPAddressPool *result;
638
639   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
640
641   priv = stream->priv;
642
643   g_mutex_lock (&priv->lock);
644   if ((result = priv->pool))
645     g_object_ref (result);
646   g_mutex_unlock (&priv->lock);
647
648   return result;
649 }
650
651 /**
652  * gst_rtsp_stream_get_multicast_address:
653  * @stream: a #GstRTSPStream
654  * @family: the #GSocketFamily
655  *
656  * Get the multicast address of @stream for @family.
657  *
658  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
659  * allocated. gst_rtsp_address_free() after usage.
660  */
661 GstRTSPAddress *
662 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
663     GSocketFamily family)
664 {
665   GstRTSPStreamPrivate *priv;
666   GstRTSPAddress *result;
667   GstRTSPAddress **addrp;
668   GstRTSPAddressFlags flags;
669
670   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
671
672   priv = stream->priv;
673
674   if (family == G_SOCKET_FAMILY_IPV6) {
675     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
676     addrp = &priv->addr_v4;
677   } else {
678     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
679     addrp = &priv->addr_v6;
680   }
681
682   g_mutex_lock (&priv->lock);
683   if (*addrp == NULL) {
684     if (priv->pool == NULL)
685       goto no_pool;
686
687     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
688
689     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
690     if (*addrp == NULL)
691       goto no_address;
692   }
693   result = gst_rtsp_address_copy (*addrp);
694   g_mutex_unlock (&priv->lock);
695
696   return result;
697
698   /* ERRORS */
699 no_pool:
700   {
701     GST_ERROR_OBJECT (stream, "no address pool specified");
702     g_mutex_unlock (&priv->lock);
703     return NULL;
704   }
705 no_address:
706   {
707     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
708     g_mutex_unlock (&priv->lock);
709     return NULL;
710   }
711 }
712
713 /**
714  * gst_rtsp_stream_reserve_address:
715  * @stream: a #GstRTSPStream
716  * @address: an address
717  * @port: a port
718  * @n_ports: n_ports
719  * @ttl: a TTL
720  *
721  * Reserve @address and @port as the address and port of @stream.
722  *
723  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
724  * reserved. gst_rtsp_address_free() after usage.
725  */
726 GstRTSPAddress *
727 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
728     const gchar * address, guint port, guint n_ports, guint ttl)
729 {
730   GstRTSPStreamPrivate *priv;
731   GstRTSPAddress *result;
732   GInetAddress *addr;
733   GSocketFamily family;
734   GstRTSPAddress **addrp;
735
736   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
737   g_return_val_if_fail (address != NULL, NULL);
738   g_return_val_if_fail (port > 0, NULL);
739   g_return_val_if_fail (n_ports > 0, NULL);
740   g_return_val_if_fail (ttl > 0, NULL);
741
742   priv = stream->priv;
743
744   addr = g_inet_address_new_from_string (address);
745   if (!addr) {
746     GST_ERROR ("failed to get inet addr from %s", address);
747     family = G_SOCKET_FAMILY_IPV4;
748   } else {
749     family = g_inet_address_get_family (addr);
750     g_object_unref (addr);
751   }
752
753   if (family == G_SOCKET_FAMILY_IPV6)
754     addrp = &priv->addr_v4;
755   else
756     addrp = &priv->addr_v6;
757
758   g_mutex_lock (&priv->lock);
759   if (*addrp == NULL) {
760     GstRTSPAddressPoolResult res;
761
762     if (priv->pool == NULL)
763       goto no_pool;
764
765     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
766         port, n_ports, ttl, addrp);
767     if (res != GST_RTSP_ADDRESS_POOL_OK)
768       goto no_address;
769   } else {
770     if (strcmp ((*addrp)->address, address) ||
771         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
772         (*addrp)->ttl != ttl)
773       goto different_address;
774   }
775   result = gst_rtsp_address_copy (*addrp);
776   g_mutex_unlock (&priv->lock);
777
778   return result;
779
780   /* ERRORS */
781 no_pool:
782   {
783     GST_ERROR_OBJECT (stream, "no address pool specified");
784     g_mutex_unlock (&priv->lock);
785     return NULL;
786   }
787 no_address:
788   {
789     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
790         address);
791     g_mutex_unlock (&priv->lock);
792     return NULL;
793   }
794 different_address:
795   {
796     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
797         " reserved", address);
798     g_mutex_unlock (&priv->lock);
799     return NULL;
800   }
801 }
802
803 static gboolean
804 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
805     GSocketFamily family, GstElement * udpsrc_out[2],
806     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
807     GstRTSPAddress ** server_addr_out)
808 {
809   GstStateChangeReturn ret;
810   GstElement *udpsrc0, *udpsrc1;
811   GstElement *udpsink0, *udpsink1;
812   GSocket *rtp_socket = NULL;
813   GSocket *rtcp_socket;
814   gint tmp_rtp, tmp_rtcp;
815   guint count;
816   gint rtpport, rtcpport;
817   GList *rejected_addresses = NULL;
818   GstRTSPAddress *addr = NULL;
819   GInetAddress *inetaddr = NULL;
820   GSocketAddress *rtp_sockaddr = NULL;
821   GSocketAddress *rtcp_sockaddr = NULL;
822   const gchar *multisink_socket;
823
824   if (family == G_SOCKET_FAMILY_IPV6)
825     multisink_socket = "socket-v6";
826   else
827     multisink_socket = "socket";
828
829   udpsrc0 = NULL;
830   udpsrc1 = NULL;
831   udpsink0 = NULL;
832   udpsink1 = NULL;
833   count = 0;
834
835   /* Start with random port */
836   tmp_rtp = 0;
837
838   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
839       G_SOCKET_PROTOCOL_UDP, NULL);
840   if (!rtcp_socket)
841     goto no_udp_protocol;
842
843   if (*server_addr_out)
844     gst_rtsp_address_free (*server_addr_out);
845
846   /* try to allocate 2 UDP ports, the RTP port should be an even
847    * number and the RTCP port should be the next (uneven) port */
848 again:
849
850   if (rtp_socket == NULL) {
851     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
852         G_SOCKET_PROTOCOL_UDP, NULL);
853     if (!rtp_socket)
854       goto no_udp_protocol;
855   }
856
857   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
858     GstRTSPAddressFlags flags;
859
860     if (addr)
861       rejected_addresses = g_list_prepend (rejected_addresses, addr);
862
863     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
864     if (family == G_SOCKET_FAMILY_IPV6)
865       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
866     else
867       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
868
869     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
870
871     if (addr == NULL)
872       goto no_ports;
873
874     tmp_rtp = addr->port;
875
876     g_clear_object (&inetaddr);
877     inetaddr = g_inet_address_new_from_string (addr->address);
878   } else {
879     if (tmp_rtp != 0) {
880       tmp_rtp += 2;
881       if (++count > 20)
882         goto no_ports;
883     }
884
885     if (inetaddr == NULL)
886       inetaddr = g_inet_address_new_any (family);
887   }
888
889   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
890   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
891     g_object_unref (rtp_sockaddr);
892     goto again;
893   }
894   g_object_unref (rtp_sockaddr);
895
896   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
897   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
898     g_clear_object (&rtp_sockaddr);
899     goto socket_error;
900   }
901
902   tmp_rtp =
903       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
904   g_object_unref (rtp_sockaddr);
905
906   /* check if port is even */
907   if ((tmp_rtp & 1) != 0) {
908     /* port not even, close and allocate another */
909     tmp_rtp++;
910     g_clear_object (&rtp_socket);
911     goto again;
912   }
913
914   /* set port */
915   tmp_rtcp = tmp_rtp + 1;
916
917   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
918   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
919     g_object_unref (rtcp_sockaddr);
920     g_clear_object (&rtp_socket);
921     goto again;
922   }
923   g_object_unref (rtcp_sockaddr);
924
925   g_clear_object (&inetaddr);
926
927   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
928   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
929
930   if (udpsrc0 == NULL || udpsrc1 == NULL)
931     goto no_udp_protocol;
932
933   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
934   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
935
936   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
937   if (ret == GST_STATE_CHANGE_FAILURE)
938     goto element_error;
939   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
940   if (ret == GST_STATE_CHANGE_FAILURE)
941     goto element_error;
942
943   /* all fine, do port check */
944   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
945   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
946
947   /* this should not happen... */
948   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
949     goto port_error;
950
951   if (udpsink_out[0])
952     udpsink0 = udpsink_out[0];
953   else
954     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
955
956   if (!udpsink0)
957     goto no_udp_protocol;
958
959   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
960   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
961
962   if (udpsink_out[1])
963     udpsink1 = udpsink_out[1];
964   else
965     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
966
967   if (!udpsink1)
968     goto no_udp_protocol;
969
970   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
971   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
972   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
973
974   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
975   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
976   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
977   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
978   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
979   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
980   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
981   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
982
983   /* we keep these elements, we will further configure them when the
984    * client told us to really use the UDP ports. */
985   udpsrc_out[0] = udpsrc0;
986   udpsrc_out[1] = udpsrc1;
987   udpsink_out[0] = udpsink0;
988   udpsink_out[1] = udpsink1;
989   server_port_out->min = rtpport;
990   server_port_out->max = rtcpport;
991
992   *server_addr_out = addr;
993   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
994
995   g_object_unref (rtp_socket);
996   g_object_unref (rtcp_socket);
997
998   return TRUE;
999
1000   /* ERRORS */
1001 no_udp_protocol:
1002   {
1003     goto cleanup;
1004   }
1005 no_ports:
1006   {
1007     goto cleanup;
1008   }
1009 port_error:
1010   {
1011     goto cleanup;
1012   }
1013 socket_error:
1014   {
1015     goto cleanup;
1016   }
1017 element_error:
1018   {
1019     goto cleanup;
1020   }
1021 cleanup:
1022   {
1023     if (udpsrc0) {
1024       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1025       gst_object_unref (udpsrc0);
1026     }
1027     if (udpsrc1) {
1028       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1029       gst_object_unref (udpsrc1);
1030     }
1031     if (udpsink0) {
1032       gst_element_set_state (udpsink0, GST_STATE_NULL);
1033       gst_object_unref (udpsink0);
1034     }
1035     if (udpsink1) {
1036       gst_element_set_state (udpsink1, GST_STATE_NULL);
1037       gst_object_unref (udpsink1);
1038     }
1039     if (inetaddr)
1040       g_object_unref (inetaddr);
1041     g_list_free_full (rejected_addresses,
1042         (GDestroyNotify) gst_rtsp_address_free);
1043     if (addr)
1044       gst_rtsp_address_free (addr);
1045     if (rtp_socket)
1046       g_object_unref (rtp_socket);
1047     if (rtcp_socket)
1048       g_object_unref (rtcp_socket);
1049     return FALSE;
1050   }
1051 }
1052
1053 /* must be called with lock */
1054 static gboolean
1055 alloc_ports (GstRTSPStream * stream)
1056 {
1057   GstRTSPStreamPrivate *priv = stream->priv;
1058
1059   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1060       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1061       &priv->server_port_v4, &priv->server_addr_v4);
1062
1063   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1064       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1065       &priv->server_port_v6, &priv->server_addr_v6);
1066
1067   return priv->have_ipv4 || priv->have_ipv6;
1068 }
1069
1070 /**
1071  * gst_rtsp_stream_get_server_port:
1072  * @stream: a #GstRTSPStream
1073  * @server_port: (out): result server port
1074  * @family: the port family to get
1075  *
1076  * Fill @server_port with the port pair used by the server. This function can
1077  * only be called when @stream has been joined.
1078  */
1079 void
1080 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1081     GstRTSPRange * server_port, GSocketFamily family)
1082 {
1083   GstRTSPStreamPrivate *priv;
1084
1085   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1086   priv = stream->priv;
1087   g_return_if_fail (priv->is_joined);
1088
1089   g_mutex_lock (&priv->lock);
1090   if (family == G_SOCKET_FAMILY_IPV4) {
1091     if (server_port)
1092       *server_port = priv->server_port_v4;
1093   } else {
1094     if (server_port)
1095       *server_port = priv->server_port_v6;
1096   }
1097   g_mutex_unlock (&priv->lock);
1098 }
1099
1100 /**
1101  * gst_rtsp_stream_get_rtpsession:
1102  * @stream: a #GstRTSPStream
1103  *
1104  * Get the RTP session of this stream.
1105  *
1106  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1107  */
1108 GObject *
1109 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1110 {
1111   GstRTSPStreamPrivate *priv;
1112   GObject *session;
1113
1114   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1115
1116   priv = stream->priv;
1117
1118   g_mutex_lock (&priv->lock);
1119   if ((session = priv->session))
1120     g_object_ref (session);
1121   g_mutex_unlock (&priv->lock);
1122
1123   return session;
1124 }
1125
1126 /**
1127  * gst_rtsp_stream_get_ssrc:
1128  * @stream: a #GstRTSPStream
1129  * @ssrc: (out): result ssrc
1130  *
1131  * Get the SSRC used by the RTP session of this stream. This function can only
1132  * be called when @stream has been joined.
1133  */
1134 void
1135 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1136 {
1137   GstRTSPStreamPrivate *priv;
1138
1139   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1140   priv = stream->priv;
1141   g_return_if_fail (priv->is_joined);
1142
1143   g_mutex_lock (&priv->lock);
1144   if (ssrc && priv->session)
1145     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1146   g_mutex_unlock (&priv->lock);
1147 }
1148
1149 /* executed from streaming thread */
1150 static void
1151 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1152 {
1153   GstRTSPStreamPrivate *priv = stream->priv;
1154   GstCaps *newcaps, *oldcaps;
1155
1156   newcaps = gst_pad_get_current_caps (pad);
1157
1158   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1159       newcaps);
1160
1161   g_mutex_lock (&priv->lock);
1162   oldcaps = priv->caps;
1163   priv->caps = newcaps;
1164   g_mutex_unlock (&priv->lock);
1165
1166   if (oldcaps)
1167     gst_caps_unref (oldcaps);
1168 }
1169
1170 static void
1171 dump_structure (const GstStructure * s)
1172 {
1173   gchar *sstr;
1174
1175   sstr = gst_structure_to_string (s);
1176   GST_INFO ("structure: %s", sstr);
1177   g_free (sstr);
1178 }
1179
1180 static GstRTSPStreamTransport *
1181 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1182 {
1183   GstRTSPStreamPrivate *priv = stream->priv;
1184   GList *walk;
1185   GstRTSPStreamTransport *result = NULL;
1186   const gchar *tmp;
1187   gchar *dest;
1188   guint port;
1189
1190   if (rtcp_from == NULL)
1191     return NULL;
1192
1193   tmp = g_strrstr (rtcp_from, ":");
1194   if (tmp == NULL)
1195     return NULL;
1196
1197   port = atoi (tmp + 1);
1198   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1199
1200   g_mutex_lock (&priv->lock);
1201   GST_INFO ("finding %s:%d in %d transports", dest, port,
1202       g_list_length (priv->transports));
1203
1204   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1205     GstRTSPStreamTransport *trans = walk->data;
1206     const GstRTSPTransport *tr;
1207     gint min, max;
1208
1209     tr = gst_rtsp_stream_transport_get_transport (trans);
1210
1211     min = tr->client_port.min;
1212     max = tr->client_port.max;
1213
1214     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1215       result = trans;
1216       break;
1217     }
1218   }
1219   if (result)
1220     g_object_ref (result);
1221   g_mutex_unlock (&priv->lock);
1222
1223   g_free (dest);
1224
1225   return result;
1226 }
1227
1228 static GstRTSPStreamTransport *
1229 check_transport (GObject * source, GstRTSPStream * stream)
1230 {
1231   GstStructure *stats;
1232   GstRTSPStreamTransport *trans;
1233
1234   /* see if we have a stream to match with the origin of the RTCP packet */
1235   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1236   if (trans == NULL) {
1237     g_object_get (source, "stats", &stats, NULL);
1238     if (stats) {
1239       const gchar *rtcp_from;
1240
1241       dump_structure (stats);
1242
1243       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1244       if ((trans = find_transport (stream, rtcp_from))) {
1245         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1246             source);
1247         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1248             g_object_unref);
1249       }
1250       gst_structure_free (stats);
1251     }
1252   }
1253   return trans;
1254 }
1255
1256
1257 static void
1258 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1259 {
1260   GstRTSPStreamTransport *trans;
1261
1262   GST_INFO ("%p: new source %p", stream, source);
1263
1264   trans = check_transport (source, stream);
1265
1266   if (trans)
1267     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1268 }
1269
1270 static void
1271 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1272 {
1273   GST_INFO ("%p: new SDES %p", stream, source);
1274 }
1275
1276 static void
1277 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1278 {
1279   GstRTSPStreamTransport *trans;
1280
1281   trans = check_transport (source, stream);
1282
1283   if (trans) {
1284     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1285     gst_rtsp_stream_transport_keep_alive (trans);
1286   }
1287 #ifdef DUMP_STATS
1288   {
1289     GstStructure *stats;
1290     g_object_get (source, "stats", &stats, NULL);
1291     if (stats) {
1292       dump_structure (stats);
1293       gst_structure_free (stats);
1294     }
1295   }
1296 #endif
1297 }
1298
1299 static void
1300 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1301 {
1302   GST_INFO ("%p: source %p bye", stream, source);
1303 }
1304
1305 static void
1306 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1307 {
1308   GstRTSPStreamTransport *trans;
1309
1310   GST_INFO ("%p: source %p bye timeout", stream, source);
1311
1312   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1313     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1314     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1315   }
1316 }
1317
1318 static void
1319 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1320 {
1321   GstRTSPStreamTransport *trans;
1322
1323   GST_INFO ("%p: source %p timeout", stream, source);
1324
1325   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1326     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1327     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1328   }
1329 }
1330
1331 static GstFlowReturn
1332 handle_new_sample (GstAppSink * sink, gpointer user_data)
1333 {
1334   GstRTSPStreamPrivate *priv;
1335   GList *walk;
1336   GstSample *sample;
1337   GstBuffer *buffer;
1338   GstRTSPStream *stream;
1339
1340   sample = gst_app_sink_pull_sample (sink);
1341   if (!sample)
1342     return GST_FLOW_OK;
1343
1344   stream = (GstRTSPStream *) user_data;
1345   priv = stream->priv;
1346   buffer = gst_sample_get_buffer (sample);
1347
1348   g_mutex_lock (&priv->lock);
1349   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1350     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1351
1352     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1353       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1354     } else {
1355       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1356     }
1357   }
1358   g_mutex_unlock (&priv->lock);
1359
1360   gst_sample_unref (sample);
1361
1362   return GST_FLOW_OK;
1363 }
1364
1365 static GstAppSinkCallbacks sink_cb = {
1366   NULL,                         /* not interested in EOS */
1367   NULL,                         /* not interested in preroll samples */
1368   handle_new_sample,
1369 };
1370
1371 /**
1372  * gst_rtsp_stream_join_bin:
1373  * @stream: a #GstRTSPStream
1374  * @bin: a #GstBin to join
1375  * @rtpbin: a rtpbin element in @bin
1376  * @state: the target state of the new elements
1377  *
1378  * Join the #GstBin @bin that contains the element @rtpbin.
1379  *
1380  * @stream will link to @rtpbin, which must be inside @bin. The elements
1381  * added to @bin will be set to the state given in @state.
1382  *
1383  * Returns: %TRUE on success.
1384  */
1385 gboolean
1386 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1387     GstElement * rtpbin, GstState state)
1388 {
1389   GstRTSPStreamPrivate *priv;
1390   gint i;
1391   guint idx;
1392   gchar *name;
1393   GstPad *pad, *sinkpad, *selpad;
1394   GstPadLinkReturn ret;
1395
1396   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1397   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1398   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1399
1400   priv = stream->priv;
1401
1402   g_mutex_lock (&priv->lock);
1403   if (priv->is_joined)
1404     goto was_joined;
1405
1406   /* create a session with the same index as the stream */
1407   idx = priv->idx;
1408
1409   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1410
1411   if (!alloc_ports (stream))
1412     goto no_ports;
1413
1414   /* update the dscp qos field in the sinks */
1415   update_dscp_qos (stream);
1416
1417   /* get a pad for sending RTP */
1418   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1419   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1420   g_free (name);
1421   /* link the RTP pad to the session manager, it should not really fail unless
1422    * this is not really an RTP pad */
1423   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1424   if (ret != GST_PAD_LINK_OK)
1425     goto link_failed;
1426
1427   /* get pads from the RTP session element for sending and receiving
1428    * RTP/RTCP*/
1429   name = g_strdup_printf ("send_rtp_src_%u", idx);
1430   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1431   g_free (name);
1432   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1433   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1434   g_free (name);
1435   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1436   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1437   g_free (name);
1438   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1439   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1440   g_free (name);
1441
1442   /* get the session */
1443   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1444
1445   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1446       stream);
1447   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1448       stream);
1449   g_signal_connect (priv->session, "on-ssrc-active",
1450       (GCallback) on_ssrc_active, stream);
1451   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1452       stream);
1453   g_signal_connect (priv->session, "on-bye-timeout",
1454       (GCallback) on_bye_timeout, stream);
1455   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1456       stream);
1457
1458   for (i = 0; i < 2; i++) {
1459     GstPad *teepad, *queuepad;
1460     /* For the sender we create this bit of pipeline for both
1461      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1462      * we need to add a queue before appsink to make the pipeline
1463      * not block. For the TCP case, we want to pump data to the
1464      * client as fast as possible anyway.
1465      *
1466      * .--------.      .-----.    .---------.
1467      * | rtpbin |      | tee |    | udpsink |
1468      * |       send->sink   src->sink       |
1469      * '--------'      |     |    '---------'
1470      *                 |     |    .---------.    .---------.
1471      *                 |     |    |  queue  |    | appsink |
1472      *                 |    src->sink      src->sink       |
1473      *                 '-----'    '---------'    '---------'
1474      *
1475      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1476      * udpsink directly to the session.
1477      */
1478     /* add udpsink */
1479     gst_bin_add (bin, priv->udpsink[i]);
1480     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1481
1482     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1483       /* make tee for RTP/RTCP */
1484       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1485       gst_bin_add (bin, priv->tee[i]);
1486
1487       /* and link to rtpbin send pad */
1488       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1489       gst_pad_link (priv->send_src[i], pad);
1490       gst_object_unref (pad);
1491
1492       /* link tee to udpsink */
1493       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1494       gst_pad_link (teepad, sinkpad);
1495       gst_object_unref (teepad);
1496
1497       /* make queue */
1498       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1499       gst_bin_add (bin, priv->appqueue[i]);
1500       /* and link to tee */
1501       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1502       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1503       gst_pad_link (teepad, pad);
1504       gst_object_unref (pad);
1505       gst_object_unref (teepad);
1506
1507       /* make appsink */
1508       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1509       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1510       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1511       gst_bin_add (bin, priv->appsink[i]);
1512       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1513           &sink_cb, stream, NULL);
1514       /* and link to queue */
1515       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1516       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1517       gst_pad_link (queuepad, pad);
1518       gst_object_unref (pad);
1519       gst_object_unref (queuepad);
1520     } else {
1521       /* else only udpsink needed, link it to the session */
1522       gst_pad_link (priv->send_src[i], sinkpad);
1523     }
1524     gst_object_unref (sinkpad);
1525
1526     /* For the receiver we create this bit of pipeline for both
1527      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1528      * and it is all funneled into the rtpbin receive pad.
1529      *
1530      * .--------.     .--------.    .--------.
1531      * | udpsrc |     | funnel |    | rtpbin |
1532      * |       src->sink      src->sink      |
1533      * '--------'     |        |    '--------'
1534      * .--------.     |        |
1535      * | appsrc |     |        |
1536      * |       src->sink       |
1537      * '--------'     '--------'
1538      */
1539     /* make funnel for the RTP/RTCP receivers */
1540     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1541     gst_bin_add (bin, priv->funnel[i]);
1542
1543     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1544     gst_pad_link (pad, priv->recv_sink[i]);
1545     gst_object_unref (pad);
1546
1547     if (priv->udpsrc_v4[i]) {
1548       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1549        * values */
1550       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1551       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1552       /* add udpsrc */
1553       gst_bin_add (bin, priv->udpsrc_v4[i]);
1554
1555       /* and link to the funnel v4 */
1556       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1557       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1558       gst_pad_link (pad, selpad);
1559       gst_object_unref (pad);
1560       gst_object_unref (selpad);
1561     }
1562
1563     if (priv->udpsrc_v6[i]) {
1564       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1565       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1566       gst_bin_add (bin, priv->udpsrc_v6[i]);
1567
1568       /* and link to the funnel v6 */
1569       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1570       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1571       gst_pad_link (pad, selpad);
1572       gst_object_unref (pad);
1573       gst_object_unref (selpad);
1574     }
1575
1576     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1577       /* make and add appsrc */
1578       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1579       gst_bin_add (bin, priv->appsrc[i]);
1580       /* and link to the funnel */
1581       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1582       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1583       gst_pad_link (pad, selpad);
1584       gst_object_unref (pad);
1585       gst_object_unref (selpad);
1586     }
1587
1588     /* check if we need to set to a special state */
1589     if (state != GST_STATE_NULL) {
1590       if (priv->udpsink[i])
1591         gst_element_set_state (priv->udpsink[i], state);
1592       if (priv->appsink[i])
1593         gst_element_set_state (priv->appsink[i], state);
1594       if (priv->appqueue[i])
1595         gst_element_set_state (priv->appqueue[i], state);
1596       if (priv->tee[i])
1597         gst_element_set_state (priv->tee[i], state);
1598       if (priv->funnel[i])
1599         gst_element_set_state (priv->funnel[i], state);
1600       if (priv->appsrc[i])
1601         gst_element_set_state (priv->appsrc[i], state);
1602     }
1603   }
1604
1605   /* be notified of caps changes */
1606   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1607       (GCallback) caps_notify, stream);
1608
1609   priv->is_joined = TRUE;
1610   g_mutex_unlock (&priv->lock);
1611
1612   return TRUE;
1613
1614   /* ERRORS */
1615 was_joined:
1616   {
1617     g_mutex_unlock (&priv->lock);
1618     return TRUE;
1619   }
1620 no_ports:
1621   {
1622     g_mutex_unlock (&priv->lock);
1623     GST_WARNING ("failed to allocate ports %u", idx);
1624     return FALSE;
1625   }
1626 link_failed:
1627   {
1628     GST_WARNING ("failed to link stream %u", idx);
1629     gst_object_unref (priv->send_rtp_sink);
1630     priv->send_rtp_sink = NULL;
1631     g_mutex_unlock (&priv->lock);
1632     return FALSE;
1633   }
1634 }
1635
1636 /**
1637  * gst_rtsp_stream_leave_bin:
1638  * @stream: a #GstRTSPStream
1639  * @bin: a #GstBin
1640  * @rtpbin: a rtpbin #GstElement
1641  *
1642  * Remove the elements of @stream from @bin.
1643  *
1644  * Return: %TRUE on success.
1645  */
1646 gboolean
1647 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1648     GstElement * rtpbin)
1649 {
1650   GstRTSPStreamPrivate *priv;
1651   gint i;
1652
1653   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1654   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1655   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1656
1657   priv = stream->priv;
1658
1659   g_mutex_lock (&priv->lock);
1660   if (!priv->is_joined)
1661     goto was_not_joined;
1662
1663   /* all transports must be removed by now */
1664   g_return_val_if_fail (priv->transports == NULL, FALSE);
1665
1666   GST_INFO ("stream %p leaving bin", stream);
1667
1668   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1669   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1670   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1671   gst_object_unref (priv->send_rtp_sink);
1672   priv->send_rtp_sink = NULL;
1673
1674   for (i = 0; i < 2; i++) {
1675     if (priv->udpsink[i])
1676       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1677     if (priv->appsink[i])
1678       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1679     if (priv->appqueue[i])
1680       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1681     if (priv->tee[i])
1682       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1683     if (priv->funnel[i])
1684       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1685     if (priv->appsrc[i])
1686       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1687     if (priv->udpsrc_v4[i]) {
1688       /* and set udpsrc to NULL now before removing */
1689       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1690       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1691       /* removing them should also nicely release the request
1692        * pads when they finalize */
1693       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1694     }
1695     if (priv->udpsrc_v6[i]) {
1696       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1697       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1698       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1699     }
1700     if (priv->udpsink[i])
1701       gst_bin_remove (bin, priv->udpsink[i]);
1702     if (priv->appsrc[i])
1703       gst_bin_remove (bin, priv->appsrc[i]);
1704     if (priv->appsink[i])
1705       gst_bin_remove (bin, priv->appsink[i]);
1706     if (priv->appqueue[i])
1707       gst_bin_remove (bin, priv->appqueue[i]);
1708     if (priv->tee[i])
1709       gst_bin_remove (bin, priv->tee[i]);
1710     if (priv->funnel[i])
1711       gst_bin_remove (bin, priv->funnel[i]);
1712
1713     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1714     gst_object_unref (priv->recv_sink[i]);
1715     priv->recv_sink[i] = NULL;
1716
1717     priv->udpsrc_v4[i] = NULL;
1718     priv->udpsrc_v6[i] = NULL;
1719     priv->udpsink[i] = NULL;
1720     priv->appsrc[i] = NULL;
1721     priv->appsink[i] = NULL;
1722     priv->appqueue[i] = NULL;
1723     priv->tee[i] = NULL;
1724     priv->funnel[i] = NULL;
1725   }
1726   gst_object_unref (priv->send_src[0]);
1727   priv->send_src[0] = NULL;
1728
1729   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1730   gst_object_unref (priv->send_src[1]);
1731   priv->send_src[1] = NULL;
1732
1733   g_object_unref (priv->session);
1734   priv->session = NULL;
1735   if (priv->caps)
1736     gst_caps_unref (priv->caps);
1737   priv->caps = NULL;
1738
1739   priv->is_joined = FALSE;
1740   g_mutex_unlock (&priv->lock);
1741
1742   return TRUE;
1743
1744 was_not_joined:
1745   {
1746     return TRUE;
1747   }
1748 }
1749
1750 /**
1751  * gst_rtsp_stream_get_rtpinfo:
1752  * @stream: a #GstRTSPStream
1753  * @rtptime: result RTP timestamp
1754  * @seq: result RTP seqnum
1755  *
1756  * Retrieve the current rtptime and seq. This is used to
1757  * construct a RTPInfo reply header.
1758  *
1759  * Returns: %TRUE when rtptime and seq could be determined.
1760  */
1761 gboolean
1762 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1763     guint * rtptime, guint * seq)
1764 {
1765   GstRTSPStreamPrivate *priv;
1766   GObjectClass *payobjclass;
1767
1768   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1769   g_return_val_if_fail (rtptime != NULL, FALSE);
1770   g_return_val_if_fail (seq != NULL, FALSE);
1771
1772   priv = stream->priv;
1773
1774   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1775
1776   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1777       !g_object_class_find_property (payobjclass, "timestamp"))
1778     return FALSE;
1779
1780   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1781
1782   return TRUE;
1783 }
1784
1785 /**
1786  * gst_rtsp_stream_get_caps:
1787  * @stream: a #GstRTSPStream
1788  *
1789  * Retrieve the current caps of @stream.
1790  *
1791  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1792  *    after usage.
1793  */
1794 GstCaps *
1795 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1796 {
1797   GstRTSPStreamPrivate *priv;
1798   GstCaps *result;
1799
1800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1801
1802   priv = stream->priv;
1803
1804   g_mutex_lock (&priv->lock);
1805   if ((result = priv->caps))
1806     gst_caps_ref (result);
1807   g_mutex_unlock (&priv->lock);
1808
1809   return result;
1810 }
1811
1812 /**
1813  * gst_rtsp_stream_recv_rtp:
1814  * @stream: a #GstRTSPStream
1815  * @buffer: (transfer full): a #GstBuffer
1816  *
1817  * Handle an RTP buffer for the stream. This method is usually called when a
1818  * message has been received from a client using the TCP transport.
1819  *
1820  * This function takes ownership of @buffer.
1821  *
1822  * Returns: a GstFlowReturn.
1823  */
1824 GstFlowReturn
1825 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1826 {
1827   GstRTSPStreamPrivate *priv;
1828   GstFlowReturn ret;
1829   GstElement *element;
1830
1831   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1832   priv = stream->priv;
1833   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1834   g_return_val_if_fail (priv->is_joined, FALSE);
1835
1836   g_mutex_lock (&priv->lock);
1837   if (priv->appsrc[0])
1838     element = gst_object_ref (priv->appsrc[0]);
1839   else
1840     element = NULL;
1841   g_mutex_unlock (&priv->lock);
1842
1843   if (element) {
1844     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1845     gst_object_unref (element);
1846   } else {
1847     ret = GST_FLOW_OK;
1848   }
1849   return ret;
1850 }
1851
1852 /**
1853  * gst_rtsp_stream_recv_rtcp:
1854  * @stream: a #GstRTSPStream
1855  * @buffer: (transfer full): a #GstBuffer
1856  *
1857  * Handle an RTCP buffer for the stream. This method is usually called when a
1858  * message has been received from a client using the TCP transport.
1859  *
1860  * This function takes ownership of @buffer.
1861  *
1862  * Returns: a GstFlowReturn.
1863  */
1864 GstFlowReturn
1865 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1866 {
1867   GstRTSPStreamPrivate *priv;
1868   GstFlowReturn ret;
1869   GstElement *element;
1870
1871   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1872   priv = stream->priv;
1873   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1874   g_return_val_if_fail (priv->is_joined, FALSE);
1875
1876   g_mutex_lock (&priv->lock);
1877   if (priv->appsrc[1])
1878     element = gst_object_ref (priv->appsrc[1]);
1879   else
1880     element = NULL;
1881   g_mutex_unlock (&priv->lock);
1882
1883   if (element) {
1884     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1885     gst_object_unref (element);
1886   } else {
1887     ret = GST_FLOW_OK;
1888   }
1889   return ret;
1890 }
1891
1892 /* must be called with lock */
1893 static gboolean
1894 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1895     gboolean add)
1896 {
1897   GstRTSPStreamPrivate *priv = stream->priv;
1898   const GstRTSPTransport *tr;
1899
1900   tr = gst_rtsp_stream_transport_get_transport (trans);
1901
1902   switch (tr->lower_transport) {
1903     case GST_RTSP_LOWER_TRANS_UDP:
1904     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1905     {
1906       gchar *dest;
1907       gint min, max;
1908       guint ttl = 0;
1909
1910       dest = tr->destination;
1911       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1912         min = tr->port.min;
1913         max = tr->port.max;
1914         ttl = tr->ttl;
1915       } else {
1916         min = tr->client_port.min;
1917         max = tr->client_port.max;
1918       }
1919
1920       if (add) {
1921         GST_INFO ("adding %s:%d-%d", dest, min, max);
1922         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1923         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1924         if (ttl > 0) {
1925           GST_INFO ("setting ttl-mc %d", ttl);
1926           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1927           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1928         }
1929         priv->transports = g_list_prepend (priv->transports, trans);
1930       } else {
1931         GST_INFO ("removing %s:%d-%d", dest, min, max);
1932         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1933         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1934         priv->transports = g_list_remove (priv->transports, trans);
1935       }
1936       break;
1937     }
1938     case GST_RTSP_LOWER_TRANS_TCP:
1939       if (add) {
1940         GST_INFO ("adding TCP %s", tr->destination);
1941         priv->transports = g_list_prepend (priv->transports, trans);
1942       } else {
1943         GST_INFO ("removing TCP %s", tr->destination);
1944         priv->transports = g_list_remove (priv->transports, trans);
1945       }
1946       break;
1947     default:
1948       goto unknown_transport;
1949   }
1950   return TRUE;
1951
1952   /* ERRORS */
1953 unknown_transport:
1954   {
1955     GST_INFO ("Unknown transport %d", tr->lower_transport);
1956     return FALSE;
1957   }
1958 }
1959
1960
1961 /**
1962  * gst_rtsp_stream_add_transport:
1963  * @stream: a #GstRTSPStream
1964  * @trans: a #GstRTSPStreamTransport
1965  *
1966  * Add the transport in @trans to @stream. The media of @stream will
1967  * then also be send to the values configured in @trans.
1968  *
1969  * @stream must be joined to a bin.
1970  *
1971  * @trans must contain a valid #GstRTSPTransport.
1972  *
1973  * Returns: %TRUE if @trans was added
1974  */
1975 gboolean
1976 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1977     GstRTSPStreamTransport * trans)
1978 {
1979   GstRTSPStreamPrivate *priv;
1980   gboolean res;
1981
1982   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1983   priv = stream->priv;
1984   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1985   g_return_val_if_fail (priv->is_joined, FALSE);
1986
1987   g_mutex_lock (&priv->lock);
1988   res = update_transport (stream, trans, TRUE);
1989   g_mutex_unlock (&priv->lock);
1990
1991   return res;
1992 }
1993
1994 /**
1995  * gst_rtsp_stream_remove_transport:
1996  * @stream: a #GstRTSPStream
1997  * @trans: a #GstRTSPStreamTransport
1998  *
1999  * Remove the transport in @trans from @stream. The media of @stream will
2000  * not be sent to the values configured in @trans.
2001  *
2002  * @stream must be joined to a bin.
2003  *
2004  * @trans must contain a valid #GstRTSPTransport.
2005  *
2006  * Returns: %TRUE if @trans was removed
2007  */
2008 gboolean
2009 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2010     GstRTSPStreamTransport * trans)
2011 {
2012   GstRTSPStreamPrivate *priv;
2013   gboolean res;
2014
2015   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2016   priv = stream->priv;
2017   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2018   g_return_val_if_fail (priv->is_joined, FALSE);
2019
2020   g_mutex_lock (&priv->lock);
2021   res = update_transport (stream, trans, FALSE);
2022   g_mutex_unlock (&priv->lock);
2023
2024   return res;
2025 }
2026
2027 /**
2028  * gst_rtsp_stream_get_rtp_socket:
2029  * @stream: a #GstRTSPStream
2030  * @family: the socket family
2031  *
2032  * Get the RTP socket from @stream for a @family.
2033  *
2034  * @stream must be joined to a bin.
2035  *
2036  * Returns: the RTP socket or %NULL if no socket could be allocated for @family.
2037  *     Unref after usage
2038  */
2039 GSocket *
2040 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2041 {
2042   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2043   GSocket *socket;
2044   gchar *name;
2045
2046   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2047   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2048       family == G_SOCKET_FAMILY_IPV6, NULL);
2049   g_return_val_if_fail (priv->udpsink[0], NULL);
2050
2051   if (family == G_SOCKET_FAMILY_IPV6)
2052     name = "socket-v6";
2053   else
2054     name = "socket";
2055
2056   g_object_get (priv->udpsink[0], name, &socket, NULL);
2057
2058   return socket;
2059 }
2060
2061 /**
2062  * gst_rtsp_stream_get_rtcp_socket:
2063  * @stream: a #GstRTSPStream
2064  * @family: the socket family
2065  *
2066  * Get the RTCP socket from @stream for a @family.
2067  *
2068  * @stream must be joined to a bin.
2069  *
2070  * Returns: the RTCP socket or %NULL if no socket could be allocated for
2071  *     @family. Unref after usage
2072  */
2073 GSocket *
2074 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2075 {
2076   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2077   GSocket *socket;
2078   gchar *name;
2079
2080   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2081   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2082       family == G_SOCKET_FAMILY_IPV6, NULL);
2083   g_return_val_if_fail (priv->udpsink[1], NULL);
2084
2085   if (family == G_SOCKET_FAMILY_IPV6)
2086     name = "socket-v6";
2087   else
2088     name = "socket";
2089
2090   g_object_get (priv->udpsink[1], name, &socket, NULL);
2091
2092   return socket;
2093 }
2094
2095 /**
2096  * gst_rtsp_stream_transport_filter:
2097  * @stream: a #GstRTSPStream
2098  * @func: (scope call) (allow-none): a callback
2099  * @user_data: user data passed to @func
2100  *
2101  * Call @func for each transport managed by @stream. The result value of @func
2102  * determines what happens to the transport. @func will be called with @stream
2103  * locked so no further actions on @stream can be performed from @func.
2104  *
2105  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2106  * @stream.
2107  *
2108  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2109  *
2110  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2111  * will also be added with an additional ref to the result #GList of this
2112  * function..
2113  *
2114  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2115  *
2116  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2117  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2118  * element in the #GList should be unreffed before the list is freed.
2119  */
2120 GList *
2121 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2122     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2123 {
2124   GstRTSPStreamPrivate *priv;
2125   GList *result, *walk, *next;
2126
2127   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2128
2129   priv = stream->priv;
2130
2131   result = NULL;
2132
2133   g_mutex_lock (&priv->lock);
2134   for (walk = priv->transports; walk; walk = next) {
2135     GstRTSPStreamTransport *trans = walk->data;
2136     GstRTSPFilterResult res;
2137
2138     next = g_list_next (walk);
2139
2140     if (func)
2141       res = func (stream, trans, user_data);
2142     else
2143       res = GST_RTSP_FILTER_REF;
2144
2145     switch (res) {
2146       case GST_RTSP_FILTER_REMOVE:
2147         update_transport (stream, trans, FALSE);
2148         break;
2149       case GST_RTSP_FILTER_REF:
2150         result = g_list_prepend (result, g_object_ref (trans));
2151         break;
2152       case GST_RTSP_FILTER_KEEP:
2153       default:
2154         break;
2155     }
2156   }
2157   g_mutex_unlock (&priv->lock);
2158
2159   return result;
2160 }
2161
2162 static GstPadProbeReturn
2163 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2164 {
2165   GstRTSPStreamPrivate *priv;
2166   GstRTSPStream *stream;
2167
2168   stream = user_data;
2169   priv = stream->priv;
2170
2171   GST_DEBUG_OBJECT (pad, "now blocking");
2172
2173   g_mutex_lock (&priv->lock);
2174   priv->blocking = TRUE;
2175   g_mutex_unlock (&priv->lock);
2176
2177   gst_element_post_message (priv->payloader,
2178       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2179           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2180
2181   return GST_PAD_PROBE_OK;
2182 }
2183
2184 /**
2185  * gst_rtsp_stream_set_blocked:
2186  * @stream: a #GstRTSPStream
2187  * @blocked: boolean indicating we should block or unblock
2188  *
2189  * Blocks or unblocks the dataflow on @stream.
2190  *
2191  * Returns: %TRUE on success
2192  */
2193 gboolean
2194 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2195 {
2196   GstRTSPStreamPrivate *priv;
2197
2198   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2199
2200   priv = stream->priv;
2201
2202   g_mutex_lock (&priv->lock);
2203   if (blocked) {
2204     priv->blocking = FALSE;
2205     if (priv->blocked_id == 0) {
2206       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2207           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2208           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2209           g_object_ref (stream), g_object_unref);
2210     }
2211   } else {
2212     if (priv->blocked_id != 0) {
2213       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2214       priv->blocked_id = 0;
2215       priv->blocking = FALSE;
2216     }
2217   }
2218   g_mutex_unlock (&priv->lock);
2219
2220   return TRUE;
2221 }
2222
2223 /**
2224  * gst_rtsp_stream_is_blocking:
2225  * @stream: a #GstRTSPStream
2226  *
2227  * Check if @stream is blocking on a #GstBuffer.
2228  *
2229  * Returns: %TRUE if @stream is blocking
2230  */
2231 gboolean
2232 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2233 {
2234   GstRTSPStreamPrivate *priv;
2235   gboolean result;
2236
2237   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2238
2239   priv = stream->priv;
2240
2241   g_mutex_lock (&priv->lock);
2242   result = priv->blocking;
2243   g_mutex_unlock (&priv->lock);
2244
2245   return result;
2246 }