stream: also return the running-time
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPLowerTrans protocols;
72
73   /* pads on the rtpbin */
74   GstPad *send_rtp_sink;
75   GstPad *recv_sink[2];
76   GstPad *send_src[2];
77
78   /* the RTPSession object */
79   GObject *session;
80
81   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
82    * sockets */
83   GstElement *udpsrc_v4[2];
84
85   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
86    * sockets */
87   GstElement *udpsrc_v6[2];
88
89   GstElement *udpsink[2];
90
91   /* for TCP transport */
92   GstElement *appsrc[2];
93   GstElement *appqueue[2];
94   GstElement *appsink[2];
95
96   GstElement *tee[2];
97   GstElement *funnel[2];
98
99   /* server ports for sending/receiving over ipv4 */
100   GstRTSPRange server_port_v4;
101   GstRTSPAddress *server_addr_v4;
102   gboolean have_ipv4;
103
104   /* server ports for sending/receiving over ipv6 */
105   GstRTSPRange server_port_v6;
106   GstRTSPAddress *server_addr_v6;
107   gboolean have_ipv6;
108
109   /* multicast addresses */
110   GstRTSPAddressPool *pool;
111   GstRTSPAddress *addr_v4;
112   GstRTSPAddress *addr_v6;
113
114   /* the caps of the stream */
115   gulong caps_sig;
116   GstCaps *caps;
117
118   /* transports we stream to */
119   guint n_active;
120   GList *transports;
121
122   gint dscp_qos;
123
124   /* stream blocking */
125   gulong blocked_id;
126   gboolean blocking;
127 };
128
129 #define DEFAULT_CONTROL         NULL
130 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
131                                         GST_RTSP_LOWER_TRANS_TCP
132
133 enum
134 {
135   PROP_0,
136   PROP_CONTROL,
137   PROP_PROTOCOLS,
138   PROP_LAST
139 };
140
141 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
142 #define GST_CAT_DEFAULT rtsp_stream_debug
143
144 static GQuark ssrc_stream_map_key;
145
146 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
147     GValue * value, GParamSpec * pspec);
148 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
149     const GValue * value, GParamSpec * pspec);
150
151 static void gst_rtsp_stream_finalize (GObject * obj);
152
153 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
154
155 static void
156 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
157 {
158   GObjectClass *gobject_class;
159
160   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
161
162   gobject_class = G_OBJECT_CLASS (klass);
163
164   gobject_class->get_property = gst_rtsp_stream_get_property;
165   gobject_class->set_property = gst_rtsp_stream_set_property;
166   gobject_class->finalize = gst_rtsp_stream_finalize;
167
168   g_object_class_install_property (gobject_class, PROP_CONTROL,
169       g_param_spec_string ("control", "Control",
170           "The control string for this stream", DEFAULT_CONTROL,
171           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172
173   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
174       g_param_spec_flags ("protocols", "Protocols",
175           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
176           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177
178   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
179
180   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
181 }
182
183 static void
184 gst_rtsp_stream_init (GstRTSPStream * stream)
185 {
186   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
187
188   GST_DEBUG ("new stream %p", stream);
189
190   stream->priv = priv;
191
192   priv->dscp_qos = -1;
193   priv->control = g_strdup (DEFAULT_CONTROL);
194   priv->protocols = DEFAULT_PROTOCOLS;
195
196   g_mutex_init (&priv->lock);
197 }
198
199 static void
200 gst_rtsp_stream_finalize (GObject * obj)
201 {
202   GstRTSPStream *stream;
203   GstRTSPStreamPrivate *priv;
204
205   stream = GST_RTSP_STREAM (obj);
206   priv = stream->priv;
207
208   GST_DEBUG ("finalize stream %p", stream);
209
210   /* we really need to be unjoined now */
211   g_return_if_fail (!priv->is_joined);
212
213   if (priv->addr_v4)
214     gst_rtsp_address_free (priv->addr_v4);
215   if (priv->addr_v6)
216     gst_rtsp_address_free (priv->addr_v6);
217   if (priv->server_addr_v4)
218     gst_rtsp_address_free (priv->server_addr_v4);
219   if (priv->server_addr_v6)
220     gst_rtsp_address_free (priv->server_addr_v6);
221   if (priv->pool)
222     g_object_unref (priv->pool);
223   gst_object_unref (priv->payloader);
224   gst_object_unref (priv->srcpad);
225   g_free (priv->control);
226   g_mutex_clear (&priv->lock);
227
228   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
229 }
230
231 static void
232 gst_rtsp_stream_get_property (GObject * object, guint propid,
233     GValue * value, GParamSpec * pspec)
234 {
235   GstRTSPStream *stream = GST_RTSP_STREAM (object);
236
237   switch (propid) {
238     case PROP_CONTROL:
239       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
240       break;
241     case PROP_PROTOCOLS:
242       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
243       break;
244     default:
245       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
246   }
247 }
248
249 static void
250 gst_rtsp_stream_set_property (GObject * object, guint propid,
251     const GValue * value, GParamSpec * pspec)
252 {
253   GstRTSPStream *stream = GST_RTSP_STREAM (object);
254
255   switch (propid) {
256     case PROP_CONTROL:
257       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
258       break;
259     case PROP_PROTOCOLS:
260       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
261       break;
262     default:
263       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
264   }
265 }
266
267 /**
268  * gst_rtsp_stream_new:
269  * @idx: an index
270  * @srcpad: a #GstPad
271  * @payloader: a #GstElement
272  *
273  * Create a new media stream with index @idx that handles RTP data on
274  * @srcpad and has a payloader element @payloader.
275  *
276  * Returns: a new #GstRTSPStream
277  */
278 GstRTSPStream *
279 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
280 {
281   GstRTSPStreamPrivate *priv;
282   GstRTSPStream *stream;
283
284   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
285   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
286   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
287
288   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
289   priv = stream->priv;
290   priv->idx = idx;
291   priv->payloader = gst_object_ref (payloader);
292   priv->srcpad = gst_object_ref (srcpad);
293
294   return stream;
295 }
296
297 /**
298  * gst_rtsp_stream_get_index:
299  * @stream: a #GstRTSPStream
300  *
301  * Get the stream index.
302  *
303  * Return: the stream index.
304  */
305 guint
306 gst_rtsp_stream_get_index (GstRTSPStream * stream)
307 {
308   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
309
310   return stream->priv->idx;
311 }
312
313 /**
314  * gst_rtsp_stream_get_pt:
315  * @stream: a #GstRTSPStream
316  *
317  * Get the stream payload type.
318  *
319  * Return: the stream payload type.
320  */
321 guint
322 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
323 {
324   GstRTSPStreamPrivate *priv;
325   guint pt;
326
327   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
328
329   priv = stream->priv;
330
331   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
332
333   return pt;
334 }
335
336 /**
337  * gst_rtsp_stream_get_srcpad:
338  * @stream: a #GstRTSPStream
339  *
340  * Get the srcpad associated with @stream.
341  *
342  * Returns: (transfer full): the srcpad. Unref after usage.
343  */
344 GstPad *
345 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
346 {
347   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
348
349   return gst_object_ref (stream->priv->srcpad);
350 }
351
352 /**
353  * gst_rtsp_stream_get_control:
354  * @stream: a #GstRTSPStream
355  *
356  * Get the control string to identify this stream.
357  *
358  * Returns: (transfer full): the control string. free after usage.
359  */
360 gchar *
361 gst_rtsp_stream_get_control (GstRTSPStream * stream)
362 {
363   GstRTSPStreamPrivate *priv;
364   gchar *result;
365
366   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
367
368   priv = stream->priv;
369
370   g_mutex_lock (&priv->lock);
371   if ((result = g_strdup (priv->control)) == NULL)
372     result = g_strdup_printf ("stream=%u", priv->idx);
373   g_mutex_unlock (&priv->lock);
374
375   return result;
376 }
377
378 /**
379  * gst_rtsp_stream_set_control:
380  * @stream: a #GstRTSPStream
381  * @control: a control string
382  *
383  * Set the control string in @stream.
384  */
385 void
386 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
387 {
388   GstRTSPStreamPrivate *priv;
389
390   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
391
392   priv = stream->priv;
393
394   g_mutex_lock (&priv->lock);
395   g_free (priv->control);
396   priv->control = g_strdup (control);
397   g_mutex_unlock (&priv->lock);
398 }
399
400 /**
401  * gst_rtsp_stream_has_control:
402  * @stream: a #GstRTSPStream
403  * @control: a control string
404  *
405  * Check if @stream has the control string @control.
406  *
407  * Returns: %TRUE is @stream has @control as the control string
408  */
409 gboolean
410 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
411 {
412   GstRTSPStreamPrivate *priv;
413   gboolean res;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
416
417   priv = stream->priv;
418
419   g_mutex_lock (&priv->lock);
420   if (priv->control)
421     res = (g_strcmp0 (priv->control, control) == 0);
422   else {
423     guint streamid;
424
425     if (sscanf (control, "stream=%u", &streamid) > 0)
426       res = (streamid == priv->idx);
427     else
428       res = FALSE;
429   }
430   g_mutex_unlock (&priv->lock);
431
432   return res;
433 }
434
435 /**
436  * gst_rtsp_stream_set_mtu:
437  * @stream: a #GstRTSPStream
438  * @mtu: a new MTU
439  *
440  * Configure the mtu in the payloader of @stream to @mtu.
441  */
442 void
443 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
444 {
445   GstRTSPStreamPrivate *priv;
446
447   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
448
449   priv = stream->priv;
450
451   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
452
453   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
454 }
455
456 /**
457  * gst_rtsp_stream_get_mtu:
458  * @stream: a #GstRTSPStream
459  *
460  * Get the configured MTU in the payloader of @stream.
461  *
462  * Returns: the MTU of the payloader.
463  */
464 guint
465 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
466 {
467   GstRTSPStreamPrivate *priv;
468   guint mtu;
469
470   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
471
472   priv = stream->priv;
473
474   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
475
476   return mtu;
477 }
478
479 /* Update the dscp qos property on the udp sinks */
480 static void
481 update_dscp_qos (GstRTSPStream * stream)
482 {
483   GstRTSPStreamPrivate *priv;
484
485   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
486
487   priv = stream->priv;
488
489   if (priv->udpsink[0]) {
490     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
491         NULL);
492   }
493
494   if (priv->udpsink[1]) {
495     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
496         NULL);
497   }
498 }
499
500 /**
501  * gst_rtsp_stream_set_dscp_qos:
502  * @stream: a #GstRTSPStream
503  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
504  *
505  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
506  */
507 void
508 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
509 {
510   GstRTSPStreamPrivate *priv;
511
512   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
513
514   priv = stream->priv;
515
516   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
517
518   if (dscp_qos < -1 || dscp_qos > 63) {
519     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
520     return;
521   }
522
523   priv->dscp_qos = dscp_qos;
524
525   update_dscp_qos (stream);
526 }
527
528 /**
529  * gst_rtsp_stream_get_dscp_qos:
530  * @stream: a #GstRTSPStream
531  *
532  * Get the configured DSCP QoS in of the outgoing sockets.
533  *
534  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
535  */
536 gint
537 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
538 {
539   GstRTSPStreamPrivate *priv;
540
541   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
542
543   priv = stream->priv;
544
545   return priv->dscp_qos;
546 }
547
548 /**
549  * gst_rtsp_stream_set_protocols:
550  * @stream: a #GstRTSPStream
551  * @protocols: the new flags
552  *
553  * Configure the allowed lower transport for @stream.
554  */
555 void
556 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
557     GstRTSPLowerTrans protocols)
558 {
559   GstRTSPStreamPrivate *priv;
560
561   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
562
563   priv = stream->priv;
564
565   g_mutex_lock (&priv->lock);
566   priv->protocols = protocols;
567   g_mutex_unlock (&priv->lock);
568 }
569
570 /**
571  * gst_rtsp_stream_get_protocols:
572  * @stream: a #GstRTSPStream
573  *
574  * Get the allowed protocols of @stream.
575  *
576  * Returns: a #GstRTSPLowerTrans
577  */
578 GstRTSPLowerTrans
579 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
580 {
581   GstRTSPStreamPrivate *priv;
582   GstRTSPLowerTrans res;
583
584   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
585       GST_RTSP_LOWER_TRANS_UNKNOWN);
586
587   priv = stream->priv;
588
589   g_mutex_lock (&priv->lock);
590   res = priv->protocols;
591   g_mutex_unlock (&priv->lock);
592
593   return res;
594 }
595
596 /**
597  * gst_rtsp_stream_set_address_pool:
598  * @stream: a #GstRTSPStream
599  * @pool: a #GstRTSPAddressPool
600  *
601  * configure @pool to be used as the address pool of @stream.
602  */
603 void
604 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
605     GstRTSPAddressPool * pool)
606 {
607   GstRTSPStreamPrivate *priv;
608   GstRTSPAddressPool *old;
609
610   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
611
612   priv = stream->priv;
613
614   GST_LOG_OBJECT (stream, "set address pool %p", pool);
615
616   g_mutex_lock (&priv->lock);
617   if ((old = priv->pool) != pool)
618     priv->pool = pool ? g_object_ref (pool) : NULL;
619   else
620     old = NULL;
621   g_mutex_unlock (&priv->lock);
622
623   if (old)
624     g_object_unref (old);
625 }
626
627 /**
628  * gst_rtsp_stream_get_address_pool:
629  * @stream: a #GstRTSPStream
630  *
631  * Get the #GstRTSPAddressPool used as the address pool of @stream.
632  *
633  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
634  * usage.
635  */
636 GstRTSPAddressPool *
637 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
638 {
639   GstRTSPStreamPrivate *priv;
640   GstRTSPAddressPool *result;
641
642   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
643
644   priv = stream->priv;
645
646   g_mutex_lock (&priv->lock);
647   if ((result = priv->pool))
648     g_object_ref (result);
649   g_mutex_unlock (&priv->lock);
650
651   return result;
652 }
653
654 /**
655  * gst_rtsp_stream_get_multicast_address:
656  * @stream: a #GstRTSPStream
657  * @family: the #GSocketFamily
658  *
659  * Get the multicast address of @stream for @family.
660  *
661  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
662  * allocated. gst_rtsp_address_free() after usage.
663  */
664 GstRTSPAddress *
665 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
666     GSocketFamily family)
667 {
668   GstRTSPStreamPrivate *priv;
669   GstRTSPAddress *result;
670   GstRTSPAddress **addrp;
671   GstRTSPAddressFlags flags;
672
673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
674
675   priv = stream->priv;
676
677   if (family == G_SOCKET_FAMILY_IPV6) {
678     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
679     addrp = &priv->addr_v4;
680   } else {
681     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
682     addrp = &priv->addr_v6;
683   }
684
685   g_mutex_lock (&priv->lock);
686   if (*addrp == NULL) {
687     if (priv->pool == NULL)
688       goto no_pool;
689
690     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
691
692     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
693     if (*addrp == NULL)
694       goto no_address;
695   }
696   result = gst_rtsp_address_copy (*addrp);
697   g_mutex_unlock (&priv->lock);
698
699   return result;
700
701   /* ERRORS */
702 no_pool:
703   {
704     GST_ERROR_OBJECT (stream, "no address pool specified");
705     g_mutex_unlock (&priv->lock);
706     return NULL;
707   }
708 no_address:
709   {
710     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
711     g_mutex_unlock (&priv->lock);
712     return NULL;
713   }
714 }
715
716 /**
717  * gst_rtsp_stream_reserve_address:
718  * @stream: a #GstRTSPStream
719  * @address: an address
720  * @port: a port
721  * @n_ports: n_ports
722  * @ttl: a TTL
723  *
724  * Reserve @address and @port as the address and port of @stream.
725  *
726  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
727  * reserved. gst_rtsp_address_free() after usage.
728  */
729 GstRTSPAddress *
730 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
731     const gchar * address, guint port, guint n_ports, guint ttl)
732 {
733   GstRTSPStreamPrivate *priv;
734   GstRTSPAddress *result;
735   GInetAddress *addr;
736   GSocketFamily family;
737   GstRTSPAddress **addrp;
738
739   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
740   g_return_val_if_fail (address != NULL, NULL);
741   g_return_val_if_fail (port > 0, NULL);
742   g_return_val_if_fail (n_ports > 0, NULL);
743   g_return_val_if_fail (ttl > 0, NULL);
744
745   priv = stream->priv;
746
747   addr = g_inet_address_new_from_string (address);
748   if (!addr) {
749     GST_ERROR ("failed to get inet addr from %s", address);
750     family = G_SOCKET_FAMILY_IPV4;
751   } else {
752     family = g_inet_address_get_family (addr);
753     g_object_unref (addr);
754   }
755
756   if (family == G_SOCKET_FAMILY_IPV6)
757     addrp = &priv->addr_v4;
758   else
759     addrp = &priv->addr_v6;
760
761   g_mutex_lock (&priv->lock);
762   if (*addrp == NULL) {
763     GstRTSPAddressPoolResult res;
764
765     if (priv->pool == NULL)
766       goto no_pool;
767
768     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
769         port, n_ports, ttl, addrp);
770     if (res != GST_RTSP_ADDRESS_POOL_OK)
771       goto no_address;
772   } else {
773     if (strcmp ((*addrp)->address, address) ||
774         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
775         (*addrp)->ttl != ttl)
776       goto different_address;
777   }
778   result = gst_rtsp_address_copy (*addrp);
779   g_mutex_unlock (&priv->lock);
780
781   return result;
782
783   /* ERRORS */
784 no_pool:
785   {
786     GST_ERROR_OBJECT (stream, "no address pool specified");
787     g_mutex_unlock (&priv->lock);
788     return NULL;
789   }
790 no_address:
791   {
792     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
793         address);
794     g_mutex_unlock (&priv->lock);
795     return NULL;
796   }
797 different_address:
798   {
799     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
800         " reserved", address);
801     g_mutex_unlock (&priv->lock);
802     return NULL;
803   }
804 }
805
806 static gboolean
807 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
808     GSocketFamily family, GstElement * udpsrc_out[2],
809     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
810     GstRTSPAddress ** server_addr_out)
811 {
812   GstStateChangeReturn ret;
813   GstElement *udpsrc0, *udpsrc1;
814   GstElement *udpsink0, *udpsink1;
815   GSocket *rtp_socket = NULL;
816   GSocket *rtcp_socket;
817   gint tmp_rtp, tmp_rtcp;
818   guint count;
819   gint rtpport, rtcpport;
820   GList *rejected_addresses = NULL;
821   GstRTSPAddress *addr = NULL;
822   GInetAddress *inetaddr = NULL;
823   GSocketAddress *rtp_sockaddr = NULL;
824   GSocketAddress *rtcp_sockaddr = NULL;
825   const gchar *multisink_socket;
826
827   if (family == G_SOCKET_FAMILY_IPV6)
828     multisink_socket = "socket-v6";
829   else
830     multisink_socket = "socket";
831
832   udpsrc0 = NULL;
833   udpsrc1 = NULL;
834   udpsink0 = NULL;
835   udpsink1 = NULL;
836   count = 0;
837
838   /* Start with random port */
839   tmp_rtp = 0;
840
841   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
842       G_SOCKET_PROTOCOL_UDP, NULL);
843   if (!rtcp_socket)
844     goto no_udp_protocol;
845
846   if (*server_addr_out)
847     gst_rtsp_address_free (*server_addr_out);
848
849   /* try to allocate 2 UDP ports, the RTP port should be an even
850    * number and the RTCP port should be the next (uneven) port */
851 again:
852
853   if (rtp_socket == NULL) {
854     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
855         G_SOCKET_PROTOCOL_UDP, NULL);
856     if (!rtp_socket)
857       goto no_udp_protocol;
858   }
859
860   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
861     GstRTSPAddressFlags flags;
862
863     if (addr)
864       rejected_addresses = g_list_prepend (rejected_addresses, addr);
865
866     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
867     if (family == G_SOCKET_FAMILY_IPV6)
868       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
869     else
870       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
871
872     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
873
874     if (addr == NULL)
875       goto no_ports;
876
877     tmp_rtp = addr->port;
878
879     g_clear_object (&inetaddr);
880     inetaddr = g_inet_address_new_from_string (addr->address);
881   } else {
882     if (tmp_rtp != 0) {
883       tmp_rtp += 2;
884       if (++count > 20)
885         goto no_ports;
886     }
887
888     if (inetaddr == NULL)
889       inetaddr = g_inet_address_new_any (family);
890   }
891
892   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
893   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
894     g_object_unref (rtp_sockaddr);
895     goto again;
896   }
897   g_object_unref (rtp_sockaddr);
898
899   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
900   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
901     g_clear_object (&rtp_sockaddr);
902     goto socket_error;
903   }
904
905   tmp_rtp =
906       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
907   g_object_unref (rtp_sockaddr);
908
909   /* check if port is even */
910   if ((tmp_rtp & 1) != 0) {
911     /* port not even, close and allocate another */
912     tmp_rtp++;
913     g_clear_object (&rtp_socket);
914     goto again;
915   }
916
917   /* set port */
918   tmp_rtcp = tmp_rtp + 1;
919
920   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
921   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
922     g_object_unref (rtcp_sockaddr);
923     g_clear_object (&rtp_socket);
924     goto again;
925   }
926   g_object_unref (rtcp_sockaddr);
927
928   g_clear_object (&inetaddr);
929
930   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
931   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
932
933   if (udpsrc0 == NULL || udpsrc1 == NULL)
934     goto no_udp_protocol;
935
936   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
937   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
938
939   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
940   if (ret == GST_STATE_CHANGE_FAILURE)
941     goto element_error;
942   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
943   if (ret == GST_STATE_CHANGE_FAILURE)
944     goto element_error;
945
946   /* all fine, do port check */
947   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
948   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
949
950   /* this should not happen... */
951   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
952     goto port_error;
953
954   if (udpsink_out[0])
955     udpsink0 = udpsink_out[0];
956   else
957     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
958
959   if (!udpsink0)
960     goto no_udp_protocol;
961
962   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
963   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
964
965   if (udpsink_out[1])
966     udpsink1 = udpsink_out[1];
967   else
968     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
969
970   if (!udpsink1)
971     goto no_udp_protocol;
972
973   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
974   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
975   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
976
977   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
978   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
979   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
980   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
981   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
982   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
983   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
984   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
985
986   /* we keep these elements, we will further configure them when the
987    * client told us to really use the UDP ports. */
988   udpsrc_out[0] = udpsrc0;
989   udpsrc_out[1] = udpsrc1;
990   udpsink_out[0] = udpsink0;
991   udpsink_out[1] = udpsink1;
992   server_port_out->min = rtpport;
993   server_port_out->max = rtcpport;
994
995   *server_addr_out = addr;
996   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
997
998   g_object_unref (rtp_socket);
999   g_object_unref (rtcp_socket);
1000
1001   return TRUE;
1002
1003   /* ERRORS */
1004 no_udp_protocol:
1005   {
1006     goto cleanup;
1007   }
1008 no_ports:
1009   {
1010     goto cleanup;
1011   }
1012 port_error:
1013   {
1014     goto cleanup;
1015   }
1016 socket_error:
1017   {
1018     goto cleanup;
1019   }
1020 element_error:
1021   {
1022     goto cleanup;
1023   }
1024 cleanup:
1025   {
1026     if (udpsrc0) {
1027       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1028       gst_object_unref (udpsrc0);
1029     }
1030     if (udpsrc1) {
1031       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1032       gst_object_unref (udpsrc1);
1033     }
1034     if (udpsink0) {
1035       gst_element_set_state (udpsink0, GST_STATE_NULL);
1036       gst_object_unref (udpsink0);
1037     }
1038     if (inetaddr)
1039       g_object_unref (inetaddr);
1040     g_list_free_full (rejected_addresses,
1041         (GDestroyNotify) gst_rtsp_address_free);
1042     if (addr)
1043       gst_rtsp_address_free (addr);
1044     if (rtp_socket)
1045       g_object_unref (rtp_socket);
1046     if (rtcp_socket)
1047       g_object_unref (rtcp_socket);
1048     return FALSE;
1049   }
1050 }
1051
1052 /* must be called with lock */
1053 static gboolean
1054 alloc_ports (GstRTSPStream * stream)
1055 {
1056   GstRTSPStreamPrivate *priv = stream->priv;
1057
1058   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1059       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1060       &priv->server_port_v4, &priv->server_addr_v4);
1061
1062   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1063       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1064       &priv->server_port_v6, &priv->server_addr_v6);
1065
1066   return priv->have_ipv4 || priv->have_ipv6;
1067 }
1068
1069 /**
1070  * gst_rtsp_stream_get_server_port:
1071  * @stream: a #GstRTSPStream
1072  * @server_port: (out): result server port
1073  * @family: the port family to get
1074  *
1075  * Fill @server_port with the port pair used by the server. This function can
1076  * only be called when @stream has been joined.
1077  */
1078 void
1079 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1080     GstRTSPRange * server_port, GSocketFamily family)
1081 {
1082   GstRTSPStreamPrivate *priv;
1083
1084   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1085   priv = stream->priv;
1086   g_return_if_fail (priv->is_joined);
1087
1088   g_mutex_lock (&priv->lock);
1089   if (family == G_SOCKET_FAMILY_IPV4) {
1090     if (server_port)
1091       *server_port = priv->server_port_v4;
1092   } else {
1093     if (server_port)
1094       *server_port = priv->server_port_v6;
1095   }
1096   g_mutex_unlock (&priv->lock);
1097 }
1098
1099 /**
1100  * gst_rtsp_stream_get_rtpsession:
1101  * @stream: a #GstRTSPStream
1102  *
1103  * Get the RTP session of this stream.
1104  *
1105  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1106  */
1107 GObject *
1108 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1109 {
1110   GstRTSPStreamPrivate *priv;
1111   GObject *session;
1112
1113   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1114
1115   priv = stream->priv;
1116
1117   g_mutex_lock (&priv->lock);
1118   if ((session = priv->session))
1119     g_object_ref (session);
1120   g_mutex_unlock (&priv->lock);
1121
1122   return session;
1123 }
1124
1125 /**
1126  * gst_rtsp_stream_get_ssrc:
1127  * @stream: a #GstRTSPStream
1128  * @ssrc: (out): result ssrc
1129  *
1130  * Get the SSRC used by the RTP session of this stream. This function can only
1131  * be called when @stream has been joined.
1132  */
1133 void
1134 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1135 {
1136   GstRTSPStreamPrivate *priv;
1137
1138   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1139   priv = stream->priv;
1140   g_return_if_fail (priv->is_joined);
1141
1142   g_mutex_lock (&priv->lock);
1143   if (ssrc && priv->session)
1144     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1145   g_mutex_unlock (&priv->lock);
1146 }
1147
1148 /* executed from streaming thread */
1149 static void
1150 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1151 {
1152   GstRTSPStreamPrivate *priv = stream->priv;
1153   GstCaps *newcaps, *oldcaps;
1154
1155   newcaps = gst_pad_get_current_caps (pad);
1156
1157   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1158       newcaps);
1159
1160   g_mutex_lock (&priv->lock);
1161   oldcaps = priv->caps;
1162   priv->caps = newcaps;
1163   g_mutex_unlock (&priv->lock);
1164
1165   if (oldcaps)
1166     gst_caps_unref (oldcaps);
1167 }
1168
1169 static void
1170 dump_structure (const GstStructure * s)
1171 {
1172   gchar *sstr;
1173
1174   sstr = gst_structure_to_string (s);
1175   GST_INFO ("structure: %s", sstr);
1176   g_free (sstr);
1177 }
1178
1179 static GstRTSPStreamTransport *
1180 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1181 {
1182   GstRTSPStreamPrivate *priv = stream->priv;
1183   GList *walk;
1184   GstRTSPStreamTransport *result = NULL;
1185   const gchar *tmp;
1186   gchar *dest;
1187   guint port;
1188
1189   if (rtcp_from == NULL)
1190     return NULL;
1191
1192   tmp = g_strrstr (rtcp_from, ":");
1193   if (tmp == NULL)
1194     return NULL;
1195
1196   port = atoi (tmp + 1);
1197   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1198
1199   g_mutex_lock (&priv->lock);
1200   GST_INFO ("finding %s:%d in %d transports", dest, port,
1201       g_list_length (priv->transports));
1202
1203   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1204     GstRTSPStreamTransport *trans = walk->data;
1205     const GstRTSPTransport *tr;
1206     gint min, max;
1207
1208     tr = gst_rtsp_stream_transport_get_transport (trans);
1209
1210     min = tr->client_port.min;
1211     max = tr->client_port.max;
1212
1213     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1214       result = trans;
1215       break;
1216     }
1217   }
1218   if (result)
1219     g_object_ref (result);
1220   g_mutex_unlock (&priv->lock);
1221
1222   g_free (dest);
1223
1224   return result;
1225 }
1226
1227 static GstRTSPStreamTransport *
1228 check_transport (GObject * source, GstRTSPStream * stream)
1229 {
1230   GstStructure *stats;
1231   GstRTSPStreamTransport *trans;
1232
1233   /* see if we have a stream to match with the origin of the RTCP packet */
1234   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1235   if (trans == NULL) {
1236     g_object_get (source, "stats", &stats, NULL);
1237     if (stats) {
1238       const gchar *rtcp_from;
1239
1240       dump_structure (stats);
1241
1242       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1243       if ((trans = find_transport (stream, rtcp_from))) {
1244         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1245             source);
1246         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1247             g_object_unref);
1248       }
1249       gst_structure_free (stats);
1250     }
1251   }
1252   return trans;
1253 }
1254
1255
1256 static void
1257 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1258 {
1259   GstRTSPStreamTransport *trans;
1260
1261   GST_INFO ("%p: new source %p", stream, source);
1262
1263   trans = check_transport (source, stream);
1264
1265   if (trans)
1266     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1267 }
1268
1269 static void
1270 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1271 {
1272   GST_INFO ("%p: new SDES %p", stream, source);
1273 }
1274
1275 static void
1276 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1277 {
1278   GstRTSPStreamTransport *trans;
1279
1280   trans = check_transport (source, stream);
1281
1282   if (trans) {
1283     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1284     gst_rtsp_stream_transport_keep_alive (trans);
1285   }
1286 #ifdef DUMP_STATS
1287   {
1288     GstStructure *stats;
1289     g_object_get (source, "stats", &stats, NULL);
1290     if (stats) {
1291       dump_structure (stats);
1292       gst_structure_free (stats);
1293     }
1294   }
1295 #endif
1296 }
1297
1298 static void
1299 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1300 {
1301   GST_INFO ("%p: source %p bye", stream, source);
1302 }
1303
1304 static void
1305 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1306 {
1307   GstRTSPStreamTransport *trans;
1308
1309   GST_INFO ("%p: source %p bye timeout", stream, source);
1310
1311   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1312     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1313     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1314   }
1315 }
1316
1317 static void
1318 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1319 {
1320   GstRTSPStreamTransport *trans;
1321
1322   GST_INFO ("%p: source %p timeout", stream, source);
1323
1324   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1325     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1326     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1327   }
1328 }
1329
1330 static GstFlowReturn
1331 handle_new_sample (GstAppSink * sink, gpointer user_data)
1332 {
1333   GstRTSPStreamPrivate *priv;
1334   GList *walk;
1335   GstSample *sample;
1336   GstBuffer *buffer;
1337   GstRTSPStream *stream;
1338
1339   sample = gst_app_sink_pull_sample (sink);
1340   if (!sample)
1341     return GST_FLOW_OK;
1342
1343   stream = (GstRTSPStream *) user_data;
1344   priv = stream->priv;
1345   buffer = gst_sample_get_buffer (sample);
1346
1347   g_mutex_lock (&priv->lock);
1348   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1349     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1350
1351     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1352       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1353     } else {
1354       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1355     }
1356   }
1357   g_mutex_unlock (&priv->lock);
1358
1359   gst_sample_unref (sample);
1360
1361   return GST_FLOW_OK;
1362 }
1363
1364 static GstAppSinkCallbacks sink_cb = {
1365   NULL,                         /* not interested in EOS */
1366   NULL,                         /* not interested in preroll samples */
1367   handle_new_sample,
1368 };
1369
1370 /**
1371  * gst_rtsp_stream_join_bin:
1372  * @stream: a #GstRTSPStream
1373  * @bin: a #GstBin to join
1374  * @rtpbin: a rtpbin element in @bin
1375  * @state: the target state of the new elements
1376  *
1377  * Join the #GstBin @bin that contains the element @rtpbin.
1378  *
1379  * @stream will link to @rtpbin, which must be inside @bin. The elements
1380  * added to @bin will be set to the state given in @state.
1381  *
1382  * Returns: %TRUE on success.
1383  */
1384 gboolean
1385 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1386     GstElement * rtpbin, GstState state)
1387 {
1388   GstRTSPStreamPrivate *priv;
1389   gint i;
1390   guint idx;
1391   gchar *name;
1392   GstPad *pad, *sinkpad, *selpad;
1393   GstPadLinkReturn ret;
1394
1395   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1396   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1397   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1398
1399   priv = stream->priv;
1400
1401   g_mutex_lock (&priv->lock);
1402   if (priv->is_joined)
1403     goto was_joined;
1404
1405   /* create a session with the same index as the stream */
1406   idx = priv->idx;
1407
1408   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1409
1410   if (!alloc_ports (stream))
1411     goto no_ports;
1412
1413   /* update the dscp qos field in the sinks */
1414   update_dscp_qos (stream);
1415
1416   /* get a pad for sending RTP */
1417   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1418   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1419   g_free (name);
1420   /* link the RTP pad to the session manager, it should not really fail unless
1421    * this is not really an RTP pad */
1422   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1423   if (ret != GST_PAD_LINK_OK)
1424     goto link_failed;
1425
1426   /* get pads from the RTP session element for sending and receiving
1427    * RTP/RTCP*/
1428   name = g_strdup_printf ("send_rtp_src_%u", idx);
1429   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1430   g_free (name);
1431   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1432   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1433   g_free (name);
1434   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1435   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1436   g_free (name);
1437   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1438   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1439   g_free (name);
1440
1441   /* get the session */
1442   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1443
1444   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1445       stream);
1446   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1447       stream);
1448   g_signal_connect (priv->session, "on-ssrc-active",
1449       (GCallback) on_ssrc_active, stream);
1450   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1451       stream);
1452   g_signal_connect (priv->session, "on-bye-timeout",
1453       (GCallback) on_bye_timeout, stream);
1454   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1455       stream);
1456
1457   for (i = 0; i < 2; i++) {
1458     GstPad *teepad, *queuepad;
1459     /* For the sender we create this bit of pipeline for both
1460      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1461      * we need to add a queue before appsink to make the pipeline
1462      * not block. For the TCP case, we want to pump data to the
1463      * client as fast as possible anyway.
1464      *
1465      * .--------.      .-----.    .---------.
1466      * | rtpbin |      | tee |    | udpsink |
1467      * |       send->sink   src->sink       |
1468      * '--------'      |     |    '---------'
1469      *                 |     |    .---------.    .---------.
1470      *                 |     |    |  queue  |    | appsink |
1471      *                 |    src->sink      src->sink       |
1472      *                 '-----'    '---------'    '---------'
1473      *
1474      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1475      * udpsink directly to the session.
1476      */
1477     /* add udpsink */
1478     gst_bin_add (bin, priv->udpsink[i]);
1479     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1480
1481     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1482       /* make tee for RTP/RTCP */
1483       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1484       gst_bin_add (bin, priv->tee[i]);
1485
1486       /* and link to rtpbin send pad */
1487       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1488       gst_pad_link (priv->send_src[i], pad);
1489       gst_object_unref (pad);
1490
1491       /* link tee to udpsink */
1492       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1493       gst_pad_link (teepad, sinkpad);
1494       gst_object_unref (teepad);
1495
1496       /* make queue */
1497       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1498       gst_bin_add (bin, priv->appqueue[i]);
1499       /* and link to tee */
1500       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1501       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1502       gst_pad_link (teepad, pad);
1503       gst_object_unref (pad);
1504       gst_object_unref (teepad);
1505
1506       /* make appsink */
1507       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1508       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1509       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1510       gst_bin_add (bin, priv->appsink[i]);
1511       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1512           &sink_cb, stream, NULL);
1513       /* and link to queue */
1514       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1515       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1516       gst_pad_link (queuepad, pad);
1517       gst_object_unref (pad);
1518       gst_object_unref (queuepad);
1519     } else {
1520       /* else only udpsink needed, link it to the session */
1521       gst_pad_link (priv->send_src[i], sinkpad);
1522     }
1523     gst_object_unref (sinkpad);
1524
1525     /* For the receiver we create this bit of pipeline for both
1526      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1527      * and it is all funneled into the rtpbin receive pad.
1528      *
1529      * .--------.     .--------.    .--------.
1530      * | udpsrc |     | funnel |    | rtpbin |
1531      * |       src->sink      src->sink      |
1532      * '--------'     |        |    '--------'
1533      * .--------.     |        |
1534      * | appsrc |     |        |
1535      * |       src->sink       |
1536      * '--------'     '--------'
1537      */
1538     /* make funnel for the RTP/RTCP receivers */
1539     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1540     gst_bin_add (bin, priv->funnel[i]);
1541
1542     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1543     gst_pad_link (pad, priv->recv_sink[i]);
1544     gst_object_unref (pad);
1545
1546     if (priv->udpsrc_v4[i]) {
1547       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1548        * values */
1549       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1550       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1551       /* add udpsrc */
1552       gst_bin_add (bin, priv->udpsrc_v4[i]);
1553
1554       /* and link to the funnel v4 */
1555       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1556       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1557       gst_pad_link (pad, selpad);
1558       gst_object_unref (pad);
1559       gst_object_unref (selpad);
1560     }
1561
1562     if (priv->udpsrc_v6[i]) {
1563       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1564       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1565       gst_bin_add (bin, priv->udpsrc_v6[i]);
1566
1567       /* and link to the funnel v6 */
1568       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1569       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1570       gst_pad_link (pad, selpad);
1571       gst_object_unref (pad);
1572       gst_object_unref (selpad);
1573     }
1574
1575     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1576       /* make and add appsrc */
1577       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1578       gst_bin_add (bin, priv->appsrc[i]);
1579       /* and link to the funnel */
1580       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1581       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1582       gst_pad_link (pad, selpad);
1583       gst_object_unref (pad);
1584       gst_object_unref (selpad);
1585     }
1586
1587     /* check if we need to set to a special state */
1588     if (state != GST_STATE_NULL) {
1589       if (priv->udpsink[i])
1590         gst_element_set_state (priv->udpsink[i], state);
1591       if (priv->appsink[i])
1592         gst_element_set_state (priv->appsink[i], state);
1593       if (priv->appqueue[i])
1594         gst_element_set_state (priv->appqueue[i], state);
1595       if (priv->tee[i])
1596         gst_element_set_state (priv->tee[i], state);
1597       if (priv->funnel[i])
1598         gst_element_set_state (priv->funnel[i], state);
1599       if (priv->appsrc[i])
1600         gst_element_set_state (priv->appsrc[i], state);
1601     }
1602   }
1603
1604   /* be notified of caps changes */
1605   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1606       (GCallback) caps_notify, stream);
1607
1608   priv->is_joined = TRUE;
1609   g_mutex_unlock (&priv->lock);
1610
1611   return TRUE;
1612
1613   /* ERRORS */
1614 was_joined:
1615   {
1616     g_mutex_unlock (&priv->lock);
1617     return TRUE;
1618   }
1619 no_ports:
1620   {
1621     g_mutex_unlock (&priv->lock);
1622     GST_WARNING ("failed to allocate ports %u", idx);
1623     return FALSE;
1624   }
1625 link_failed:
1626   {
1627     GST_WARNING ("failed to link stream %u", idx);
1628     gst_object_unref (priv->send_rtp_sink);
1629     priv->send_rtp_sink = NULL;
1630     g_mutex_unlock (&priv->lock);
1631     return FALSE;
1632   }
1633 }
1634
1635 /**
1636  * gst_rtsp_stream_leave_bin:
1637  * @stream: a #GstRTSPStream
1638  * @bin: a #GstBin
1639  * @rtpbin: a rtpbin #GstElement
1640  *
1641  * Remove the elements of @stream from @bin.
1642  *
1643  * Return: %TRUE on success.
1644  */
1645 gboolean
1646 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1647     GstElement * rtpbin)
1648 {
1649   GstRTSPStreamPrivate *priv;
1650   gint i;
1651
1652   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1653   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1654   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1655
1656   priv = stream->priv;
1657
1658   g_mutex_lock (&priv->lock);
1659   if (!priv->is_joined)
1660     goto was_not_joined;
1661
1662   /* all transports must be removed by now */
1663   g_return_val_if_fail (priv->transports == NULL, FALSE);
1664
1665   GST_INFO ("stream %p leaving bin", stream);
1666
1667   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1668   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1669   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1670   gst_object_unref (priv->send_rtp_sink);
1671   priv->send_rtp_sink = NULL;
1672
1673   for (i = 0; i < 2; i++) {
1674     if (priv->udpsink[i])
1675       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1676     if (priv->appsink[i])
1677       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1678     if (priv->appqueue[i])
1679       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1680     if (priv->tee[i])
1681       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1682     if (priv->funnel[i])
1683       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1684     if (priv->appsrc[i])
1685       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1686     if (priv->udpsrc_v4[i]) {
1687       /* and set udpsrc to NULL now before removing */
1688       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1689       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1690       /* removing them should also nicely release the request
1691        * pads when they finalize */
1692       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1693     }
1694     if (priv->udpsrc_v6[i]) {
1695       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1696       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1697       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1698     }
1699     if (priv->udpsink[i])
1700       gst_bin_remove (bin, priv->udpsink[i]);
1701     if (priv->appsrc[i])
1702       gst_bin_remove (bin, priv->appsrc[i]);
1703     if (priv->appsink[i])
1704       gst_bin_remove (bin, priv->appsink[i]);
1705     if (priv->appqueue[i])
1706       gst_bin_remove (bin, priv->appqueue[i]);
1707     if (priv->tee[i])
1708       gst_bin_remove (bin, priv->tee[i]);
1709     if (priv->funnel[i])
1710       gst_bin_remove (bin, priv->funnel[i]);
1711
1712     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1713     gst_object_unref (priv->recv_sink[i]);
1714     priv->recv_sink[i] = NULL;
1715
1716     priv->udpsrc_v4[i] = NULL;
1717     priv->udpsrc_v6[i] = NULL;
1718     priv->udpsink[i] = NULL;
1719     priv->appsrc[i] = NULL;
1720     priv->appsink[i] = NULL;
1721     priv->appqueue[i] = NULL;
1722     priv->tee[i] = NULL;
1723     priv->funnel[i] = NULL;
1724   }
1725   gst_object_unref (priv->send_src[0]);
1726   priv->send_src[0] = NULL;
1727
1728   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1729   gst_object_unref (priv->send_src[1]);
1730   priv->send_src[1] = NULL;
1731
1732   g_object_unref (priv->session);
1733   priv->session = NULL;
1734   if (priv->caps)
1735     gst_caps_unref (priv->caps);
1736   priv->caps = NULL;
1737
1738   priv->is_joined = FALSE;
1739   g_mutex_unlock (&priv->lock);
1740
1741   return TRUE;
1742
1743 was_not_joined:
1744   {
1745     return TRUE;
1746   }
1747 }
1748
1749 /**
1750  * gst_rtsp_stream_get_rtpinfo:
1751  * @stream: a #GstRTSPStream
1752  * @rtptime: (allow-none): result RTP timestamp
1753  * @seq: (allow-none): result RTP seqnum
1754  * @running_time: (allow-none): result running-time
1755  *
1756  * Retrieve the current rtptime, seq and running-time. This is used to
1757  * construct a RTPInfo reply header.
1758  *
1759  * Returns: %TRUE when rtptime, seq and running-time could be determined.
1760  */
1761 gboolean
1762 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1763     guint * rtptime, guint * seq, GstClockTime * running_time)
1764 {
1765   GstRTSPStreamPrivate *priv;
1766   GObjectClass *payobjclass;
1767
1768   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1769
1770   priv = stream->priv;
1771
1772   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1773
1774   if (seq && g_object_class_find_property (payobjclass, "seqnum"))
1775     g_object_get (priv->payloader, "seqnum", seq, NULL);
1776
1777   if (rtptime && g_object_class_find_property (payobjclass, "timestamp"))
1778     g_object_get (priv->payloader, "timestamp", rtptime, NULL);
1779
1780   if (running_time
1781       && g_object_class_find_property (payobjclass, "running-time"))
1782     g_object_get (priv->payloader, "running-time", running_time, NULL);
1783
1784   return TRUE;
1785 }
1786
1787 /**
1788  * gst_rtsp_stream_get_caps:
1789  * @stream: a #GstRTSPStream
1790  *
1791  * Retrieve the current caps of @stream.
1792  *
1793  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1794  *    after usage.
1795  */
1796 GstCaps *
1797 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1798 {
1799   GstRTSPStreamPrivate *priv;
1800   GstCaps *result;
1801
1802   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1803
1804   priv = stream->priv;
1805
1806   g_mutex_lock (&priv->lock);
1807   if ((result = priv->caps))
1808     gst_caps_ref (result);
1809   g_mutex_unlock (&priv->lock);
1810
1811   return result;
1812 }
1813
1814 /**
1815  * gst_rtsp_stream_recv_rtp:
1816  * @stream: a #GstRTSPStream
1817  * @buffer: (transfer full): a #GstBuffer
1818  *
1819  * Handle an RTP buffer for the stream. This method is usually called when a
1820  * message has been received from a client using the TCP transport.
1821  *
1822  * This function takes ownership of @buffer.
1823  *
1824  * Returns: a GstFlowReturn.
1825  */
1826 GstFlowReturn
1827 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1828 {
1829   GstRTSPStreamPrivate *priv;
1830   GstFlowReturn ret;
1831   GstElement *element;
1832
1833   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1834   priv = stream->priv;
1835   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1836   g_return_val_if_fail (priv->is_joined, FALSE);
1837
1838   g_mutex_lock (&priv->lock);
1839   if (priv->appsrc[0])
1840     element = gst_object_ref (priv->appsrc[0]);
1841   else
1842     element = NULL;
1843   g_mutex_unlock (&priv->lock);
1844
1845   if (element) {
1846     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1847     gst_object_unref (element);
1848   } else {
1849     ret = GST_FLOW_OK;
1850   }
1851   return ret;
1852 }
1853
1854 /**
1855  * gst_rtsp_stream_recv_rtcp:
1856  * @stream: a #GstRTSPStream
1857  * @buffer: (transfer full): a #GstBuffer
1858  *
1859  * Handle an RTCP buffer for the stream. This method is usually called when a
1860  * message has been received from a client using the TCP transport.
1861  *
1862  * This function takes ownership of @buffer.
1863  *
1864  * Returns: a GstFlowReturn.
1865  */
1866 GstFlowReturn
1867 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1868 {
1869   GstRTSPStreamPrivate *priv;
1870   GstFlowReturn ret;
1871   GstElement *element;
1872
1873   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1874   priv = stream->priv;
1875   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1876   g_return_val_if_fail (priv->is_joined, FALSE);
1877
1878   g_mutex_lock (&priv->lock);
1879   if (priv->appsrc[1])
1880     element = gst_object_ref (priv->appsrc[1]);
1881   else
1882     element = NULL;
1883   g_mutex_unlock (&priv->lock);
1884
1885   if (element) {
1886     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1887     gst_object_unref (element);
1888   } else {
1889     ret = GST_FLOW_OK;
1890   }
1891   return ret;
1892 }
1893
1894 /* must be called with lock */
1895 static gboolean
1896 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1897     gboolean add)
1898 {
1899   GstRTSPStreamPrivate *priv = stream->priv;
1900   const GstRTSPTransport *tr;
1901
1902   tr = gst_rtsp_stream_transport_get_transport (trans);
1903
1904   switch (tr->lower_transport) {
1905     case GST_RTSP_LOWER_TRANS_UDP:
1906     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1907     {
1908       gchar *dest;
1909       gint min, max;
1910       guint ttl = 0;
1911
1912       dest = tr->destination;
1913       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1914         min = tr->port.min;
1915         max = tr->port.max;
1916         ttl = tr->ttl;
1917       } else {
1918         min = tr->client_port.min;
1919         max = tr->client_port.max;
1920       }
1921
1922       if (add) {
1923         GST_INFO ("adding %s:%d-%d", dest, min, max);
1924         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1925         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1926         if (ttl > 0) {
1927           GST_INFO ("setting ttl-mc %d", ttl);
1928           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1929           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1930         }
1931         priv->transports = g_list_prepend (priv->transports, trans);
1932       } else {
1933         GST_INFO ("removing %s:%d-%d", dest, min, max);
1934         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1935         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1936         priv->transports = g_list_remove (priv->transports, trans);
1937       }
1938       break;
1939     }
1940     case GST_RTSP_LOWER_TRANS_TCP:
1941       if (add) {
1942         GST_INFO ("adding TCP %s", tr->destination);
1943         priv->transports = g_list_prepend (priv->transports, trans);
1944       } else {
1945         GST_INFO ("removing TCP %s", tr->destination);
1946         priv->transports = g_list_remove (priv->transports, trans);
1947       }
1948       break;
1949     default:
1950       goto unknown_transport;
1951   }
1952   return TRUE;
1953
1954   /* ERRORS */
1955 unknown_transport:
1956   {
1957     GST_INFO ("Unknown transport %d", tr->lower_transport);
1958     return FALSE;
1959   }
1960 }
1961
1962
1963 /**
1964  * gst_rtsp_stream_add_transport:
1965  * @stream: a #GstRTSPStream
1966  * @trans: a #GstRTSPStreamTransport
1967  *
1968  * Add the transport in @trans to @stream. The media of @stream will
1969  * then also be send to the values configured in @trans.
1970  *
1971  * @stream must be joined to a bin.
1972  *
1973  * @trans must contain a valid #GstRTSPTransport.
1974  *
1975  * Returns: %TRUE if @trans was added
1976  */
1977 gboolean
1978 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1979     GstRTSPStreamTransport * trans)
1980 {
1981   GstRTSPStreamPrivate *priv;
1982   gboolean res;
1983
1984   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1985   priv = stream->priv;
1986   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1987   g_return_val_if_fail (priv->is_joined, FALSE);
1988
1989   g_mutex_lock (&priv->lock);
1990   res = update_transport (stream, trans, TRUE);
1991   g_mutex_unlock (&priv->lock);
1992
1993   return res;
1994 }
1995
1996 /**
1997  * gst_rtsp_stream_remove_transport:
1998  * @stream: a #GstRTSPStream
1999  * @trans: a #GstRTSPStreamTransport
2000  *
2001  * Remove the transport in @trans from @stream. The media of @stream will
2002  * not be sent to the values configured in @trans.
2003  *
2004  * @stream must be joined to a bin.
2005  *
2006  * @trans must contain a valid #GstRTSPTransport.
2007  *
2008  * Returns: %TRUE if @trans was removed
2009  */
2010 gboolean
2011 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2012     GstRTSPStreamTransport * trans)
2013 {
2014   GstRTSPStreamPrivate *priv;
2015   gboolean res;
2016
2017   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2018   priv = stream->priv;
2019   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2020   g_return_val_if_fail (priv->is_joined, FALSE);
2021
2022   g_mutex_lock (&priv->lock);
2023   res = update_transport (stream, trans, FALSE);
2024   g_mutex_unlock (&priv->lock);
2025
2026   return res;
2027 }
2028
2029 /**
2030  * gst_rtsp_stream_get_rtp_socket:
2031  * @stream: a #GstRTSPStream
2032  * @family: the socket family
2033  *
2034  * Get the RTP socket from @stream for a @family.
2035  *
2036  * @stream must be joined to a bin.
2037  *
2038  * Returns: the RTP socket or %NULL if no socket could be allocated for @family.
2039  *     Unref after usage
2040  */
2041 GSocket *
2042 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2043 {
2044   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2045   GSocket *socket;
2046   gchar *name;
2047
2048   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2049   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2050       family == G_SOCKET_FAMILY_IPV6, NULL);
2051   g_return_val_if_fail (priv->udpsink[0], NULL);
2052
2053   if (family == G_SOCKET_FAMILY_IPV6)
2054     name = "socket-v6";
2055   else
2056     name = "socket";
2057
2058   g_object_get (priv->udpsink[0], name, &socket, NULL);
2059
2060   return socket;
2061 }
2062
2063 /**
2064  * gst_rtsp_stream_get_rtcp_socket:
2065  * @stream: a #GstRTSPStream
2066  * @family: the socket family
2067  *
2068  * Get the RTCP socket from @stream for a @family.
2069  *
2070  * @stream must be joined to a bin.
2071  *
2072  * Returns: the RTCP socket or %NULL if no socket could be allocated for
2073  *     @family. Unref after usage
2074  */
2075 GSocket *
2076 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2077 {
2078   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2079   GSocket *socket;
2080   gchar *name;
2081
2082   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2083   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2084       family == G_SOCKET_FAMILY_IPV6, NULL);
2085   g_return_val_if_fail (priv->udpsink[1], NULL);
2086
2087   if (family == G_SOCKET_FAMILY_IPV6)
2088     name = "socket-v6";
2089   else
2090     name = "socket";
2091
2092   g_object_get (priv->udpsink[1], name, &socket, NULL);
2093
2094   return socket;
2095 }
2096
2097 /**
2098  * gst_rtsp_stream_transport_filter:
2099  * @stream: a #GstRTSPStream
2100  * @func: (scope call) (allow-none): a callback
2101  * @user_data: user data passed to @func
2102  *
2103  * Call @func for each transport managed by @stream. The result value of @func
2104  * determines what happens to the transport. @func will be called with @stream
2105  * locked so no further actions on @stream can be performed from @func.
2106  *
2107  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2108  * @stream.
2109  *
2110  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2111  *
2112  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2113  * will also be added with an additional ref to the result #GList of this
2114  * function..
2115  *
2116  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2117  *
2118  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2119  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2120  * element in the #GList should be unreffed before the list is freed.
2121  */
2122 GList *
2123 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2124     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2125 {
2126   GstRTSPStreamPrivate *priv;
2127   GList *result, *walk, *next;
2128
2129   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2130
2131   priv = stream->priv;
2132
2133   result = NULL;
2134
2135   g_mutex_lock (&priv->lock);
2136   for (walk = priv->transports; walk; walk = next) {
2137     GstRTSPStreamTransport *trans = walk->data;
2138     GstRTSPFilterResult res;
2139
2140     next = g_list_next (walk);
2141
2142     if (func)
2143       res = func (stream, trans, user_data);
2144     else
2145       res = GST_RTSP_FILTER_REF;
2146
2147     switch (res) {
2148       case GST_RTSP_FILTER_REMOVE:
2149         update_transport (stream, trans, FALSE);
2150         break;
2151       case GST_RTSP_FILTER_REF:
2152         result = g_list_prepend (result, g_object_ref (trans));
2153         break;
2154       case GST_RTSP_FILTER_KEEP:
2155       default:
2156         break;
2157     }
2158   }
2159   g_mutex_unlock (&priv->lock);
2160
2161   return result;
2162 }
2163
2164 static GstPadProbeReturn
2165 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2166 {
2167   GstRTSPStreamPrivate *priv;
2168   GstRTSPStream *stream;
2169
2170   stream = user_data;
2171   priv = stream->priv;
2172
2173   GST_DEBUG_OBJECT (pad, "now blocking");
2174
2175   g_mutex_lock (&priv->lock);
2176   priv->blocking = TRUE;
2177   g_mutex_unlock (&priv->lock);
2178
2179   gst_element_post_message (priv->payloader,
2180       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2181           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2182
2183   return GST_PAD_PROBE_OK;
2184 }
2185
2186 /**
2187  * gst_rtsp_stream_set_blocked:
2188  * @stream: a #GstRTSPStream
2189  * @blocked: boolean indicating we should block or unblock
2190  *
2191  * Blocks or unblocks the dataflow on @stream.
2192  *
2193  * Returns: %TRUE on success
2194  */
2195 gboolean
2196 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2197 {
2198   GstRTSPStreamPrivate *priv;
2199
2200   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2201
2202   priv = stream->priv;
2203
2204   g_mutex_lock (&priv->lock);
2205   if (blocked) {
2206     priv->blocking = FALSE;
2207     if (priv->blocked_id == 0) {
2208       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2209           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2210           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2211           g_object_ref (stream), g_object_unref);
2212     }
2213   } else {
2214     if (priv->blocked_id != 0) {
2215       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2216       priv->blocked_id = 0;
2217       priv->blocking = FALSE;
2218     }
2219   }
2220   g_mutex_unlock (&priv->lock);
2221
2222   return TRUE;
2223 }
2224
2225 /**
2226  * gst_rtsp_stream_is_blocking:
2227  * @stream: a #GstRTSPStream
2228  *
2229  * Check if @stream is blocking on a #GstBuffer.
2230  *
2231  * Returns: %TRUE if @stream is blocking
2232  */
2233 gboolean
2234 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2235 {
2236   GstRTSPStreamPrivate *priv;
2237   gboolean result;
2238
2239   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2240
2241   priv = stream->priv;
2242
2243   g_mutex_lock (&priv->lock);
2244   result = priv->blocking;
2245   g_mutex_unlock (&priv->lock);
2246
2247   return result;
2248 }