rtsp-stream: create stream pipeline based on transport
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83   gchar *control;
84
85   GstRTSPProfile profiles;
86   GstRTSPLowerTrans protocols;
87
88   /* pads on the rtpbin */
89   GstPad *send_rtp_sink;
90   GstPad *recv_rtp_src;
91   GstPad *recv_sink[2];
92   GstPad *send_src[2];
93
94   /* the RTPSession object */
95   GObject *session;
96
97   /* SRTP encoder/decoder */
98   GstElement *srtpenc;
99   GstElement *srtpdec;
100   GHashTable *keys;
101
102   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
103    * sockets */
104   GstElement *udpsrc_v4[2];
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
107    * sockets */
108   GstElement *udpsrc_v6[2];
109
110   GstElement *udpqueue[2];
111   GstElement *udpsink[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   guint rtx_pt;
125   GstClockTime rtx_time;
126
127   /* server ports for sending/receiving over ipv4 */
128   GstRTSPRange server_port_v4;
129   GstRTSPAddress *server_addr_v4;
130   gboolean have_ipv4;
131
132   /* server ports for sending/receiving over ipv6 */
133   GstRTSPRange server_port_v6;
134   GstRTSPAddress *server_addr_v6;
135   gboolean have_ipv6;
136
137   /* multicast addresses */
138   GstRTSPAddressPool *pool;
139   GstRTSPAddress *addr_v4;
140   GstRTSPAddress *addr_v6;
141
142   /* the caps of the stream */
143   gulong caps_sig;
144   GstCaps *caps;
145
146   /* transports we stream to */
147   guint n_active;
148   GList *transports;
149   guint transports_cookie;
150   GList *tr_cache_rtp;
151   GList *tr_cache_rtcp;
152   guint tr_cache_cookie_rtp;
153   guint tr_cache_cookie_rtcp;
154
155
156   /* UDP sources for UDP multicast transports */
157   GList *transport_sources;
158
159   gint dscp_qos;
160
161   /* stream blocking */
162   gulong blocked_id;
163   gboolean blocking;
164
165   /* pt->caps map for RECORD streams */
166   GHashTable *ptmap;
167 };
168
169 #define DEFAULT_CONTROL         NULL
170 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
171 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
172                                         GST_RTSP_LOWER_TRANS_TCP
173
174 enum
175 {
176   PROP_0,
177   PROP_CONTROL,
178   PROP_PROFILES,
179   PROP_PROTOCOLS,
180   PROP_LAST
181 };
182
183 enum
184 {
185   SIGNAL_NEW_RTP_ENCODER,
186   SIGNAL_NEW_RTCP_ENCODER,
187   SIGNAL_LAST
188 };
189
190 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
191 #define GST_CAT_DEFAULT rtsp_stream_debug
192
193 static GQuark ssrc_stream_map_key;
194
195 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
196     GValue * value, GParamSpec * pspec);
197 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
198     const GValue * value, GParamSpec * pspec);
199
200 static void gst_rtsp_stream_finalize (GObject * obj);
201
202 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
203
204 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
205
206 static void
207 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
208 {
209   GObjectClass *gobject_class;
210
211   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
212
213   gobject_class = G_OBJECT_CLASS (klass);
214
215   gobject_class->get_property = gst_rtsp_stream_get_property;
216   gobject_class->set_property = gst_rtsp_stream_set_property;
217   gobject_class->finalize = gst_rtsp_stream_finalize;
218
219   g_object_class_install_property (gobject_class, PROP_CONTROL,
220       g_param_spec_string ("control", "Control",
221           "The control string for this stream", DEFAULT_CONTROL,
222           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
223
224   g_object_class_install_property (gobject_class, PROP_PROFILES,
225       g_param_spec_flags ("profiles", "Profiles",
226           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
227           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228
229   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
230       g_param_spec_flags ("protocols", "Protocols",
231           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
232           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
233
234   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
235       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
237       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
238
239   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
240       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
241       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
242       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
243
244   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
245
246   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
247 }
248
249 static void
250 gst_rtsp_stream_init (GstRTSPStream * stream)
251 {
252   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
253
254   GST_DEBUG ("new stream %p", stream);
255
256   stream->priv = priv;
257
258   priv->dscp_qos = -1;
259   priv->control = g_strdup (DEFAULT_CONTROL);
260   priv->profiles = DEFAULT_PROFILES;
261   priv->protocols = DEFAULT_PROTOCOLS;
262
263   g_mutex_init (&priv->lock);
264
265   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
266       NULL, (GDestroyNotify) gst_caps_unref);
267   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
268       (GDestroyNotify) gst_caps_unref);
269 }
270
271 static void
272 gst_rtsp_stream_finalize (GObject * obj)
273 {
274   GstRTSPStream *stream;
275   GstRTSPStreamPrivate *priv;
276
277   stream = GST_RTSP_STREAM (obj);
278   priv = stream->priv;
279
280   GST_DEBUG ("finalize stream %p", stream);
281
282   /* we really need to be unjoined now */
283   g_return_if_fail (!priv->is_joined);
284
285   if (priv->addr_v4)
286     gst_rtsp_address_free (priv->addr_v4);
287   if (priv->addr_v6)
288     gst_rtsp_address_free (priv->addr_v6);
289   if (priv->server_addr_v4)
290     gst_rtsp_address_free (priv->server_addr_v4);
291   if (priv->server_addr_v6)
292     gst_rtsp_address_free (priv->server_addr_v6);
293   if (priv->pool)
294     g_object_unref (priv->pool);
295   if (priv->rtxsend)
296     g_object_unref (priv->rtxsend);
297
298   gst_object_unref (priv->payloader);
299   if (priv->srcpad)
300     gst_object_unref (priv->srcpad);
301   if (priv->sinkpad)
302     gst_object_unref (priv->sinkpad);
303   g_free (priv->control);
304   g_mutex_clear (&priv->lock);
305
306   g_hash_table_unref (priv->keys);
307   g_hash_table_destroy (priv->ptmap);
308
309   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
310 }
311
312 static void
313 gst_rtsp_stream_get_property (GObject * object, guint propid,
314     GValue * value, GParamSpec * pspec)
315 {
316   GstRTSPStream *stream = GST_RTSP_STREAM (object);
317
318   switch (propid) {
319     case PROP_CONTROL:
320       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
321       break;
322     case PROP_PROFILES:
323       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
324       break;
325     case PROP_PROTOCOLS:
326       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
327       break;
328     default:
329       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
330   }
331 }
332
333 static void
334 gst_rtsp_stream_set_property (GObject * object, guint propid,
335     const GValue * value, GParamSpec * pspec)
336 {
337   GstRTSPStream *stream = GST_RTSP_STREAM (object);
338
339   switch (propid) {
340     case PROP_CONTROL:
341       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
342       break;
343     case PROP_PROFILES:
344       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
345       break;
346     case PROP_PROTOCOLS:
347       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
348       break;
349     default:
350       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
351   }
352 }
353
354 /**
355  * gst_rtsp_stream_new:
356  * @idx: an index
357  * @pad: a #GstPad
358  * @payloader: a #GstElement
359  *
360  * Create a new media stream with index @idx that handles RTP data on
361  * @pad and has a payloader element @payloader if @pad is a source pad
362  * or a depayloader element @payloader if @pad is a sink pad.
363  *
364  * Returns: (transfer full): a new #GstRTSPStream
365  */
366 GstRTSPStream *
367 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
368 {
369   GstRTSPStreamPrivate *priv;
370   GstRTSPStream *stream;
371
372   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
373   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
374
375   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
376   priv = stream->priv;
377   priv->idx = idx;
378   priv->payloader = gst_object_ref (payloader);
379   if (GST_PAD_IS_SRC (pad))
380     priv->srcpad = gst_object_ref (pad);
381   else
382     priv->sinkpad = gst_object_ref (pad);
383
384   return stream;
385 }
386
387 /**
388  * gst_rtsp_stream_get_index:
389  * @stream: a #GstRTSPStream
390  *
391  * Get the stream index.
392  *
393  * Return: the stream index.
394  */
395 guint
396 gst_rtsp_stream_get_index (GstRTSPStream * stream)
397 {
398   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
399
400   return stream->priv->idx;
401 }
402
403 /**
404  * gst_rtsp_stream_get_pt:
405  * @stream: a #GstRTSPStream
406  *
407  * Get the stream payload type.
408  *
409  * Return: the stream payload type.
410  */
411 guint
412 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
413 {
414   GstRTSPStreamPrivate *priv;
415   guint pt;
416
417   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
418
419   priv = stream->priv;
420
421   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
422
423   return pt;
424 }
425
426 /**
427  * gst_rtsp_stream_get_srcpad:
428  * @stream: a #GstRTSPStream
429  *
430  * Get the srcpad associated with @stream.
431  *
432  * Returns: (transfer full): the srcpad. Unref after usage.
433  */
434 GstPad *
435 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
436 {
437   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
438
439   if (!stream->priv->srcpad)
440     return NULL;
441
442   return gst_object_ref (stream->priv->srcpad);
443 }
444
445 /**
446  * gst_rtsp_stream_get_sinkpad:
447  * @stream: a #GstRTSPStream
448  *
449  * Get the sinkpad associated with @stream.
450  *
451  * Returns: (transfer full): the sinkpad. Unref after usage.
452  */
453 GstPad *
454 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
455 {
456   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
457
458   if (!stream->priv->sinkpad)
459     return NULL;
460
461   return gst_object_ref (stream->priv->sinkpad);
462 }
463
464 /**
465  * gst_rtsp_stream_get_control:
466  * @stream: a #GstRTSPStream
467  *
468  * Get the control string to identify this stream.
469  *
470  * Returns: (transfer full): the control string. g_free() after usage.
471  */
472 gchar *
473 gst_rtsp_stream_get_control (GstRTSPStream * stream)
474 {
475   GstRTSPStreamPrivate *priv;
476   gchar *result;
477
478   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
479
480   priv = stream->priv;
481
482   g_mutex_lock (&priv->lock);
483   if ((result = g_strdup (priv->control)) == NULL)
484     result = g_strdup_printf ("stream=%u", priv->idx);
485   g_mutex_unlock (&priv->lock);
486
487   return result;
488 }
489
490 /**
491  * gst_rtsp_stream_set_control:
492  * @stream: a #GstRTSPStream
493  * @control: a control string
494  *
495  * Set the control string in @stream.
496  */
497 void
498 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
499 {
500   GstRTSPStreamPrivate *priv;
501
502   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
503
504   priv = stream->priv;
505
506   g_mutex_lock (&priv->lock);
507   g_free (priv->control);
508   priv->control = g_strdup (control);
509   g_mutex_unlock (&priv->lock);
510 }
511
512 /**
513  * gst_rtsp_stream_has_control:
514  * @stream: a #GstRTSPStream
515  * @control: a control string
516  *
517  * Check if @stream has the control string @control.
518  *
519  * Returns: %TRUE is @stream has @control as the control string
520  */
521 gboolean
522 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
523 {
524   GstRTSPStreamPrivate *priv;
525   gboolean res;
526
527   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
528
529   priv = stream->priv;
530
531   g_mutex_lock (&priv->lock);
532   if (priv->control)
533     res = (g_strcmp0 (priv->control, control) == 0);
534   else {
535     guint streamid;
536
537     if (sscanf (control, "stream=%u", &streamid) > 0)
538       res = (streamid == priv->idx);
539     else
540       res = FALSE;
541   }
542   g_mutex_unlock (&priv->lock);
543
544   return res;
545 }
546
547 /**
548  * gst_rtsp_stream_set_mtu:
549  * @stream: a #GstRTSPStream
550  * @mtu: a new MTU
551  *
552  * Configure the mtu in the payloader of @stream to @mtu.
553  */
554 void
555 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
556 {
557   GstRTSPStreamPrivate *priv;
558
559   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
560
561   priv = stream->priv;
562
563   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
564
565   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
566 }
567
568 /**
569  * gst_rtsp_stream_get_mtu:
570  * @stream: a #GstRTSPStream
571  *
572  * Get the configured MTU in the payloader of @stream.
573  *
574  * Returns: the MTU of the payloader.
575  */
576 guint
577 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
578 {
579   GstRTSPStreamPrivate *priv;
580   guint mtu;
581
582   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
583
584   priv = stream->priv;
585
586   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
587
588   return mtu;
589 }
590
591 /* Update the dscp qos property on the udp sinks */
592 static void
593 update_dscp_qos (GstRTSPStream * stream)
594 {
595   GstRTSPStreamPrivate *priv;
596
597   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
598
599   priv = stream->priv;
600
601   if (priv->udpsink[0]) {
602     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
603         NULL);
604   }
605
606   if (priv->udpsink[1]) {
607     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
608         NULL);
609   }
610 }
611
612 /**
613  * gst_rtsp_stream_set_dscp_qos:
614  * @stream: a #GstRTSPStream
615  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
616  *
617  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
618  */
619 void
620 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
621 {
622   GstRTSPStreamPrivate *priv;
623
624   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
625
626   priv = stream->priv;
627
628   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
629
630   if (dscp_qos < -1 || dscp_qos > 63) {
631     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
632     return;
633   }
634
635   priv->dscp_qos = dscp_qos;
636
637   update_dscp_qos (stream);
638 }
639
640 /**
641  * gst_rtsp_stream_get_dscp_qos:
642  * @stream: a #GstRTSPStream
643  *
644  * Get the configured DSCP QoS in of the outgoing sockets.
645  *
646  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
647  */
648 gint
649 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
650 {
651   GstRTSPStreamPrivate *priv;
652
653   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
654
655   priv = stream->priv;
656
657   return priv->dscp_qos;
658 }
659
660 /**
661  * gst_rtsp_stream_is_transport_supported:
662  * @stream: a #GstRTSPStream
663  * @transport: (transfer none): a #GstRTSPTransport
664  *
665  * Check if @transport can be handled by stream
666  *
667  * Returns: %TRUE if @transport can be handled by @stream.
668  */
669 gboolean
670 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
671     GstRTSPTransport * transport)
672 {
673   GstRTSPStreamPrivate *priv;
674
675   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
676
677   priv = stream->priv;
678
679   g_mutex_lock (&priv->lock);
680   if (transport->trans != GST_RTSP_TRANS_RTP)
681     goto unsupported_transmode;
682
683   if (!(transport->profile & priv->profiles))
684     goto unsupported_profile;
685
686   if (!(transport->lower_transport & priv->protocols))
687     goto unsupported_ltrans;
688
689   g_mutex_unlock (&priv->lock);
690
691   return TRUE;
692
693   /* ERRORS */
694 unsupported_transmode:
695   {
696     GST_DEBUG ("unsupported transport mode %d", transport->trans);
697     g_mutex_unlock (&priv->lock);
698     return FALSE;
699   }
700 unsupported_profile:
701   {
702     GST_DEBUG ("unsupported profile %d", transport->profile);
703     g_mutex_unlock (&priv->lock);
704     return FALSE;
705   }
706 unsupported_ltrans:
707   {
708     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
709     g_mutex_unlock (&priv->lock);
710     return FALSE;
711   }
712 }
713
714 /**
715  * gst_rtsp_stream_set_profiles:
716  * @stream: a #GstRTSPStream
717  * @profiles: the new profiles
718  *
719  * Configure the allowed profiles for @stream.
720  */
721 void
722 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
723 {
724   GstRTSPStreamPrivate *priv;
725
726   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
727
728   priv = stream->priv;
729
730   g_mutex_lock (&priv->lock);
731   priv->profiles = profiles;
732   g_mutex_unlock (&priv->lock);
733 }
734
735 /**
736  * gst_rtsp_stream_get_profiles:
737  * @stream: a #GstRTSPStream
738  *
739  * Get the allowed profiles of @stream.
740  *
741  * Returns: a #GstRTSPProfile
742  */
743 GstRTSPProfile
744 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
745 {
746   GstRTSPStreamPrivate *priv;
747   GstRTSPProfile res;
748
749   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
750
751   priv = stream->priv;
752
753   g_mutex_lock (&priv->lock);
754   res = priv->profiles;
755   g_mutex_unlock (&priv->lock);
756
757   return res;
758 }
759
760 /**
761  * gst_rtsp_stream_set_protocols:
762  * @stream: a #GstRTSPStream
763  * @protocols: the new flags
764  *
765  * Configure the allowed lower transport for @stream.
766  */
767 void
768 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
769     GstRTSPLowerTrans protocols)
770 {
771   GstRTSPStreamPrivate *priv;
772
773   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
774
775   priv = stream->priv;
776
777   g_mutex_lock (&priv->lock);
778   priv->protocols = protocols;
779   g_mutex_unlock (&priv->lock);
780 }
781
782 /**
783  * gst_rtsp_stream_get_protocols:
784  * @stream: a #GstRTSPStream
785  *
786  * Get the allowed protocols of @stream.
787  *
788  * Returns: a #GstRTSPLowerTrans
789  */
790 GstRTSPLowerTrans
791 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
792 {
793   GstRTSPStreamPrivate *priv;
794   GstRTSPLowerTrans res;
795
796   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
797       GST_RTSP_LOWER_TRANS_UNKNOWN);
798
799   priv = stream->priv;
800
801   g_mutex_lock (&priv->lock);
802   res = priv->protocols;
803   g_mutex_unlock (&priv->lock);
804
805   return res;
806 }
807
808 /**
809  * gst_rtsp_stream_set_address_pool:
810  * @stream: a #GstRTSPStream
811  * @pool: (transfer none): a #GstRTSPAddressPool
812  *
813  * configure @pool to be used as the address pool of @stream.
814  */
815 void
816 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
817     GstRTSPAddressPool * pool)
818 {
819   GstRTSPStreamPrivate *priv;
820   GstRTSPAddressPool *old;
821
822   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
823
824   priv = stream->priv;
825
826   GST_LOG_OBJECT (stream, "set address pool %p", pool);
827
828   g_mutex_lock (&priv->lock);
829   if ((old = priv->pool) != pool)
830     priv->pool = pool ? g_object_ref (pool) : NULL;
831   else
832     old = NULL;
833   g_mutex_unlock (&priv->lock);
834
835   if (old)
836     g_object_unref (old);
837 }
838
839 /**
840  * gst_rtsp_stream_get_address_pool:
841  * @stream: a #GstRTSPStream
842  *
843  * Get the #GstRTSPAddressPool used as the address pool of @stream.
844  *
845  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
846  * usage.
847  */
848 GstRTSPAddressPool *
849 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
850 {
851   GstRTSPStreamPrivate *priv;
852   GstRTSPAddressPool *result;
853
854   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
855
856   priv = stream->priv;
857
858   g_mutex_lock (&priv->lock);
859   if ((result = priv->pool))
860     g_object_ref (result);
861   g_mutex_unlock (&priv->lock);
862
863   return result;
864 }
865
866 /**
867  * gst_rtsp_stream_get_multicast_address:
868  * @stream: a #GstRTSPStream
869  * @family: the #GSocketFamily
870  *
871  * Get the multicast address of @stream for @family.
872  *
873  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
874  * or %NULL when no address could be allocated. gst_rtsp_address_free()
875  * after usage.
876  */
877 GstRTSPAddress *
878 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
879     GSocketFamily family)
880 {
881   GstRTSPStreamPrivate *priv;
882   GstRTSPAddress *result;
883   GstRTSPAddress **addrp;
884   GstRTSPAddressFlags flags;
885
886   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
887
888   priv = stream->priv;
889
890   if (family == G_SOCKET_FAMILY_IPV6) {
891     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
892     addrp = &priv->addr_v6;
893   } else {
894     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
895     addrp = &priv->addr_v4;
896   }
897
898   g_mutex_lock (&priv->lock);
899   if (*addrp == NULL) {
900     if (priv->pool == NULL)
901       goto no_pool;
902
903     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
904
905     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
906     if (*addrp == NULL)
907       goto no_address;
908   }
909   result = gst_rtsp_address_copy (*addrp);
910   g_mutex_unlock (&priv->lock);
911
912   return result;
913
914   /* ERRORS */
915 no_pool:
916   {
917     GST_ERROR_OBJECT (stream, "no address pool specified");
918     g_mutex_unlock (&priv->lock);
919     return NULL;
920   }
921 no_address:
922   {
923     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
924     g_mutex_unlock (&priv->lock);
925     return NULL;
926   }
927 }
928
929 /**
930  * gst_rtsp_stream_reserve_address:
931  * @stream: a #GstRTSPStream
932  * @address: an address
933  * @port: a port
934  * @n_ports: n_ports
935  * @ttl: a TTL
936  *
937  * Reserve @address and @port as the address and port of @stream.
938  *
939  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
940  * the address could be reserved. gst_rtsp_address_free() after usage.
941  */
942 GstRTSPAddress *
943 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
944     const gchar * address, guint port, guint n_ports, guint ttl)
945 {
946   GstRTSPStreamPrivate *priv;
947   GstRTSPAddress *result;
948   GInetAddress *addr;
949   GSocketFamily family;
950   GstRTSPAddress **addrp;
951
952   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
953   g_return_val_if_fail (address != NULL, NULL);
954   g_return_val_if_fail (port > 0, NULL);
955   g_return_val_if_fail (n_ports > 0, NULL);
956   g_return_val_if_fail (ttl > 0, NULL);
957
958   priv = stream->priv;
959
960   addr = g_inet_address_new_from_string (address);
961   if (!addr) {
962     GST_ERROR ("failed to get inet addr from %s", address);
963     family = G_SOCKET_FAMILY_IPV4;
964   } else {
965     family = g_inet_address_get_family (addr);
966     g_object_unref (addr);
967   }
968
969   if (family == G_SOCKET_FAMILY_IPV6)
970     addrp = &priv->addr_v6;
971   else
972     addrp = &priv->addr_v4;
973
974   g_mutex_lock (&priv->lock);
975   if (*addrp == NULL) {
976     GstRTSPAddressPoolResult res;
977
978     if (priv->pool == NULL)
979       goto no_pool;
980
981     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
982         port, n_ports, ttl, addrp);
983     if (res != GST_RTSP_ADDRESS_POOL_OK)
984       goto no_address;
985   } else {
986     if (strcmp ((*addrp)->address, address) ||
987         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
988         (*addrp)->ttl != ttl)
989       goto different_address;
990   }
991   result = gst_rtsp_address_copy (*addrp);
992   g_mutex_unlock (&priv->lock);
993
994   return result;
995
996   /* ERRORS */
997 no_pool:
998   {
999     GST_ERROR_OBJECT (stream, "no address pool specified");
1000     g_mutex_unlock (&priv->lock);
1001     return NULL;
1002   }
1003 no_address:
1004   {
1005     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1006         address);
1007     g_mutex_unlock (&priv->lock);
1008     return NULL;
1009   }
1010 different_address:
1011   {
1012     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1013         " reserved", address);
1014     g_mutex_unlock (&priv->lock);
1015     return NULL;
1016   }
1017 }
1018
1019 static gboolean
1020 alloc_ports_one_family (GstRTSPStream * stream, GstRTSPAddressPool * pool,
1021     gint buffer_size, GSocketFamily family, GstElement * udpsrc_out[2],
1022     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
1023     GstRTSPAddress ** server_addr_out)
1024 {
1025   GstRTSPStreamPrivate *priv = stream->priv;
1026   GstStateChangeReturn ret;
1027   GstElement *udpsrc0, *udpsrc1;
1028   GstElement *udpsink0, *udpsink1;
1029   GSocket *rtp_socket = NULL;
1030   GSocket *rtcp_socket;
1031   gint tmp_rtp, tmp_rtcp;
1032   guint count;
1033   gint rtpport, rtcpport;
1034   GList *rejected_addresses = NULL;
1035   GstRTSPAddress *addr = NULL;
1036   GInetAddress *inetaddr = NULL;
1037   GSocketAddress *rtp_sockaddr = NULL;
1038   GSocketAddress *rtcp_sockaddr = NULL;
1039   const gchar *multisink_socket;
1040
1041   if (family == G_SOCKET_FAMILY_IPV6)
1042     multisink_socket = "socket-v6";
1043   else
1044     multisink_socket = "socket";
1045
1046   udpsrc0 = NULL;
1047   udpsrc1 = NULL;
1048   udpsink0 = NULL;
1049   udpsink1 = NULL;
1050   count = 0;
1051
1052   /* Start with random port */
1053   tmp_rtp = 0;
1054
1055   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1056       G_SOCKET_PROTOCOL_UDP, NULL);
1057   if (!rtcp_socket)
1058     goto no_udp_protocol;
1059
1060   if (*server_addr_out)
1061     gst_rtsp_address_free (*server_addr_out);
1062
1063   /* try to allocate 2 UDP ports, the RTP port should be an even
1064    * number and the RTCP port should be the next (uneven) port */
1065 again:
1066
1067   if (rtp_socket == NULL) {
1068     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1069         G_SOCKET_PROTOCOL_UDP, NULL);
1070     if (!rtp_socket)
1071       goto no_udp_protocol;
1072   }
1073
1074   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1075     GstRTSPAddressFlags flags;
1076
1077     if (addr)
1078       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1079
1080     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1081     if (family == G_SOCKET_FAMILY_IPV6)
1082       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1083     else
1084       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1085
1086     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1087
1088     if (addr == NULL)
1089       goto no_ports;
1090
1091     tmp_rtp = addr->port;
1092
1093     g_clear_object (&inetaddr);
1094     inetaddr = g_inet_address_new_from_string (addr->address);
1095   } else {
1096     if (tmp_rtp != 0) {
1097       tmp_rtp += 2;
1098       if (++count > 20)
1099         goto no_ports;
1100     }
1101
1102     if (inetaddr == NULL)
1103       inetaddr = g_inet_address_new_any (family);
1104   }
1105
1106   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1107   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1108     g_object_unref (rtp_sockaddr);
1109     goto again;
1110   }
1111   g_object_unref (rtp_sockaddr);
1112
1113   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1114   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1115     g_clear_object (&rtp_sockaddr);
1116     goto socket_error;
1117   }
1118
1119   tmp_rtp =
1120       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1121   g_object_unref (rtp_sockaddr);
1122
1123   /* check if port is even */
1124   if ((tmp_rtp & 1) != 0) {
1125     /* port not even, close and allocate another */
1126     tmp_rtp++;
1127     g_clear_object (&rtp_socket);
1128     goto again;
1129   }
1130
1131   /* set port */
1132   tmp_rtcp = tmp_rtp + 1;
1133
1134   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1135   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1136     g_object_unref (rtcp_sockaddr);
1137     g_clear_object (&rtp_socket);
1138     goto again;
1139   }
1140   g_object_unref (rtcp_sockaddr);
1141
1142   g_clear_object (&inetaddr);
1143
1144   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1145   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1146
1147   if (udpsrc0 == NULL || udpsrc1 == NULL)
1148     goto no_udp_protocol;
1149
1150   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1151   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1152
1153   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1154   if (ret == GST_STATE_CHANGE_FAILURE)
1155     goto element_error;
1156   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1157   if (ret == GST_STATE_CHANGE_FAILURE)
1158     goto element_error;
1159
1160   /* all fine, do port check */
1161   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1162   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1163
1164   /* this should not happen... */
1165   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1166     goto port_error;
1167
1168   if (udpsink_out[0])
1169     udpsink0 = udpsink_out[0];
1170   else
1171     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1172
1173   if (!udpsink0)
1174     goto no_udp_protocol;
1175
1176   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1177   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1178
1179   if (udpsink_out[1])
1180     udpsink1 = udpsink_out[1];
1181   else
1182     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1183
1184   if (!udpsink1)
1185     goto no_udp_protocol;
1186
1187   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1188   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1189   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1190
1191   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1192   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1193   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1194   /* Needs to be async for RECORD streams, otherwise we will never go to
1195    * PLAYING because the sinks will wait for data while the udpsrc can't
1196    * provide data with timestamps in PAUSED. */
1197   if (priv->sinkpad)
1198     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1199   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1200   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1201   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1202   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1203   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1204
1205   /* we keep these elements, we will further configure them when the
1206    * client told us to really use the UDP ports. */
1207   udpsrc_out[0] = udpsrc0;
1208   udpsrc_out[1] = udpsrc1;
1209   udpsink_out[0] = udpsink0;
1210   udpsink_out[1] = udpsink1;
1211
1212   server_port_out->min = rtpport;
1213   server_port_out->max = rtcpport;
1214
1215   *server_addr_out = addr;
1216   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1217
1218   g_object_unref (rtp_socket);
1219   g_object_unref (rtcp_socket);
1220
1221   return TRUE;
1222
1223   /* ERRORS */
1224 no_udp_protocol:
1225   {
1226     goto cleanup;
1227   }
1228 no_ports:
1229   {
1230     goto cleanup;
1231   }
1232 port_error:
1233   {
1234     goto cleanup;
1235   }
1236 socket_error:
1237   {
1238     goto cleanup;
1239   }
1240 element_error:
1241   {
1242     goto cleanup;
1243   }
1244 cleanup:
1245   {
1246     if (udpsrc0) {
1247       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1248       gst_object_unref (udpsrc0);
1249     }
1250     if (udpsrc1) {
1251       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1252       gst_object_unref (udpsrc1);
1253     }
1254     if (udpsink0) {
1255       gst_element_set_state (udpsink0, GST_STATE_NULL);
1256       gst_object_unref (udpsink0);
1257     }
1258     if (inetaddr)
1259       g_object_unref (inetaddr);
1260     g_list_free_full (rejected_addresses,
1261         (GDestroyNotify) gst_rtsp_address_free);
1262     if (addr)
1263       gst_rtsp_address_free (addr);
1264     if (rtp_socket)
1265       g_object_unref (rtp_socket);
1266     if (rtcp_socket)
1267       g_object_unref (rtcp_socket);
1268     return FALSE;
1269   }
1270 }
1271
1272 /* must be called with lock */
1273 static gboolean
1274 alloc_ports (GstRTSPStream * stream)
1275 {
1276   GstRTSPStreamPrivate *priv = stream->priv;
1277
1278   priv->have_ipv4 =
1279       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1280       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1281       &priv->server_port_v4, &priv->server_addr_v4);
1282
1283   priv->have_ipv6 =
1284       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1285       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1286       &priv->server_port_v6, &priv->server_addr_v6);
1287
1288   return priv->have_ipv4 || priv->have_ipv6;
1289 }
1290
1291 /**
1292  * gst_rtsp_stream_get_server_port:
1293  * @stream: a #GstRTSPStream
1294  * @server_port: (out): result server port
1295  * @family: the port family to get
1296  *
1297  * Fill @server_port with the port pair used by the server. This function can
1298  * only be called when @stream has been joined.
1299  */
1300 void
1301 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1302     GstRTSPRange * server_port, GSocketFamily family)
1303 {
1304   GstRTSPStreamPrivate *priv;
1305
1306   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1307   priv = stream->priv;
1308   g_return_if_fail (priv->is_joined);
1309
1310   g_mutex_lock (&priv->lock);
1311   if (family == G_SOCKET_FAMILY_IPV4) {
1312     if (server_port)
1313       *server_port = priv->server_port_v4;
1314   } else {
1315     if (server_port)
1316       *server_port = priv->server_port_v6;
1317   }
1318   g_mutex_unlock (&priv->lock);
1319 }
1320
1321 /**
1322  * gst_rtsp_stream_get_rtpsession:
1323  * @stream: a #GstRTSPStream
1324  *
1325  * Get the RTP session of this stream.
1326  *
1327  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1328  */
1329 GObject *
1330 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1331 {
1332   GstRTSPStreamPrivate *priv;
1333   GObject *session;
1334
1335   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1336
1337   priv = stream->priv;
1338
1339   g_mutex_lock (&priv->lock);
1340   if ((session = priv->session))
1341     g_object_ref (session);
1342   g_mutex_unlock (&priv->lock);
1343
1344   return session;
1345 }
1346
1347 /**
1348  * gst_rtsp_stream_get_ssrc:
1349  * @stream: a #GstRTSPStream
1350  * @ssrc: (out): result ssrc
1351  *
1352  * Get the SSRC used by the RTP session of this stream. This function can only
1353  * be called when @stream has been joined.
1354  */
1355 void
1356 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1357 {
1358   GstRTSPStreamPrivate *priv;
1359
1360   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1361   priv = stream->priv;
1362   g_return_if_fail (priv->is_joined);
1363
1364   g_mutex_lock (&priv->lock);
1365   if (ssrc && priv->session)
1366     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1367   g_mutex_unlock (&priv->lock);
1368 }
1369
1370 /**
1371  * gst_rtsp_stream_set_retransmission_time:
1372  * @stream: a #GstRTSPStream
1373  * @time: a #GstClockTime
1374  *
1375  * Set the amount of time to store retransmission packets.
1376  */
1377 void
1378 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1379     GstClockTime time)
1380 {
1381   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1382
1383   g_mutex_lock (&stream->priv->lock);
1384   stream->priv->rtx_time = time;
1385   if (stream->priv->rtxsend)
1386     g_object_set (stream->priv->rtxsend, "max-size-time",
1387         GST_TIME_AS_MSECONDS (time), NULL);
1388   g_mutex_unlock (&stream->priv->lock);
1389 }
1390
1391 /**
1392  * gst_rtsp_stream_get_retransmission_time:
1393  * @stream: a #GstRTSPStream
1394  *
1395  * Get the amount of time to store retransmission data.
1396  *
1397  * Returns: the amount of time to store retransmission data.
1398  */
1399 GstClockTime
1400 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1401 {
1402   GstClockTime ret;
1403
1404   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1405
1406   g_mutex_lock (&stream->priv->lock);
1407   ret = stream->priv->rtx_time;
1408   g_mutex_unlock (&stream->priv->lock);
1409
1410   return ret;
1411 }
1412
1413 /**
1414  * gst_rtsp_stream_set_retransmission_pt:
1415  * @stream: a #GstRTSPStream
1416  * @rtx_pt: a #guint
1417  *
1418  * Set the payload type (pt) for retransmission of this stream.
1419  */
1420 void
1421 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1422 {
1423   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1424
1425   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1426
1427   g_mutex_lock (&stream->priv->lock);
1428   stream->priv->rtx_pt = rtx_pt;
1429   if (stream->priv->rtxsend) {
1430     guint pt = gst_rtsp_stream_get_pt (stream);
1431     gchar *pt_s = g_strdup_printf ("%d", pt);
1432     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1433         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1434     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1435     g_free (pt_s);
1436     gst_structure_free (rtx_pt_map);
1437   }
1438   g_mutex_unlock (&stream->priv->lock);
1439 }
1440
1441 /**
1442  * gst_rtsp_stream_get_retransmission_pt:
1443  * @stream: a #GstRTSPStream
1444  *
1445  * Get the payload-type used for retransmission of this stream
1446  *
1447  * Returns: The retransmission PT.
1448  */
1449 guint
1450 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1451 {
1452   guint rtx_pt;
1453
1454   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1455
1456   g_mutex_lock (&stream->priv->lock);
1457   rtx_pt = stream->priv->rtx_pt;
1458   g_mutex_unlock (&stream->priv->lock);
1459
1460   return rtx_pt;
1461 }
1462
1463 /**
1464  * gst_rtsp_stream_set_buffer_size:
1465  * @stream: a #GstRTSPStream
1466  * @size: the buffer size
1467  *
1468  * Set the size of the UDP transmission buffer (in bytes)
1469  * Needs to be set before the stream is joined to a bin.
1470  *
1471  * Since: 1.6
1472  */
1473 void
1474 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1475 {
1476   g_mutex_lock (&stream->priv->lock);
1477   stream->priv->buffer_size = size;
1478   g_mutex_unlock (&stream->priv->lock);
1479 }
1480
1481 /**
1482  * gst_rtsp_stream_get_buffer_size:
1483  * @stream: a #GstRTSPStream
1484  *
1485  * Get the size of the UDP transmission buffer (in bytes)
1486  *
1487  * Returns: the size of the UDP TX buffer
1488  *
1489  * Since: 1.6
1490  */
1491 guint
1492 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1493 {
1494   guint buffer_size;
1495
1496   g_mutex_lock (&stream->priv->lock);
1497   buffer_size = stream->priv->buffer_size;
1498   g_mutex_unlock (&stream->priv->lock);
1499
1500   return buffer_size;
1501 }
1502
1503 /* executed from streaming thread */
1504 static void
1505 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1506 {
1507   GstRTSPStreamPrivate *priv = stream->priv;
1508   GstCaps *newcaps, *oldcaps;
1509
1510   newcaps = gst_pad_get_current_caps (pad);
1511
1512   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1513       newcaps);
1514
1515   g_mutex_lock (&priv->lock);
1516   oldcaps = priv->caps;
1517   priv->caps = newcaps;
1518   g_mutex_unlock (&priv->lock);
1519
1520   if (oldcaps)
1521     gst_caps_unref (oldcaps);
1522 }
1523
1524 static void
1525 dump_structure (const GstStructure * s)
1526 {
1527   gchar *sstr;
1528
1529   sstr = gst_structure_to_string (s);
1530   GST_INFO ("structure: %s", sstr);
1531   g_free (sstr);
1532 }
1533
1534 static GstRTSPStreamTransport *
1535 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1536 {
1537   GstRTSPStreamPrivate *priv = stream->priv;
1538   GList *walk;
1539   GstRTSPStreamTransport *result = NULL;
1540   const gchar *tmp;
1541   gchar *dest;
1542   guint port;
1543
1544   if (rtcp_from == NULL)
1545     return NULL;
1546
1547   tmp = g_strrstr (rtcp_from, ":");
1548   if (tmp == NULL)
1549     return NULL;
1550
1551   port = atoi (tmp + 1);
1552   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1553
1554   g_mutex_lock (&priv->lock);
1555   GST_INFO ("finding %s:%d in %d transports", dest, port,
1556       g_list_length (priv->transports));
1557
1558   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1559     GstRTSPStreamTransport *trans = walk->data;
1560     const GstRTSPTransport *tr;
1561     gint min, max;
1562
1563     tr = gst_rtsp_stream_transport_get_transport (trans);
1564
1565     min = tr->client_port.min;
1566     max = tr->client_port.max;
1567
1568     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1569       result = trans;
1570       break;
1571     }
1572   }
1573   if (result)
1574     g_object_ref (result);
1575   g_mutex_unlock (&priv->lock);
1576
1577   g_free (dest);
1578
1579   return result;
1580 }
1581
1582 static GstRTSPStreamTransport *
1583 check_transport (GObject * source, GstRTSPStream * stream)
1584 {
1585   GstStructure *stats;
1586   GstRTSPStreamTransport *trans;
1587
1588   /* see if we have a stream to match with the origin of the RTCP packet */
1589   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1590   if (trans == NULL) {
1591     g_object_get (source, "stats", &stats, NULL);
1592     if (stats) {
1593       const gchar *rtcp_from;
1594
1595       dump_structure (stats);
1596
1597       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1598       if ((trans = find_transport (stream, rtcp_from))) {
1599         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1600             source);
1601         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1602             g_object_unref);
1603       }
1604       gst_structure_free (stats);
1605     }
1606   }
1607   return trans;
1608 }
1609
1610
1611 static void
1612 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1613 {
1614   GstRTSPStreamTransport *trans;
1615
1616   GST_INFO ("%p: new source %p", stream, source);
1617
1618   trans = check_transport (source, stream);
1619
1620   if (trans)
1621     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1622 }
1623
1624 static void
1625 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1626 {
1627   GST_INFO ("%p: new SDES %p", stream, source);
1628 }
1629
1630 static void
1631 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1632 {
1633   GstRTSPStreamTransport *trans;
1634
1635   trans = check_transport (source, stream);
1636
1637   if (trans) {
1638     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1639     gst_rtsp_stream_transport_keep_alive (trans);
1640   }
1641 #ifdef DUMP_STATS
1642   {
1643     GstStructure *stats;
1644     g_object_get (source, "stats", &stats, NULL);
1645     if (stats) {
1646       dump_structure (stats);
1647       gst_structure_free (stats);
1648     }
1649   }
1650 #endif
1651 }
1652
1653 static void
1654 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1655 {
1656   GST_INFO ("%p: source %p bye", stream, source);
1657 }
1658
1659 static void
1660 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1661 {
1662   GstRTSPStreamTransport *trans;
1663
1664   GST_INFO ("%p: source %p bye timeout", stream, source);
1665
1666   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1667     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1668     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1669   }
1670 }
1671
1672 static void
1673 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1674 {
1675   GstRTSPStreamTransport *trans;
1676
1677   GST_INFO ("%p: source %p timeout", stream, source);
1678
1679   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1680     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1681     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1682   }
1683 }
1684
1685 static void
1686 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1687 {
1688   GST_INFO ("%p: new sender source %p", stream, source);
1689 #ifndef DUMP_STATS
1690   {
1691     GstStructure *stats;
1692     g_object_get (source, "stats", &stats, NULL);
1693     if (stats) {
1694       dump_structure (stats);
1695       gst_structure_free (stats);
1696     }
1697   }
1698 #endif
1699 }
1700
1701 static void
1702 on_sender_ssrc_active (GObject * session, GObject * source,
1703     GstRTSPStream * stream)
1704 {
1705 #ifndef DUMP_STATS
1706   {
1707     GstStructure *stats;
1708     g_object_get (source, "stats", &stats, NULL);
1709     if (stats) {
1710       dump_structure (stats);
1711       gst_structure_free (stats);
1712     }
1713   }
1714 #endif
1715 }
1716
1717 static void
1718 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1719 {
1720   if (is_rtp) {
1721     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1722     g_list_free (priv->tr_cache_rtp);
1723     priv->tr_cache_rtp = NULL;
1724   } else {
1725     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1726     g_list_free (priv->tr_cache_rtcp);
1727     priv->tr_cache_rtcp = NULL;
1728   }
1729 }
1730
1731 static GstFlowReturn
1732 handle_new_sample (GstAppSink * sink, gpointer user_data)
1733 {
1734   GstRTSPStreamPrivate *priv;
1735   GList *walk;
1736   GstSample *sample;
1737   GstBuffer *buffer;
1738   GstRTSPStream *stream;
1739   gboolean is_rtp;
1740
1741   sample = gst_app_sink_pull_sample (sink);
1742   if (!sample)
1743     return GST_FLOW_OK;
1744
1745   stream = (GstRTSPStream *) user_data;
1746   priv = stream->priv;
1747   buffer = gst_sample_get_buffer (sample);
1748
1749   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1750
1751   g_mutex_lock (&priv->lock);
1752   if (is_rtp) {
1753     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1754       clear_tr_cache (priv, is_rtp);
1755       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1756         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1757         priv->tr_cache_rtp =
1758             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1759       }
1760       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1761     }
1762   } else {
1763     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1764       clear_tr_cache (priv, is_rtp);
1765       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1766         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1767         priv->tr_cache_rtcp =
1768             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1769       }
1770       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1771     }
1772   }
1773   g_mutex_unlock (&priv->lock);
1774
1775   if (is_rtp) {
1776     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1777       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1778       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1779     }
1780   } else {
1781     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1782       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1783       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1784     }
1785   }
1786   gst_sample_unref (sample);
1787
1788   return GST_FLOW_OK;
1789 }
1790
1791 static GstAppSinkCallbacks sink_cb = {
1792   NULL,                         /* not interested in EOS */
1793   NULL,                         /* not interested in preroll samples */
1794   handle_new_sample,
1795 };
1796
1797 static GstElement *
1798 get_rtp_encoder (GstRTSPStream * stream, guint session)
1799 {
1800   GstRTSPStreamPrivate *priv = stream->priv;
1801
1802   if (priv->srtpenc == NULL) {
1803     gchar *name;
1804
1805     name = g_strdup_printf ("srtpenc_%u", session);
1806     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1807     g_free (name);
1808
1809     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1810   }
1811   return gst_object_ref (priv->srtpenc);
1812 }
1813
1814 static GstElement *
1815 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1816 {
1817   GstRTSPStreamPrivate *priv = stream->priv;
1818   GstElement *oldenc, *enc;
1819   GstPad *pad;
1820   gchar *name;
1821
1822   if (priv->idx != session)
1823     return NULL;
1824
1825   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1826
1827   oldenc = priv->srtpenc;
1828   enc = get_rtp_encoder (stream, session);
1829   name = g_strdup_printf ("rtp_sink_%d", session);
1830   pad = gst_element_get_request_pad (enc, name);
1831   g_free (name);
1832   gst_object_unref (pad);
1833
1834   if (oldenc == NULL)
1835     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1836         enc);
1837
1838   return enc;
1839 }
1840
1841 static GstElement *
1842 request_rtcp_encoder (GstElement * rtpbin, guint session,
1843     GstRTSPStream * stream)
1844 {
1845   GstRTSPStreamPrivate *priv = stream->priv;
1846   GstElement *oldenc, *enc;
1847   GstPad *pad;
1848   gchar *name;
1849
1850   if (priv->idx != session)
1851     return NULL;
1852
1853   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1854
1855   oldenc = priv->srtpenc;
1856   enc = get_rtp_encoder (stream, session);
1857   name = g_strdup_printf ("rtcp_sink_%d", session);
1858   pad = gst_element_get_request_pad (enc, name);
1859   g_free (name);
1860   gst_object_unref (pad);
1861
1862   if (oldenc == NULL)
1863     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1864         enc);
1865
1866   return enc;
1867 }
1868
1869 static GstCaps *
1870 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1871 {
1872   GstRTSPStreamPrivate *priv = stream->priv;
1873   GstCaps *caps;
1874
1875   GST_DEBUG ("request key %08x", ssrc);
1876
1877   g_mutex_lock (&priv->lock);
1878   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1879     gst_caps_ref (caps);
1880   g_mutex_unlock (&priv->lock);
1881
1882   return caps;
1883 }
1884
1885 static GstElement *
1886 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1887     GstRTSPStream * stream)
1888 {
1889   GstRTSPStreamPrivate *priv = stream->priv;
1890
1891   if (priv->idx != session)
1892     return NULL;
1893
1894   if (priv->srtpdec == NULL) {
1895     gchar *name;
1896
1897     name = g_strdup_printf ("srtpdec_%u", session);
1898     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1899     g_free (name);
1900
1901     g_signal_connect (priv->srtpdec, "request-key",
1902         (GCallback) request_key, stream);
1903   }
1904   return gst_object_ref (priv->srtpdec);
1905 }
1906
1907 /**
1908  * gst_rtsp_stream_request_aux_sender:
1909  * @stream: a #GstRTSPStream
1910  * @sessid: the session id
1911  *
1912  * Creating a rtxsend bin
1913  *
1914  * Returns: (transfer full): a #GstElement.
1915  *
1916  * Since: 1.6
1917  */
1918 GstElement *
1919 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
1920 {
1921   GstElement *bin;
1922   GstPad *pad;
1923   GstStructure *pt_map;
1924   gchar *name;
1925   guint pt, rtx_pt;
1926   gchar *pt_s;
1927
1928   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1929
1930   pt = gst_rtsp_stream_get_pt (stream);
1931   pt_s = g_strdup_printf ("%u", pt);
1932   rtx_pt = stream->priv->rtx_pt;
1933
1934   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1935
1936   bin = gst_bin_new (NULL);
1937   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1938   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1939       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1940   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1941       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1942   g_free (pt_s);
1943   gst_structure_free (pt_map);
1944   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1945
1946   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1947   name = g_strdup_printf ("src_%u", sessid);
1948   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1949   g_free (name);
1950   gst_object_unref (pad);
1951
1952   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1953   name = g_strdup_printf ("sink_%u", sessid);
1954   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1955   g_free (name);
1956   gst_object_unref (pad);
1957
1958   return bin;
1959 }
1960
1961 /**
1962  * gst_rtsp_stream_set_pt_map:
1963  * @stream: a #GstRTSPStream
1964  * @pt: the pt
1965  * @caps: a #GstCaps
1966  *
1967  * Configure a pt map between @pt and @caps.
1968  */
1969 void
1970 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
1971 {
1972   GstRTSPStreamPrivate *priv = stream->priv;
1973
1974   g_mutex_lock (&priv->lock);
1975   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
1976   g_mutex_unlock (&priv->lock);
1977 }
1978
1979 static GstCaps *
1980 request_pt_map (GstElement * rtpbin, guint session, guint pt,
1981     GstRTSPStream * stream)
1982 {
1983   GstRTSPStreamPrivate *priv = stream->priv;
1984   GstCaps *caps = NULL;
1985
1986   g_mutex_lock (&priv->lock);
1987
1988   if (priv->idx == session) {
1989     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
1990     if (caps) {
1991       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
1992       gst_caps_ref (caps);
1993     } else {
1994       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
1995     }
1996   }
1997
1998   g_mutex_unlock (&priv->lock);
1999
2000   return caps;
2001 }
2002
2003 static void
2004 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2005 {
2006   GstRTSPStreamPrivate *priv = stream->priv;
2007   gchar *name;
2008   GstPadLinkReturn ret;
2009   guint sessid;
2010
2011   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2012       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2013
2014   name = gst_pad_get_name (pad);
2015   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2016     g_free (name);
2017     return;
2018   }
2019   g_free (name);
2020
2021   if (priv->idx != sessid)
2022     return;
2023
2024   if (gst_pad_is_linked (priv->sinkpad)) {
2025     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2026         GST_DEBUG_PAD_NAME (priv->sinkpad));
2027     return;
2028   }
2029
2030   /* link the RTP pad to the session manager, it should not really fail unless
2031    * this is not really an RTP pad */
2032   ret = gst_pad_link (pad, priv->sinkpad);
2033   if (ret != GST_PAD_LINK_OK)
2034     goto link_failed;
2035   priv->recv_rtp_src = gst_object_ref (pad);
2036
2037   return;
2038
2039 /* ERRORS */
2040 link_failed:
2041   {
2042     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2043         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2044   }
2045 }
2046
2047 static void
2048 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2049     GstRTSPStream * stream)
2050 {
2051   /* TODO: What to do here other than this? */
2052   GST_DEBUG ("Stream %p: Got EOS", stream);
2053   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2054 }
2055
2056 /**
2057  * gst_rtsp_stream_join_bin:
2058  * @stream: a #GstRTSPStream
2059  * @bin: (transfer none): a #GstBin to join
2060  * @rtpbin: (transfer none): a rtpbin element in @bin
2061  * @state: the target state of the new elements
2062  *
2063  * Join the #GstBin @bin that contains the element @rtpbin.
2064  *
2065  * @stream will link to @rtpbin, which must be inside @bin. The elements
2066  * added to @bin will be set to the state given in @state.
2067  *
2068  * Returns: %TRUE on success.
2069  */
2070 gboolean
2071 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2072     GstElement * rtpbin, GstState state)
2073 {
2074   GstRTSPStreamPrivate *priv;
2075   gint i;
2076   guint idx;
2077   gchar *name;
2078   GstPad *pad, *sinkpad = NULL, *selpad;
2079   GstPadLinkReturn ret;
2080   gboolean is_tcp = FALSE, is_udp = FALSE;
2081
2082   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2083   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2084   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2085
2086   priv = stream->priv;
2087
2088   g_mutex_lock (&priv->lock);
2089   if (priv->is_joined)
2090     goto was_joined;
2091
2092   /* create a session with the same index as the stream */
2093   idx = priv->idx;
2094
2095   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2096
2097   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2098
2099   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2100       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2101
2102   if (is_udp && !alloc_ports (stream))
2103     goto no_ports;
2104
2105   /* update the dscp qos field in the sinks */
2106   update_dscp_qos (stream);
2107
2108   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2109       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2110     /* For SRTP */
2111     g_signal_connect (rtpbin, "request-rtp-encoder",
2112         (GCallback) request_rtp_encoder, stream);
2113     g_signal_connect (rtpbin, "request-rtcp-encoder",
2114         (GCallback) request_rtcp_encoder, stream);
2115     g_signal_connect (rtpbin, "request-rtp-decoder",
2116         (GCallback) request_rtp_rtcp_decoder, stream);
2117     g_signal_connect (rtpbin, "request-rtcp-decoder",
2118         (GCallback) request_rtp_rtcp_decoder, stream);
2119   }
2120
2121   if (priv->sinkpad) {
2122     g_signal_connect (rtpbin, "request-pt-map",
2123         (GCallback) request_pt_map, stream);
2124   }
2125
2126   /* get pads from the RTP session element for sending and receiving
2127    * RTP/RTCP*/
2128   if (priv->srcpad) {
2129     /* get a pad for sending RTP */
2130     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2131     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2132     g_free (name);
2133
2134     /* link the RTP pad to the session manager, it should not really fail unless
2135      * this is not really an RTP pad */
2136     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2137     if (ret != GST_PAD_LINK_OK)
2138       goto link_failed;
2139
2140     name = g_strdup_printf ("send_rtp_src_%u", idx);
2141     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2142     g_free (name);
2143   } else {
2144     /* Need to connect our sinkpad from here */
2145     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2146     /* EOS */
2147     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2148
2149     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2150     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2151     g_free (name);
2152   }
2153
2154   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2155   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2156   g_free (name);
2157   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2158   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2159   g_free (name);
2160
2161   /* get the session */
2162   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2163
2164   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2165       stream);
2166   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2167       stream);
2168   g_signal_connect (priv->session, "on-ssrc-active",
2169       (GCallback) on_ssrc_active, stream);
2170   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2171       stream);
2172   g_signal_connect (priv->session, "on-bye-timeout",
2173       (GCallback) on_bye_timeout, stream);
2174   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2175       stream);
2176
2177   /* signal for sender ssrc */
2178   g_signal_connect (priv->session, "on-new-sender-ssrc",
2179       (GCallback) on_new_sender_ssrc, stream);
2180   g_signal_connect (priv->session, "on-sender-ssrc-active",
2181       (GCallback) on_sender_ssrc_active, stream);
2182
2183   for (i = 0; i < 2; i++) {
2184     GstPad *teepad, *queuepad;
2185     /* For the sender we create this bit of pipeline for both
2186      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2187      * we need to add a queue before appsink and udpsink to make
2188      * the pipeline not block. For the TCP case, we want to pump
2189      * client as fast as possible anyway. This pipeline is used
2190      * when both TCP and UDP are present.
2191      *
2192      * .--------.      .-----.    .---------.    .---------.
2193      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2194      * |       send->sink   src->sink      src->sink       |
2195      * '--------'      |     |    '---------'    '---------'
2196      *                 |     |    .---------.    .---------.
2197      *                 |     |    |  queue  |    | appsink |
2198      *                 |    src->sink      src->sink       |
2199      *                 '-----'    '---------'    '---------'
2200      *
2201      * When only UDP or only TCP is allowed, we skip the tee and queue
2202      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2203      * the session.
2204      */
2205
2206     /* Only link the RTP send src if we're going to send RTP, link
2207      * the RTCP send src always */
2208     if (priv->srcpad || i == 1) {
2209       if (is_udp) {
2210         /* add udpsink */
2211         gst_bin_add (bin, priv->udpsink[i]);
2212         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2213       }
2214
2215       if (is_tcp) {
2216         /* make appsink */
2217         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2218         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2219         gst_bin_add (bin, priv->appsink[i]);
2220         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2221             &sink_cb, stream, NULL);
2222       }
2223
2224       if (is_udp && is_tcp) {
2225         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2226
2227         /* make tee for RTP/RTCP */
2228         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2229         gst_bin_add (bin, priv->tee[i]);
2230
2231         /* and link to rtpbin send pad */
2232         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2233         gst_pad_link (priv->send_src[i], pad);
2234         gst_object_unref (pad);
2235
2236         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2237         g_object_set (priv->udpqueue[i], "max-size-buffers",
2238             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2239             NULL);
2240         gst_bin_add (bin, priv->udpqueue[i]);
2241         /* link tee to udpqueue */
2242         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2243         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2244         gst_pad_link (teepad, pad);
2245         gst_object_unref (pad);
2246         gst_object_unref (teepad);
2247
2248         /* link udpqueue to udpsink */
2249         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2250         gst_pad_link (queuepad, sinkpad);
2251         gst_object_unref (queuepad);
2252         gst_object_unref (sinkpad);
2253
2254         /* make appqueue */
2255         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2256         g_object_set (priv->appqueue[i], "max-size-buffers",
2257             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2258             NULL);
2259         gst_bin_add (bin, priv->appqueue[i]);
2260         /* and link tee to appqueue */
2261         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2262         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2263         gst_pad_link (teepad, pad);
2264         gst_object_unref (pad);
2265         gst_object_unref (teepad);
2266
2267         /* and link appqueue to appsink */
2268         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2269         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2270         gst_pad_link (queuepad, pad);
2271         gst_object_unref (pad);
2272         gst_object_unref (queuepad);
2273       } else if (is_tcp) {
2274         /* only appsink needed, link it to the session */
2275         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2276         gst_pad_link (priv->send_src[i], pad);
2277         gst_object_unref (pad);
2278
2279         /* when its only TCP, we need to set sync and preroll to FALSE
2280          * for the sink to avoid deadlock. And this is only needed for
2281          * sink used for RTCP data, not the RTP data. */
2282         if (i == 1)
2283           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2284       } else {
2285         /* else only udpsink needed, link it to the session */
2286         gst_pad_link (priv->send_src[i], sinkpad);
2287         gst_object_unref (sinkpad);
2288       }
2289     }
2290
2291     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2292      * RTCP sink always */
2293     if (priv->sinkpad || i == 1) {
2294       /* For the receiver we create this bit of pipeline for both
2295        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2296        * and it is all funneled into the rtpbin receive pad.
2297        *
2298        * .--------.     .--------.    .--------.
2299        * | udpsrc |     | funnel |    | rtpbin |
2300        * |       src->sink      src->sink      |
2301        * '--------'     |        |    '--------'
2302        * .--------.     |        |
2303        * | appsrc |     |        |
2304        * |       src->sink       |
2305        * '--------'     '--------'
2306        */
2307       /* make funnel for the RTP/RTCP receivers */
2308       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2309       gst_bin_add (bin, priv->funnel[i]);
2310
2311       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2312       gst_pad_link (pad, priv->recv_sink[i]);
2313       gst_object_unref (pad);
2314
2315       if (priv->udpsrc_v4[i]) {
2316         if (priv->srcpad) {
2317           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2318            * values. This is only relevant for PLAY pipelines */
2319           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2320           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2321         }
2322         /* add udpsrc */
2323         gst_bin_add (bin, priv->udpsrc_v4[i]);
2324
2325         /* and link to the funnel v4 */
2326         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2327         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2328         gst_pad_link (pad, selpad);
2329         gst_object_unref (pad);
2330         gst_object_unref (selpad);
2331       }
2332
2333       if (priv->udpsrc_v6[i]) {
2334         if (priv->srcpad) {
2335           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2336           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2337         }
2338         gst_bin_add (bin, priv->udpsrc_v6[i]);
2339
2340         /* and link to the funnel v6 */
2341         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2342         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2343         gst_pad_link (pad, selpad);
2344         gst_object_unref (pad);
2345         gst_object_unref (selpad);
2346       }
2347
2348       if (is_tcp) {
2349         /* make and add appsrc */
2350         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2351         priv->appsrc_base_time[i] = -1;
2352         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2353         gst_bin_add (bin, priv->appsrc[i]);
2354         /* and link to the funnel */
2355         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2356         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2357         gst_pad_link (pad, selpad);
2358         gst_object_unref (pad);
2359         gst_object_unref (selpad);
2360       }
2361     }
2362
2363     /* check if we need to set to a special state */
2364     if (state != GST_STATE_NULL) {
2365       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2366         gst_element_set_state (priv->udpsink[i], state);
2367       if (priv->appsink[i] && (priv->srcpad || i == 1))
2368         gst_element_set_state (priv->appsink[i], state);
2369       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2370         gst_element_set_state (priv->appqueue[i], state);
2371       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2372         gst_element_set_state (priv->udpqueue[i], state);
2373       if (priv->tee[i] && (priv->srcpad || i == 1))
2374         gst_element_set_state (priv->tee[i], state);
2375       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2376         gst_element_set_state (priv->funnel[i], state);
2377       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2378         gst_element_set_state (priv->appsrc[i], state);
2379     }
2380   }
2381
2382   if (priv->srcpad) {
2383     /* be notified of caps changes */
2384     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2385         (GCallback) caps_notify, stream);
2386   }
2387
2388   priv->is_joined = TRUE;
2389   g_mutex_unlock (&priv->lock);
2390
2391   return TRUE;
2392
2393   /* ERRORS */
2394 was_joined:
2395   {
2396     g_mutex_unlock (&priv->lock);
2397     return TRUE;
2398   }
2399 no_ports:
2400   {
2401     g_mutex_unlock (&priv->lock);
2402     GST_WARNING ("failed to allocate ports %u", idx);
2403     return FALSE;
2404   }
2405 link_failed:
2406   {
2407     GST_WARNING ("failed to link stream %u", idx);
2408     gst_object_unref (priv->send_rtp_sink);
2409     priv->send_rtp_sink = NULL;
2410     g_mutex_unlock (&priv->lock);
2411     return FALSE;
2412   }
2413 }
2414
2415 /**
2416  * gst_rtsp_stream_leave_bin:
2417  * @stream: a #GstRTSPStream
2418  * @bin: (transfer none): a #GstBin
2419  * @rtpbin: (transfer none): a rtpbin #GstElement
2420  *
2421  * Remove the elements of @stream from @bin.
2422  *
2423  * Return: %TRUE on success.
2424  */
2425 gboolean
2426 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2427     GstElement * rtpbin)
2428 {
2429   GstRTSPStreamPrivate *priv;
2430   gint i;
2431   GList *l;
2432   gboolean is_tcp, is_udp;
2433
2434   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2435   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2436   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2437
2438   priv = stream->priv;
2439
2440   g_mutex_lock (&priv->lock);
2441   if (!priv->is_joined)
2442     goto was_not_joined;
2443
2444   /* all transports must be removed by now */
2445   if (priv->transports != NULL)
2446     goto transports_not_removed;
2447
2448   clear_tr_cache (priv, TRUE);
2449   clear_tr_cache (priv, FALSE);
2450
2451   GST_INFO ("stream %p leaving bin", stream);
2452
2453   if (priv->srcpad) {
2454     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2455
2456     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2457     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2458     gst_object_unref (priv->send_rtp_sink);
2459     priv->send_rtp_sink = NULL;
2460   } else if (priv->recv_rtp_src) {
2461     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2462     gst_object_unref (priv->recv_rtp_src);
2463     priv->recv_rtp_src = NULL;
2464   }
2465
2466   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2467
2468   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2469       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2470
2471
2472   for (i = 0; i < 2; i++) {
2473     if (priv->udpsink[i])
2474       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2475     if (priv->appsink[i])
2476       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2477     if (priv->appqueue[i])
2478       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2479     if (priv->udpqueue[i])
2480       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2481     if (priv->tee[i])
2482       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2483     if (priv->funnel[i])
2484       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2485     if (priv->appsrc[i])
2486       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2487     if (priv->udpsrc_v4[i] && (priv->sinkpad || i == 1)) {
2488       /* and set udpsrc to NULL now before removing */
2489       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2490       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2491       /* removing them should also nicely release the request
2492        * pads when they finalize */
2493       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2494     }
2495     if (priv->udpsrc_v6[i] && (priv->sinkpad || i == 1)) {
2496       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2497       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2498       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2499     }
2500
2501     for (l = priv->transport_sources; l; l = l->next) {
2502       GstRTSPMulticastTransportSource *s = l->data;
2503
2504       if (!s->udpsrc[i])
2505         continue;
2506
2507       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2508       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2509       gst_bin_remove (bin, s->udpsrc[i]);
2510     }
2511
2512     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2513       gst_bin_remove (bin, priv->udpsink[i]);
2514     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2515       gst_bin_remove (bin, priv->appsrc[i]);
2516     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2517       gst_bin_remove (bin, priv->appsink[i]);
2518     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2519       gst_bin_remove (bin, priv->appqueue[i]);
2520     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2521       gst_bin_remove (bin, priv->udpqueue[i]);
2522     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2523       gst_bin_remove (bin, priv->tee[i]);
2524     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2525       gst_bin_remove (bin, priv->funnel[i]);
2526
2527     if (priv->sinkpad || i == 1) {
2528       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2529       gst_object_unref (priv->recv_sink[i]);
2530       priv->recv_sink[i] = NULL;
2531     }
2532
2533     priv->udpsrc_v4[i] = NULL;
2534     priv->udpsrc_v6[i] = NULL;
2535     priv->udpsink[i] = NULL;
2536     priv->appsrc[i] = NULL;
2537     priv->appsink[i] = NULL;
2538     priv->appqueue[i] = NULL;
2539     priv->udpqueue[i] = NULL;
2540     priv->tee[i] = NULL;
2541     priv->funnel[i] = NULL;
2542   }
2543
2544   for (l = priv->transport_sources; l; l = l->next) {
2545     GstRTSPMulticastTransportSource *s = l->data;
2546     g_slice_free (GstRTSPMulticastTransportSource, s);
2547   }
2548   g_list_free (priv->transport_sources);
2549   priv->transport_sources = NULL;
2550
2551   if (priv->srcpad) {
2552     gst_object_unref (priv->send_src[0]);
2553     priv->send_src[0] = NULL;
2554   }
2555
2556   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2557   gst_object_unref (priv->send_src[1]);
2558   priv->send_src[1] = NULL;
2559
2560   g_object_unref (priv->session);
2561   priv->session = NULL;
2562   if (priv->caps)
2563     gst_caps_unref (priv->caps);
2564   priv->caps = NULL;
2565
2566   if (priv->srtpenc)
2567     gst_object_unref (priv->srtpenc);
2568   if (priv->srtpdec)
2569     gst_object_unref (priv->srtpdec);
2570
2571   priv->is_joined = FALSE;
2572   g_mutex_unlock (&priv->lock);
2573
2574   return TRUE;
2575
2576 was_not_joined:
2577   {
2578     g_mutex_unlock (&priv->lock);
2579     return TRUE;
2580   }
2581 transports_not_removed:
2582   {
2583     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2584     g_mutex_unlock (&priv->lock);
2585     return FALSE;
2586   }
2587 }
2588
2589 /**
2590  * gst_rtsp_stream_get_rtpinfo:
2591  * @stream: a #GstRTSPStream
2592  * @rtptime: (allow-none): result RTP timestamp
2593  * @seq: (allow-none): result RTP seqnum
2594  * @clock_rate: (allow-none): the clock rate
2595  * @running_time: (allow-none): result running-time
2596  *
2597  * Retrieve the current rtptime, seq and running-time. This is used to
2598  * construct a RTPInfo reply header.
2599  *
2600  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2601  */
2602 gboolean
2603 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2604     guint * rtptime, guint * seq, guint * clock_rate,
2605     GstClockTime * running_time)
2606 {
2607   GstRTSPStreamPrivate *priv;
2608   GstStructure *stats;
2609   GObjectClass *payobjclass;
2610
2611   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2612
2613   priv = stream->priv;
2614
2615   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2616
2617   g_mutex_lock (&priv->lock);
2618
2619   /* First try to extract the information from the last buffer on the sinks.
2620    * This will have a more accurate sequence number and timestamp, as between
2621    * the payloader and the sink there can be some queues
2622    */
2623   if (priv->udpsink[0] || priv->appsink[0]) {
2624     GstSample *last_sample;
2625
2626     if (priv->udpsink[0])
2627       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2628     else
2629       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2630
2631     if (last_sample) {
2632       GstCaps *caps;
2633       GstBuffer *buffer;
2634       GstSegment *segment;
2635       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2636
2637       caps = gst_sample_get_caps (last_sample);
2638       buffer = gst_sample_get_buffer (last_sample);
2639       segment = gst_sample_get_segment (last_sample);
2640
2641       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2642         if (seq) {
2643           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2644         }
2645
2646         if (rtptime) {
2647           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2648         }
2649
2650         gst_rtp_buffer_unmap (&rtp_buffer);
2651
2652         if (running_time) {
2653           *running_time =
2654               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2655               GST_BUFFER_TIMESTAMP (buffer));
2656         }
2657
2658         if (clock_rate) {
2659           GstStructure *s = gst_caps_get_structure (caps, 0);
2660
2661           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2662
2663           if (*clock_rate == 0 && running_time)
2664             *running_time = GST_CLOCK_TIME_NONE;
2665         }
2666         gst_sample_unref (last_sample);
2667
2668         goto done;
2669       } else {
2670         gst_sample_unref (last_sample);
2671       }
2672     }
2673   }
2674
2675   if (g_object_class_find_property (payobjclass, "stats")) {
2676     g_object_get (priv->payloader, "stats", &stats, NULL);
2677     if (stats == NULL)
2678       goto no_stats;
2679
2680     if (seq)
2681       gst_structure_get_uint (stats, "seqnum", seq);
2682
2683     if (rtptime)
2684       gst_structure_get_uint (stats, "timestamp", rtptime);
2685
2686     if (running_time)
2687       gst_structure_get_clock_time (stats, "running-time", running_time);
2688
2689     if (clock_rate) {
2690       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2691       if (*clock_rate == 0 && running_time)
2692         *running_time = GST_CLOCK_TIME_NONE;
2693     }
2694     gst_structure_free (stats);
2695   } else {
2696     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2697         !g_object_class_find_property (payobjclass, "timestamp"))
2698       goto no_stats;
2699
2700     if (seq)
2701       g_object_get (priv->payloader, "seqnum", seq, NULL);
2702
2703     if (rtptime)
2704       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2705
2706     if (running_time)
2707       *running_time = GST_CLOCK_TIME_NONE;
2708   }
2709
2710 done:
2711   g_mutex_unlock (&priv->lock);
2712
2713   return TRUE;
2714
2715   /* ERRORS */
2716 no_stats:
2717   {
2718     GST_WARNING ("Could not get payloader stats");
2719     g_mutex_unlock (&priv->lock);
2720     return FALSE;
2721   }
2722 }
2723
2724 /**
2725  * gst_rtsp_stream_get_caps:
2726  * @stream: a #GstRTSPStream
2727  *
2728  * Retrieve the current caps of @stream.
2729  *
2730  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2731  * after usage.
2732  */
2733 GstCaps *
2734 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2735 {
2736   GstRTSPStreamPrivate *priv;
2737   GstCaps *result;
2738
2739   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2740
2741   priv = stream->priv;
2742
2743   g_mutex_lock (&priv->lock);
2744   if ((result = priv->caps))
2745     gst_caps_ref (result);
2746   g_mutex_unlock (&priv->lock);
2747
2748   return result;
2749 }
2750
2751 /**
2752  * gst_rtsp_stream_recv_rtp:
2753  * @stream: a #GstRTSPStream
2754  * @buffer: (transfer full): a #GstBuffer
2755  *
2756  * Handle an RTP buffer for the stream. This method is usually called when a
2757  * message has been received from a client using the TCP transport.
2758  *
2759  * This function takes ownership of @buffer.
2760  *
2761  * Returns: a GstFlowReturn.
2762  */
2763 GstFlowReturn
2764 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2765 {
2766   GstRTSPStreamPrivate *priv;
2767   GstFlowReturn ret;
2768   GstElement *element;
2769
2770   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2771   priv = stream->priv;
2772   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2773   g_return_val_if_fail (priv->is_joined, FALSE);
2774
2775   g_mutex_lock (&priv->lock);
2776   if (priv->appsrc[0])
2777     element = gst_object_ref (priv->appsrc[0]);
2778   else
2779     element = NULL;
2780   g_mutex_unlock (&priv->lock);
2781
2782   if (element) {
2783     if (priv->appsrc_base_time[0] == -1) {
2784       /* Take current running_time. This timestamp will be put on
2785        * the first buffer of each stream because we are a live source and so we
2786        * timestamp with the running_time. When we are dealing with TCP, we also
2787        * only timestamp the first buffer (using the DISCONT flag) because a server
2788        * typically bursts data, for which we don't want to compensate by speeding
2789        * up the media. The other timestamps will be interpollated from this one
2790        * using the RTP timestamps. */
2791       GST_OBJECT_LOCK (element);
2792       if (GST_ELEMENT_CLOCK (element)) {
2793         GstClockTime now;
2794         GstClockTime base_time;
2795
2796         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2797         base_time = GST_ELEMENT_CAST (element)->base_time;
2798
2799         priv->appsrc_base_time[0] = now - base_time;
2800         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2801         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2802             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2803             GST_TIME_ARGS (base_time));
2804       }
2805       GST_OBJECT_UNLOCK (element);
2806     }
2807
2808     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2809     gst_object_unref (element);
2810   } else {
2811     ret = GST_FLOW_OK;
2812   }
2813   return ret;
2814 }
2815
2816 /**
2817  * gst_rtsp_stream_recv_rtcp:
2818  * @stream: a #GstRTSPStream
2819  * @buffer: (transfer full): a #GstBuffer
2820  *
2821  * Handle an RTCP buffer for the stream. This method is usually called when a
2822  * message has been received from a client using the TCP transport.
2823  *
2824  * This function takes ownership of @buffer.
2825  *
2826  * Returns: a GstFlowReturn.
2827  */
2828 GstFlowReturn
2829 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2830 {
2831   GstRTSPStreamPrivate *priv;
2832   GstFlowReturn ret;
2833   GstElement *element;
2834
2835   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2836   priv = stream->priv;
2837   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2838
2839   if (!priv->is_joined) {
2840     gst_buffer_unref (buffer);
2841     return GST_FLOW_NOT_LINKED;
2842   }
2843   g_mutex_lock (&priv->lock);
2844   if (priv->appsrc[1])
2845     element = gst_object_ref (priv->appsrc[1]);
2846   else
2847     element = NULL;
2848   g_mutex_unlock (&priv->lock);
2849
2850   if (element) {
2851     if (priv->appsrc_base_time[1] == -1) {
2852       /* Take current running_time. This timestamp will be put on
2853        * the first buffer of each stream because we are a live source and so we
2854        * timestamp with the running_time. When we are dealing with TCP, we also
2855        * only timestamp the first buffer (using the DISCONT flag) because a server
2856        * typically bursts data, for which we don't want to compensate by speeding
2857        * up the media. The other timestamps will be interpollated from this one
2858        * using the RTP timestamps. */
2859       GST_OBJECT_LOCK (element);
2860       if (GST_ELEMENT_CLOCK (element)) {
2861         GstClockTime now;
2862         GstClockTime base_time;
2863
2864         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2865         base_time = GST_ELEMENT_CAST (element)->base_time;
2866
2867         priv->appsrc_base_time[1] = now - base_time;
2868         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
2869         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2870             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2871             GST_TIME_ARGS (base_time));
2872       }
2873       GST_OBJECT_UNLOCK (element);
2874     }
2875
2876     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2877     gst_object_unref (element);
2878   } else {
2879     ret = GST_FLOW_OK;
2880     gst_buffer_unref (buffer);
2881   }
2882   return ret;
2883 }
2884
2885 /* must be called with lock */
2886 static gboolean
2887 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2888     gboolean add)
2889 {
2890   GstRTSPStreamPrivate *priv = stream->priv;
2891   const GstRTSPTransport *tr;
2892
2893   tr = gst_rtsp_stream_transport_get_transport (trans);
2894
2895   switch (tr->lower_transport) {
2896     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2897     {
2898       GstRTSPMulticastTransportSource *source;
2899       GstBin *bin;
2900
2901       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2902
2903       if (add) {
2904         gchar *host;
2905         gint i;
2906         GstPad *selpad, *pad;
2907
2908         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2909         source->transport = trans;
2910
2911         for (i = 0; i < 2; i++) {
2912           host =
2913               g_strdup_printf ("udp://%s:%d", tr->destination,
2914               (i == 0) ? tr->port.min : tr->port.max);
2915           source->udpsrc[i] =
2916               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2917           g_free (host);
2918           g_object_set (source->udpsrc[i], "loop", FALSE, NULL);
2919
2920           if (priv->srcpad) {
2921             /* we set and keep these to playing so that they don't cause NO_PREROLL return
2922              * values. This is only relevant for PLAY pipelines */
2923             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2924             gst_element_set_locked_state (source->udpsrc[i], TRUE);
2925           }
2926           /* add udpsrc */
2927           gst_bin_add (bin, source->udpsrc[i]);
2928
2929           /* and link to the funnel v4 */
2930           source->selpad[i] = selpad =
2931               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2932           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2933           gst_pad_link (pad, selpad);
2934           gst_object_unref (pad);
2935           gst_object_unref (selpad);
2936         }
2937
2938         priv->transport_sources =
2939             g_list_prepend (priv->transport_sources, source);
2940       } else {
2941         GList *l;
2942
2943         for (l = priv->transport_sources; l; l = l->next) {
2944           source = l->data;
2945
2946           if (source->transport == trans) {
2947             priv->transport_sources =
2948                 g_list_delete_link (priv->transport_sources, l);
2949             break;
2950           }
2951         }
2952
2953         if (l != NULL) {
2954           gint i;
2955
2956           for (i = 0; i < 2; i++) {
2957             /* Will automatically unlink everything */
2958             gst_bin_remove (bin,
2959                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2960
2961             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2962             gst_object_unref (source->udpsrc[i]);
2963
2964             gst_element_release_request_pad (priv->funnel[i],
2965                 source->selpad[i]);
2966           }
2967
2968           g_slice_free (GstRTSPMulticastTransportSource, source);
2969         }
2970       }
2971
2972       gst_object_unref (bin);
2973
2974       /* fall through for the generic case */
2975     }
2976     case GST_RTSP_LOWER_TRANS_UDP:
2977     {
2978       gchar *dest;
2979       gint min, max;
2980       guint ttl = 0;
2981
2982       dest = tr->destination;
2983       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2984         min = tr->port.min;
2985         max = tr->port.max;
2986         ttl = tr->ttl;
2987       } else {
2988         min = tr->client_port.min;
2989         max = tr->client_port.max;
2990       }
2991
2992       if (add) {
2993         if (ttl > 0) {
2994           GST_INFO ("setting ttl-mc %d", ttl);
2995           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2996           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2997         }
2998         GST_INFO ("adding %s:%d-%d", dest, min, max);
2999         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3000         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3001         priv->transports = g_list_prepend (priv->transports, trans);
3002       } else {
3003         GST_INFO ("removing %s:%d-%d", dest, min, max);
3004         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3005         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3006         priv->transports = g_list_remove (priv->transports, trans);
3007       }
3008       priv->transports_cookie++;
3009       break;
3010     }
3011     case GST_RTSP_LOWER_TRANS_TCP:
3012       if (add) {
3013         GST_INFO ("adding TCP %s", tr->destination);
3014         priv->transports = g_list_prepend (priv->transports, trans);
3015       } else {
3016         GST_INFO ("removing TCP %s", tr->destination);
3017         priv->transports = g_list_remove (priv->transports, trans);
3018       }
3019       priv->transports_cookie++;
3020       break;
3021     default:
3022       goto unknown_transport;
3023   }
3024   return TRUE;
3025
3026   /* ERRORS */
3027 unknown_transport:
3028   {
3029     GST_INFO ("Unknown transport %d", tr->lower_transport);
3030     return FALSE;
3031   }
3032 }
3033
3034
3035 /**
3036  * gst_rtsp_stream_add_transport:
3037  * @stream: a #GstRTSPStream
3038  * @trans: (transfer none): a #GstRTSPStreamTransport
3039  *
3040  * Add the transport in @trans to @stream. The media of @stream will
3041  * then also be send to the values configured in @trans.
3042  *
3043  * @stream must be joined to a bin.
3044  *
3045  * @trans must contain a valid #GstRTSPTransport.
3046  *
3047  * Returns: %TRUE if @trans was added
3048  */
3049 gboolean
3050 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3051     GstRTSPStreamTransport * trans)
3052 {
3053   GstRTSPStreamPrivate *priv;
3054   gboolean res;
3055
3056   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3057   priv = stream->priv;
3058   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3059   g_return_val_if_fail (priv->is_joined, FALSE);
3060
3061   g_mutex_lock (&priv->lock);
3062   res = update_transport (stream, trans, TRUE);
3063   g_mutex_unlock (&priv->lock);
3064
3065   return res;
3066 }
3067
3068 /**
3069  * gst_rtsp_stream_remove_transport:
3070  * @stream: a #GstRTSPStream
3071  * @trans: (transfer none): a #GstRTSPStreamTransport
3072  *
3073  * Remove the transport in @trans from @stream. The media of @stream will
3074  * not be sent to the values configured in @trans.
3075  *
3076  * @stream must be joined to a bin.
3077  *
3078  * @trans must contain a valid #GstRTSPTransport.
3079  *
3080  * Returns: %TRUE if @trans was removed
3081  */
3082 gboolean
3083 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3084     GstRTSPStreamTransport * trans)
3085 {
3086   GstRTSPStreamPrivate *priv;
3087   gboolean res;
3088
3089   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3090   priv = stream->priv;
3091   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3092   g_return_val_if_fail (priv->is_joined, FALSE);
3093
3094   g_mutex_lock (&priv->lock);
3095   res = update_transport (stream, trans, FALSE);
3096   g_mutex_unlock (&priv->lock);
3097
3098   return res;
3099 }
3100
3101 /**
3102  * gst_rtsp_stream_update_crypto:
3103  * @stream: a #GstRTSPStream
3104  * @ssrc: the SSRC
3105  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3106  *
3107  * Update the new crypto information for @ssrc in @stream. If information
3108  * for @ssrc did not exist, it will be added. If information
3109  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3110  * be removed from @stream.
3111  *
3112  * Returns: %TRUE if @crypto could be updated
3113  */
3114 gboolean
3115 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3116     guint ssrc, GstCaps * crypto)
3117 {
3118   GstRTSPStreamPrivate *priv;
3119
3120   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3121   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3122
3123   priv = stream->priv;
3124
3125   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3126
3127   g_mutex_lock (&priv->lock);
3128   if (crypto)
3129     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3130         gst_caps_ref (crypto));
3131   else
3132     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3133   g_mutex_unlock (&priv->lock);
3134
3135   return TRUE;
3136 }
3137
3138 /**
3139  * gst_rtsp_stream_get_rtp_socket:
3140  * @stream: a #GstRTSPStream
3141  * @family: the socket family
3142  *
3143  * Get the RTP socket from @stream for a @family.
3144  *
3145  * @stream must be joined to a bin.
3146  *
3147  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3148  * socket could be allocated for @family. Unref after usage
3149  */
3150 GSocket *
3151 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3152 {
3153   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3154   GSocket *socket;
3155   const gchar *name;
3156
3157   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3158   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3159       family == G_SOCKET_FAMILY_IPV6, NULL);
3160   g_return_val_if_fail (priv->udpsink[0], NULL);
3161
3162   if (family == G_SOCKET_FAMILY_IPV6)
3163     name = "socket-v6";
3164   else
3165     name = "socket";
3166
3167   g_object_get (priv->udpsink[0], name, &socket, NULL);
3168
3169   return socket;
3170 }
3171
3172 /**
3173  * gst_rtsp_stream_get_rtcp_socket:
3174  * @stream: a #GstRTSPStream
3175  * @family: the socket family
3176  *
3177  * Get the RTCP socket from @stream for a @family.
3178  *
3179  * @stream must be joined to a bin.
3180  *
3181  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3182  * socket could be allocated for @family. Unref after usage
3183  */
3184 GSocket *
3185 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3186 {
3187   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3188   GSocket *socket;
3189   const gchar *name;
3190
3191   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3192   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3193       family == G_SOCKET_FAMILY_IPV6, NULL);
3194   g_return_val_if_fail (priv->udpsink[1], NULL);
3195
3196   if (family == G_SOCKET_FAMILY_IPV6)
3197     name = "socket-v6";
3198   else
3199     name = "socket";
3200
3201   g_object_get (priv->udpsink[1], name, &socket, NULL);
3202
3203   return socket;
3204 }
3205
3206 /**
3207  * gst_rtsp_stream_set_seqnum:
3208  * @stream: a #GstRTSPStream
3209  * @seqnum: a new sequence number
3210  *
3211  * Configure the sequence number in the payloader of @stream to @seqnum.
3212  */
3213 void
3214 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3215 {
3216   GstRTSPStreamPrivate *priv;
3217
3218   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3219
3220   priv = stream->priv;
3221
3222   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3223 }
3224
3225 /**
3226  * gst_rtsp_stream_get_seqnum:
3227  * @stream: a #GstRTSPStream
3228  *
3229  * Get the configured sequence number in the payloader of @stream.
3230  *
3231  * Returns: the sequence number of the payloader.
3232  */
3233 guint16
3234 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3235 {
3236   GstRTSPStreamPrivate *priv;
3237   guint seqnum;
3238
3239   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3240
3241   priv = stream->priv;
3242
3243   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3244
3245   return seqnum;
3246 }
3247
3248 /**
3249  * gst_rtsp_stream_transport_filter:
3250  * @stream: a #GstRTSPStream
3251  * @func: (scope call) (allow-none): a callback
3252  * @user_data: (closure): user data passed to @func
3253  *
3254  * Call @func for each transport managed by @stream. The result value of @func
3255  * determines what happens to the transport. @func will be called with @stream
3256  * locked so no further actions on @stream can be performed from @func.
3257  *
3258  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3259  * @stream.
3260  *
3261  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3262  *
3263  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3264  * will also be added with an additional ref to the result #GList of this
3265  * function..
3266  *
3267  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3268  *
3269  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3270  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3271  * element in the #GList should be unreffed before the list is freed.
3272  */
3273 GList *
3274 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3275     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3276 {
3277   GstRTSPStreamPrivate *priv;
3278   GList *result, *walk, *next;
3279   GHashTable *visited = NULL;
3280   guint cookie;
3281
3282   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3283
3284   priv = stream->priv;
3285
3286   result = NULL;
3287   if (func)
3288     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3289
3290   g_mutex_lock (&priv->lock);
3291 restart:
3292   cookie = priv->transports_cookie;
3293   for (walk = priv->transports; walk; walk = next) {
3294     GstRTSPStreamTransport *trans = walk->data;
3295     GstRTSPFilterResult res;
3296     gboolean changed;
3297
3298     next = g_list_next (walk);
3299
3300     if (func) {
3301       /* only visit each transport once */
3302       if (g_hash_table_contains (visited, trans))
3303         continue;
3304
3305       g_hash_table_add (visited, g_object_ref (trans));
3306       g_mutex_unlock (&priv->lock);
3307
3308       res = func (stream, trans, user_data);
3309
3310       g_mutex_lock (&priv->lock);
3311     } else
3312       res = GST_RTSP_FILTER_REF;
3313
3314     changed = (cookie != priv->transports_cookie);
3315
3316     switch (res) {
3317       case GST_RTSP_FILTER_REMOVE:
3318         update_transport (stream, trans, FALSE);
3319         break;
3320       case GST_RTSP_FILTER_REF:
3321         result = g_list_prepend (result, g_object_ref (trans));
3322         break;
3323       case GST_RTSP_FILTER_KEEP:
3324       default:
3325         break;
3326     }
3327     if (changed)
3328       goto restart;
3329   }
3330   g_mutex_unlock (&priv->lock);
3331
3332   if (func)
3333     g_hash_table_unref (visited);
3334
3335   return result;
3336 }
3337
3338 static GstPadProbeReturn
3339 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3340 {
3341   GstRTSPStreamPrivate *priv;
3342   GstRTSPStream *stream;
3343
3344   stream = user_data;
3345   priv = stream->priv;
3346
3347   GST_DEBUG_OBJECT (pad, "now blocking");
3348
3349   g_mutex_lock (&priv->lock);
3350   priv->blocking = TRUE;
3351   g_mutex_unlock (&priv->lock);
3352
3353   gst_element_post_message (priv->payloader,
3354       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3355           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3356
3357   return GST_PAD_PROBE_OK;
3358 }
3359
3360 /**
3361  * gst_rtsp_stream_set_blocked:
3362  * @stream: a #GstRTSPStream
3363  * @blocked: boolean indicating we should block or unblock
3364  *
3365  * Blocks or unblocks the dataflow on @stream.
3366  *
3367  * Returns: %TRUE on success
3368  */
3369 gboolean
3370 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3371 {
3372   GstRTSPStreamPrivate *priv;
3373
3374   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3375
3376   priv = stream->priv;
3377
3378   g_mutex_lock (&priv->lock);
3379   if (blocked) {
3380     priv->blocking = FALSE;
3381     if (priv->blocked_id == 0) {
3382       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3383           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3384           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3385           g_object_ref (stream), g_object_unref);
3386     }
3387   } else {
3388     if (priv->blocked_id != 0) {
3389       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3390       priv->blocked_id = 0;
3391       priv->blocking = FALSE;
3392     }
3393   }
3394   g_mutex_unlock (&priv->lock);
3395
3396   return TRUE;
3397 }
3398
3399 /**
3400  * gst_rtsp_stream_is_blocking:
3401  * @stream: a #GstRTSPStream
3402  *
3403  * Check if @stream is blocking on a #GstBuffer.
3404  *
3405  * Returns: %TRUE if @stream is blocking
3406  */
3407 gboolean
3408 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3409 {
3410   GstRTSPStreamPrivate *priv;
3411   gboolean result;
3412
3413   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3414
3415   priv = stream->priv;
3416
3417   g_mutex_lock (&priv->lock);
3418   result = priv->blocking;
3419   g_mutex_unlock (&priv->lock);
3420
3421   return result;
3422 }
3423
3424 /**
3425  * gst_rtsp_stream_query_position:
3426  * @stream: a #GstRTSPStream
3427  *
3428  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3429  * the RTP parts of the pipeline and not the RTCP parts.
3430  *
3431  * Returns: %TRUE if the position could be queried
3432  */
3433 gboolean
3434 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3435 {
3436   GstRTSPStreamPrivate *priv;
3437   GstElement *sink;
3438   gboolean ret;
3439
3440   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3441
3442   priv = stream->priv;
3443
3444   g_mutex_lock (&priv->lock);
3445   /* depending on the transport type, it should query corresponding sink */
3446   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3447       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3448     sink = priv->udpsink[0];
3449   else
3450     sink = priv->appsink[0];
3451
3452   if (sink)
3453     gst_object_ref (sink);
3454   g_mutex_unlock (&priv->lock);
3455
3456   if (!sink)
3457     return FALSE;
3458
3459   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3460   gst_object_unref (sink);
3461
3462   return ret;
3463 }
3464
3465 /**
3466  * gst_rtsp_stream_query_stop:
3467  * @stream: a #GstRTSPStream
3468  *
3469  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3470  * the RTP parts of the pipeline and not the RTCP parts.
3471  *
3472  * Returns: %TRUE if the stop could be queried
3473  */
3474 gboolean
3475 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3476 {
3477   GstRTSPStreamPrivate *priv;
3478   GstElement *sink;
3479   GstQuery *query;
3480   gboolean ret;
3481
3482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3483
3484   priv = stream->priv;
3485
3486   g_mutex_lock (&priv->lock);
3487   /* depending on the transport type, it should query corresponding sink */
3488   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3489       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3490     sink = priv->udpsink[0];
3491   else
3492     sink = priv->appsink[0];
3493
3494   if (sink)
3495     gst_object_ref (sink);
3496   g_mutex_unlock (&priv->lock);
3497
3498   if (!sink)
3499     return FALSE;
3500
3501   query = gst_query_new_segment (GST_FORMAT_TIME);
3502   if ((ret = gst_element_query (sink, query))) {
3503     GstFormat format;
3504
3505     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3506     if (format != GST_FORMAT_TIME)
3507       *stop = -1;
3508   }
3509   gst_query_unref (query);
3510   gst_object_unref (sink);
3511
3512   return ret;
3513
3514 }