ce2f5a4dd70ab66ed96a404a4851c26bdddbd52e
[platform/upstream/gst-rtsp-server.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83   gchar *control;
84
85   GstRTSPProfile profiles;
86   GstRTSPLowerTrans protocols;
87
88   /* pads on the rtpbin */
89   GstPad *send_rtp_sink;
90   GstPad *recv_rtp_src;
91   GstPad *recv_sink[2];
92   GstPad *send_src[2];
93
94   /* the RTPSession object */
95   GObject *session;
96
97   /* SRTP encoder/decoder */
98   GstElement *srtpenc;
99   GstElement *srtpdec;
100   GHashTable *keys;
101
102   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
103    * sockets */
104   GstElement *udpsrc_v4[2];
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
107    * sockets */
108   GstElement *udpsrc_v6[2];
109
110   GstElement *udpqueue[2];
111   GstElement *udpsink[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   guint rtx_pt;
125   GstClockTime rtx_time;
126
127   /* server ports for sending/receiving over ipv4 */
128   GstRTSPRange server_port_v4;
129   GstRTSPAddress *server_addr_v4;
130   gboolean have_ipv4;
131
132   /* server ports for sending/receiving over ipv6 */
133   GstRTSPRange server_port_v6;
134   GstRTSPAddress *server_addr_v6;
135   gboolean have_ipv6;
136
137   /* multicast addresses */
138   GstRTSPAddressPool *pool;
139   GstRTSPAddress *addr_v4;
140   GstRTSPAddress *addr_v6;
141
142   /* the caps of the stream */
143   gulong caps_sig;
144   GstCaps *caps;
145
146   /* transports we stream to */
147   guint n_active;
148   GList *transports;
149   guint transports_cookie;
150   GList *tr_cache_rtp;
151   GList *tr_cache_rtcp;
152   guint tr_cache_cookie_rtp;
153   guint tr_cache_cookie_rtcp;
154
155
156   /* UDP sources for UDP multicast transports */
157   GList *transport_sources;
158
159   gint dscp_qos;
160
161   /* stream blocking */
162   gulong blocked_id;
163   gboolean blocking;
164
165   /* pt->caps map for RECORD streams */
166   GHashTable *ptmap;
167 };
168
169 #define DEFAULT_CONTROL         NULL
170 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
171 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
172                                         GST_RTSP_LOWER_TRANS_TCP
173
174 enum
175 {
176   PROP_0,
177   PROP_CONTROL,
178   PROP_PROFILES,
179   PROP_PROTOCOLS,
180   PROP_LAST
181 };
182
183 enum
184 {
185   SIGNAL_NEW_RTP_ENCODER,
186   SIGNAL_NEW_RTCP_ENCODER,
187   SIGNAL_RTCP_STATS,
188   SIGNAL_LAST
189 };
190
191 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
192 #define GST_CAT_DEFAULT rtsp_stream_debug
193
194 static GQuark ssrc_stream_map_key;
195
196 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
197     GValue * value, GParamSpec * pspec);
198 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
199     const GValue * value, GParamSpec * pspec);
200
201 static void gst_rtsp_stream_finalize (GObject * obj);
202
203 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
204
205 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
206
207 static void
208 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
209 {
210   GObjectClass *gobject_class;
211
212   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
213
214   gobject_class = G_OBJECT_CLASS (klass);
215
216   gobject_class->get_property = gst_rtsp_stream_get_property;
217   gobject_class->set_property = gst_rtsp_stream_set_property;
218   gobject_class->finalize = gst_rtsp_stream_finalize;
219
220   g_object_class_install_property (gobject_class, PROP_CONTROL,
221       g_param_spec_string ("control", "Control",
222           "The control string for this stream", DEFAULT_CONTROL,
223           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
224
225   g_object_class_install_property (gobject_class, PROP_PROFILES,
226       g_param_spec_flags ("profiles", "Profiles",
227           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
228           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
229
230   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
231       g_param_spec_flags ("protocols", "Protocols",
232           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
233           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
234
235   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
236       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
237       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
238       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
239
240   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
241       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
242       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
243       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
244
245   gst_rtsp_stream_signals[SIGNAL_RTCP_STATS] =
246       g_signal_new ("rtcp-statistics", G_TYPE_FROM_CLASS (klass),
247       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
248       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE);
249
250   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
251
252   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
253 }
254
255 static void
256 gst_rtsp_stream_init (GstRTSPStream * stream)
257 {
258   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
259
260   GST_DEBUG ("new stream %p", stream);
261
262   stream->priv = priv;
263
264   priv->dscp_qos = -1;
265   priv->control = g_strdup (DEFAULT_CONTROL);
266   priv->profiles = DEFAULT_PROFILES;
267   priv->protocols = DEFAULT_PROTOCOLS;
268
269   g_mutex_init (&priv->lock);
270
271   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
272       NULL, (GDestroyNotify) gst_caps_unref);
273   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
274       (GDestroyNotify) gst_caps_unref);
275 }
276
277 static void
278 gst_rtsp_stream_finalize (GObject * obj)
279 {
280   GstRTSPStream *stream;
281   GstRTSPStreamPrivate *priv;
282
283   stream = GST_RTSP_STREAM (obj);
284   priv = stream->priv;
285
286   GST_DEBUG ("finalize stream %p", stream);
287
288   /* we really need to be unjoined now */
289   g_return_if_fail (!priv->is_joined);
290
291   if (priv->addr_v4)
292     gst_rtsp_address_free (priv->addr_v4);
293   if (priv->addr_v6)
294     gst_rtsp_address_free (priv->addr_v6);
295   if (priv->server_addr_v4)
296     gst_rtsp_address_free (priv->server_addr_v4);
297   if (priv->server_addr_v6)
298     gst_rtsp_address_free (priv->server_addr_v6);
299   if (priv->pool)
300     g_object_unref (priv->pool);
301   if (priv->rtxsend)
302     g_object_unref (priv->rtxsend);
303
304   gst_object_unref (priv->payloader);
305   if (priv->srcpad)
306     gst_object_unref (priv->srcpad);
307   if (priv->sinkpad)
308     gst_object_unref (priv->sinkpad);
309   g_free (priv->control);
310   g_mutex_clear (&priv->lock);
311
312   g_hash_table_unref (priv->keys);
313   g_hash_table_destroy (priv->ptmap);
314
315   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
316 }
317
318 static void
319 gst_rtsp_stream_get_property (GObject * object, guint propid,
320     GValue * value, GParamSpec * pspec)
321 {
322   GstRTSPStream *stream = GST_RTSP_STREAM (object);
323
324   switch (propid) {
325     case PROP_CONTROL:
326       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
327       break;
328     case PROP_PROFILES:
329       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
330       break;
331     case PROP_PROTOCOLS:
332       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
333       break;
334     default:
335       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
336   }
337 }
338
339 static void
340 gst_rtsp_stream_set_property (GObject * object, guint propid,
341     const GValue * value, GParamSpec * pspec)
342 {
343   GstRTSPStream *stream = GST_RTSP_STREAM (object);
344
345   switch (propid) {
346     case PROP_CONTROL:
347       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
348       break;
349     case PROP_PROFILES:
350       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
351       break;
352     case PROP_PROTOCOLS:
353       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
354       break;
355     default:
356       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
357   }
358 }
359
360 /**
361  * gst_rtsp_stream_new:
362  * @idx: an index
363  * @pad: a #GstPad
364  * @payloader: a #GstElement
365  *
366  * Create a new media stream with index @idx that handles RTP data on
367  * @pad and has a payloader element @payloader if @pad is a source pad
368  * or a depayloader element @payloader if @pad is a sink pad.
369  *
370  * Returns: (transfer full): a new #GstRTSPStream
371  */
372 GstRTSPStream *
373 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
374 {
375   GstRTSPStreamPrivate *priv;
376   GstRTSPStream *stream;
377
378   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
379   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
380
381   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
382   priv = stream->priv;
383   priv->idx = idx;
384   priv->payloader = gst_object_ref (payloader);
385   if (GST_PAD_IS_SRC (pad))
386     priv->srcpad = gst_object_ref (pad);
387   else
388     priv->sinkpad = gst_object_ref (pad);
389
390   return stream;
391 }
392
393 /**
394  * gst_rtsp_stream_get_index:
395  * @stream: a #GstRTSPStream
396  *
397  * Get the stream index.
398  *
399  * Return: the stream index.
400  */
401 guint
402 gst_rtsp_stream_get_index (GstRTSPStream * stream)
403 {
404   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
405
406   return stream->priv->idx;
407 }
408
409 /**
410  * gst_rtsp_stream_get_pt:
411  * @stream: a #GstRTSPStream
412  *
413  * Get the stream payload type.
414  *
415  * Return: the stream payload type.
416  */
417 guint
418 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
419 {
420   GstRTSPStreamPrivate *priv;
421   guint pt;
422
423   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
424
425   priv = stream->priv;
426
427   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
428
429   return pt;
430 }
431
432 /**
433  * gst_rtsp_stream_get_srcpad:
434  * @stream: a #GstRTSPStream
435  *
436  * Get the srcpad associated with @stream.
437  *
438  * Returns: (transfer full): the srcpad. Unref after usage.
439  */
440 GstPad *
441 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
442 {
443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
444
445   if (!stream->priv->srcpad)
446     return NULL;
447
448   return gst_object_ref (stream->priv->srcpad);
449 }
450
451 /**
452  * gst_rtsp_stream_get_sinkpad:
453  * @stream: a #GstRTSPStream
454  *
455  * Get the sinkpad associated with @stream.
456  *
457  * Returns: (transfer full): the sinkpad. Unref after usage.
458  */
459 GstPad *
460 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
461 {
462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
463
464   if (!stream->priv->sinkpad)
465     return NULL;
466
467   return gst_object_ref (stream->priv->sinkpad);
468 }
469
470 /**
471  * gst_rtsp_stream_get_control:
472  * @stream: a #GstRTSPStream
473  *
474  * Get the control string to identify this stream.
475  *
476  * Returns: (transfer full): the control string. g_free() after usage.
477  */
478 gchar *
479 gst_rtsp_stream_get_control (GstRTSPStream * stream)
480 {
481   GstRTSPStreamPrivate *priv;
482   gchar *result;
483
484   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
485
486   priv = stream->priv;
487
488   g_mutex_lock (&priv->lock);
489   if ((result = g_strdup (priv->control)) == NULL)
490     result = g_strdup_printf ("stream=%u", priv->idx);
491   g_mutex_unlock (&priv->lock);
492
493   return result;
494 }
495
496 /**
497  * gst_rtsp_stream_set_control:
498  * @stream: a #GstRTSPStream
499  * @control: a control string
500  *
501  * Set the control string in @stream.
502  */
503 void
504 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
505 {
506   GstRTSPStreamPrivate *priv;
507
508   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
509
510   priv = stream->priv;
511
512   g_mutex_lock (&priv->lock);
513   g_free (priv->control);
514   priv->control = g_strdup (control);
515   g_mutex_unlock (&priv->lock);
516 }
517
518 /**
519  * gst_rtsp_stream_has_control:
520  * @stream: a #GstRTSPStream
521  * @control: a control string
522  *
523  * Check if @stream has the control string @control.
524  *
525  * Returns: %TRUE is @stream has @control as the control string
526  */
527 gboolean
528 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
529 {
530   GstRTSPStreamPrivate *priv;
531   gboolean res;
532
533   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
534
535   priv = stream->priv;
536
537   g_mutex_lock (&priv->lock);
538   if (priv->control)
539     res = (g_strcmp0 (priv->control, control) == 0);
540   else {
541     guint streamid;
542
543     if (sscanf (control, "stream=%u", &streamid) > 0)
544       res = (streamid == priv->idx);
545     else
546       res = FALSE;
547   }
548   g_mutex_unlock (&priv->lock);
549
550   return res;
551 }
552
553 /**
554  * gst_rtsp_stream_set_mtu:
555  * @stream: a #GstRTSPStream
556  * @mtu: a new MTU
557  *
558  * Configure the mtu in the payloader of @stream to @mtu.
559  */
560 void
561 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
562 {
563   GstRTSPStreamPrivate *priv;
564
565   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
566
567   priv = stream->priv;
568
569   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
570
571   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
572 }
573
574 /**
575  * gst_rtsp_stream_get_mtu:
576  * @stream: a #GstRTSPStream
577  *
578  * Get the configured MTU in the payloader of @stream.
579  *
580  * Returns: the MTU of the payloader.
581  */
582 guint
583 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
584 {
585   GstRTSPStreamPrivate *priv;
586   guint mtu;
587
588   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
589
590   priv = stream->priv;
591
592   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
593
594   return mtu;
595 }
596
597 /* Update the dscp qos property on the udp sinks */
598 static void
599 update_dscp_qos (GstRTSPStream * stream)
600 {
601   GstRTSPStreamPrivate *priv;
602
603   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
604
605   priv = stream->priv;
606
607   if (priv->udpsink[0]) {
608     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
609         NULL);
610   }
611
612   if (priv->udpsink[1]) {
613     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
614         NULL);
615   }
616 }
617
618 /**
619  * gst_rtsp_stream_set_dscp_qos:
620  * @stream: a #GstRTSPStream
621  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
622  *
623  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
624  */
625 void
626 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
627 {
628   GstRTSPStreamPrivate *priv;
629
630   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
631
632   priv = stream->priv;
633
634   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
635
636   if (dscp_qos < -1 || dscp_qos > 63) {
637     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
638     return;
639   }
640
641   priv->dscp_qos = dscp_qos;
642
643   update_dscp_qos (stream);
644 }
645
646 /**
647  * gst_rtsp_stream_get_dscp_qos:
648  * @stream: a #GstRTSPStream
649  *
650  * Get the configured DSCP QoS in of the outgoing sockets.
651  *
652  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
653  */
654 gint
655 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
656 {
657   GstRTSPStreamPrivate *priv;
658
659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
660
661   priv = stream->priv;
662
663   return priv->dscp_qos;
664 }
665
666 /**
667  * gst_rtsp_stream_is_transport_supported:
668  * @stream: a #GstRTSPStream
669  * @transport: (transfer none): a #GstRTSPTransport
670  *
671  * Check if @transport can be handled by stream
672  *
673  * Returns: %TRUE if @transport can be handled by @stream.
674  */
675 gboolean
676 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
677     GstRTSPTransport * transport)
678 {
679   GstRTSPStreamPrivate *priv;
680
681   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
682
683   priv = stream->priv;
684
685   g_mutex_lock (&priv->lock);
686   if (transport->trans != GST_RTSP_TRANS_RTP)
687     goto unsupported_transmode;
688
689   if (!(transport->profile & priv->profiles))
690     goto unsupported_profile;
691
692   if (!(transport->lower_transport & priv->protocols))
693     goto unsupported_ltrans;
694
695   g_mutex_unlock (&priv->lock);
696
697   return TRUE;
698
699   /* ERRORS */
700 unsupported_transmode:
701   {
702     GST_DEBUG ("unsupported transport mode %d", transport->trans);
703     g_mutex_unlock (&priv->lock);
704     return FALSE;
705   }
706 unsupported_profile:
707   {
708     GST_DEBUG ("unsupported profile %d", transport->profile);
709     g_mutex_unlock (&priv->lock);
710     return FALSE;
711   }
712 unsupported_ltrans:
713   {
714     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
715     g_mutex_unlock (&priv->lock);
716     return FALSE;
717   }
718 }
719
720 /**
721  * gst_rtsp_stream_set_profiles:
722  * @stream: a #GstRTSPStream
723  * @profiles: the new profiles
724  *
725  * Configure the allowed profiles for @stream.
726  */
727 void
728 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
729 {
730   GstRTSPStreamPrivate *priv;
731
732   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
733
734   priv = stream->priv;
735
736   g_mutex_lock (&priv->lock);
737   priv->profiles = profiles;
738   g_mutex_unlock (&priv->lock);
739 }
740
741 /**
742  * gst_rtsp_stream_get_profiles:
743  * @stream: a #GstRTSPStream
744  *
745  * Get the allowed profiles of @stream.
746  *
747  * Returns: a #GstRTSPProfile
748  */
749 GstRTSPProfile
750 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
751 {
752   GstRTSPStreamPrivate *priv;
753   GstRTSPProfile res;
754
755   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
756
757   priv = stream->priv;
758
759   g_mutex_lock (&priv->lock);
760   res = priv->profiles;
761   g_mutex_unlock (&priv->lock);
762
763   return res;
764 }
765
766 /**
767  * gst_rtsp_stream_set_protocols:
768  * @stream: a #GstRTSPStream
769  * @protocols: the new flags
770  *
771  * Configure the allowed lower transport for @stream.
772  */
773 void
774 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
775     GstRTSPLowerTrans protocols)
776 {
777   GstRTSPStreamPrivate *priv;
778
779   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
780
781   priv = stream->priv;
782
783   g_mutex_lock (&priv->lock);
784   priv->protocols = protocols;
785   g_mutex_unlock (&priv->lock);
786 }
787
788 /**
789  * gst_rtsp_stream_get_protocols:
790  * @stream: a #GstRTSPStream
791  *
792  * Get the allowed protocols of @stream.
793  *
794  * Returns: a #GstRTSPLowerTrans
795  */
796 GstRTSPLowerTrans
797 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
798 {
799   GstRTSPStreamPrivate *priv;
800   GstRTSPLowerTrans res;
801
802   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
803       GST_RTSP_LOWER_TRANS_UNKNOWN);
804
805   priv = stream->priv;
806
807   g_mutex_lock (&priv->lock);
808   res = priv->protocols;
809   g_mutex_unlock (&priv->lock);
810
811   return res;
812 }
813
814 /**
815  * gst_rtsp_stream_set_address_pool:
816  * @stream: a #GstRTSPStream
817  * @pool: (transfer none): a #GstRTSPAddressPool
818  *
819  * configure @pool to be used as the address pool of @stream.
820  */
821 void
822 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
823     GstRTSPAddressPool * pool)
824 {
825   GstRTSPStreamPrivate *priv;
826   GstRTSPAddressPool *old;
827
828   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
829
830   priv = stream->priv;
831
832   GST_LOG_OBJECT (stream, "set address pool %p", pool);
833
834   g_mutex_lock (&priv->lock);
835   if ((old = priv->pool) != pool)
836     priv->pool = pool ? g_object_ref (pool) : NULL;
837   else
838     old = NULL;
839   g_mutex_unlock (&priv->lock);
840
841   if (old)
842     g_object_unref (old);
843 }
844
845 /**
846  * gst_rtsp_stream_get_address_pool:
847  * @stream: a #GstRTSPStream
848  *
849  * Get the #GstRTSPAddressPool used as the address pool of @stream.
850  *
851  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
852  * usage.
853  */
854 GstRTSPAddressPool *
855 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
856 {
857   GstRTSPStreamPrivate *priv;
858   GstRTSPAddressPool *result;
859
860   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
861
862   priv = stream->priv;
863
864   g_mutex_lock (&priv->lock);
865   if ((result = priv->pool))
866     g_object_ref (result);
867   g_mutex_unlock (&priv->lock);
868
869   return result;
870 }
871
872 /**
873  * gst_rtsp_stream_get_multicast_address:
874  * @stream: a #GstRTSPStream
875  * @family: the #GSocketFamily
876  *
877  * Get the multicast address of @stream for @family.
878  *
879  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
880  * or %NULL when no address could be allocated. gst_rtsp_address_free()
881  * after usage.
882  */
883 GstRTSPAddress *
884 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
885     GSocketFamily family)
886 {
887   GstRTSPStreamPrivate *priv;
888   GstRTSPAddress *result;
889   GstRTSPAddress **addrp;
890   GstRTSPAddressFlags flags;
891
892   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
893
894   priv = stream->priv;
895
896   if (family == G_SOCKET_FAMILY_IPV6) {
897     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
898     addrp = &priv->addr_v6;
899   } else {
900     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
901     addrp = &priv->addr_v4;
902   }
903
904   g_mutex_lock (&priv->lock);
905   if (*addrp == NULL) {
906     if (priv->pool == NULL)
907       goto no_pool;
908
909     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
910
911     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
912     if (*addrp == NULL)
913       goto no_address;
914   }
915   result = gst_rtsp_address_copy (*addrp);
916   g_mutex_unlock (&priv->lock);
917
918   return result;
919
920   /* ERRORS */
921 no_pool:
922   {
923     GST_ERROR_OBJECT (stream, "no address pool specified");
924     g_mutex_unlock (&priv->lock);
925     return NULL;
926   }
927 no_address:
928   {
929     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
930     g_mutex_unlock (&priv->lock);
931     return NULL;
932   }
933 }
934
935 /**
936  * gst_rtsp_stream_reserve_address:
937  * @stream: a #GstRTSPStream
938  * @address: an address
939  * @port: a port
940  * @n_ports: n_ports
941  * @ttl: a TTL
942  *
943  * Reserve @address and @port as the address and port of @stream.
944  *
945  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
946  * the address could be reserved. gst_rtsp_address_free() after usage.
947  */
948 GstRTSPAddress *
949 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
950     const gchar * address, guint port, guint n_ports, guint ttl)
951 {
952   GstRTSPStreamPrivate *priv;
953   GstRTSPAddress *result;
954   GInetAddress *addr;
955   GSocketFamily family;
956   GstRTSPAddress **addrp;
957
958   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
959   g_return_val_if_fail (address != NULL, NULL);
960   g_return_val_if_fail (port > 0, NULL);
961   g_return_val_if_fail (n_ports > 0, NULL);
962   g_return_val_if_fail (ttl > 0, NULL);
963
964   priv = stream->priv;
965
966   addr = g_inet_address_new_from_string (address);
967   if (!addr) {
968     GST_ERROR ("failed to get inet addr from %s", address);
969     family = G_SOCKET_FAMILY_IPV4;
970   } else {
971     family = g_inet_address_get_family (addr);
972     g_object_unref (addr);
973   }
974
975   if (family == G_SOCKET_FAMILY_IPV6)
976     addrp = &priv->addr_v6;
977   else
978     addrp = &priv->addr_v4;
979
980   g_mutex_lock (&priv->lock);
981   if (*addrp == NULL) {
982     GstRTSPAddressPoolResult res;
983
984     if (priv->pool == NULL)
985       goto no_pool;
986
987     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
988         port, n_ports, ttl, addrp);
989     if (res != GST_RTSP_ADDRESS_POOL_OK)
990       goto no_address;
991   } else {
992     if (strcmp ((*addrp)->address, address) ||
993         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
994         (*addrp)->ttl != ttl)
995       goto different_address;
996   }
997   result = gst_rtsp_address_copy (*addrp);
998   g_mutex_unlock (&priv->lock);
999
1000   return result;
1001
1002   /* ERRORS */
1003 no_pool:
1004   {
1005     GST_ERROR_OBJECT (stream, "no address pool specified");
1006     g_mutex_unlock (&priv->lock);
1007     return NULL;
1008   }
1009 no_address:
1010   {
1011     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1012         address);
1013     g_mutex_unlock (&priv->lock);
1014     return NULL;
1015   }
1016 different_address:
1017   {
1018     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1019         " reserved", address);
1020     g_mutex_unlock (&priv->lock);
1021     return NULL;
1022   }
1023 }
1024
1025 static gboolean
1026 alloc_ports_one_family (GstRTSPStream * stream, GstRTSPAddressPool * pool,
1027     gint buffer_size, GSocketFamily family, GstElement * udpsrc_out[2],
1028     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
1029     GstRTSPAddress ** server_addr_out)
1030 {
1031   GstRTSPStreamPrivate *priv = stream->priv;
1032   GstStateChangeReturn ret;
1033   GstElement *udpsrc0, *udpsrc1;
1034   GstElement *udpsink0, *udpsink1;
1035   GSocket *rtp_socket = NULL;
1036   GSocket *rtcp_socket;
1037   gint tmp_rtp, tmp_rtcp;
1038   guint count;
1039   gint rtpport, rtcpport;
1040   GList *rejected_addresses = NULL;
1041   GstRTSPAddress *addr = NULL;
1042   GInetAddress *inetaddr = NULL;
1043   GSocketAddress *rtp_sockaddr = NULL;
1044   GSocketAddress *rtcp_sockaddr = NULL;
1045   const gchar *multisink_socket;
1046
1047   if (family == G_SOCKET_FAMILY_IPV6)
1048     multisink_socket = "socket-v6";
1049   else
1050     multisink_socket = "socket";
1051
1052   udpsrc0 = NULL;
1053   udpsrc1 = NULL;
1054   udpsink0 = NULL;
1055   udpsink1 = NULL;
1056   count = 0;
1057
1058   /* Start with random port */
1059   tmp_rtp = 0;
1060
1061   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1062       G_SOCKET_PROTOCOL_UDP, NULL);
1063   if (!rtcp_socket)
1064     goto no_udp_protocol;
1065
1066   if (*server_addr_out)
1067     gst_rtsp_address_free (*server_addr_out);
1068
1069   /* try to allocate 2 UDP ports, the RTP port should be an even
1070    * number and the RTCP port should be the next (uneven) port */
1071 again:
1072
1073   if (rtp_socket == NULL) {
1074     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1075         G_SOCKET_PROTOCOL_UDP, NULL);
1076     if (!rtp_socket)
1077       goto no_udp_protocol;
1078   }
1079
1080   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1081     GstRTSPAddressFlags flags;
1082
1083     if (addr)
1084       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1085
1086     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1087     if (family == G_SOCKET_FAMILY_IPV6)
1088       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1089     else
1090       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1091
1092     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1093
1094     if (addr == NULL)
1095       goto no_ports;
1096
1097     tmp_rtp = addr->port;
1098
1099     g_clear_object (&inetaddr);
1100     inetaddr = g_inet_address_new_from_string (addr->address);
1101   } else {
1102     if (tmp_rtp != 0) {
1103       tmp_rtp += 2;
1104       if (++count > 20)
1105         goto no_ports;
1106     }
1107
1108     if (inetaddr == NULL)
1109       inetaddr = g_inet_address_new_any (family);
1110   }
1111
1112   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1113   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1114     g_object_unref (rtp_sockaddr);
1115     goto again;
1116   }
1117   g_object_unref (rtp_sockaddr);
1118
1119   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1120   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1121     g_clear_object (&rtp_sockaddr);
1122     goto socket_error;
1123   }
1124
1125   tmp_rtp =
1126       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1127   g_object_unref (rtp_sockaddr);
1128
1129   /* check if port is even */
1130   if ((tmp_rtp & 1) != 0) {
1131     /* port not even, close and allocate another */
1132     tmp_rtp++;
1133     g_clear_object (&rtp_socket);
1134     goto again;
1135   }
1136
1137   /* set port */
1138   tmp_rtcp = tmp_rtp + 1;
1139
1140   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1141   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1142     g_object_unref (rtcp_sockaddr);
1143     g_clear_object (&rtp_socket);
1144     goto again;
1145   }
1146   g_object_unref (rtcp_sockaddr);
1147
1148   g_clear_object (&inetaddr);
1149
1150   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1151   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1152
1153   if (udpsrc0 == NULL || udpsrc1 == NULL)
1154     goto no_udp_protocol;
1155
1156   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1157   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1158
1159   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1160   if (ret == GST_STATE_CHANGE_FAILURE)
1161     goto element_error;
1162   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1163   if (ret == GST_STATE_CHANGE_FAILURE)
1164     goto element_error;
1165
1166   /* all fine, do port check */
1167   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1168   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1169
1170   /* this should not happen... */
1171   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1172     goto port_error;
1173
1174   if (udpsink_out[0])
1175     udpsink0 = udpsink_out[0];
1176   else
1177     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1178
1179   if (!udpsink0)
1180     goto no_udp_protocol;
1181
1182   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1183   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1184
1185   if (udpsink_out[1])
1186     udpsink1 = udpsink_out[1];
1187   else
1188     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1189
1190   if (!udpsink1)
1191     goto no_udp_protocol;
1192
1193   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1194   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1195   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1196
1197   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1198   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1199   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1200   /* Needs to be async for RECORD streams, otherwise we will never go to
1201    * PLAYING because the sinks will wait for data while the udpsrc can't
1202    * provide data with timestamps in PAUSED. */
1203   if (priv->sinkpad)
1204     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1205   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1206   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1207   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1208   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1209   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1210
1211   /* we keep these elements, we will further configure them when the
1212    * client told us to really use the UDP ports. */
1213   udpsrc_out[0] = udpsrc0;
1214   udpsrc_out[1] = udpsrc1;
1215   udpsink_out[0] = udpsink0;
1216   udpsink_out[1] = udpsink1;
1217
1218   server_port_out->min = rtpport;
1219   server_port_out->max = rtcpport;
1220
1221   *server_addr_out = addr;
1222   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1223
1224   g_object_unref (rtp_socket);
1225   g_object_unref (rtcp_socket);
1226
1227   return TRUE;
1228
1229   /* ERRORS */
1230 no_udp_protocol:
1231   {
1232     goto cleanup;
1233   }
1234 no_ports:
1235   {
1236     goto cleanup;
1237   }
1238 port_error:
1239   {
1240     goto cleanup;
1241   }
1242 socket_error:
1243   {
1244     goto cleanup;
1245   }
1246 element_error:
1247   {
1248     goto cleanup;
1249   }
1250 cleanup:
1251   {
1252     if (udpsrc0) {
1253       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1254       gst_object_unref (udpsrc0);
1255     }
1256     if (udpsrc1) {
1257       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1258       gst_object_unref (udpsrc1);
1259     }
1260     if (udpsink0) {
1261       gst_element_set_state (udpsink0, GST_STATE_NULL);
1262       gst_object_unref (udpsink0);
1263     }
1264     if (inetaddr)
1265       g_object_unref (inetaddr);
1266     g_list_free_full (rejected_addresses,
1267         (GDestroyNotify) gst_rtsp_address_free);
1268     if (addr)
1269       gst_rtsp_address_free (addr);
1270     if (rtp_socket)
1271       g_object_unref (rtp_socket);
1272     if (rtcp_socket)
1273       g_object_unref (rtcp_socket);
1274     return FALSE;
1275   }
1276 }
1277
1278 /* must be called with lock */
1279 static gboolean
1280 alloc_ports (GstRTSPStream * stream)
1281 {
1282   GstRTSPStreamPrivate *priv = stream->priv;
1283
1284   priv->have_ipv4 =
1285       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1286       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1287       &priv->server_port_v4, &priv->server_addr_v4);
1288
1289   priv->have_ipv6 =
1290       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1291       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1292       &priv->server_port_v6, &priv->server_addr_v6);
1293
1294   return priv->have_ipv4 || priv->have_ipv6;
1295 }
1296
1297 /**
1298  * gst_rtsp_stream_get_server_port:
1299  * @stream: a #GstRTSPStream
1300  * @server_port: (out): result server port
1301  * @family: the port family to get
1302  *
1303  * Fill @server_port with the port pair used by the server. This function can
1304  * only be called when @stream has been joined.
1305  */
1306 void
1307 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1308     GstRTSPRange * server_port, GSocketFamily family)
1309 {
1310   GstRTSPStreamPrivate *priv;
1311
1312   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1313   priv = stream->priv;
1314   g_return_if_fail (priv->is_joined);
1315
1316   g_mutex_lock (&priv->lock);
1317   if (family == G_SOCKET_FAMILY_IPV4) {
1318     if (server_port)
1319       *server_port = priv->server_port_v4;
1320   } else {
1321     if (server_port)
1322       *server_port = priv->server_port_v6;
1323   }
1324   g_mutex_unlock (&priv->lock);
1325 }
1326
1327 /**
1328  * gst_rtsp_stream_get_rtpsession:
1329  * @stream: a #GstRTSPStream
1330  *
1331  * Get the RTP session of this stream.
1332  *
1333  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1334  */
1335 GObject *
1336 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1337 {
1338   GstRTSPStreamPrivate *priv;
1339   GObject *session;
1340
1341   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1342
1343   priv = stream->priv;
1344
1345   g_mutex_lock (&priv->lock);
1346   if ((session = priv->session))
1347     g_object_ref (session);
1348   g_mutex_unlock (&priv->lock);
1349
1350   return session;
1351 }
1352
1353 /**
1354  * gst_rtsp_stream_get_ssrc:
1355  * @stream: a #GstRTSPStream
1356  * @ssrc: (out): result ssrc
1357  *
1358  * Get the SSRC used by the RTP session of this stream. This function can only
1359  * be called when @stream has been joined.
1360  */
1361 void
1362 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1363 {
1364   GstRTSPStreamPrivate *priv;
1365
1366   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1367   priv = stream->priv;
1368   g_return_if_fail (priv->is_joined);
1369
1370   g_mutex_lock (&priv->lock);
1371   if (ssrc && priv->session)
1372     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1373   g_mutex_unlock (&priv->lock);
1374 }
1375
1376 /**
1377  * gst_rtsp_stream_set_retransmission_time:
1378  * @stream: a #GstRTSPStream
1379  * @time: a #GstClockTime
1380  *
1381  * Set the amount of time to store retransmission packets.
1382  */
1383 void
1384 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1385     GstClockTime time)
1386 {
1387   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1388
1389   g_mutex_lock (&stream->priv->lock);
1390   stream->priv->rtx_time = time;
1391   if (stream->priv->rtxsend)
1392     g_object_set (stream->priv->rtxsend, "max-size-time",
1393         GST_TIME_AS_MSECONDS (time), NULL);
1394   g_mutex_unlock (&stream->priv->lock);
1395 }
1396
1397 /**
1398  * gst_rtsp_stream_get_retransmission_time:
1399  * @stream: a #GstRTSPStream
1400  *
1401  * Get the amount of time to store retransmission data.
1402  *
1403  * Returns: the amount of time to store retransmission data.
1404  */
1405 GstClockTime
1406 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1407 {
1408   GstClockTime ret;
1409
1410   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1411
1412   g_mutex_lock (&stream->priv->lock);
1413   ret = stream->priv->rtx_time;
1414   g_mutex_unlock (&stream->priv->lock);
1415
1416   return ret;
1417 }
1418
1419 /**
1420  * gst_rtsp_stream_set_retransmission_pt:
1421  * @stream: a #GstRTSPStream
1422  * @rtx_pt: a #guint
1423  *
1424  * Set the payload type (pt) for retransmission of this stream.
1425  */
1426 void
1427 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1428 {
1429   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1430
1431   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1432
1433   g_mutex_lock (&stream->priv->lock);
1434   stream->priv->rtx_pt = rtx_pt;
1435   if (stream->priv->rtxsend) {
1436     guint pt = gst_rtsp_stream_get_pt (stream);
1437     gchar *pt_s = g_strdup_printf ("%d", pt);
1438     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1439         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1440     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1441     g_free (pt_s);
1442     gst_structure_free (rtx_pt_map);
1443   }
1444   g_mutex_unlock (&stream->priv->lock);
1445 }
1446
1447 /**
1448  * gst_rtsp_stream_get_retransmission_pt:
1449  * @stream: a #GstRTSPStream
1450  *
1451  * Get the payload-type used for retransmission of this stream
1452  *
1453  * Returns: The retransmission PT.
1454  */
1455 guint
1456 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1457 {
1458   guint rtx_pt;
1459
1460   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1461
1462   g_mutex_lock (&stream->priv->lock);
1463   rtx_pt = stream->priv->rtx_pt;
1464   g_mutex_unlock (&stream->priv->lock);
1465
1466   return rtx_pt;
1467 }
1468
1469 /**
1470  * gst_rtsp_stream_set_buffer_size:
1471  * @stream: a #GstRTSPStream
1472  * @size: the buffer size
1473  *
1474  * Set the size of the UDP transmission buffer (in bytes)
1475  * Needs to be set before the stream is joined to a bin.
1476  *
1477  * Since: 1.6
1478  */
1479 void
1480 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1481 {
1482   g_mutex_lock (&stream->priv->lock);
1483   stream->priv->buffer_size = size;
1484   g_mutex_unlock (&stream->priv->lock);
1485 }
1486
1487 /**
1488  * gst_rtsp_stream_get_buffer_size:
1489  * @stream: a #GstRTSPStream
1490  *
1491  * Get the size of the UDP transmission buffer (in bytes)
1492  *
1493  * Returns: the size of the UDP TX buffer
1494  *
1495  * Since: 1.6
1496  */
1497 guint
1498 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1499 {
1500   guint buffer_size;
1501
1502   g_mutex_lock (&stream->priv->lock);
1503   buffer_size = stream->priv->buffer_size;
1504   g_mutex_unlock (&stream->priv->lock);
1505
1506   return buffer_size;
1507 }
1508
1509 /* executed from streaming thread */
1510 static void
1511 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1512 {
1513   GstRTSPStreamPrivate *priv = stream->priv;
1514   GstCaps *newcaps, *oldcaps;
1515
1516   newcaps = gst_pad_get_current_caps (pad);
1517
1518   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1519       newcaps);
1520
1521   g_mutex_lock (&priv->lock);
1522   oldcaps = priv->caps;
1523   priv->caps = newcaps;
1524   g_mutex_unlock (&priv->lock);
1525
1526   if (oldcaps)
1527     gst_caps_unref (oldcaps);
1528 }
1529
1530 static void
1531 dump_structure (const GstStructure * s)
1532 {
1533   gchar *sstr;
1534
1535   sstr = gst_structure_to_string (s);
1536   GST_INFO ("structure: %s", sstr);
1537   g_free (sstr);
1538 }
1539
1540 static GstRTSPStreamTransport *
1541 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1542 {
1543   GstRTSPStreamPrivate *priv = stream->priv;
1544   GList *walk;
1545   GstRTSPStreamTransport *result = NULL;
1546   const gchar *tmp;
1547   gchar *dest;
1548   guint port;
1549
1550   if (rtcp_from == NULL)
1551     return NULL;
1552
1553   tmp = g_strrstr (rtcp_from, ":");
1554   if (tmp == NULL)
1555     return NULL;
1556
1557   port = atoi (tmp + 1);
1558   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1559
1560   g_mutex_lock (&priv->lock);
1561   GST_INFO ("finding %s:%d in %d transports", dest, port,
1562       g_list_length (priv->transports));
1563
1564   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1565     GstRTSPStreamTransport *trans = walk->data;
1566     const GstRTSPTransport *tr;
1567     gint min, max;
1568
1569     tr = gst_rtsp_stream_transport_get_transport (trans);
1570
1571     min = tr->client_port.min;
1572     max = tr->client_port.max;
1573
1574     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1575       result = trans;
1576       break;
1577     }
1578   }
1579   if (result)
1580     g_object_ref (result);
1581   g_mutex_unlock (&priv->lock);
1582
1583   g_free (dest);
1584
1585   return result;
1586 }
1587
1588 static GstRTSPStreamTransport *
1589 check_transport (GObject * source, GstRTSPStream * stream)
1590 {
1591   GstStructure *stats;
1592   GstRTSPStreamTransport *trans;
1593
1594   /* see if we have a stream to match with the origin of the RTCP packet */
1595   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1596   if (trans == NULL) {
1597     g_object_get (source, "stats", &stats, NULL);
1598     if (stats) {
1599       const gchar *rtcp_from;
1600
1601       dump_structure (stats);
1602
1603       g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_RTCP_STATS], 0, stats);
1604
1605       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1606       if ((trans = find_transport (stream, rtcp_from))) {
1607         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1608             source);
1609         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1610             g_object_unref);
1611       }
1612       gst_structure_free (stats);
1613     }
1614   }
1615   return trans;
1616 }
1617
1618
1619 static void
1620 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1621 {
1622   GstRTSPStreamTransport *trans;
1623
1624   GST_INFO ("%p: new source %p", stream, source);
1625
1626   trans = check_transport (source, stream);
1627
1628   if (trans)
1629     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1630 }
1631
1632 static void
1633 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1634 {
1635   GST_INFO ("%p: new SDES %p", stream, source);
1636 }
1637
1638 static void
1639 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1640 {
1641   GstRTSPStreamTransport *trans;
1642
1643   trans = check_transport (source, stream);
1644
1645   if (trans) {
1646     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1647     gst_rtsp_stream_transport_keep_alive (trans);
1648   }
1649 #ifdef DUMP_STATS
1650   {
1651     GstStructure *stats;
1652     g_object_get (source, "stats", &stats, NULL);
1653     if (stats) {
1654       g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_RTCP_STATS], 0, stats);
1655
1656       dump_structure (stats);
1657       gst_structure_free (stats);
1658     }
1659   }
1660 #endif
1661 }
1662
1663 static void
1664 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1665 {
1666   GST_INFO ("%p: source %p bye", stream, source);
1667 }
1668
1669 static void
1670 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1671 {
1672   GstRTSPStreamTransport *trans;
1673
1674   GST_INFO ("%p: source %p bye timeout", stream, source);
1675
1676   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1677     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1678     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1679   }
1680 }
1681
1682 static void
1683 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1684 {
1685   GstRTSPStreamTransport *trans;
1686
1687   GST_INFO ("%p: source %p timeout", stream, source);
1688
1689   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1690     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1691     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1692   }
1693 }
1694
1695 static void
1696 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1697 {
1698   if (is_rtp) {
1699     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1700     g_list_free (priv->tr_cache_rtp);
1701     priv->tr_cache_rtp = NULL;
1702   } else {
1703     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1704     g_list_free (priv->tr_cache_rtcp);
1705     priv->tr_cache_rtcp = NULL;
1706   }
1707 }
1708
1709 static GstFlowReturn
1710 handle_new_sample (GstAppSink * sink, gpointer user_data)
1711 {
1712   GstRTSPStreamPrivate *priv;
1713   GList *walk;
1714   GstSample *sample;
1715   GstBuffer *buffer;
1716   GstRTSPStream *stream;
1717   gboolean is_rtp;
1718
1719   sample = gst_app_sink_pull_sample (sink);
1720   if (!sample)
1721     return GST_FLOW_OK;
1722
1723   stream = (GstRTSPStream *) user_data;
1724   priv = stream->priv;
1725   buffer = gst_sample_get_buffer (sample);
1726
1727   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1728
1729   g_mutex_lock (&priv->lock);
1730   if (is_rtp) {
1731     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1732       clear_tr_cache (priv, is_rtp);
1733       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1734         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1735         priv->tr_cache_rtp =
1736             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1737       }
1738       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1739     }
1740   } else {
1741     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1742       clear_tr_cache (priv, is_rtp);
1743       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1744         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1745         priv->tr_cache_rtcp =
1746             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1747       }
1748       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1749     }
1750   }
1751   g_mutex_unlock (&priv->lock);
1752
1753   if (is_rtp) {
1754     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1755       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1756       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1757     }
1758   } else {
1759     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1760       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1761       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1762     }
1763   }
1764   gst_sample_unref (sample);
1765
1766   return GST_FLOW_OK;
1767 }
1768
1769 static GstAppSinkCallbacks sink_cb = {
1770   NULL,                         /* not interested in EOS */
1771   NULL,                         /* not interested in preroll samples */
1772   handle_new_sample,
1773 };
1774
1775 static GstElement *
1776 get_rtp_encoder (GstRTSPStream * stream, guint session)
1777 {
1778   GstRTSPStreamPrivate *priv = stream->priv;
1779
1780   if (priv->srtpenc == NULL) {
1781     gchar *name;
1782
1783     name = g_strdup_printf ("srtpenc_%u", session);
1784     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1785     g_free (name);
1786
1787     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1788   }
1789   return gst_object_ref (priv->srtpenc);
1790 }
1791
1792 static GstElement *
1793 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1794 {
1795   GstRTSPStreamPrivate *priv = stream->priv;
1796   GstElement *oldenc, *enc;
1797   GstPad *pad;
1798   gchar *name;
1799
1800   if (priv->idx != session)
1801     return NULL;
1802
1803   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1804
1805   oldenc = priv->srtpenc;
1806   enc = get_rtp_encoder (stream, session);
1807   name = g_strdup_printf ("rtp_sink_%d", session);
1808   pad = gst_element_get_request_pad (enc, name);
1809   g_free (name);
1810   gst_object_unref (pad);
1811
1812   if (oldenc == NULL)
1813     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1814         enc);
1815
1816   return enc;
1817 }
1818
1819 static GstElement *
1820 request_rtcp_encoder (GstElement * rtpbin, guint session,
1821     GstRTSPStream * stream)
1822 {
1823   GstRTSPStreamPrivate *priv = stream->priv;
1824   GstElement *oldenc, *enc;
1825   GstPad *pad;
1826   gchar *name;
1827
1828   if (priv->idx != session)
1829     return NULL;
1830
1831   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1832
1833   oldenc = priv->srtpenc;
1834   enc = get_rtp_encoder (stream, session);
1835   name = g_strdup_printf ("rtcp_sink_%d", session);
1836   pad = gst_element_get_request_pad (enc, name);
1837   g_free (name);
1838   gst_object_unref (pad);
1839
1840   if (oldenc == NULL)
1841     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1842         enc);
1843
1844   return enc;
1845 }
1846
1847 static GstCaps *
1848 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1849 {
1850   GstRTSPStreamPrivate *priv = stream->priv;
1851   GstCaps *caps;
1852
1853   GST_DEBUG ("request key %08x", ssrc);
1854
1855   g_mutex_lock (&priv->lock);
1856   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1857     gst_caps_ref (caps);
1858   g_mutex_unlock (&priv->lock);
1859
1860   return caps;
1861 }
1862
1863 static GstElement *
1864 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1865     GstRTSPStream * stream)
1866 {
1867   GstRTSPStreamPrivate *priv = stream->priv;
1868
1869   if (priv->idx != session)
1870     return NULL;
1871
1872   if (priv->srtpdec == NULL) {
1873     gchar *name;
1874
1875     name = g_strdup_printf ("srtpdec_%u", session);
1876     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1877     g_free (name);
1878
1879     g_signal_connect (priv->srtpdec, "request-key",
1880         (GCallback) request_key, stream);
1881   }
1882   return gst_object_ref (priv->srtpdec);
1883 }
1884
1885 /**
1886  * gst_rtsp_stream_request_aux_sender:
1887  * @stream: a #GstRTSPStream
1888  * @sessid: the session id
1889  *
1890  * Creating a rtxsend bin
1891  *
1892  * Returns: (transfer full): a #GstElement.
1893  *
1894  * Since: 1.6
1895  */
1896 GstElement *
1897 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
1898 {
1899   GstElement *bin;
1900   GstPad *pad;
1901   GstStructure *pt_map;
1902   gchar *name;
1903   guint pt, rtx_pt;
1904   gchar *pt_s;
1905
1906   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1907
1908   pt = gst_rtsp_stream_get_pt (stream);
1909   pt_s = g_strdup_printf ("%u", pt);
1910   rtx_pt = stream->priv->rtx_pt;
1911
1912   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1913
1914   bin = gst_bin_new (NULL);
1915   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1916   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1917       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1918   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1919       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1920   g_free (pt_s);
1921   gst_structure_free (pt_map);
1922   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1923
1924   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1925   name = g_strdup_printf ("src_%u", sessid);
1926   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1927   g_free (name);
1928   gst_object_unref (pad);
1929
1930   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1931   name = g_strdup_printf ("sink_%u", sessid);
1932   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1933   g_free (name);
1934   gst_object_unref (pad);
1935
1936   return bin;
1937 }
1938
1939 /**
1940  * gst_rtsp_stream_set_pt_map:
1941  * @stream: a #GstRTSPStream
1942  * @pt: the pt
1943  * @caps: a #GstCaps
1944  *
1945  * Configure a pt map between @pt and @caps.
1946  */
1947 void
1948 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
1949 {
1950   GstRTSPStreamPrivate *priv = stream->priv;
1951
1952   g_mutex_lock (&priv->lock);
1953   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
1954   g_mutex_unlock (&priv->lock);
1955 }
1956
1957 static GstCaps *
1958 request_pt_map (GstElement * rtpbin, guint session, guint pt,
1959     GstRTSPStream * stream)
1960 {
1961   GstRTSPStreamPrivate *priv = stream->priv;
1962   GstCaps *caps = NULL;
1963
1964   g_mutex_lock (&priv->lock);
1965
1966   if (priv->idx == session) {
1967     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
1968     if (caps) {
1969       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
1970       gst_caps_ref (caps);
1971     } else {
1972       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
1973     }
1974   }
1975
1976   g_mutex_unlock (&priv->lock);
1977
1978   return caps;
1979 }
1980
1981 static void
1982 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
1983 {
1984   GstRTSPStreamPrivate *priv = stream->priv;
1985   gchar *name;
1986   GstPadLinkReturn ret;
1987   guint sessid;
1988
1989   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
1990       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
1991
1992   name = gst_pad_get_name (pad);
1993   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
1994     g_free (name);
1995     return;
1996   }
1997   g_free (name);
1998
1999   if (priv->idx != sessid)
2000     return;
2001
2002   if (gst_pad_is_linked (priv->sinkpad)) {
2003     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2004         GST_DEBUG_PAD_NAME (priv->sinkpad));
2005     return;
2006   }
2007
2008   /* link the RTP pad to the session manager, it should not really fail unless
2009    * this is not really an RTP pad */
2010   ret = gst_pad_link (pad, priv->sinkpad);
2011   if (ret != GST_PAD_LINK_OK)
2012     goto link_failed;
2013   priv->recv_rtp_src = gst_object_ref (pad);
2014
2015   return;
2016
2017 /* ERRORS */
2018 link_failed:
2019   {
2020     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2021         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2022   }
2023 }
2024
2025 static void
2026 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2027     GstRTSPStream * stream)
2028 {
2029   /* TODO: What to do here other than this? */
2030   GST_DEBUG ("Stream %p: Got EOS", stream);
2031   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2032 }
2033
2034 /**
2035  * gst_rtsp_stream_join_bin:
2036  * @stream: a #GstRTSPStream
2037  * @bin: (transfer none): a #GstBin to join
2038  * @rtpbin: (transfer none): a rtpbin element in @bin
2039  * @state: the target state of the new elements
2040  *
2041  * Join the #GstBin @bin that contains the element @rtpbin.
2042  *
2043  * @stream will link to @rtpbin, which must be inside @bin. The elements
2044  * added to @bin will be set to the state given in @state.
2045  *
2046  * Returns: %TRUE on success.
2047  */
2048 gboolean
2049 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2050     GstElement * rtpbin, GstState state)
2051 {
2052   GstRTSPStreamPrivate *priv;
2053   gint i;
2054   guint idx;
2055   gchar *name;
2056   GstPad *pad, *sinkpad, *selpad;
2057   GstPadLinkReturn ret;
2058
2059   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2060   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2061   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2062
2063   priv = stream->priv;
2064
2065   g_mutex_lock (&priv->lock);
2066   if (priv->is_joined)
2067     goto was_joined;
2068
2069   /* create a session with the same index as the stream */
2070   idx = priv->idx;
2071
2072   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2073
2074   if (!alloc_ports (stream))
2075     goto no_ports;
2076
2077   /* update the dscp qos field in the sinks */
2078   update_dscp_qos (stream);
2079
2080   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2081       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2082     /* For SRTP */
2083     g_signal_connect (rtpbin, "request-rtp-encoder",
2084         (GCallback) request_rtp_encoder, stream);
2085     g_signal_connect (rtpbin, "request-rtcp-encoder",
2086         (GCallback) request_rtcp_encoder, stream);
2087     g_signal_connect (rtpbin, "request-rtp-decoder",
2088         (GCallback) request_rtp_rtcp_decoder, stream);
2089     g_signal_connect (rtpbin, "request-rtcp-decoder",
2090         (GCallback) request_rtp_rtcp_decoder, stream);
2091   }
2092
2093   if (priv->sinkpad) {
2094     g_signal_connect (rtpbin, "request-pt-map",
2095         (GCallback) request_pt_map, stream);
2096   }
2097
2098   /* get a pad for sending RTP */
2099   name = g_strdup_printf ("send_rtp_sink_%u", idx);
2100   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2101   g_free (name);
2102
2103   if (priv->srcpad) {
2104     /* link the RTP pad to the session manager, it should not really fail unless
2105      * this is not really an RTP pad */
2106     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2107     if (ret != GST_PAD_LINK_OK)
2108       goto link_failed;
2109   } else {
2110     /* Need to connect our sinkpad from here */
2111     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2112     /* EOS */
2113     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2114   }
2115
2116   /* get pads from the RTP session element for sending and receiving
2117    * RTP/RTCP*/
2118   name = g_strdup_printf ("send_rtp_src_%u", idx);
2119   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2120   g_free (name);
2121   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2122   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2123   g_free (name);
2124
2125   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2126   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2127   g_free (name);
2128   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2129   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2130   g_free (name);
2131
2132   /* get the session */
2133   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2134
2135   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2136       stream);
2137   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2138       stream);
2139   g_signal_connect (priv->session, "on-ssrc-active",
2140       (GCallback) on_ssrc_active, stream);
2141   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2142       stream);
2143   g_signal_connect (priv->session, "on-bye-timeout",
2144       (GCallback) on_bye_timeout, stream);
2145   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2146       stream);
2147
2148   for (i = 0; i < 2; i++) {
2149     GstPad *teepad, *queuepad;
2150     /* For the sender we create this bit of pipeline for both
2151      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2152      * we need to add a queue before appsink and udpsink to make
2153      * the pipeline not block. For the TCP case, we want to pump
2154      * data to the client as fast as possible.
2155      *
2156      * .--------.      .-----.    .---------.    .---------.
2157      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2158      * |       send->sink   src->sink      src->sink       |
2159      * '--------'      |     |    '---------'    '---------'
2160      *                 |     |    .---------.    .---------.
2161      *                 |     |    |  queue  |    | appsink |
2162      *                 |    src->sink      src->sink       |
2163      *                 '-----'    '---------'    '---------'
2164      *
2165      * When only UDP is allowed, we skip the tee, queue and appsink and link the
2166      * udpsink directly to the session.
2167      */
2168     /* add udpsink */
2169     gst_bin_add (bin, priv->udpsink[i]);
2170     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2171
2172     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2173       /* make tee for RTP/RTCP */
2174       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2175       gst_bin_add (bin, priv->tee[i]);
2176
2177       /* and link to rtpbin send pad */
2178       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2179       gst_pad_link (priv->send_src[i], pad);
2180       gst_object_unref (pad);
2181
2182       priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2183       g_object_set (priv->udpqueue[i], "max-size-buffers",
2184           1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
2185       gst_bin_add (bin, priv->udpqueue[i]);
2186       /* link tee to udpqueue */
2187       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2188       pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2189       gst_pad_link (teepad, pad);
2190       gst_object_unref (pad);
2191       gst_object_unref (teepad);
2192
2193       /* link udpqueue to udpsink */
2194       queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2195       gst_pad_link (queuepad, sinkpad);
2196       gst_object_unref (queuepad);
2197
2198       /* make queue */
2199       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2200       g_object_set (priv->appqueue[i], "max-size-buffers",
2201           1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
2202       gst_bin_add (bin, priv->appqueue[i]);
2203       /* and link to tee */
2204       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2205       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2206       gst_pad_link (teepad, pad);
2207       gst_object_unref (pad);
2208       gst_object_unref (teepad);
2209
2210       /* make appsink */
2211       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2212       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2213       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2214       gst_bin_add (bin, priv->appsink[i]);
2215       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2216           &sink_cb, stream, NULL);
2217       /* and link to queue */
2218       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2219       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2220       gst_pad_link (queuepad, pad);
2221       gst_object_unref (pad);
2222       gst_object_unref (queuepad);
2223     } else {
2224       /* else only udpsink needed, link it to the session */
2225       gst_pad_link (priv->send_src[i], sinkpad);
2226     }
2227     gst_object_unref (sinkpad);
2228
2229     /* For the receiver we create this bit of pipeline for both
2230      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2231      * and it is all funneled into the rtpbin receive pad.
2232      *
2233      * .--------.     .--------.    .--------.
2234      * | udpsrc |     | funnel |    | rtpbin |
2235      * |       src->sink      src->sink      |
2236      * '--------'     |        |    '--------'
2237      * .--------.     |        |
2238      * | appsrc |     |        |
2239      * |       src->sink       |
2240      * '--------'     '--------'
2241      */
2242     /* make funnel for the RTP/RTCP receivers */
2243     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2244     gst_bin_add (bin, priv->funnel[i]);
2245
2246     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2247     gst_pad_link (pad, priv->recv_sink[i]);
2248     gst_object_unref (pad);
2249
2250     if (priv->udpsrc_v4[i]) {
2251       if (priv->srcpad) {
2252         /* we set and keep these to playing so that they don't cause NO_PREROLL return
2253          * values. This is only relevant for PLAY pipelines */
2254         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2255         gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2256       }
2257       /* add udpsrc */
2258       gst_bin_add (bin, priv->udpsrc_v4[i]);
2259
2260       /* and link to the funnel v4 */
2261       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2262       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2263       gst_pad_link (pad, selpad);
2264       gst_object_unref (pad);
2265       gst_object_unref (selpad);
2266     }
2267
2268     if (priv->udpsrc_v6[i]) {
2269       if (priv->srcpad) {
2270         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2271         gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2272       }
2273       gst_bin_add (bin, priv->udpsrc_v6[i]);
2274
2275       /* and link to the funnel v6 */
2276       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2277       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2278       gst_pad_link (pad, selpad);
2279       gst_object_unref (pad);
2280       gst_object_unref (selpad);
2281     }
2282
2283     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2284       /* make and add appsrc */
2285       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2286       priv->appsrc_base_time[i] = -1;
2287       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2288       gst_bin_add (bin, priv->appsrc[i]);
2289       /* and link to the funnel */
2290       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2291       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2292       gst_pad_link (pad, selpad);
2293       gst_object_unref (pad);
2294       gst_object_unref (selpad);
2295     }
2296
2297     /* check if we need to set to a special state */
2298     if (state != GST_STATE_NULL) {
2299       if (priv->udpsink[i])
2300         gst_element_set_state (priv->udpsink[i], state);
2301       if (priv->appsink[i])
2302         gst_element_set_state (priv->appsink[i], state);
2303       if (priv->appqueue[i])
2304         gst_element_set_state (priv->appqueue[i], state);
2305       if (priv->udpqueue[i])
2306         gst_element_set_state (priv->udpqueue[i], state);
2307       if (priv->tee[i])
2308         gst_element_set_state (priv->tee[i], state);
2309       if (priv->funnel[i])
2310         gst_element_set_state (priv->funnel[i], state);
2311       if (priv->appsrc[i])
2312         gst_element_set_state (priv->appsrc[i], state);
2313     }
2314   }
2315
2316   /* be notified of caps changes */
2317   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2318       (GCallback) caps_notify, stream);
2319
2320   priv->is_joined = TRUE;
2321   g_mutex_unlock (&priv->lock);
2322
2323   return TRUE;
2324
2325   /* ERRORS */
2326 was_joined:
2327   {
2328     g_mutex_unlock (&priv->lock);
2329     return TRUE;
2330   }
2331 no_ports:
2332   {
2333     g_mutex_unlock (&priv->lock);
2334     GST_WARNING ("failed to allocate ports %u", idx);
2335     return FALSE;
2336   }
2337 link_failed:
2338   {
2339     GST_WARNING ("failed to link stream %u", idx);
2340     gst_object_unref (priv->send_rtp_sink);
2341     priv->send_rtp_sink = NULL;
2342     g_mutex_unlock (&priv->lock);
2343     return FALSE;
2344   }
2345 }
2346
2347 /**
2348  * gst_rtsp_stream_leave_bin:
2349  * @stream: a #GstRTSPStream
2350  * @bin: (transfer none): a #GstBin
2351  * @rtpbin: (transfer none): a rtpbin #GstElement
2352  *
2353  * Remove the elements of @stream from @bin.
2354  *
2355  * Return: %TRUE on success.
2356  */
2357 gboolean
2358 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2359     GstElement * rtpbin)
2360 {
2361   GstRTSPStreamPrivate *priv;
2362   gint i;
2363   GList *l;
2364
2365   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2366   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2367   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2368
2369   priv = stream->priv;
2370
2371   g_mutex_lock (&priv->lock);
2372   if (!priv->is_joined)
2373     goto was_not_joined;
2374
2375   /* all transports must be removed by now */
2376   if (priv->transports != NULL)
2377     goto transports_not_removed;
2378
2379   clear_tr_cache (priv, TRUE);
2380   clear_tr_cache (priv, FALSE);
2381
2382   GST_INFO ("stream %p leaving bin", stream);
2383
2384   if (priv->srcpad) {
2385     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2386   } else if (priv->recv_rtp_src) {
2387     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2388     gst_object_unref (priv->recv_rtp_src);
2389     priv->recv_rtp_src = NULL;
2390   }
2391   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2392   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2393   gst_object_unref (priv->send_rtp_sink);
2394   priv->send_rtp_sink = NULL;
2395
2396   for (i = 0; i < 2; i++) {
2397     if (priv->udpsink[i])
2398       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2399     if (priv->appsink[i])
2400       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2401     if (priv->appqueue[i])
2402       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2403     if (priv->udpqueue[i])
2404       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2405     if (priv->tee[i])
2406       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2407     if (priv->funnel[i])
2408       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2409     if (priv->appsrc[i])
2410       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2411     if (priv->udpsrc_v4[i]) {
2412       /* and set udpsrc to NULL now before removing */
2413       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2414       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2415       /* removing them should also nicely release the request
2416        * pads when they finalize */
2417       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2418     }
2419     if (priv->udpsrc_v6[i]) {
2420       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2421       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2422       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2423     }
2424
2425     for (l = priv->transport_sources; l; l = l->next) {
2426       GstRTSPMulticastTransportSource *s = l->data;
2427
2428       if (!s->udpsrc[i])
2429         continue;
2430
2431       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2432       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2433       gst_bin_remove (bin, s->udpsrc[i]);
2434     }
2435
2436     if (priv->udpsink[i])
2437       gst_bin_remove (bin, priv->udpsink[i]);
2438     if (priv->appsrc[i])
2439       gst_bin_remove (bin, priv->appsrc[i]);
2440     if (priv->appsink[i])
2441       gst_bin_remove (bin, priv->appsink[i]);
2442     if (priv->appqueue[i])
2443       gst_bin_remove (bin, priv->appqueue[i]);
2444     if (priv->udpqueue[i])
2445       gst_bin_remove (bin, priv->udpqueue[i]);
2446     if (priv->tee[i])
2447       gst_bin_remove (bin, priv->tee[i]);
2448     if (priv->funnel[i])
2449       gst_bin_remove (bin, priv->funnel[i]);
2450
2451     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2452     gst_object_unref (priv->recv_sink[i]);
2453     priv->recv_sink[i] = NULL;
2454
2455     priv->udpsrc_v4[i] = NULL;
2456     priv->udpsrc_v6[i] = NULL;
2457     priv->udpsink[i] = NULL;
2458     priv->appsrc[i] = NULL;
2459     priv->appsink[i] = NULL;
2460     priv->appqueue[i] = NULL;
2461     priv->udpqueue[i] = NULL;
2462     priv->tee[i] = NULL;
2463     priv->funnel[i] = NULL;
2464   }
2465
2466   for (l = priv->transport_sources; l; l = l->next) {
2467     GstRTSPMulticastTransportSource *s = l->data;
2468     g_slice_free (GstRTSPMulticastTransportSource, s);
2469   }
2470   g_list_free (priv->transport_sources);
2471   priv->transport_sources = NULL;
2472
2473   gst_object_unref (priv->send_src[0]);
2474   priv->send_src[0] = NULL;
2475
2476   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2477   gst_object_unref (priv->send_src[1]);
2478   priv->send_src[1] = NULL;
2479
2480   g_object_unref (priv->session);
2481   priv->session = NULL;
2482   if (priv->caps)
2483     gst_caps_unref (priv->caps);
2484   priv->caps = NULL;
2485
2486   if (priv->srtpenc)
2487     gst_object_unref (priv->srtpenc);
2488   if (priv->srtpdec)
2489     gst_object_unref (priv->srtpdec);
2490
2491   priv->is_joined = FALSE;
2492   g_mutex_unlock (&priv->lock);
2493
2494   return TRUE;
2495
2496 was_not_joined:
2497   {
2498     g_mutex_unlock (&priv->lock);
2499     return TRUE;
2500   }
2501 transports_not_removed:
2502   {
2503     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2504     g_mutex_unlock (&priv->lock);
2505     return FALSE;
2506   }
2507 }
2508
2509 /**
2510  * gst_rtsp_stream_get_rtpinfo:
2511  * @stream: a #GstRTSPStream
2512  * @rtptime: (allow-none): result RTP timestamp
2513  * @seq: (allow-none): result RTP seqnum
2514  * @clock_rate: (allow-none): the clock rate
2515  * @running_time: (allow-none): result running-time
2516  *
2517  * Retrieve the current rtptime, seq and running-time. This is used to
2518  * construct a RTPInfo reply header.
2519  *
2520  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2521  */
2522 gboolean
2523 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2524     guint * rtptime, guint * seq, guint * clock_rate,
2525     GstClockTime * running_time)
2526 {
2527   GstRTSPStreamPrivate *priv;
2528   GstStructure *stats;
2529   GObjectClass *payobjclass;
2530
2531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2532
2533   priv = stream->priv;
2534
2535   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2536
2537   g_mutex_lock (&priv->lock);
2538
2539   /* First try to extract the information from the last buffer on the sinks.
2540    * This will have a more accurate sequence number and timestamp, as between
2541    * the payloader and the sink there can be some queues
2542    */
2543   if (priv->udpsink[0] || priv->appsink[0]) {
2544     GstSample *last_sample;
2545
2546     if (priv->udpsink[0])
2547       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2548     else
2549       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2550
2551     if (last_sample) {
2552       GstCaps *caps;
2553       GstBuffer *buffer;
2554       GstSegment *segment;
2555       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2556
2557       caps = gst_sample_get_caps (last_sample);
2558       buffer = gst_sample_get_buffer (last_sample);
2559       segment = gst_sample_get_segment (last_sample);
2560
2561       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2562         if (seq) {
2563           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2564         }
2565
2566         if (rtptime) {
2567           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2568         }
2569
2570         gst_rtp_buffer_unmap (&rtp_buffer);
2571
2572         if (running_time) {
2573           *running_time =
2574               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2575               GST_BUFFER_TIMESTAMP (buffer));
2576         }
2577
2578         if (clock_rate) {
2579           GstStructure *s = gst_caps_get_structure (caps, 0);
2580
2581           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2582
2583           if (*clock_rate == 0 && running_time)
2584             *running_time = GST_CLOCK_TIME_NONE;
2585         }
2586         gst_sample_unref (last_sample);
2587
2588         goto done;
2589       } else {
2590         gst_sample_unref (last_sample);
2591       }
2592     }
2593   }
2594
2595   if (g_object_class_find_property (payobjclass, "stats")) {
2596     g_object_get (priv->payloader, "stats", &stats, NULL);
2597     if (stats == NULL)
2598       goto no_stats;
2599
2600     if (seq)
2601       gst_structure_get_uint (stats, "seqnum", seq);
2602
2603     if (rtptime)
2604       gst_structure_get_uint (stats, "timestamp", rtptime);
2605
2606     if (running_time)
2607       gst_structure_get_clock_time (stats, "running-time", running_time);
2608
2609     if (clock_rate) {
2610       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2611       if (*clock_rate == 0 && running_time)
2612         *running_time = GST_CLOCK_TIME_NONE;
2613     }
2614     gst_structure_free (stats);
2615   } else {
2616     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2617         !g_object_class_find_property (payobjclass, "timestamp"))
2618       goto no_stats;
2619
2620     if (seq)
2621       g_object_get (priv->payloader, "seqnum", seq, NULL);
2622
2623     if (rtptime)
2624       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2625
2626     if (running_time)
2627       *running_time = GST_CLOCK_TIME_NONE;
2628   }
2629
2630 done:
2631   g_mutex_unlock (&priv->lock);
2632
2633   return TRUE;
2634
2635   /* ERRORS */
2636 no_stats:
2637   {
2638     GST_WARNING ("Could not get payloader stats");
2639     g_mutex_unlock (&priv->lock);
2640     return FALSE;
2641   }
2642 }
2643
2644 /**
2645  * gst_rtsp_stream_get_caps:
2646  * @stream: a #GstRTSPStream
2647  *
2648  * Retrieve the current caps of @stream.
2649  *
2650  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2651  * after usage.
2652  */
2653 GstCaps *
2654 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2655 {
2656   GstRTSPStreamPrivate *priv;
2657   GstCaps *result;
2658
2659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2660
2661   priv = stream->priv;
2662
2663   g_mutex_lock (&priv->lock);
2664   if ((result = priv->caps))
2665     gst_caps_ref (result);
2666   g_mutex_unlock (&priv->lock);
2667
2668   return result;
2669 }
2670
2671 /**
2672  * gst_rtsp_stream_recv_rtp:
2673  * @stream: a #GstRTSPStream
2674  * @buffer: (transfer full): a #GstBuffer
2675  *
2676  * Handle an RTP buffer for the stream. This method is usually called when a
2677  * message has been received from a client using the TCP transport.
2678  *
2679  * This function takes ownership of @buffer.
2680  *
2681  * Returns: a GstFlowReturn.
2682  */
2683 GstFlowReturn
2684 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2685 {
2686   GstRTSPStreamPrivate *priv;
2687   GstFlowReturn ret;
2688   GstElement *element;
2689
2690   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2691   priv = stream->priv;
2692   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2693   g_return_val_if_fail (priv->is_joined, FALSE);
2694
2695   g_mutex_lock (&priv->lock);
2696   if (priv->appsrc[0])
2697     element = gst_object_ref (priv->appsrc[0]);
2698   else
2699     element = NULL;
2700   g_mutex_unlock (&priv->lock);
2701
2702   if (element) {
2703     if (priv->appsrc_base_time[0] == -1) {
2704       /* Take current running_time. This timestamp will be put on
2705        * the first buffer of each stream because we are a live source and so we
2706        * timestamp with the running_time. When we are dealing with TCP, we also
2707        * only timestamp the first buffer (using the DISCONT flag) because a server
2708        * typically bursts data, for which we don't want to compensate by speeding
2709        * up the media. The other timestamps will be interpollated from this one
2710        * using the RTP timestamps. */
2711       GST_OBJECT_LOCK (element);
2712       if (GST_ELEMENT_CLOCK (element)) {
2713         GstClockTime now;
2714         GstClockTime base_time;
2715
2716         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2717         base_time = GST_ELEMENT_CAST (element)->base_time;
2718
2719         priv->appsrc_base_time[0] = now - base_time;
2720         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2721         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2722             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2723             GST_TIME_ARGS (base_time));
2724       }
2725       GST_OBJECT_UNLOCK (element);
2726     }
2727
2728     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2729     gst_object_unref (element);
2730   } else {
2731     ret = GST_FLOW_OK;
2732   }
2733   return ret;
2734 }
2735
2736 /**
2737  * gst_rtsp_stream_recv_rtcp:
2738  * @stream: a #GstRTSPStream
2739  * @buffer: (transfer full): a #GstBuffer
2740  *
2741  * Handle an RTCP buffer for the stream. This method is usually called when a
2742  * message has been received from a client using the TCP transport.
2743  *
2744  * This function takes ownership of @buffer.
2745  *
2746  * Returns: a GstFlowReturn.
2747  */
2748 GstFlowReturn
2749 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2750 {
2751   GstRTSPStreamPrivate *priv;
2752   GstFlowReturn ret;
2753   GstElement *element;
2754
2755   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2756   priv = stream->priv;
2757   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2758
2759   if (!priv->is_joined) {
2760     gst_buffer_unref (buffer);
2761     return GST_FLOW_NOT_LINKED;
2762   }
2763   g_mutex_lock (&priv->lock);
2764   if (priv->appsrc[1])
2765     element = gst_object_ref (priv->appsrc[1]);
2766   else
2767     element = NULL;
2768   g_mutex_unlock (&priv->lock);
2769
2770   if (element) {
2771     if (priv->appsrc_base_time[1] == -1) {
2772       /* Take current running_time. This timestamp will be put on
2773        * the first buffer of each stream because we are a live source and so we
2774        * timestamp with the running_time. When we are dealing with TCP, we also
2775        * only timestamp the first buffer (using the DISCONT flag) because a server
2776        * typically bursts data, for which we don't want to compensate by speeding
2777        * up the media. The other timestamps will be interpollated from this one
2778        * using the RTP timestamps. */
2779       GST_OBJECT_LOCK (element);
2780       if (GST_ELEMENT_CLOCK (element)) {
2781         GstClockTime now;
2782         GstClockTime base_time;
2783
2784         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2785         base_time = GST_ELEMENT_CAST (element)->base_time;
2786
2787         priv->appsrc_base_time[1] = now - base_time;
2788         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
2789         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2790             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2791             GST_TIME_ARGS (base_time));
2792       }
2793       GST_OBJECT_UNLOCK (element);
2794     }
2795
2796     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2797     gst_object_unref (element);
2798   } else {
2799     ret = GST_FLOW_OK;
2800     gst_buffer_unref (buffer);
2801   }
2802   return ret;
2803 }
2804
2805 /* must be called with lock */
2806 static gboolean
2807 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2808     gboolean add)
2809 {
2810   GstRTSPStreamPrivate *priv = stream->priv;
2811   const GstRTSPTransport *tr;
2812
2813   tr = gst_rtsp_stream_transport_get_transport (trans);
2814
2815   switch (tr->lower_transport) {
2816     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2817     {
2818       GstRTSPMulticastTransportSource *source;
2819       GstBin *bin;
2820
2821       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2822
2823       if (add) {
2824         gchar *host;
2825         gint i;
2826         GstPad *selpad, *pad;
2827
2828         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2829         source->transport = trans;
2830
2831         for (i = 0; i < 2; i++) {
2832           host =
2833               g_strdup_printf ("udp://%s:%d", tr->destination,
2834               (i == 0) ? tr->port.min : tr->port.max);
2835           source->udpsrc[i] =
2836               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2837           g_free (host);
2838
2839           if (priv->srcpad) {
2840             /* we set and keep these to playing so that they don't cause NO_PREROLL return
2841              * values. This is only relevant for PLAY pipelines */
2842             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2843             gst_element_set_locked_state (source->udpsrc[i], TRUE);
2844           }
2845           /* add udpsrc */
2846           gst_bin_add (bin, source->udpsrc[i]);
2847
2848           /* and link to the funnel v4 */
2849           source->selpad[i] = selpad =
2850               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2851           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2852           gst_pad_link (pad, selpad);
2853           gst_object_unref (pad);
2854           gst_object_unref (selpad);
2855         }
2856
2857         priv->transport_sources =
2858             g_list_prepend (priv->transport_sources, source);
2859       } else {
2860         GList *l;
2861
2862         for (l = priv->transport_sources; l; l = l->next) {
2863           source = l->data;
2864
2865           if (source->transport == trans) {
2866             priv->transport_sources =
2867                 g_list_delete_link (priv->transport_sources, l);
2868             break;
2869           }
2870         }
2871
2872         if (l != NULL) {
2873           gint i;
2874
2875           for (i = 0; i < 2; i++) {
2876             /* Will automatically unlink everything */
2877             gst_bin_remove (bin,
2878                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2879
2880             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2881             gst_object_unref (source->udpsrc[i]);
2882
2883             gst_element_release_request_pad (priv->funnel[i],
2884                 source->selpad[i]);
2885           }
2886
2887           g_slice_free (GstRTSPMulticastTransportSource, source);
2888         }
2889       }
2890
2891       gst_object_unref (bin);
2892
2893       /* fall through for the generic case */
2894     }
2895     case GST_RTSP_LOWER_TRANS_UDP:
2896     {
2897       gchar *dest;
2898       gint min, max;
2899       guint ttl = 0;
2900
2901       dest = tr->destination;
2902       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2903         min = tr->port.min;
2904         max = tr->port.max;
2905         ttl = tr->ttl;
2906       } else {
2907         min = tr->client_port.min;
2908         max = tr->client_port.max;
2909       }
2910
2911       if (add) {
2912         if (ttl > 0) {
2913           GST_INFO ("setting ttl-mc %d", ttl);
2914           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2915           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2916         }
2917         GST_INFO ("adding %s:%d-%d", dest, min, max);
2918         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2919         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2920         priv->transports = g_list_prepend (priv->transports, trans);
2921       } else {
2922         GST_INFO ("removing %s:%d-%d", dest, min, max);
2923         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2924         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2925         priv->transports = g_list_remove (priv->transports, trans);
2926       }
2927       priv->transports_cookie++;
2928       break;
2929     }
2930     case GST_RTSP_LOWER_TRANS_TCP:
2931       if (add) {
2932         GST_INFO ("adding TCP %s", tr->destination);
2933         priv->transports = g_list_prepend (priv->transports, trans);
2934       } else {
2935         GST_INFO ("removing TCP %s", tr->destination);
2936         priv->transports = g_list_remove (priv->transports, trans);
2937       }
2938       priv->transports_cookie++;
2939       break;
2940     default:
2941       goto unknown_transport;
2942   }
2943   return TRUE;
2944
2945   /* ERRORS */
2946 unknown_transport:
2947   {
2948     GST_INFO ("Unknown transport %d", tr->lower_transport);
2949     return FALSE;
2950   }
2951 }
2952
2953
2954 /**
2955  * gst_rtsp_stream_add_transport:
2956  * @stream: a #GstRTSPStream
2957  * @trans: (transfer none): a #GstRTSPStreamTransport
2958  *
2959  * Add the transport in @trans to @stream. The media of @stream will
2960  * then also be send to the values configured in @trans.
2961  *
2962  * @stream must be joined to a bin.
2963  *
2964  * @trans must contain a valid #GstRTSPTransport.
2965  *
2966  * Returns: %TRUE if @trans was added
2967  */
2968 gboolean
2969 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2970     GstRTSPStreamTransport * trans)
2971 {
2972   GstRTSPStreamPrivate *priv;
2973   gboolean res;
2974
2975   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2976   priv = stream->priv;
2977   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2978   g_return_val_if_fail (priv->is_joined, FALSE);
2979
2980   g_mutex_lock (&priv->lock);
2981   res = update_transport (stream, trans, TRUE);
2982   g_mutex_unlock (&priv->lock);
2983
2984   return res;
2985 }
2986
2987 /**
2988  * gst_rtsp_stream_remove_transport:
2989  * @stream: a #GstRTSPStream
2990  * @trans: (transfer none): a #GstRTSPStreamTransport
2991  *
2992  * Remove the transport in @trans from @stream. The media of @stream will
2993  * not be sent to the values configured in @trans.
2994  *
2995  * @stream must be joined to a bin.
2996  *
2997  * @trans must contain a valid #GstRTSPTransport.
2998  *
2999  * Returns: %TRUE if @trans was removed
3000  */
3001 gboolean
3002 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3003     GstRTSPStreamTransport * trans)
3004 {
3005   GstRTSPStreamPrivate *priv;
3006   gboolean res;
3007
3008   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3009   priv = stream->priv;
3010   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3011   g_return_val_if_fail (priv->is_joined, FALSE);
3012
3013   g_mutex_lock (&priv->lock);
3014   res = update_transport (stream, trans, FALSE);
3015   g_mutex_unlock (&priv->lock);
3016
3017   return res;
3018 }
3019
3020 /**
3021  * gst_rtsp_stream_update_crypto:
3022  * @stream: a #GstRTSPStream
3023  * @ssrc: the SSRC
3024  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3025  *
3026  * Update the new crypto information for @ssrc in @stream. If information
3027  * for @ssrc did not exist, it will be added. If information
3028  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3029  * be removed from @stream.
3030  *
3031  * Returns: %TRUE if @crypto could be updated
3032  */
3033 gboolean
3034 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3035     guint ssrc, GstCaps * crypto)
3036 {
3037   GstRTSPStreamPrivate *priv;
3038
3039   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3040   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3041
3042   priv = stream->priv;
3043
3044   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3045
3046   g_mutex_lock (&priv->lock);
3047   if (crypto)
3048     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3049         gst_caps_ref (crypto));
3050   else
3051     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3052   g_mutex_unlock (&priv->lock);
3053
3054   return TRUE;
3055 }
3056
3057 /**
3058  * gst_rtsp_stream_get_rtp_socket:
3059  * @stream: a #GstRTSPStream
3060  * @family: the socket family
3061  *
3062  * Get the RTP socket from @stream for a @family.
3063  *
3064  * @stream must be joined to a bin.
3065  *
3066  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3067  * socket could be allocated for @family. Unref after usage
3068  */
3069 GSocket *
3070 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3071 {
3072   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3073   GSocket *socket;
3074   const gchar *name;
3075
3076   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3077   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3078       family == G_SOCKET_FAMILY_IPV6, NULL);
3079   g_return_val_if_fail (priv->udpsink[0], NULL);
3080
3081   if (family == G_SOCKET_FAMILY_IPV6)
3082     name = "socket-v6";
3083   else
3084     name = "socket";
3085
3086   g_object_get (priv->udpsink[0], name, &socket, NULL);
3087
3088   return socket;
3089 }
3090
3091 /**
3092  * gst_rtsp_stream_get_rtcp_socket:
3093  * @stream: a #GstRTSPStream
3094  * @family: the socket family
3095  *
3096  * Get the RTCP socket from @stream for a @family.
3097  *
3098  * @stream must be joined to a bin.
3099  *
3100  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3101  * socket could be allocated for @family. Unref after usage
3102  */
3103 GSocket *
3104 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3105 {
3106   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3107   GSocket *socket;
3108   const gchar *name;
3109
3110   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3111   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3112       family == G_SOCKET_FAMILY_IPV6, NULL);
3113   g_return_val_if_fail (priv->udpsink[1], NULL);
3114
3115   if (family == G_SOCKET_FAMILY_IPV6)
3116     name = "socket-v6";
3117   else
3118     name = "socket";
3119
3120   g_object_get (priv->udpsink[1], name, &socket, NULL);
3121
3122   return socket;
3123 }
3124
3125 /**
3126  * gst_rtsp_stream_set_seqnum:
3127  * @stream: a #GstRTSPStream
3128  * @seqnum: a new sequence number
3129  *
3130  * Configure the sequence number in the payloader of @stream to @seqnum.
3131  */
3132 void
3133 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3134 {
3135   GstRTSPStreamPrivate *priv;
3136
3137   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3138
3139   priv = stream->priv;
3140
3141   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3142 }
3143
3144 /**
3145  * gst_rtsp_stream_get_seqnum:
3146  * @stream: a #GstRTSPStream
3147  *
3148  * Get the configured sequence number in the payloader of @stream.
3149  *
3150  * Returns: the sequence number of the payloader.
3151  */
3152 guint16
3153 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3154 {
3155   GstRTSPStreamPrivate *priv;
3156   guint seqnum;
3157
3158   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3159
3160   priv = stream->priv;
3161
3162   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3163
3164   return seqnum;
3165 }
3166
3167 guint64
3168 gst_rtsp_stream_get_udp_sent_bytes (GstRTSPStream *stream)
3169 {
3170   GstRTSPStreamPrivate *priv;
3171   guint64 bytes = 0;
3172
3173   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3174
3175   priv = stream->priv;
3176
3177   g_object_get (G_OBJECT (priv->udpsink[0]), "bytes-to-serve", &bytes, NULL);
3178
3179   return bytes;
3180 }
3181
3182 /**
3183  * gst_rtsp_stream_transport_filter:
3184  * @stream: a #GstRTSPStream
3185  * @func: (scope call) (allow-none): a callback
3186  * @user_data: (closure): user data passed to @func
3187  *
3188  * Call @func for each transport managed by @stream. The result value of @func
3189  * determines what happens to the transport. @func will be called with @stream
3190  * locked so no further actions on @stream can be performed from @func.
3191  *
3192  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3193  * @stream.
3194  *
3195  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3196  *
3197  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3198  * will also be added with an additional ref to the result #GList of this
3199  * function..
3200  *
3201  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3202  *
3203  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3204  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3205  * element in the #GList should be unreffed before the list is freed.
3206  */
3207 GList *
3208 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3209     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3210 {
3211   GstRTSPStreamPrivate *priv;
3212   GList *result, *walk, *next;
3213   GHashTable *visited = NULL;
3214   guint cookie;
3215
3216   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3217
3218   priv = stream->priv;
3219
3220   result = NULL;
3221   if (func)
3222     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3223
3224   g_mutex_lock (&priv->lock);
3225 restart:
3226   cookie = priv->transports_cookie;
3227   for (walk = priv->transports; walk; walk = next) {
3228     GstRTSPStreamTransport *trans = walk->data;
3229     GstRTSPFilterResult res;
3230     gboolean changed;
3231
3232     next = g_list_next (walk);
3233
3234     if (func) {
3235       /* only visit each transport once */
3236       if (g_hash_table_contains (visited, trans))
3237         continue;
3238
3239       g_hash_table_add (visited, g_object_ref (trans));
3240       g_mutex_unlock (&priv->lock);
3241
3242       res = func (stream, trans, user_data);
3243
3244       g_mutex_lock (&priv->lock);
3245     } else
3246       res = GST_RTSP_FILTER_REF;
3247
3248     changed = (cookie != priv->transports_cookie);
3249
3250     switch (res) {
3251       case GST_RTSP_FILTER_REMOVE:
3252         update_transport (stream, trans, FALSE);
3253         break;
3254       case GST_RTSP_FILTER_REF:
3255         result = g_list_prepend (result, g_object_ref (trans));
3256         break;
3257       case GST_RTSP_FILTER_KEEP:
3258       default:
3259         break;
3260     }
3261     if (changed)
3262       goto restart;
3263   }
3264   g_mutex_unlock (&priv->lock);
3265
3266   if (func)
3267     g_hash_table_unref (visited);
3268
3269   return result;
3270 }
3271
3272 static GstPadProbeReturn
3273 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3274 {
3275   GstRTSPStreamPrivate *priv;
3276   GstRTSPStream *stream;
3277
3278   stream = user_data;
3279   priv = stream->priv;
3280
3281   GST_DEBUG_OBJECT (pad, "now blocking");
3282
3283   g_mutex_lock (&priv->lock);
3284   priv->blocking = TRUE;
3285   g_mutex_unlock (&priv->lock);
3286
3287   gst_element_post_message (priv->payloader,
3288       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3289           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3290
3291   return GST_PAD_PROBE_OK;
3292 }
3293
3294 /**
3295  * gst_rtsp_stream_set_blocked:
3296  * @stream: a #GstRTSPStream
3297  * @blocked: boolean indicating we should block or unblock
3298  *
3299  * Blocks or unblocks the dataflow on @stream.
3300  *
3301  * Returns: %TRUE on success
3302  */
3303 gboolean
3304 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3305 {
3306   GstRTSPStreamPrivate *priv;
3307
3308   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3309
3310   priv = stream->priv;
3311
3312   g_mutex_lock (&priv->lock);
3313   if (blocked) {
3314     priv->blocking = FALSE;
3315     if (priv->blocked_id == 0) {
3316       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3317           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3318           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3319           g_object_ref (stream), g_object_unref);
3320     }
3321   } else {
3322     if (priv->blocked_id != 0) {
3323       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3324       priv->blocked_id = 0;
3325       priv->blocking = FALSE;
3326     }
3327   }
3328   g_mutex_unlock (&priv->lock);
3329
3330   return TRUE;
3331 }
3332
3333 /**
3334  * gst_rtsp_stream_is_blocking:
3335  * @stream: a #GstRTSPStream
3336  *
3337  * Check if @stream is blocking on a #GstBuffer.
3338  *
3339  * Returns: %TRUE if @stream is blocking
3340  */
3341 gboolean
3342 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3343 {
3344   GstRTSPStreamPrivate *priv;
3345   gboolean result;
3346
3347   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3348
3349   priv = stream->priv;
3350
3351   g_mutex_lock (&priv->lock);
3352   result = priv->blocking;
3353   g_mutex_unlock (&priv->lock);
3354
3355   return result;
3356 }
3357
3358 /**
3359  * gst_rtsp_stream_query_position:
3360  * @stream: a #GstRTSPStream
3361  *
3362  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3363  * the RTP parts of the pipeline and not the RTCP parts.
3364  *
3365  * Returns: %TRUE if the position could be queried
3366  */
3367 gboolean
3368 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3369 {
3370   GstRTSPStreamPrivate *priv;
3371   GstElement *sink;
3372   gboolean ret;
3373
3374   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3375
3376   priv = stream->priv;
3377
3378   g_mutex_lock (&priv->lock);
3379   if ((sink = priv->udpsink[0]))
3380     gst_object_ref (sink);
3381   g_mutex_unlock (&priv->lock);
3382
3383   if (!sink)
3384     return FALSE;
3385
3386   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3387   gst_object_unref (sink);
3388
3389   return ret;
3390 }
3391
3392 /**
3393  * gst_rtsp_stream_query_stop:
3394  * @stream: a #GstRTSPStream
3395  *
3396  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3397  * the RTP parts of the pipeline and not the RTCP parts.
3398  *
3399  * Returns: %TRUE if the stop could be queried
3400  */
3401 gboolean
3402 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3403 {
3404   GstRTSPStreamPrivate *priv;
3405   GstElement *sink;
3406   GstQuery *query;
3407   gboolean ret;
3408
3409   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3410
3411   priv = stream->priv;
3412
3413   g_mutex_lock (&priv->lock);
3414   if ((sink = priv->udpsink[0]))
3415     gst_object_ref (sink);
3416   g_mutex_unlock (&priv->lock);
3417
3418   if (!sink)
3419     return FALSE;
3420
3421   query = gst_query_new_segment (GST_FORMAT_TIME);
3422   if ((ret = gst_element_query (sink, query))) {
3423     GstFormat format;
3424
3425     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3426     if (format != GST_FORMAT_TIME)
3427       *stop = -1;
3428   }
3429   gst_query_unref (query);
3430   gst_object_unref (sink);
3431
3432   return ret;
3433
3434 }