Add AL-FEC feature
[platform/upstream/gst-rtsp-server.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83   gchar *control;
84
85   GstRTSPProfile profiles;
86   GstRTSPLowerTrans protocols;
87
88   /* pads on the rtpbin */
89   GstPad *send_rtp_sink;
90   GstPad *recv_rtp_src;
91   GstPad *recv_sink[2];
92   GstPad *send_src[2];
93
94   /* the RTPSession object */
95   GObject *session;
96
97   /* SRTP encoder/decoder */
98   GstElement *srtpenc;
99   GstElement *srtpdec;
100   GHashTable *keys;
101
102   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
103    * sockets */
104   GstElement *udpsrc_v4[2];
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
107    * sockets */
108   GstElement *udpsrc_v6[2];
109
110   GstElement *udpqueue[2];
111   GstElement *udpsink[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   guint rtx_pt;
125   GstClockTime rtx_time;
126
127   /* server ports for sending/receiving over ipv4 */
128   GstRTSPRange server_port_v4;
129   GstRTSPAddress *server_addr_v4;
130   gboolean have_ipv4;
131
132   /* server ports for sending/receiving over ipv6 */
133   GstRTSPRange server_port_v6;
134   GstRTSPAddress *server_addr_v6;
135   gboolean have_ipv6;
136
137   /* multicast addresses */
138   GstRTSPAddressPool *pool;
139   GstRTSPAddress *addr_v4;
140   GstRTSPAddress *addr_v6;
141
142   /* the caps of the stream */
143   gulong caps_sig;
144   GstCaps *caps;
145
146   /* transports we stream to */
147   guint n_active;
148   GList *transports;
149   guint transports_cookie;
150   GList *tr_cache_rtp;
151   GList *tr_cache_rtcp;
152   guint tr_cache_cookie_rtp;
153   guint tr_cache_cookie_rtcp;
154
155
156   /* UDP sources for UDP multicast transports */
157   GList *transport_sources;
158
159   gint dscp_qos;
160
161   /* stream blocking */
162   gulong blocked_id;
163   gboolean blocking;
164
165   /* pt->caps map for RECORD streams */
166   GHashTable *ptmap;
167 };
168
169 #define DEFAULT_CONTROL         NULL
170 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
171 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
172                                         GST_RTSP_LOWER_TRANS_TCP
173
174 enum
175 {
176   PROP_0,
177   PROP_CONTROL,
178   PROP_PROFILES,
179   PROP_PROTOCOLS,
180   PROP_LAST
181 };
182
183 enum
184 {
185   SIGNAL_NEW_RTP_ENCODER,
186   SIGNAL_NEW_RTCP_ENCODER,
187   SIGNAL_RTCP_STATS,
188   SIGNAL_LAST
189 };
190
191 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
192 #define GST_CAT_DEFAULT rtsp_stream_debug
193
194 static GQuark ssrc_stream_map_key;
195
196 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
197     GValue * value, GParamSpec * pspec);
198 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
199     const GValue * value, GParamSpec * pspec);
200
201 static void gst_rtsp_stream_finalize (GObject * obj);
202
203 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
204
205 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
206
207 static void
208 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
209 {
210   GObjectClass *gobject_class;
211
212   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
213
214   gobject_class = G_OBJECT_CLASS (klass);
215
216   gobject_class->get_property = gst_rtsp_stream_get_property;
217   gobject_class->set_property = gst_rtsp_stream_set_property;
218   gobject_class->finalize = gst_rtsp_stream_finalize;
219
220   g_object_class_install_property (gobject_class, PROP_CONTROL,
221       g_param_spec_string ("control", "Control",
222           "The control string for this stream", DEFAULT_CONTROL,
223           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
224
225   g_object_class_install_property (gobject_class, PROP_PROFILES,
226       g_param_spec_flags ("profiles", "Profiles",
227           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
228           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
229
230   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
231       g_param_spec_flags ("protocols", "Protocols",
232           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
233           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
234
235   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
236       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
237       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
238       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
239
240   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
241       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
242       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
243       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
244
245   gst_rtsp_stream_signals[SIGNAL_RTCP_STATS] =
246       g_signal_new ("rtcp-statistics", G_TYPE_FROM_CLASS (klass),
247       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
248       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE);
249
250   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
251
252   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
253 }
254
255 static void
256 gst_rtsp_stream_init (GstRTSPStream * stream)
257 {
258   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
259
260   GST_DEBUG ("new stream %p", stream);
261
262   stream->priv = priv;
263
264   priv->dscp_qos = -1;
265   priv->control = g_strdup (DEFAULT_CONTROL);
266   priv->profiles = DEFAULT_PROFILES;
267   priv->protocols = DEFAULT_PROTOCOLS;
268
269   g_mutex_init (&priv->lock);
270
271   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
272       NULL, (GDestroyNotify) gst_caps_unref);
273   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
274       (GDestroyNotify) gst_caps_unref);
275 }
276
277 static void
278 gst_rtsp_stream_finalize (GObject * obj)
279 {
280   GstRTSPStream *stream;
281   GstRTSPStreamPrivate *priv;
282
283   stream = GST_RTSP_STREAM (obj);
284   priv = stream->priv;
285
286   GST_DEBUG ("finalize stream %p", stream);
287
288   /* we really need to be unjoined now */
289   g_return_if_fail (!priv->is_joined);
290
291   if (priv->addr_v4)
292     gst_rtsp_address_free (priv->addr_v4);
293   if (priv->addr_v6)
294     gst_rtsp_address_free (priv->addr_v6);
295   if (priv->server_addr_v4)
296     gst_rtsp_address_free (priv->server_addr_v4);
297   if (priv->server_addr_v6)
298     gst_rtsp_address_free (priv->server_addr_v6);
299   if (priv->pool)
300     g_object_unref (priv->pool);
301   if (priv->rtxsend)
302     g_object_unref (priv->rtxsend);
303
304   gst_object_unref (priv->payloader);
305   if (priv->srcpad)
306     gst_object_unref (priv->srcpad);
307   if (priv->sinkpad)
308     gst_object_unref (priv->sinkpad);
309   g_free (priv->control);
310   g_mutex_clear (&priv->lock);
311
312   g_hash_table_unref (priv->keys);
313   g_hash_table_destroy (priv->ptmap);
314
315   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
316 }
317
318 static void
319 gst_rtsp_stream_get_property (GObject * object, guint propid,
320     GValue * value, GParamSpec * pspec)
321 {
322   GstRTSPStream *stream = GST_RTSP_STREAM (object);
323
324   switch (propid) {
325     case PROP_CONTROL:
326       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
327       break;
328     case PROP_PROFILES:
329       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
330       break;
331     case PROP_PROTOCOLS:
332       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
333       break;
334     default:
335       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
336   }
337 }
338
339 static void
340 gst_rtsp_stream_set_property (GObject * object, guint propid,
341     const GValue * value, GParamSpec * pspec)
342 {
343   GstRTSPStream *stream = GST_RTSP_STREAM (object);
344
345   switch (propid) {
346     case PROP_CONTROL:
347       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
348       break;
349     case PROP_PROFILES:
350       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
351       break;
352     case PROP_PROTOCOLS:
353       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
354       break;
355     default:
356       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
357   }
358 }
359
360 /**
361  * gst_rtsp_stream_new:
362  * @idx: an index
363  * @pad: a #GstPad
364  * @payloader: a #GstElement
365  *
366  * Create a new media stream with index @idx that handles RTP data on
367  * @pad and has a payloader element @payloader if @pad is a source pad
368  * or a depayloader element @payloader if @pad is a sink pad.
369  *
370  * Returns: (transfer full): a new #GstRTSPStream
371  */
372 GstRTSPStream *
373 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
374 {
375   GstRTSPStreamPrivate *priv;
376   GstRTSPStream *stream;
377
378   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
379   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
380
381   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
382   priv = stream->priv;
383   priv->idx = idx;
384   priv->payloader = gst_object_ref (payloader);
385   if (GST_PAD_IS_SRC (pad))
386     priv->srcpad = gst_object_ref (pad);
387   else
388     priv->sinkpad = gst_object_ref (pad);
389
390   return stream;
391 }
392
393 /**
394  * gst_rtsp_stream_get_index:
395  * @stream: a #GstRTSPStream
396  *
397  * Get the stream index.
398  *
399  * Return: the stream index.
400  */
401 guint
402 gst_rtsp_stream_get_index (GstRTSPStream * stream)
403 {
404   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
405
406   return stream->priv->idx;
407 }
408
409 /**
410  * gst_rtsp_stream_get_pt:
411  * @stream: a #GstRTSPStream
412  *
413  * Get the stream payload type.
414  *
415  * Return: the stream payload type.
416  */
417 guint
418 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
419 {
420   GstRTSPStreamPrivate *priv;
421   guint pt;
422
423   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
424
425   priv = stream->priv;
426
427   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
428
429   return pt;
430 }
431
432 /**
433  * gst_rtsp_stream_get_srcpad:
434  * @stream: a #GstRTSPStream
435  *
436  * Get the srcpad associated with @stream.
437  *
438  * Returns: (transfer full): the srcpad. Unref after usage.
439  */
440 GstPad *
441 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
442 {
443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
444
445   if (!stream->priv->srcpad)
446     return NULL;
447
448   return gst_object_ref (stream->priv->srcpad);
449 }
450
451 /**
452  * gst_rtsp_stream_get_sinkpad:
453  * @stream: a #GstRTSPStream
454  *
455  * Get the sinkpad associated with @stream.
456  *
457  * Returns: (transfer full): the sinkpad. Unref after usage.
458  */
459 GstPad *
460 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
461 {
462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
463
464   if (!stream->priv->sinkpad)
465     return NULL;
466
467   return gst_object_ref (stream->priv->sinkpad);
468 }
469
470 /**
471  * gst_rtsp_stream_get_control:
472  * @stream: a #GstRTSPStream
473  *
474  * Get the control string to identify this stream.
475  *
476  * Returns: (transfer full): the control string. g_free() after usage.
477  */
478 gchar *
479 gst_rtsp_stream_get_control (GstRTSPStream * stream)
480 {
481   GstRTSPStreamPrivate *priv;
482   gchar *result;
483
484   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
485
486   priv = stream->priv;
487
488   g_mutex_lock (&priv->lock);
489   if ((result = g_strdup (priv->control)) == NULL)
490     result = g_strdup_printf ("stream=%u", priv->idx);
491   g_mutex_unlock (&priv->lock);
492
493   return result;
494 }
495
496 /**
497  * gst_rtsp_stream_set_control:
498  * @stream: a #GstRTSPStream
499  * @control: a control string
500  *
501  * Set the control string in @stream.
502  */
503 void
504 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
505 {
506   GstRTSPStreamPrivate *priv;
507
508   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
509
510   priv = stream->priv;
511
512   g_mutex_lock (&priv->lock);
513   g_free (priv->control);
514   priv->control = g_strdup (control);
515   g_mutex_unlock (&priv->lock);
516 }
517
518 /**
519  * gst_rtsp_stream_has_control:
520  * @stream: a #GstRTSPStream
521  * @control: a control string
522  *
523  * Check if @stream has the control string @control.
524  *
525  * Returns: %TRUE is @stream has @control as the control string
526  */
527 gboolean
528 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
529 {
530   GstRTSPStreamPrivate *priv;
531   gboolean res;
532
533   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
534
535   priv = stream->priv;
536
537   g_mutex_lock (&priv->lock);
538   if (priv->control)
539     res = (g_strcmp0 (priv->control, control) == 0);
540   else {
541     guint streamid;
542
543     if (sscanf (control, "stream=%u", &streamid) > 0)
544       res = (streamid == priv->idx);
545     else
546       res = FALSE;
547   }
548   g_mutex_unlock (&priv->lock);
549
550   return res;
551 }
552
553 /**
554  * gst_rtsp_stream_set_mtu:
555  * @stream: a #GstRTSPStream
556  * @mtu: a new MTU
557  *
558  * Configure the mtu in the payloader of @stream to @mtu.
559  */
560 void
561 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
562 {
563   GstRTSPStreamPrivate *priv;
564
565   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
566
567   priv = stream->priv;
568
569   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
570
571   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
572 }
573
574 /**
575  * gst_rtsp_stream_get_mtu:
576  * @stream: a #GstRTSPStream
577  *
578  * Get the configured MTU in the payloader of @stream.
579  *
580  * Returns: the MTU of the payloader.
581  */
582 guint
583 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
584 {
585   GstRTSPStreamPrivate *priv;
586   guint mtu;
587
588   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
589
590   priv = stream->priv;
591
592   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
593
594   return mtu;
595 }
596
597 /* Update the dscp qos property on the udp sinks */
598 static void
599 update_dscp_qos (GstRTSPStream * stream)
600 {
601   GstRTSPStreamPrivate *priv;
602
603   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
604
605   priv = stream->priv;
606
607   if (priv->udpsink[0]) {
608     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
609         NULL);
610   }
611
612   if (priv->udpsink[1]) {
613     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
614         NULL);
615   }
616 }
617
618 /**
619  * gst_rtsp_stream_set_dscp_qos:
620  * @stream: a #GstRTSPStream
621  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
622  *
623  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
624  */
625 void
626 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
627 {
628   GstRTSPStreamPrivate *priv;
629
630   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
631
632   priv = stream->priv;
633
634   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
635
636   if (dscp_qos < -1 || dscp_qos > 63) {
637     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
638     return;
639   }
640
641   priv->dscp_qos = dscp_qos;
642
643   update_dscp_qos (stream);
644 }
645
646 /**
647  * gst_rtsp_stream_get_dscp_qos:
648  * @stream: a #GstRTSPStream
649  *
650  * Get the configured DSCP QoS in of the outgoing sockets.
651  *
652  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
653  */
654 gint
655 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
656 {
657   GstRTSPStreamPrivate *priv;
658
659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
660
661   priv = stream->priv;
662
663   return priv->dscp_qos;
664 }
665
666 /**
667  * gst_rtsp_stream_is_transport_supported:
668  * @stream: a #GstRTSPStream
669  * @transport: (transfer none): a #GstRTSPTransport
670  *
671  * Check if @transport can be handled by stream
672  *
673  * Returns: %TRUE if @transport can be handled by @stream.
674  */
675 gboolean
676 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
677     GstRTSPTransport * transport)
678 {
679   GstRTSPStreamPrivate *priv;
680
681   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
682
683   priv = stream->priv;
684
685   g_mutex_lock (&priv->lock);
686   if (transport->trans != GST_RTSP_TRANS_RTP)
687     goto unsupported_transmode;
688
689   if (!(transport->profile & priv->profiles))
690     goto unsupported_profile;
691
692   if (!(transport->lower_transport & priv->protocols))
693     goto unsupported_ltrans;
694
695   g_mutex_unlock (&priv->lock);
696
697   return TRUE;
698
699   /* ERRORS */
700 unsupported_transmode:
701   {
702     GST_DEBUG ("unsupported transport mode %d", transport->trans);
703     g_mutex_unlock (&priv->lock);
704     return FALSE;
705   }
706 unsupported_profile:
707   {
708     GST_DEBUG ("unsupported profile %d", transport->profile);
709     g_mutex_unlock (&priv->lock);
710     return FALSE;
711   }
712 unsupported_ltrans:
713   {
714     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
715     g_mutex_unlock (&priv->lock);
716     return FALSE;
717   }
718 }
719
720 /**
721  * gst_rtsp_stream_set_profiles:
722  * @stream: a #GstRTSPStream
723  * @profiles: the new profiles
724  *
725  * Configure the allowed profiles for @stream.
726  */
727 void
728 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
729 {
730   GstRTSPStreamPrivate *priv;
731
732   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
733
734   priv = stream->priv;
735
736   g_mutex_lock (&priv->lock);
737   priv->profiles = profiles;
738   g_mutex_unlock (&priv->lock);
739 }
740
741 /**
742  * gst_rtsp_stream_get_profiles:
743  * @stream: a #GstRTSPStream
744  *
745  * Get the allowed profiles of @stream.
746  *
747  * Returns: a #GstRTSPProfile
748  */
749 GstRTSPProfile
750 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
751 {
752   GstRTSPStreamPrivate *priv;
753   GstRTSPProfile res;
754
755   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
756
757   priv = stream->priv;
758
759   g_mutex_lock (&priv->lock);
760   res = priv->profiles;
761   g_mutex_unlock (&priv->lock);
762
763   return res;
764 }
765
766 /**
767  * gst_rtsp_stream_set_protocols:
768  * @stream: a #GstRTSPStream
769  * @protocols: the new flags
770  *
771  * Configure the allowed lower transport for @stream.
772  */
773 void
774 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
775     GstRTSPLowerTrans protocols)
776 {
777   GstRTSPStreamPrivate *priv;
778
779   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
780
781   priv = stream->priv;
782
783   g_mutex_lock (&priv->lock);
784   priv->protocols = protocols;
785   g_mutex_unlock (&priv->lock);
786 }
787
788 /**
789  * gst_rtsp_stream_get_protocols:
790  * @stream: a #GstRTSPStream
791  *
792  * Get the allowed protocols of @stream.
793  *
794  * Returns: a #GstRTSPLowerTrans
795  */
796 GstRTSPLowerTrans
797 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
798 {
799   GstRTSPStreamPrivate *priv;
800   GstRTSPLowerTrans res;
801
802   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
803       GST_RTSP_LOWER_TRANS_UNKNOWN);
804
805   priv = stream->priv;
806
807   g_mutex_lock (&priv->lock);
808   res = priv->protocols;
809   g_mutex_unlock (&priv->lock);
810
811   return res;
812 }
813
814 /**
815  * gst_rtsp_stream_set_address_pool:
816  * @stream: a #GstRTSPStream
817  * @pool: (transfer none): a #GstRTSPAddressPool
818  *
819  * configure @pool to be used as the address pool of @stream.
820  */
821 void
822 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
823     GstRTSPAddressPool * pool)
824 {
825   GstRTSPStreamPrivate *priv;
826   GstRTSPAddressPool *old;
827
828   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
829
830   priv = stream->priv;
831
832   GST_LOG_OBJECT (stream, "set address pool %p", pool);
833
834   g_mutex_lock (&priv->lock);
835   if ((old = priv->pool) != pool)
836     priv->pool = pool ? g_object_ref (pool) : NULL;
837   else
838     old = NULL;
839   g_mutex_unlock (&priv->lock);
840
841   if (old)
842     g_object_unref (old);
843 }
844
845 /**
846  * gst_rtsp_stream_get_address_pool:
847  * @stream: a #GstRTSPStream
848  *
849  * Get the #GstRTSPAddressPool used as the address pool of @stream.
850  *
851  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
852  * usage.
853  */
854 GstRTSPAddressPool *
855 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
856 {
857   GstRTSPStreamPrivate *priv;
858   GstRTSPAddressPool *result;
859
860   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
861
862   priv = stream->priv;
863
864   g_mutex_lock (&priv->lock);
865   if ((result = priv->pool))
866     g_object_ref (result);
867   g_mutex_unlock (&priv->lock);
868
869   return result;
870 }
871
872 /**
873  * gst_rtsp_stream_get_multicast_address:
874  * @stream: a #GstRTSPStream
875  * @family: the #GSocketFamily
876  *
877  * Get the multicast address of @stream for @family.
878  *
879  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
880  * or %NULL when no address could be allocated. gst_rtsp_address_free()
881  * after usage.
882  */
883 GstRTSPAddress *
884 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
885     GSocketFamily family)
886 {
887   GstRTSPStreamPrivate *priv;
888   GstRTSPAddress *result;
889   GstRTSPAddress **addrp;
890   GstRTSPAddressFlags flags;
891
892   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
893
894   priv = stream->priv;
895
896   if (family == G_SOCKET_FAMILY_IPV6) {
897     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
898     addrp = &priv->addr_v6;
899   } else {
900     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
901     addrp = &priv->addr_v4;
902   }
903
904   g_mutex_lock (&priv->lock);
905   if (*addrp == NULL) {
906     if (priv->pool == NULL)
907       goto no_pool;
908
909     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
910
911     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
912     if (*addrp == NULL)
913       goto no_address;
914   }
915   result = gst_rtsp_address_copy (*addrp);
916   g_mutex_unlock (&priv->lock);
917
918   return result;
919
920   /* ERRORS */
921 no_pool:
922   {
923     GST_ERROR_OBJECT (stream, "no address pool specified");
924     g_mutex_unlock (&priv->lock);
925     return NULL;
926   }
927 no_address:
928   {
929     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
930     g_mutex_unlock (&priv->lock);
931     return NULL;
932   }
933 }
934
935 /**
936  * gst_rtsp_stream_reserve_address:
937  * @stream: a #GstRTSPStream
938  * @address: an address
939  * @port: a port
940  * @n_ports: n_ports
941  * @ttl: a TTL
942  *
943  * Reserve @address and @port as the address and port of @stream.
944  *
945  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
946  * the address could be reserved. gst_rtsp_address_free() after usage.
947  */
948 GstRTSPAddress *
949 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
950     const gchar * address, guint port, guint n_ports, guint ttl)
951 {
952   GstRTSPStreamPrivate *priv;
953   GstRTSPAddress *result;
954   GInetAddress *addr;
955   GSocketFamily family;
956   GstRTSPAddress **addrp;
957
958   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
959   g_return_val_if_fail (address != NULL, NULL);
960   g_return_val_if_fail (port > 0, NULL);
961   g_return_val_if_fail (n_ports > 0, NULL);
962   g_return_val_if_fail (ttl > 0, NULL);
963
964   priv = stream->priv;
965
966   addr = g_inet_address_new_from_string (address);
967   if (!addr) {
968     GST_ERROR ("failed to get inet addr from %s", address);
969     family = G_SOCKET_FAMILY_IPV4;
970   } else {
971     family = g_inet_address_get_family (addr);
972     g_object_unref (addr);
973   }
974
975   if (family == G_SOCKET_FAMILY_IPV6)
976     addrp = &priv->addr_v6;
977   else
978     addrp = &priv->addr_v4;
979
980   g_mutex_lock (&priv->lock);
981   if (*addrp == NULL) {
982     GstRTSPAddressPoolResult res;
983
984     if (priv->pool == NULL)
985       goto no_pool;
986
987     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
988         port, n_ports, ttl, addrp);
989     if (res != GST_RTSP_ADDRESS_POOL_OK)
990       goto no_address;
991   } else {
992     if (strcmp ((*addrp)->address, address) ||
993         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
994         (*addrp)->ttl != ttl)
995       goto different_address;
996   }
997   result = gst_rtsp_address_copy (*addrp);
998   g_mutex_unlock (&priv->lock);
999
1000   return result;
1001
1002   /* ERRORS */
1003 no_pool:
1004   {
1005     GST_ERROR_OBJECT (stream, "no address pool specified");
1006     g_mutex_unlock (&priv->lock);
1007     return NULL;
1008   }
1009 no_address:
1010   {
1011     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1012         address);
1013     g_mutex_unlock (&priv->lock);
1014     return NULL;
1015   }
1016 different_address:
1017   {
1018     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1019         " reserved", address);
1020     g_mutex_unlock (&priv->lock);
1021     return NULL;
1022   }
1023 }
1024
1025 static gboolean
1026 alloc_ports_one_family (GstRTSPStream * stream, GstRTSPAddressPool * pool,
1027     gint buffer_size, GSocketFamily family, GstElement * udpsrc_out[2],
1028     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
1029     GstRTSPAddress ** server_addr_out)
1030 {
1031   GstRTSPStreamPrivate *priv = stream->priv;
1032   GstStateChangeReturn ret;
1033   GstElement *udpsrc0, *udpsrc1;
1034   GstElement *udpsink0, *udpsink1;
1035   GSocket *rtp_socket = NULL;
1036   GSocket *rtcp_socket;
1037   gint tmp_rtp, tmp_rtcp;
1038   guint count;
1039   gint rtpport, rtcpport;
1040   GList *rejected_addresses = NULL;
1041   GstRTSPAddress *addr = NULL;
1042   GInetAddress *inetaddr = NULL;
1043   GSocketAddress *rtp_sockaddr = NULL;
1044   GSocketAddress *rtcp_sockaddr = NULL;
1045   const gchar *multisink_socket;
1046
1047   if (family == G_SOCKET_FAMILY_IPV6)
1048     multisink_socket = "socket-v6";
1049   else
1050     multisink_socket = "socket";
1051
1052   udpsrc0 = NULL;
1053   udpsrc1 = NULL;
1054   udpsink0 = NULL;
1055   udpsink1 = NULL;
1056   count = 0;
1057
1058   /* Start with random port */
1059   tmp_rtp = 0;
1060
1061   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1062       G_SOCKET_PROTOCOL_UDP, NULL);
1063   if (!rtcp_socket)
1064     goto no_udp_protocol;
1065
1066   if (*server_addr_out)
1067     gst_rtsp_address_free (*server_addr_out);
1068
1069   /* try to allocate 2 UDP ports, the RTP port should be an even
1070    * number and the RTCP port should be the next (uneven) port */
1071 again:
1072
1073   if (rtp_socket == NULL) {
1074     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1075         G_SOCKET_PROTOCOL_UDP, NULL);
1076     if (!rtp_socket)
1077       goto no_udp_protocol;
1078   }
1079
1080   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1081     GstRTSPAddressFlags flags;
1082
1083     if (addr)
1084       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1085
1086     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1087     if (family == G_SOCKET_FAMILY_IPV6)
1088       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1089     else
1090       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1091
1092     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1093
1094     if (addr == NULL)
1095       goto no_ports;
1096
1097     tmp_rtp = addr->port;
1098
1099     g_clear_object (&inetaddr);
1100     inetaddr = g_inet_address_new_from_string (addr->address);
1101   } else {
1102     if (tmp_rtp != 0) {
1103       tmp_rtp += 2;
1104       if (++count > 20)
1105         goto no_ports;
1106     }
1107
1108     if (inetaddr == NULL)
1109       inetaddr = g_inet_address_new_any (family);
1110   }
1111
1112   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1113   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1114     g_object_unref (rtp_sockaddr);
1115     goto again;
1116   }
1117   g_object_unref (rtp_sockaddr);
1118
1119   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1120   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1121     g_clear_object (&rtp_sockaddr);
1122     goto socket_error;
1123   }
1124
1125   tmp_rtp =
1126       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1127   g_object_unref (rtp_sockaddr);
1128
1129   /* check if port is even */
1130   if ((tmp_rtp & 1) != 0) {
1131     /* port not even, close and allocate another */
1132     tmp_rtp++;
1133     g_clear_object (&rtp_socket);
1134     goto again;
1135   }
1136
1137   /* set port */
1138   tmp_rtcp = tmp_rtp + 1;
1139
1140   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1141   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1142     g_object_unref (rtcp_sockaddr);
1143     g_clear_object (&rtp_socket);
1144     goto again;
1145   }
1146   g_object_unref (rtcp_sockaddr);
1147
1148   g_clear_object (&inetaddr);
1149
1150   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1151   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1152
1153   if (udpsrc0 == NULL || udpsrc1 == NULL)
1154     goto no_udp_protocol;
1155
1156   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1157   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1158
1159   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1160   if (ret == GST_STATE_CHANGE_FAILURE)
1161     goto element_error;
1162   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1163   if (ret == GST_STATE_CHANGE_FAILURE)
1164     goto element_error;
1165
1166   /* all fine, do port check */
1167   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1168   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1169
1170   /* this should not happen... */
1171   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1172     goto port_error;
1173
1174   if (udpsink_out[0])
1175     udpsink0 = udpsink_out[0];
1176   else
1177     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1178
1179   if (!udpsink0)
1180     goto no_udp_protocol;
1181
1182   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1183   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1184
1185   if (udpsink_out[1])
1186     udpsink1 = udpsink_out[1];
1187   else
1188     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1189
1190   if (!udpsink1)
1191     goto no_udp_protocol;
1192
1193   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1194   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1195   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1196
1197   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1198   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1199   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1200   /* Needs to be async for RECORD streams, otherwise we will never go to
1201    * PLAYING because the sinks will wait for data while the udpsrc can't
1202    * provide data with timestamps in PAUSED. */
1203   if (priv->sinkpad)
1204     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1205   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1206   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1207   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1208   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1209   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1210
1211   /* we keep these elements, we will further configure them when the
1212    * client told us to really use the UDP ports. */
1213   udpsrc_out[0] = udpsrc0;
1214   udpsrc_out[1] = udpsrc1;
1215   udpsink_out[0] = udpsink0;
1216   udpsink_out[1] = udpsink1;
1217
1218   server_port_out->min = rtpport;
1219   server_port_out->max = rtcpport;
1220
1221   *server_addr_out = addr;
1222   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1223
1224   g_object_unref (rtp_socket);
1225   g_object_unref (rtcp_socket);
1226
1227   return TRUE;
1228
1229   /* ERRORS */
1230 no_udp_protocol:
1231   {
1232     goto cleanup;
1233   }
1234 no_ports:
1235   {
1236     goto cleanup;
1237   }
1238 port_error:
1239   {
1240     goto cleanup;
1241   }
1242 socket_error:
1243   {
1244     goto cleanup;
1245   }
1246 element_error:
1247   {
1248     goto cleanup;
1249   }
1250 cleanup:
1251   {
1252     if (udpsrc0) {
1253       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1254       gst_object_unref (udpsrc0);
1255     }
1256     if (udpsrc1) {
1257       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1258       gst_object_unref (udpsrc1);
1259     }
1260     if (udpsink0) {
1261       gst_element_set_state (udpsink0, GST_STATE_NULL);
1262       gst_object_unref (udpsink0);
1263     }
1264     if (inetaddr)
1265       g_object_unref (inetaddr);
1266     g_list_free_full (rejected_addresses,
1267         (GDestroyNotify) gst_rtsp_address_free);
1268     if (addr)
1269       gst_rtsp_address_free (addr);
1270     if (rtp_socket)
1271       g_object_unref (rtp_socket);
1272     if (rtcp_socket)
1273       g_object_unref (rtcp_socket);
1274     return FALSE;
1275   }
1276 }
1277
1278 /* must be called with lock */
1279 static gboolean
1280 alloc_ports (GstRTSPStream * stream)
1281 {
1282   GstRTSPStreamPrivate *priv = stream->priv;
1283
1284   priv->have_ipv4 =
1285       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1286       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1287       &priv->server_port_v4, &priv->server_addr_v4);
1288
1289   priv->have_ipv6 =
1290       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1291       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1292       &priv->server_port_v6, &priv->server_addr_v6);
1293
1294   return priv->have_ipv4 || priv->have_ipv6;
1295 }
1296
1297 /**
1298  * gst_rtsp_stream_get_server_port:
1299  * @stream: a #GstRTSPStream
1300  * @server_port: (out): result server port
1301  * @family: the port family to get
1302  *
1303  * Fill @server_port with the port pair used by the server. This function can
1304  * only be called when @stream has been joined.
1305  */
1306 void
1307 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1308     GstRTSPRange * server_port, GSocketFamily family)
1309 {
1310   GstRTSPStreamPrivate *priv;
1311
1312   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1313   priv = stream->priv;
1314   g_return_if_fail (priv->is_joined);
1315
1316   g_mutex_lock (&priv->lock);
1317   if (family == G_SOCKET_FAMILY_IPV4) {
1318     if (server_port)
1319       *server_port = priv->server_port_v4;
1320   } else {
1321     if (server_port)
1322       *server_port = priv->server_port_v6;
1323   }
1324   g_mutex_unlock (&priv->lock);
1325 }
1326
1327 /**
1328  * gst_rtsp_stream_get_rtpsession:
1329  * @stream: a #GstRTSPStream
1330  *
1331  * Get the RTP session of this stream.
1332  *
1333  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1334  */
1335 GObject *
1336 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1337 {
1338   GstRTSPStreamPrivate *priv;
1339   GObject *session;
1340
1341   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1342
1343   priv = stream->priv;
1344
1345   g_mutex_lock (&priv->lock);
1346   if ((session = priv->session))
1347     g_object_ref (session);
1348   g_mutex_unlock (&priv->lock);
1349
1350   return session;
1351 }
1352
1353 /**
1354  * gst_rtsp_stream_get_ssrc:
1355  * @stream: a #GstRTSPStream
1356  * @ssrc: (out): result ssrc
1357  *
1358  * Get the SSRC used by the RTP session of this stream. This function can only
1359  * be called when @stream has been joined.
1360  */
1361 void
1362 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1363 {
1364   GstRTSPStreamPrivate *priv;
1365
1366   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1367   priv = stream->priv;
1368   g_return_if_fail (priv->is_joined);
1369
1370   g_mutex_lock (&priv->lock);
1371   if (ssrc && priv->session)
1372     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1373   g_mutex_unlock (&priv->lock);
1374 }
1375
1376 /**
1377  * gst_rtsp_stream_set_retransmission_time:
1378  * @stream: a #GstRTSPStream
1379  * @time: a #GstClockTime
1380  *
1381  * Set the amount of time to store retransmission packets.
1382  */
1383 void
1384 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1385     GstClockTime time)
1386 {
1387   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1388
1389   g_mutex_lock (&stream->priv->lock);
1390   stream->priv->rtx_time = time;
1391   if (stream->priv->rtxsend)
1392     g_object_set (stream->priv->rtxsend, "max-size-time",
1393         GST_TIME_AS_MSECONDS (time), NULL);
1394   g_mutex_unlock (&stream->priv->lock);
1395 }
1396
1397 /**
1398  * gst_rtsp_stream_get_retransmission_time:
1399  * @stream: a #GstRTSPStream
1400  *
1401  * Get the amount of time to store retransmission data.
1402  *
1403  * Returns: the amount of time to store retransmission data.
1404  */
1405 GstClockTime
1406 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1407 {
1408   GstClockTime ret;
1409
1410   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1411
1412   g_mutex_lock (&stream->priv->lock);
1413   ret = stream->priv->rtx_time;
1414   g_mutex_unlock (&stream->priv->lock);
1415
1416   return ret;
1417 }
1418
1419 /**
1420  * gst_rtsp_stream_set_retransmission_pt:
1421  * @stream: a #GstRTSPStream
1422  * @rtx_pt: a #guint
1423  *
1424  * Set the payload type (pt) for retransmission of this stream.
1425  */
1426 void
1427 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1428 {
1429   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1430
1431   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1432
1433   g_mutex_lock (&stream->priv->lock);
1434   stream->priv->rtx_pt = rtx_pt;
1435   if (stream->priv->rtxsend) {
1436     guint pt = gst_rtsp_stream_get_pt (stream);
1437     gchar *pt_s = g_strdup_printf ("%d", pt);
1438     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1439         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1440     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1441     g_free (pt_s);
1442     gst_structure_free (rtx_pt_map);
1443   }
1444   g_mutex_unlock (&stream->priv->lock);
1445 }
1446
1447 /**
1448  * gst_rtsp_stream_get_retransmission_pt:
1449  * @stream: a #GstRTSPStream
1450  *
1451  * Get the payload-type used for retransmission of this stream
1452  *
1453  * Returns: The retransmission PT.
1454  */
1455 guint
1456 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1457 {
1458   guint rtx_pt;
1459
1460   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1461
1462   g_mutex_lock (&stream->priv->lock);
1463   rtx_pt = stream->priv->rtx_pt;
1464   g_mutex_unlock (&stream->priv->lock);
1465
1466   return rtx_pt;
1467 }
1468
1469 /**
1470  * gst_rtsp_stream_set_buffer_size:
1471  * @stream: a #GstRTSPStream
1472  * @size: the buffer size
1473  *
1474  * Set the size of the UDP transmission buffer (in bytes)
1475  * Needs to be set before the stream is joined to a bin.
1476  *
1477  * Since: 1.6
1478  */
1479 void
1480 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1481 {
1482   g_mutex_lock (&stream->priv->lock);
1483   stream->priv->buffer_size = size;
1484   g_mutex_unlock (&stream->priv->lock);
1485 }
1486
1487 /**
1488  * gst_rtsp_stream_get_buffer_size:
1489  * @stream: a #GstRTSPStream
1490  *
1491  * Get the size of the UDP transmission buffer (in bytes)
1492  *
1493  * Returns: the size of the UDP TX buffer
1494  *
1495  * Since: 1.6
1496  */
1497 guint
1498 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1499 {
1500   guint buffer_size;
1501
1502   g_mutex_lock (&stream->priv->lock);
1503   buffer_size = stream->priv->buffer_size;
1504   g_mutex_unlock (&stream->priv->lock);
1505
1506   return buffer_size;
1507 }
1508
1509 /* executed from streaming thread */
1510 static void
1511 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1512 {
1513   GstRTSPStreamPrivate *priv = stream->priv;
1514   GstCaps *newcaps, *oldcaps;
1515
1516   newcaps = gst_pad_get_current_caps (pad);
1517
1518   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1519       newcaps);
1520
1521   g_mutex_lock (&priv->lock);
1522   oldcaps = priv->caps;
1523   priv->caps = newcaps;
1524   g_mutex_unlock (&priv->lock);
1525
1526   if (oldcaps)
1527     gst_caps_unref (oldcaps);
1528 }
1529
1530 static void
1531 dump_structure (const GstStructure * s)
1532 {
1533   gchar *sstr;
1534
1535   sstr = gst_structure_to_string (s);
1536   GST_INFO ("structure: %s", sstr);
1537   g_free (sstr);
1538 }
1539
1540 static GstRTSPStreamTransport *
1541 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1542 {
1543   GstRTSPStreamPrivate *priv = stream->priv;
1544   GList *walk;
1545   GstRTSPStreamTransport *result = NULL;
1546   const gchar *tmp;
1547   gchar *dest;
1548   guint port;
1549
1550   if (rtcp_from == NULL)
1551     return NULL;
1552
1553   tmp = g_strrstr (rtcp_from, ":");
1554   if (tmp == NULL)
1555     return NULL;
1556
1557   port = atoi (tmp + 1);
1558   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1559
1560   g_mutex_lock (&priv->lock);
1561   GST_INFO ("finding %s:%d in %d transports", dest, port,
1562       g_list_length (priv->transports));
1563
1564   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1565     GstRTSPStreamTransport *trans = walk->data;
1566     const GstRTSPTransport *tr;
1567     gint min, max;
1568
1569     tr = gst_rtsp_stream_transport_get_transport (trans);
1570
1571     min = tr->client_port.min;
1572     max = tr->client_port.max;
1573
1574     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1575       result = trans;
1576       break;
1577     }
1578   }
1579   if (result)
1580     g_object_ref (result);
1581   g_mutex_unlock (&priv->lock);
1582
1583   g_free (dest);
1584
1585   return result;
1586 }
1587
1588 static GstRTSPStreamTransport *
1589 check_transport (GObject * source, GstRTSPStream * stream)
1590 {
1591   GstStructure *stats;
1592   GstRTSPStreamTransport *trans;
1593
1594   /* see if we have a stream to match with the origin of the RTCP packet */
1595   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1596   if (trans == NULL) {
1597     g_object_get (source, "stats", &stats, NULL);
1598     if (stats) {
1599       const gchar *rtcp_from;
1600
1601       dump_structure (stats);
1602
1603       g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_RTCP_STATS], 0,
1604           stats);
1605
1606       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1607       if ((trans = find_transport (stream, rtcp_from))) {
1608         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1609             source);
1610         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1611             g_object_unref);
1612       }
1613       gst_structure_free (stats);
1614     }
1615   }
1616   return trans;
1617 }
1618
1619
1620 static void
1621 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1622 {
1623   GstRTSPStreamTransport *trans;
1624
1625   GST_INFO ("%p: new source %p", stream, source);
1626
1627   trans = check_transport (source, stream);
1628
1629   if (trans)
1630     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1631 }
1632
1633 static void
1634 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1635 {
1636   GST_INFO ("%p: new SDES %p", stream, source);
1637 }
1638
1639 static void
1640 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1641 {
1642   GstRTSPStreamTransport *trans;
1643
1644   trans = check_transport (source, stream);
1645
1646   if (trans) {
1647     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1648     gst_rtsp_stream_transport_keep_alive (trans);
1649   }
1650 #ifdef DUMP_STATS
1651   {
1652     GstStructure *stats;
1653     g_object_get (source, "stats", &stats, NULL);
1654     if (stats) {
1655       g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_RTCP_STATS], 0,
1656           stats);
1657
1658       dump_structure (stats);
1659       gst_structure_free (stats);
1660     }
1661   }
1662 #endif
1663 }
1664
1665 static void
1666 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1667 {
1668   GST_INFO ("%p: source %p bye", stream, source);
1669 }
1670
1671 static void
1672 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1673 {
1674   GstRTSPStreamTransport *trans;
1675
1676   GST_INFO ("%p: source %p bye timeout", stream, source);
1677
1678   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1679     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1680     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1681   }
1682 }
1683
1684 static void
1685 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1686 {
1687   GstRTSPStreamTransport *trans;
1688
1689   GST_INFO ("%p: source %p timeout", stream, source);
1690
1691   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1692     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1693     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1694   }
1695 }
1696
1697 static void
1698 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1699 {
1700   if (is_rtp) {
1701     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1702     g_list_free (priv->tr_cache_rtp);
1703     priv->tr_cache_rtp = NULL;
1704   } else {
1705     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1706     g_list_free (priv->tr_cache_rtcp);
1707     priv->tr_cache_rtcp = NULL;
1708   }
1709 }
1710
1711 static GstFlowReturn
1712 handle_new_sample (GstAppSink * sink, gpointer user_data)
1713 {
1714   GstRTSPStreamPrivate *priv;
1715   GList *walk;
1716   GstSample *sample;
1717   GstBuffer *buffer;
1718   GstRTSPStream *stream;
1719   gboolean is_rtp;
1720
1721   sample = gst_app_sink_pull_sample (sink);
1722   if (!sample)
1723     return GST_FLOW_OK;
1724
1725   stream = (GstRTSPStream *) user_data;
1726   priv = stream->priv;
1727   buffer = gst_sample_get_buffer (sample);
1728
1729   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1730
1731   g_mutex_lock (&priv->lock);
1732   if (is_rtp) {
1733     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1734       clear_tr_cache (priv, is_rtp);
1735       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1736         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1737         priv->tr_cache_rtp =
1738             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1739       }
1740       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1741     }
1742   } else {
1743     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1744       clear_tr_cache (priv, is_rtp);
1745       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1746         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1747         priv->tr_cache_rtcp =
1748             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1749       }
1750       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1751     }
1752   }
1753   g_mutex_unlock (&priv->lock);
1754
1755   if (is_rtp) {
1756     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1757       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1758       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1759     }
1760   } else {
1761     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1762       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1763       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1764     }
1765   }
1766   gst_sample_unref (sample);
1767
1768   return GST_FLOW_OK;
1769 }
1770
1771 static GstAppSinkCallbacks sink_cb = {
1772   NULL,                         /* not interested in EOS */
1773   NULL,                         /* not interested in preroll samples */
1774   handle_new_sample,
1775 };
1776
1777 static GstElement *
1778 get_rtp_encoder (GstRTSPStream * stream, guint session)
1779 {
1780   GstRTSPStreamPrivate *priv = stream->priv;
1781
1782   if (priv->srtpenc == NULL) {
1783     gchar *name;
1784
1785     name = g_strdup_printf ("srtpenc_%u", session);
1786     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1787     g_free (name);
1788
1789     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1790   }
1791   return gst_object_ref (priv->srtpenc);
1792 }
1793
1794 static GstElement *
1795 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1796 {
1797   GstRTSPStreamPrivate *priv = stream->priv;
1798   GstElement *oldenc, *enc;
1799   GstPad *pad;
1800   gchar *name;
1801
1802   if (priv->idx != session)
1803     return NULL;
1804
1805   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1806
1807   oldenc = priv->srtpenc;
1808   enc = get_rtp_encoder (stream, session);
1809   name = g_strdup_printf ("rtp_sink_%d", session);
1810   pad = gst_element_get_request_pad (enc, name);
1811   g_free (name);
1812   gst_object_unref (pad);
1813
1814   if (oldenc == NULL)
1815     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1816         enc);
1817
1818   return enc;
1819 }
1820
1821 static GstElement *
1822 request_rtcp_encoder (GstElement * rtpbin, guint session,
1823     GstRTSPStream * stream)
1824 {
1825   GstRTSPStreamPrivate *priv = stream->priv;
1826   GstElement *oldenc, *enc;
1827   GstPad *pad;
1828   gchar *name;
1829
1830   if (priv->idx != session)
1831     return NULL;
1832
1833   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1834
1835   oldenc = priv->srtpenc;
1836   enc = get_rtp_encoder (stream, session);
1837   name = g_strdup_printf ("rtcp_sink_%d", session);
1838   pad = gst_element_get_request_pad (enc, name);
1839   g_free (name);
1840   gst_object_unref (pad);
1841
1842   if (oldenc == NULL)
1843     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1844         enc);
1845
1846   return enc;
1847 }
1848
1849 static GstCaps *
1850 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1851 {
1852   GstRTSPStreamPrivate *priv = stream->priv;
1853   GstCaps *caps;
1854
1855   GST_DEBUG ("request key %08x", ssrc);
1856
1857   g_mutex_lock (&priv->lock);
1858   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1859     gst_caps_ref (caps);
1860   g_mutex_unlock (&priv->lock);
1861
1862   return caps;
1863 }
1864
1865 static GstElement *
1866 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1867     GstRTSPStream * stream)
1868 {
1869   GstRTSPStreamPrivate *priv = stream->priv;
1870
1871   if (priv->idx != session)
1872     return NULL;
1873
1874   if (priv->srtpdec == NULL) {
1875     gchar *name;
1876
1877     name = g_strdup_printf ("srtpdec_%u", session);
1878     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1879     g_free (name);
1880
1881     g_signal_connect (priv->srtpdec, "request-key",
1882         (GCallback) request_key, stream);
1883   }
1884   return gst_object_ref (priv->srtpdec);
1885 }
1886
1887 /**
1888  * gst_rtsp_stream_request_aux_sender:
1889  * @stream: a #GstRTSPStream
1890  * @sessid: the session id
1891  *
1892  * Creating a rtxsend bin
1893  *
1894  * Returns: (transfer full): a #GstElement.
1895  *
1896  * Since: 1.6
1897  */
1898 GstElement *
1899 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
1900 {
1901   GstElement *bin;
1902   GstPad *pad;
1903   GstStructure *pt_map;
1904   gchar *name;
1905   guint pt, rtx_pt;
1906   gchar *pt_s;
1907
1908   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1909
1910   pt = gst_rtsp_stream_get_pt (stream);
1911   pt_s = g_strdup_printf ("%u", pt);
1912   rtx_pt = stream->priv->rtx_pt;
1913
1914   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1915
1916   bin = gst_bin_new (NULL);
1917   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1918   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1919       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1920   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1921       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1922   g_free (pt_s);
1923   gst_structure_free (pt_map);
1924   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1925
1926   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1927   name = g_strdup_printf ("src_%u", sessid);
1928   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1929   g_free (name);
1930   gst_object_unref (pad);
1931
1932   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1933   name = g_strdup_printf ("sink_%u", sessid);
1934   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1935   g_free (name);
1936   gst_object_unref (pad);
1937
1938   return bin;
1939 }
1940
1941 /**
1942  * gst_rtsp_stream_set_pt_map:
1943  * @stream: a #GstRTSPStream
1944  * @pt: the pt
1945  * @caps: a #GstCaps
1946  *
1947  * Configure a pt map between @pt and @caps.
1948  */
1949 void
1950 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
1951 {
1952   GstRTSPStreamPrivate *priv = stream->priv;
1953
1954   g_mutex_lock (&priv->lock);
1955   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
1956   g_mutex_unlock (&priv->lock);
1957 }
1958
1959 static GstCaps *
1960 request_pt_map (GstElement * rtpbin, guint session, guint pt,
1961     GstRTSPStream * stream)
1962 {
1963   GstRTSPStreamPrivate *priv = stream->priv;
1964   GstCaps *caps = NULL;
1965
1966   g_mutex_lock (&priv->lock);
1967
1968   if (priv->idx == session) {
1969     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
1970     if (caps) {
1971       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
1972       gst_caps_ref (caps);
1973     } else {
1974       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
1975     }
1976   }
1977
1978   g_mutex_unlock (&priv->lock);
1979
1980   return caps;
1981 }
1982
1983 static void
1984 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
1985 {
1986   GstRTSPStreamPrivate *priv = stream->priv;
1987   gchar *name;
1988   GstPadLinkReturn ret;
1989   guint sessid;
1990
1991   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
1992       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
1993
1994   name = gst_pad_get_name (pad);
1995   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
1996     g_free (name);
1997     return;
1998   }
1999   g_free (name);
2000
2001   if (priv->idx != sessid)
2002     return;
2003
2004   if (gst_pad_is_linked (priv->sinkpad)) {
2005     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2006         GST_DEBUG_PAD_NAME (priv->sinkpad));
2007     return;
2008   }
2009
2010   /* link the RTP pad to the session manager, it should not really fail unless
2011    * this is not really an RTP pad */
2012   ret = gst_pad_link (pad, priv->sinkpad);
2013   if (ret != GST_PAD_LINK_OK)
2014     goto link_failed;
2015   priv->recv_rtp_src = gst_object_ref (pad);
2016
2017   return;
2018
2019 /* ERRORS */
2020 link_failed:
2021   {
2022     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2023         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2024   }
2025 }
2026
2027 static void
2028 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2029     GstRTSPStream * stream)
2030 {
2031   /* TODO: What to do here other than this? */
2032   GST_DEBUG ("Stream %p: Got EOS", stream);
2033   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2034 }
2035
2036 /**
2037  * gst_rtsp_stream_join_bin:
2038  * @stream: a #GstRTSPStream
2039  * @bin: (transfer none): a #GstBin to join
2040  * @rtpbin: (transfer none): a rtpbin element in @bin
2041  * @state: the target state of the new elements
2042  *
2043  * Join the #GstBin @bin that contains the element @rtpbin.
2044  *
2045  * @stream will link to @rtpbin, which must be inside @bin. The elements
2046  * added to @bin will be set to the state given in @state.
2047  *
2048  * Returns: %TRUE on success.
2049  */
2050 gboolean
2051 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2052     GstElement * rtpbin, GstState state)
2053 {
2054   GstRTSPStreamPrivate *priv;
2055   gint i;
2056   guint idx;
2057   gchar *name;
2058   GstPad *pad, *sinkpad, *selpad;
2059   GstPadLinkReturn ret;
2060
2061   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2062   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2063   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2064
2065   priv = stream->priv;
2066
2067   g_mutex_lock (&priv->lock);
2068   if (priv->is_joined)
2069     goto was_joined;
2070
2071   /* create a session with the same index as the stream */
2072   idx = priv->idx;
2073
2074   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2075
2076   if (!alloc_ports (stream))
2077     goto no_ports;
2078
2079   /* update the dscp qos field in the sinks */
2080   update_dscp_qos (stream);
2081
2082   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2083       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2084     /* For SRTP */
2085     g_signal_connect (rtpbin, "request-rtp-encoder",
2086         (GCallback) request_rtp_encoder, stream);
2087     g_signal_connect (rtpbin, "request-rtcp-encoder",
2088         (GCallback) request_rtcp_encoder, stream);
2089     g_signal_connect (rtpbin, "request-rtp-decoder",
2090         (GCallback) request_rtp_rtcp_decoder, stream);
2091     g_signal_connect (rtpbin, "request-rtcp-decoder",
2092         (GCallback) request_rtp_rtcp_decoder, stream);
2093   }
2094
2095   if (priv->sinkpad) {
2096     g_signal_connect (rtpbin, "request-pt-map",
2097         (GCallback) request_pt_map, stream);
2098   }
2099
2100   /* get a pad for sending RTP */
2101   name = g_strdup_printf ("send_rtp_sink_%u", idx);
2102   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2103   g_free (name);
2104
2105   if (priv->srcpad) {
2106     /* link the RTP pad to the session manager, it should not really fail unless
2107      * this is not really an RTP pad */
2108     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2109     if (ret != GST_PAD_LINK_OK)
2110       goto link_failed;
2111   } else {
2112     /* Need to connect our sinkpad from here */
2113     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2114     /* EOS */
2115     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2116   }
2117
2118   /* get pads from the RTP session element for sending and receiving
2119    * RTP/RTCP*/
2120   name = g_strdup_printf ("send_rtp_src_%u", idx);
2121   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2122   g_free (name);
2123   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2124   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2125   g_free (name);
2126
2127   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2128   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2129   g_free (name);
2130   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2131   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2132   g_free (name);
2133
2134   /* get the session */
2135   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2136
2137   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2138       stream);
2139   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2140       stream);
2141   g_signal_connect (priv->session, "on-ssrc-active",
2142       (GCallback) on_ssrc_active, stream);
2143   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2144       stream);
2145   g_signal_connect (priv->session, "on-bye-timeout",
2146       (GCallback) on_bye_timeout, stream);
2147   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2148       stream);
2149
2150   for (i = 0; i < 2; i++) {
2151     GstPad *teepad, *queuepad;
2152     /* For the sender we create this bit of pipeline for both
2153      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2154      * we need to add a queue before appsink and udpsink to make
2155      * the pipeline not block. For the TCP case, we want to pump
2156      * data to the client as fast as possible.
2157      *
2158      * .--------.      .-----.    .---------.    .---------.
2159      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2160      * |       send->sink   src->sink      src->sink       |
2161      * '--------'      |     |    '---------'    '---------'
2162      *                 |     |    .---------.    .---------.
2163      *                 |     |    |  queue  |    | appsink |
2164      *                 |    src->sink      src->sink       |
2165      *                 '-----'    '---------'    '---------'
2166      *
2167      * When only UDP is allowed, we skip the tee, queue and appsink and link the
2168      * udpsink directly to the session.
2169      */
2170     /* add udpsink */
2171     gst_bin_add (bin, priv->udpsink[i]);
2172     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2173
2174     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2175       /* make tee for RTP/RTCP */
2176       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2177       gst_bin_add (bin, priv->tee[i]);
2178
2179       /* and link to rtpbin send pad */
2180       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2181       gst_pad_link (priv->send_src[i], pad);
2182       gst_object_unref (pad);
2183
2184       priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2185       g_object_set (priv->udpqueue[i], "max-size-buffers",
2186           1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
2187       gst_bin_add (bin, priv->udpqueue[i]);
2188       /* link tee to udpqueue */
2189       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2190       pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2191       gst_pad_link (teepad, pad);
2192       gst_object_unref (pad);
2193       gst_object_unref (teepad);
2194
2195       /* link udpqueue to udpsink */
2196       queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2197       gst_pad_link (queuepad, sinkpad);
2198       gst_object_unref (queuepad);
2199
2200       /* make queue */
2201       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2202       g_object_set (priv->appqueue[i], "max-size-buffers",
2203           1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
2204       gst_bin_add (bin, priv->appqueue[i]);
2205       /* and link to tee */
2206       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2207       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2208       gst_pad_link (teepad, pad);
2209       gst_object_unref (pad);
2210       gst_object_unref (teepad);
2211
2212       /* make appsink */
2213       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2214       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2215       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2216       gst_bin_add (bin, priv->appsink[i]);
2217       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2218           &sink_cb, stream, NULL);
2219       /* and link to queue */
2220       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2221       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2222       gst_pad_link (queuepad, pad);
2223       gst_object_unref (pad);
2224       gst_object_unref (queuepad);
2225     } else {
2226       /* else only udpsink needed, link it to the session */
2227       gst_pad_link (priv->send_src[i], sinkpad);
2228     }
2229     gst_object_unref (sinkpad);
2230
2231     /* For the receiver we create this bit of pipeline for both
2232      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2233      * and it is all funneled into the rtpbin receive pad.
2234      *
2235      * .--------.     .--------.    .--------.
2236      * | udpsrc |     | funnel |    | rtpbin |
2237      * |       src->sink      src->sink      |
2238      * '--------'     |        |    '--------'
2239      * .--------.     |        |
2240      * | appsrc |     |        |
2241      * |       src->sink       |
2242      * '--------'     '--------'
2243      */
2244     /* make funnel for the RTP/RTCP receivers */
2245     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2246     gst_bin_add (bin, priv->funnel[i]);
2247
2248     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2249     gst_pad_link (pad, priv->recv_sink[i]);
2250     gst_object_unref (pad);
2251
2252     if (priv->udpsrc_v4[i]) {
2253       if (priv->srcpad) {
2254         /* we set and keep these to playing so that they don't cause NO_PREROLL return
2255          * values. This is only relevant for PLAY pipelines */
2256         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2257         gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2258       }
2259       /* add udpsrc */
2260       gst_bin_add (bin, priv->udpsrc_v4[i]);
2261
2262       /* and link to the funnel v4 */
2263       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2264       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2265       gst_pad_link (pad, selpad);
2266       gst_object_unref (pad);
2267       gst_object_unref (selpad);
2268     }
2269
2270     if (priv->udpsrc_v6[i]) {
2271       if (priv->srcpad) {
2272         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2273         gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2274       }
2275       gst_bin_add (bin, priv->udpsrc_v6[i]);
2276
2277       /* and link to the funnel v6 */
2278       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2279       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2280       gst_pad_link (pad, selpad);
2281       gst_object_unref (pad);
2282       gst_object_unref (selpad);
2283     }
2284
2285     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2286       /* make and add appsrc */
2287       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2288       priv->appsrc_base_time[i] = -1;
2289       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2290       gst_bin_add (bin, priv->appsrc[i]);
2291       /* and link to the funnel */
2292       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2293       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2294       gst_pad_link (pad, selpad);
2295       gst_object_unref (pad);
2296       gst_object_unref (selpad);
2297     }
2298
2299     /* check if we need to set to a special state */
2300     if (state != GST_STATE_NULL) {
2301       if (priv->udpsink[i])
2302         gst_element_set_state (priv->udpsink[i], state);
2303       if (priv->appsink[i])
2304         gst_element_set_state (priv->appsink[i], state);
2305       if (priv->appqueue[i])
2306         gst_element_set_state (priv->appqueue[i], state);
2307       if (priv->udpqueue[i])
2308         gst_element_set_state (priv->udpqueue[i], state);
2309       if (priv->tee[i])
2310         gst_element_set_state (priv->tee[i], state);
2311       if (priv->funnel[i])
2312         gst_element_set_state (priv->funnel[i], state);
2313       if (priv->appsrc[i])
2314         gst_element_set_state (priv->appsrc[i], state);
2315     }
2316   }
2317
2318   /* be notified of caps changes */
2319   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2320       (GCallback) caps_notify, stream);
2321
2322   priv->is_joined = TRUE;
2323   g_mutex_unlock (&priv->lock);
2324
2325   return TRUE;
2326
2327   /* ERRORS */
2328 was_joined:
2329   {
2330     g_mutex_unlock (&priv->lock);
2331     return TRUE;
2332   }
2333 no_ports:
2334   {
2335     g_mutex_unlock (&priv->lock);
2336     GST_WARNING ("failed to allocate ports %u", idx);
2337     return FALSE;
2338   }
2339 link_failed:
2340   {
2341     GST_WARNING ("failed to link stream %u", idx);
2342     gst_object_unref (priv->send_rtp_sink);
2343     priv->send_rtp_sink = NULL;
2344     g_mutex_unlock (&priv->lock);
2345     return FALSE;
2346   }
2347 }
2348
2349 /**
2350  * gst_rtsp_stream_leave_bin:
2351  * @stream: a #GstRTSPStream
2352  * @bin: (transfer none): a #GstBin
2353  * @rtpbin: (transfer none): a rtpbin #GstElement
2354  *
2355  * Remove the elements of @stream from @bin.
2356  *
2357  * Return: %TRUE on success.
2358  */
2359 gboolean
2360 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2361     GstElement * rtpbin)
2362 {
2363   GstRTSPStreamPrivate *priv;
2364   gint i;
2365   GList *l;
2366
2367   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2368   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2369   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2370
2371   priv = stream->priv;
2372
2373   g_mutex_lock (&priv->lock);
2374   if (!priv->is_joined)
2375     goto was_not_joined;
2376
2377   /* all transports must be removed by now */
2378   if (priv->transports != NULL)
2379     goto transports_not_removed;
2380
2381   clear_tr_cache (priv, TRUE);
2382   clear_tr_cache (priv, FALSE);
2383
2384   GST_INFO ("stream %p leaving bin", stream);
2385
2386   if (priv->srcpad) {
2387     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2388   } else if (priv->recv_rtp_src) {
2389     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2390     gst_object_unref (priv->recv_rtp_src);
2391     priv->recv_rtp_src = NULL;
2392   }
2393   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2394   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2395   gst_object_unref (priv->send_rtp_sink);
2396   priv->send_rtp_sink = NULL;
2397
2398   for (i = 0; i < 2; i++) {
2399     if (priv->udpsink[i])
2400       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2401     if (priv->appsink[i])
2402       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2403     if (priv->appqueue[i])
2404       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2405     if (priv->udpqueue[i])
2406       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2407     if (priv->tee[i])
2408       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2409     if (priv->funnel[i])
2410       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2411     if (priv->appsrc[i])
2412       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2413     if (priv->udpsrc_v4[i]) {
2414       /* and set udpsrc to NULL now before removing */
2415       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2416       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2417       /* removing them should also nicely release the request
2418        * pads when they finalize */
2419       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2420     }
2421     if (priv->udpsrc_v6[i]) {
2422       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2423       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2424       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2425     }
2426
2427     for (l = priv->transport_sources; l; l = l->next) {
2428       GstRTSPMulticastTransportSource *s = l->data;
2429
2430       if (!s->udpsrc[i])
2431         continue;
2432
2433       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2434       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2435       gst_bin_remove (bin, s->udpsrc[i]);
2436     }
2437
2438     if (priv->udpsink[i])
2439       gst_bin_remove (bin, priv->udpsink[i]);
2440     if (priv->appsrc[i])
2441       gst_bin_remove (bin, priv->appsrc[i]);
2442     if (priv->appsink[i])
2443       gst_bin_remove (bin, priv->appsink[i]);
2444     if (priv->appqueue[i])
2445       gst_bin_remove (bin, priv->appqueue[i]);
2446     if (priv->udpqueue[i])
2447       gst_bin_remove (bin, priv->udpqueue[i]);
2448     if (priv->tee[i])
2449       gst_bin_remove (bin, priv->tee[i]);
2450     if (priv->funnel[i])
2451       gst_bin_remove (bin, priv->funnel[i]);
2452
2453     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2454     gst_object_unref (priv->recv_sink[i]);
2455     priv->recv_sink[i] = NULL;
2456
2457     priv->udpsrc_v4[i] = NULL;
2458     priv->udpsrc_v6[i] = NULL;
2459     priv->udpsink[i] = NULL;
2460     priv->appsrc[i] = NULL;
2461     priv->appsink[i] = NULL;
2462     priv->appqueue[i] = NULL;
2463     priv->udpqueue[i] = NULL;
2464     priv->tee[i] = NULL;
2465     priv->funnel[i] = NULL;
2466   }
2467
2468   for (l = priv->transport_sources; l; l = l->next) {
2469     GstRTSPMulticastTransportSource *s = l->data;
2470     g_slice_free (GstRTSPMulticastTransportSource, s);
2471   }
2472   g_list_free (priv->transport_sources);
2473   priv->transport_sources = NULL;
2474
2475   gst_object_unref (priv->send_src[0]);
2476   priv->send_src[0] = NULL;
2477
2478   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2479   gst_object_unref (priv->send_src[1]);
2480   priv->send_src[1] = NULL;
2481
2482   g_object_unref (priv->session);
2483   priv->session = NULL;
2484   if (priv->caps)
2485     gst_caps_unref (priv->caps);
2486   priv->caps = NULL;
2487
2488   if (priv->srtpenc)
2489     gst_object_unref (priv->srtpenc);
2490   if (priv->srtpdec)
2491     gst_object_unref (priv->srtpdec);
2492
2493   priv->is_joined = FALSE;
2494   g_mutex_unlock (&priv->lock);
2495
2496   return TRUE;
2497
2498 was_not_joined:
2499   {
2500     g_mutex_unlock (&priv->lock);
2501     return TRUE;
2502   }
2503 transports_not_removed:
2504   {
2505     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2506     g_mutex_unlock (&priv->lock);
2507     return FALSE;
2508   }
2509 }
2510
2511 /**
2512  * gst_rtsp_stream_get_rtpinfo:
2513  * @stream: a #GstRTSPStream
2514  * @rtptime: (allow-none): result RTP timestamp
2515  * @seq: (allow-none): result RTP seqnum
2516  * @clock_rate: (allow-none): the clock rate
2517  * @running_time: (allow-none): result running-time
2518  *
2519  * Retrieve the current rtptime, seq and running-time. This is used to
2520  * construct a RTPInfo reply header.
2521  *
2522  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2523  */
2524 gboolean
2525 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2526     guint * rtptime, guint * seq, guint * clock_rate,
2527     GstClockTime * running_time)
2528 {
2529   GstRTSPStreamPrivate *priv;
2530   GstStructure *stats;
2531   GObjectClass *payobjclass;
2532
2533   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2534
2535   priv = stream->priv;
2536
2537   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2538
2539   g_mutex_lock (&priv->lock);
2540
2541   /* First try to extract the information from the last buffer on the sinks.
2542    * This will have a more accurate sequence number and timestamp, as between
2543    * the payloader and the sink there can be some queues
2544    */
2545   if (priv->udpsink[0] || priv->appsink[0]) {
2546     GstSample *last_sample;
2547
2548     if (priv->udpsink[0])
2549       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2550     else
2551       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2552
2553     if (last_sample) {
2554       GstCaps *caps;
2555       GstBuffer *buffer;
2556       GstSegment *segment;
2557       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2558
2559       caps = gst_sample_get_caps (last_sample);
2560       buffer = gst_sample_get_buffer (last_sample);
2561       segment = gst_sample_get_segment (last_sample);
2562
2563       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2564         if (seq) {
2565           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2566         }
2567
2568         if (rtptime) {
2569           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2570         }
2571
2572         gst_rtp_buffer_unmap (&rtp_buffer);
2573
2574         if (running_time) {
2575           *running_time =
2576               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2577               GST_BUFFER_TIMESTAMP (buffer));
2578         }
2579
2580         if (clock_rate) {
2581           GstStructure *s = gst_caps_get_structure (caps, 0);
2582
2583           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2584
2585           if (*clock_rate == 0 && running_time)
2586             *running_time = GST_CLOCK_TIME_NONE;
2587         }
2588         gst_sample_unref (last_sample);
2589
2590         goto done;
2591       } else {
2592         gst_sample_unref (last_sample);
2593       }
2594     }
2595   }
2596
2597   if (g_object_class_find_property (payobjclass, "stats")) {
2598     g_object_get (priv->payloader, "stats", &stats, NULL);
2599     if (stats == NULL)
2600       goto no_stats;
2601
2602     if (seq)
2603       gst_structure_get_uint (stats, "seqnum", seq);
2604
2605     if (rtptime)
2606       gst_structure_get_uint (stats, "timestamp", rtptime);
2607
2608     if (running_time)
2609       gst_structure_get_clock_time (stats, "running-time", running_time);
2610
2611     if (clock_rate) {
2612       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2613       if (*clock_rate == 0 && running_time)
2614         *running_time = GST_CLOCK_TIME_NONE;
2615     }
2616     gst_structure_free (stats);
2617   } else {
2618     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2619         !g_object_class_find_property (payobjclass, "timestamp"))
2620       goto no_stats;
2621
2622     if (seq)
2623       g_object_get (priv->payloader, "seqnum", seq, NULL);
2624
2625     if (rtptime)
2626       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2627
2628     if (running_time)
2629       *running_time = GST_CLOCK_TIME_NONE;
2630   }
2631
2632 done:
2633   g_mutex_unlock (&priv->lock);
2634
2635   return TRUE;
2636
2637   /* ERRORS */
2638 no_stats:
2639   {
2640     GST_WARNING ("Could not get payloader stats");
2641     g_mutex_unlock (&priv->lock);
2642     return FALSE;
2643   }
2644 }
2645
2646 /**
2647  * gst_rtsp_stream_get_caps:
2648  * @stream: a #GstRTSPStream
2649  *
2650  * Retrieve the current caps of @stream.
2651  *
2652  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2653  * after usage.
2654  */
2655 GstCaps *
2656 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2657 {
2658   GstRTSPStreamPrivate *priv;
2659   GstCaps *result;
2660
2661   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2662
2663   priv = stream->priv;
2664
2665   g_mutex_lock (&priv->lock);
2666   if ((result = priv->caps))
2667     gst_caps_ref (result);
2668   g_mutex_unlock (&priv->lock);
2669
2670   return result;
2671 }
2672
2673 /**
2674  * gst_rtsp_stream_recv_rtp:
2675  * @stream: a #GstRTSPStream
2676  * @buffer: (transfer full): a #GstBuffer
2677  *
2678  * Handle an RTP buffer for the stream. This method is usually called when a
2679  * message has been received from a client using the TCP transport.
2680  *
2681  * This function takes ownership of @buffer.
2682  *
2683  * Returns: a GstFlowReturn.
2684  */
2685 GstFlowReturn
2686 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2687 {
2688   GstRTSPStreamPrivate *priv;
2689   GstFlowReturn ret;
2690   GstElement *element;
2691
2692   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2693   priv = stream->priv;
2694   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2695   g_return_val_if_fail (priv->is_joined, FALSE);
2696
2697   g_mutex_lock (&priv->lock);
2698   if (priv->appsrc[0])
2699     element = gst_object_ref (priv->appsrc[0]);
2700   else
2701     element = NULL;
2702   g_mutex_unlock (&priv->lock);
2703
2704   if (element) {
2705     if (priv->appsrc_base_time[0] == -1) {
2706       /* Take current running_time. This timestamp will be put on
2707        * the first buffer of each stream because we are a live source and so we
2708        * timestamp with the running_time. When we are dealing with TCP, we also
2709        * only timestamp the first buffer (using the DISCONT flag) because a server
2710        * typically bursts data, for which we don't want to compensate by speeding
2711        * up the media. The other timestamps will be interpollated from this one
2712        * using the RTP timestamps. */
2713       GST_OBJECT_LOCK (element);
2714       if (GST_ELEMENT_CLOCK (element)) {
2715         GstClockTime now;
2716         GstClockTime base_time;
2717
2718         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2719         base_time = GST_ELEMENT_CAST (element)->base_time;
2720
2721         priv->appsrc_base_time[0] = now - base_time;
2722         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2723         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2724             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2725             GST_TIME_ARGS (base_time));
2726       }
2727       GST_OBJECT_UNLOCK (element);
2728     }
2729
2730     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2731     gst_object_unref (element);
2732   } else {
2733     ret = GST_FLOW_OK;
2734   }
2735   return ret;
2736 }
2737
2738 /**
2739  * gst_rtsp_stream_recv_rtcp:
2740  * @stream: a #GstRTSPStream
2741  * @buffer: (transfer full): a #GstBuffer
2742  *
2743  * Handle an RTCP buffer for the stream. This method is usually called when a
2744  * message has been received from a client using the TCP transport.
2745  *
2746  * This function takes ownership of @buffer.
2747  *
2748  * Returns: a GstFlowReturn.
2749  */
2750 GstFlowReturn
2751 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2752 {
2753   GstRTSPStreamPrivate *priv;
2754   GstFlowReturn ret;
2755   GstElement *element;
2756
2757   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2758   priv = stream->priv;
2759   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2760
2761   if (!priv->is_joined) {
2762     gst_buffer_unref (buffer);
2763     return GST_FLOW_NOT_LINKED;
2764   }
2765   g_mutex_lock (&priv->lock);
2766   if (priv->appsrc[1])
2767     element = gst_object_ref (priv->appsrc[1]);
2768   else
2769     element = NULL;
2770   g_mutex_unlock (&priv->lock);
2771
2772   if (element) {
2773     if (priv->appsrc_base_time[1] == -1) {
2774       /* Take current running_time. This timestamp will be put on
2775        * the first buffer of each stream because we are a live source and so we
2776        * timestamp with the running_time. When we are dealing with TCP, we also
2777        * only timestamp the first buffer (using the DISCONT flag) because a server
2778        * typically bursts data, for which we don't want to compensate by speeding
2779        * up the media. The other timestamps will be interpollated from this one
2780        * using the RTP timestamps. */
2781       GST_OBJECT_LOCK (element);
2782       if (GST_ELEMENT_CLOCK (element)) {
2783         GstClockTime now;
2784         GstClockTime base_time;
2785
2786         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2787         base_time = GST_ELEMENT_CAST (element)->base_time;
2788
2789         priv->appsrc_base_time[1] = now - base_time;
2790         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
2791         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2792             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2793             GST_TIME_ARGS (base_time));
2794       }
2795       GST_OBJECT_UNLOCK (element);
2796     }
2797
2798     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2799     gst_object_unref (element);
2800   } else {
2801     ret = GST_FLOW_OK;
2802     gst_buffer_unref (buffer);
2803   }
2804   return ret;
2805 }
2806
2807 /* must be called with lock */
2808 static gboolean
2809 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2810     gboolean add)
2811 {
2812   GstRTSPStreamPrivate *priv = stream->priv;
2813   const GstRTSPTransport *tr;
2814
2815   tr = gst_rtsp_stream_transport_get_transport (trans);
2816
2817   switch (tr->lower_transport) {
2818     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2819     {
2820       GstRTSPMulticastTransportSource *source;
2821       GstBin *bin;
2822
2823       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2824
2825       if (add) {
2826         gchar *host;
2827         gint i;
2828         GstPad *selpad, *pad;
2829
2830         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2831         source->transport = trans;
2832
2833         for (i = 0; i < 2; i++) {
2834           host =
2835               g_strdup_printf ("udp://%s:%d", tr->destination,
2836               (i == 0) ? tr->port.min : tr->port.max);
2837           source->udpsrc[i] =
2838               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2839           g_free (host);
2840
2841           if (priv->srcpad) {
2842             /* we set and keep these to playing so that they don't cause NO_PREROLL return
2843              * values. This is only relevant for PLAY pipelines */
2844             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2845             gst_element_set_locked_state (source->udpsrc[i], TRUE);
2846           }
2847           /* add udpsrc */
2848           gst_bin_add (bin, source->udpsrc[i]);
2849
2850           /* and link to the funnel v4 */
2851           source->selpad[i] = selpad =
2852               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2853           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2854           gst_pad_link (pad, selpad);
2855           gst_object_unref (pad);
2856           gst_object_unref (selpad);
2857         }
2858
2859         priv->transport_sources =
2860             g_list_prepend (priv->transport_sources, source);
2861       } else {
2862         GList *l;
2863
2864         for (l = priv->transport_sources; l; l = l->next) {
2865           source = l->data;
2866
2867           if (source->transport == trans) {
2868             priv->transport_sources =
2869                 g_list_delete_link (priv->transport_sources, l);
2870             break;
2871           }
2872         }
2873
2874         if (l != NULL) {
2875           gint i;
2876
2877           for (i = 0; i < 2; i++) {
2878             /* Will automatically unlink everything */
2879             gst_bin_remove (bin,
2880                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2881
2882             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2883             gst_object_unref (source->udpsrc[i]);
2884
2885             gst_element_release_request_pad (priv->funnel[i],
2886                 source->selpad[i]);
2887           }
2888
2889           g_slice_free (GstRTSPMulticastTransportSource, source);
2890         }
2891       }
2892
2893       gst_object_unref (bin);
2894
2895       /* fall through for the generic case */
2896     }
2897     case GST_RTSP_LOWER_TRANS_UDP:
2898     {
2899       gchar *dest;
2900       gint min, max;
2901       guint ttl = 0;
2902
2903       dest = tr->destination;
2904       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2905         min = tr->port.min;
2906         max = tr->port.max;
2907         ttl = tr->ttl;
2908       } else {
2909         min = tr->client_port.min;
2910         max = tr->client_port.max;
2911       }
2912
2913       if (add) {
2914         if (ttl > 0) {
2915           GST_INFO ("setting ttl-mc %d", ttl);
2916           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2917           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2918         }
2919         GST_INFO ("adding %s:%d-%d", dest, min, max);
2920         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2921         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2922         priv->transports = g_list_prepend (priv->transports, trans);
2923       } else {
2924         GST_INFO ("removing %s:%d-%d", dest, min, max);
2925         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2926         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2927         priv->transports = g_list_remove (priv->transports, trans);
2928       }
2929       priv->transports_cookie++;
2930       break;
2931     }
2932     case GST_RTSP_LOWER_TRANS_TCP:
2933       if (add) {
2934         GST_INFO ("adding TCP %s", tr->destination);
2935         priv->transports = g_list_prepend (priv->transports, trans);
2936       } else {
2937         GST_INFO ("removing TCP %s", tr->destination);
2938         priv->transports = g_list_remove (priv->transports, trans);
2939       }
2940       priv->transports_cookie++;
2941       break;
2942     default:
2943       goto unknown_transport;
2944   }
2945   return TRUE;
2946
2947   /* ERRORS */
2948 unknown_transport:
2949   {
2950     GST_INFO ("Unknown transport %d", tr->lower_transport);
2951     return FALSE;
2952   }
2953 }
2954
2955
2956 /**
2957  * gst_rtsp_stream_add_transport:
2958  * @stream: a #GstRTSPStream
2959  * @trans: (transfer none): a #GstRTSPStreamTransport
2960  *
2961  * Add the transport in @trans to @stream. The media of @stream will
2962  * then also be send to the values configured in @trans.
2963  *
2964  * @stream must be joined to a bin.
2965  *
2966  * @trans must contain a valid #GstRTSPTransport.
2967  *
2968  * Returns: %TRUE if @trans was added
2969  */
2970 gboolean
2971 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2972     GstRTSPStreamTransport * trans)
2973 {
2974   GstRTSPStreamPrivate *priv;
2975   gboolean res;
2976
2977   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2978   priv = stream->priv;
2979   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2980   g_return_val_if_fail (priv->is_joined, FALSE);
2981
2982   g_mutex_lock (&priv->lock);
2983   res = update_transport (stream, trans, TRUE);
2984   g_mutex_unlock (&priv->lock);
2985
2986   return res;
2987 }
2988
2989 /**
2990  * gst_rtsp_stream_remove_transport:
2991  * @stream: a #GstRTSPStream
2992  * @trans: (transfer none): a #GstRTSPStreamTransport
2993  *
2994  * Remove the transport in @trans from @stream. The media of @stream will
2995  * not be sent to the values configured in @trans.
2996  *
2997  * @stream must be joined to a bin.
2998  *
2999  * @trans must contain a valid #GstRTSPTransport.
3000  *
3001  * Returns: %TRUE if @trans was removed
3002  */
3003 gboolean
3004 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3005     GstRTSPStreamTransport * trans)
3006 {
3007   GstRTSPStreamPrivate *priv;
3008   gboolean res;
3009
3010   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3011   priv = stream->priv;
3012   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3013   g_return_val_if_fail (priv->is_joined, FALSE);
3014
3015   g_mutex_lock (&priv->lock);
3016   res = update_transport (stream, trans, FALSE);
3017   g_mutex_unlock (&priv->lock);
3018
3019   return res;
3020 }
3021
3022 /**
3023  * gst_rtsp_stream_update_crypto:
3024  * @stream: a #GstRTSPStream
3025  * @ssrc: the SSRC
3026  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3027  *
3028  * Update the new crypto information for @ssrc in @stream. If information
3029  * for @ssrc did not exist, it will be added. If information
3030  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3031  * be removed from @stream.
3032  *
3033  * Returns: %TRUE if @crypto could be updated
3034  */
3035 gboolean
3036 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3037     guint ssrc, GstCaps * crypto)
3038 {
3039   GstRTSPStreamPrivate *priv;
3040
3041   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3042   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3043
3044   priv = stream->priv;
3045
3046   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3047
3048   g_mutex_lock (&priv->lock);
3049   if (crypto)
3050     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3051         gst_caps_ref (crypto));
3052   else
3053     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3054   g_mutex_unlock (&priv->lock);
3055
3056   return TRUE;
3057 }
3058
3059 /**
3060  * gst_rtsp_stream_get_rtp_socket:
3061  * @stream: a #GstRTSPStream
3062  * @family: the socket family
3063  *
3064  * Get the RTP socket from @stream for a @family.
3065  *
3066  * @stream must be joined to a bin.
3067  *
3068  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3069  * socket could be allocated for @family. Unref after usage
3070  */
3071 GSocket *
3072 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3073 {
3074   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3075   GSocket *socket;
3076   const gchar *name;
3077
3078   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3079   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3080       family == G_SOCKET_FAMILY_IPV6, NULL);
3081   g_return_val_if_fail (priv->udpsink[0], NULL);
3082
3083   if (family == G_SOCKET_FAMILY_IPV6)
3084     name = "socket-v6";
3085   else
3086     name = "socket";
3087
3088   g_object_get (priv->udpsink[0], name, &socket, NULL);
3089
3090   return socket;
3091 }
3092
3093 /**
3094  * gst_rtsp_stream_get_rtcp_socket:
3095  * @stream: a #GstRTSPStream
3096  * @family: the socket family
3097  *
3098  * Get the RTCP socket from @stream for a @family.
3099  *
3100  * @stream must be joined to a bin.
3101  *
3102  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3103  * socket could be allocated for @family. Unref after usage
3104  */
3105 GSocket *
3106 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3107 {
3108   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3109   GSocket *socket;
3110   const gchar *name;
3111
3112   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3113   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3114       family == G_SOCKET_FAMILY_IPV6, NULL);
3115   g_return_val_if_fail (priv->udpsink[1], NULL);
3116
3117   if (family == G_SOCKET_FAMILY_IPV6)
3118     name = "socket-v6";
3119   else
3120     name = "socket";
3121
3122   g_object_get (priv->udpsink[1], name, &socket, NULL);
3123
3124   return socket;
3125 }
3126
3127 /**
3128  * gst_rtsp_stream_set_seqnum:
3129  * @stream: a #GstRTSPStream
3130  * @seqnum: a new sequence number
3131  *
3132  * Configure the sequence number in the payloader of @stream to @seqnum.
3133  */
3134 void
3135 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3136 {
3137   GstRTSPStreamPrivate *priv;
3138
3139   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3140
3141   priv = stream->priv;
3142
3143   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3144 }
3145
3146 /**
3147  * gst_rtsp_stream_get_seqnum:
3148  * @stream: a #GstRTSPStream
3149  *
3150  * Get the configured sequence number in the payloader of @stream.
3151  *
3152  * Returns: the sequence number of the payloader.
3153  */
3154 guint16
3155 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3156 {
3157   GstRTSPStreamPrivate *priv;
3158   guint seqnum;
3159
3160   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3161
3162   priv = stream->priv;
3163
3164   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3165
3166   return seqnum;
3167 }
3168
3169 guint64
3170 gst_rtsp_stream_get_udp_sent_bytes (GstRTSPStream * stream)
3171 {
3172   GstRTSPStreamPrivate *priv;
3173   guint64 bytes = 0;
3174
3175   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3176
3177   priv = stream->priv;
3178
3179   g_object_get (G_OBJECT (priv->udpsink[0]), "bytes-to-serve", &bytes, NULL);
3180
3181   return bytes;
3182 }
3183
3184 /**
3185  * gst_rtsp_stream_transport_filter:
3186  * @stream: a #GstRTSPStream
3187  * @func: (scope call) (allow-none): a callback
3188  * @user_data: (closure): user data passed to @func
3189  *
3190  * Call @func for each transport managed by @stream. The result value of @func
3191  * determines what happens to the transport. @func will be called with @stream
3192  * locked so no further actions on @stream can be performed from @func.
3193  *
3194  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3195  * @stream.
3196  *
3197  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3198  *
3199  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3200  * will also be added with an additional ref to the result #GList of this
3201  * function..
3202  *
3203  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3204  *
3205  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3206  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3207  * element in the #GList should be unreffed before the list is freed.
3208  */
3209 GList *
3210 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3211     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3212 {
3213   GstRTSPStreamPrivate *priv;
3214   GList *result, *walk, *next;
3215   GHashTable *visited = NULL;
3216   guint cookie;
3217
3218   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3219
3220   priv = stream->priv;
3221
3222   result = NULL;
3223   if (func)
3224     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3225
3226   g_mutex_lock (&priv->lock);
3227 restart:
3228   cookie = priv->transports_cookie;
3229   for (walk = priv->transports; walk; walk = next) {
3230     GstRTSPStreamTransport *trans = walk->data;
3231     GstRTSPFilterResult res;
3232     gboolean changed;
3233
3234     next = g_list_next (walk);
3235
3236     if (func) {
3237       /* only visit each transport once */
3238       if (g_hash_table_contains (visited, trans))
3239         continue;
3240
3241       g_hash_table_add (visited, g_object_ref (trans));
3242       g_mutex_unlock (&priv->lock);
3243
3244       res = func (stream, trans, user_data);
3245
3246       g_mutex_lock (&priv->lock);
3247     } else
3248       res = GST_RTSP_FILTER_REF;
3249
3250     changed = (cookie != priv->transports_cookie);
3251
3252     switch (res) {
3253       case GST_RTSP_FILTER_REMOVE:
3254         update_transport (stream, trans, FALSE);
3255         break;
3256       case GST_RTSP_FILTER_REF:
3257         result = g_list_prepend (result, g_object_ref (trans));
3258         break;
3259       case GST_RTSP_FILTER_KEEP:
3260       default:
3261         break;
3262     }
3263     if (changed)
3264       goto restart;
3265   }
3266   g_mutex_unlock (&priv->lock);
3267
3268   if (func)
3269     g_hash_table_unref (visited);
3270
3271   return result;
3272 }
3273
3274 static GstPadProbeReturn
3275 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3276 {
3277   GstRTSPStreamPrivate *priv;
3278   GstRTSPStream *stream;
3279
3280   stream = user_data;
3281   priv = stream->priv;
3282
3283   GST_DEBUG_OBJECT (pad, "now blocking");
3284
3285   g_mutex_lock (&priv->lock);
3286   priv->blocking = TRUE;
3287   g_mutex_unlock (&priv->lock);
3288
3289   gst_element_post_message (priv->payloader,
3290       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3291           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3292
3293   return GST_PAD_PROBE_OK;
3294 }
3295
3296 /**
3297  * gst_rtsp_stream_set_blocked:
3298  * @stream: a #GstRTSPStream
3299  * @blocked: boolean indicating we should block or unblock
3300  *
3301  * Blocks or unblocks the dataflow on @stream.
3302  *
3303  * Returns: %TRUE on success
3304  */
3305 gboolean
3306 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3307 {
3308   GstRTSPStreamPrivate *priv;
3309
3310   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3311
3312   priv = stream->priv;
3313
3314   g_mutex_lock (&priv->lock);
3315   if (blocked) {
3316     priv->blocking = FALSE;
3317     if (priv->blocked_id == 0) {
3318       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3319           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3320           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3321           g_object_ref (stream), g_object_unref);
3322     }
3323   } else {
3324     if (priv->blocked_id != 0) {
3325       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3326       priv->blocked_id = 0;
3327       priv->blocking = FALSE;
3328     }
3329   }
3330   g_mutex_unlock (&priv->lock);
3331
3332   return TRUE;
3333 }
3334
3335 /**
3336  * gst_rtsp_stream_is_blocking:
3337  * @stream: a #GstRTSPStream
3338  *
3339  * Check if @stream is blocking on a #GstBuffer.
3340  *
3341  * Returns: %TRUE if @stream is blocking
3342  */
3343 gboolean
3344 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3345 {
3346   GstRTSPStreamPrivate *priv;
3347   gboolean result;
3348
3349   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3350
3351   priv = stream->priv;
3352
3353   g_mutex_lock (&priv->lock);
3354   result = priv->blocking;
3355   g_mutex_unlock (&priv->lock);
3356
3357   return result;
3358 }
3359
3360 /**
3361  * gst_rtsp_stream_query_position:
3362  * @stream: a #GstRTSPStream
3363  *
3364  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3365  * the RTP parts of the pipeline and not the RTCP parts.
3366  *
3367  * Returns: %TRUE if the position could be queried
3368  */
3369 gboolean
3370 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3371 {
3372   GstRTSPStreamPrivate *priv;
3373   GstElement *sink;
3374   gboolean ret;
3375
3376   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3377
3378   priv = stream->priv;
3379
3380   g_mutex_lock (&priv->lock);
3381   if ((sink = priv->udpsink[0]))
3382     gst_object_ref (sink);
3383   g_mutex_unlock (&priv->lock);
3384
3385   if (!sink)
3386     return FALSE;
3387
3388   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3389   gst_object_unref (sink);
3390
3391   return ret;
3392 }
3393
3394 /**
3395  * gst_rtsp_stream_query_stop:
3396  * @stream: a #GstRTSPStream
3397  *
3398  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3399  * the RTP parts of the pipeline and not the RTCP parts.
3400  *
3401  * Returns: %TRUE if the stop could be queried
3402  */
3403 gboolean
3404 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3405 {
3406   GstRTSPStreamPrivate *priv;
3407   GstElement *sink;
3408   GstQuery *query;
3409   gboolean ret;
3410
3411   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3412
3413   priv = stream->priv;
3414
3415   g_mutex_lock (&priv->lock);
3416   if ((sink = priv->udpsink[0]))
3417     gst_object_ref (sink);
3418   g_mutex_unlock (&priv->lock);
3419
3420   if (!sink)
3421     return FALSE;
3422
3423   query = gst_query_new_segment (GST_FORMAT_TIME);
3424   if ((ret = gst_element_query (sink, query))) {
3425     GstFormat format;
3426
3427     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3428     if (format != GST_FORMAT_TIME)
3429       *stop = -1;
3430   }
3431   gst_query_unref (query);
3432   gst_object_unref (sink);
3433
3434   return ret;
3435
3436 }