rtsp-stream: Put the timestamp of receival of the initial packet over TCP on the...
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include "rtsp-stream.h"
59
60 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
61      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
62
63 typedef struct
64 {
65   GstRTSPStreamTransport *transport;
66
67   /* RTP and RTCP source */
68   GstElement *udpsrc[2];
69   GstPad *selpad[2];
70 } GstRTSPMulticastTransportSource;
71
72 struct _GstRTSPStreamPrivate
73 {
74   GMutex lock;
75   guint idx;
76   /* Only one pad is ever set */
77   GstPad *srcpad, *sinkpad;
78   GstElement *payloader;
79   guint buffer_size;
80   gboolean is_joined;
81   gchar *control;
82
83   GstRTSPProfile profiles;
84   GstRTSPLowerTrans protocols;
85
86   /* pads on the rtpbin */
87   GstPad *send_rtp_sink;
88   GstPad *recv_rtp_src;
89   GstPad *recv_sink[2];
90   GstPad *send_src[2];
91
92   /* the RTPSession object */
93   GObject *session;
94
95   /* SRTP encoder/decoder */
96   GstElement *srtpenc;
97   GstElement *srtpdec;
98   GHashTable *keys;
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
101    * sockets */
102   GstElement *udpsrc_v4[2];
103
104   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
105    * sockets */
106   GstElement *udpsrc_v6[2];
107
108   GstElement *udpsink[2];
109
110   /* for TCP transport */
111   GstElement *appsrc[2];
112   GstClockTime appsrc_base_time[2];
113   GstElement *appqueue[2];
114   GstElement *appsink[2];
115
116   GstElement *tee[2];
117   GstElement *funnel[2];
118
119   /* retransmission */
120   GstElement *rtxsend;
121   guint rtx_pt;
122   GstClockTime rtx_time;
123
124   /* server ports for sending/receiving over ipv4 */
125   GstRTSPRange server_port_v4;
126   GstRTSPAddress *server_addr_v4;
127   gboolean have_ipv4;
128
129   /* server ports for sending/receiving over ipv6 */
130   GstRTSPRange server_port_v6;
131   GstRTSPAddress *server_addr_v6;
132   gboolean have_ipv6;
133
134   /* multicast addresses */
135   GstRTSPAddressPool *pool;
136   GstRTSPAddress *addr_v4;
137   GstRTSPAddress *addr_v6;
138
139   /* the caps of the stream */
140   gulong caps_sig;
141   GstCaps *caps;
142
143   /* transports we stream to */
144   guint n_active;
145   GList *transports;
146   guint transports_cookie;
147   GList *tr_cache_rtp;
148   GList *tr_cache_rtcp;
149   guint tr_cache_cookie_rtp;
150   guint tr_cache_cookie_rtcp;
151
152
153   /* UDP sources for UDP multicast transports */
154   GList *transport_sources;
155
156   gint dscp_qos;
157
158   /* stream blocking */
159   gulong blocked_id;
160   gboolean blocking;
161
162   /* pt->caps map for RECORD streams */
163   GHashTable *ptmap;
164 };
165
166 #define DEFAULT_CONTROL         NULL
167 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
168 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
169                                         GST_RTSP_LOWER_TRANS_TCP
170
171 enum
172 {
173   PROP_0,
174   PROP_CONTROL,
175   PROP_PROFILES,
176   PROP_PROTOCOLS,
177   PROP_LAST
178 };
179
180 enum
181 {
182   SIGNAL_NEW_RTP_ENCODER,
183   SIGNAL_NEW_RTCP_ENCODER,
184   SIGNAL_LAST
185 };
186
187 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
188 #define GST_CAT_DEFAULT rtsp_stream_debug
189
190 static GQuark ssrc_stream_map_key;
191
192 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
193     GValue * value, GParamSpec * pspec);
194 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
195     const GValue * value, GParamSpec * pspec);
196
197 static void gst_rtsp_stream_finalize (GObject * obj);
198
199 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
200
201 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
202
203 static void
204 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
205 {
206   GObjectClass *gobject_class;
207
208   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
209
210   gobject_class = G_OBJECT_CLASS (klass);
211
212   gobject_class->get_property = gst_rtsp_stream_get_property;
213   gobject_class->set_property = gst_rtsp_stream_set_property;
214   gobject_class->finalize = gst_rtsp_stream_finalize;
215
216   g_object_class_install_property (gobject_class, PROP_CONTROL,
217       g_param_spec_string ("control", "Control",
218           "The control string for this stream", DEFAULT_CONTROL,
219           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220
221   g_object_class_install_property (gobject_class, PROP_PROFILES,
222       g_param_spec_flags ("profiles", "Profiles",
223           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
224           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
225
226   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
227       g_param_spec_flags ("protocols", "Protocols",
228           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
229           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230
231   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
232       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
233       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
234       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
235
236   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
237       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
238       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
239       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
240
241   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
242
243   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
244 }
245
246 static void
247 gst_rtsp_stream_init (GstRTSPStream * stream)
248 {
249   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
250
251   GST_DEBUG ("new stream %p", stream);
252
253   stream->priv = priv;
254
255   priv->dscp_qos = -1;
256   priv->control = g_strdup (DEFAULT_CONTROL);
257   priv->profiles = DEFAULT_PROFILES;
258   priv->protocols = DEFAULT_PROTOCOLS;
259
260   g_mutex_init (&priv->lock);
261
262   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
263       NULL, (GDestroyNotify) gst_caps_unref);
264   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
265       (GDestroyNotify) gst_caps_unref);
266 }
267
268 static void
269 gst_rtsp_stream_finalize (GObject * obj)
270 {
271   GstRTSPStream *stream;
272   GstRTSPStreamPrivate *priv;
273
274   stream = GST_RTSP_STREAM (obj);
275   priv = stream->priv;
276
277   GST_DEBUG ("finalize stream %p", stream);
278
279   /* we really need to be unjoined now */
280   g_return_if_fail (!priv->is_joined);
281
282   if (priv->addr_v4)
283     gst_rtsp_address_free (priv->addr_v4);
284   if (priv->addr_v6)
285     gst_rtsp_address_free (priv->addr_v6);
286   if (priv->server_addr_v4)
287     gst_rtsp_address_free (priv->server_addr_v4);
288   if (priv->server_addr_v6)
289     gst_rtsp_address_free (priv->server_addr_v6);
290   if (priv->pool)
291     g_object_unref (priv->pool);
292   if (priv->rtxsend)
293     g_object_unref (priv->rtxsend);
294
295   gst_object_unref (priv->payloader);
296   if (priv->srcpad)
297     gst_object_unref (priv->srcpad);
298   if (priv->sinkpad)
299     gst_object_unref (priv->sinkpad);
300   g_free (priv->control);
301   g_mutex_clear (&priv->lock);
302
303   g_hash_table_unref (priv->keys);
304   g_hash_table_destroy (priv->ptmap);
305
306   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
307 }
308
309 static void
310 gst_rtsp_stream_get_property (GObject * object, guint propid,
311     GValue * value, GParamSpec * pspec)
312 {
313   GstRTSPStream *stream = GST_RTSP_STREAM (object);
314
315   switch (propid) {
316     case PROP_CONTROL:
317       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
318       break;
319     case PROP_PROFILES:
320       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
321       break;
322     case PROP_PROTOCOLS:
323       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
324       break;
325     default:
326       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
327   }
328 }
329
330 static void
331 gst_rtsp_stream_set_property (GObject * object, guint propid,
332     const GValue * value, GParamSpec * pspec)
333 {
334   GstRTSPStream *stream = GST_RTSP_STREAM (object);
335
336   switch (propid) {
337     case PROP_CONTROL:
338       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
339       break;
340     case PROP_PROFILES:
341       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
342       break;
343     case PROP_PROTOCOLS:
344       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
345       break;
346     default:
347       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
348   }
349 }
350
351 /**
352  * gst_rtsp_stream_new:
353  * @idx: an index
354  * @pad: a #GstPad
355  * @payloader: a #GstElement
356  *
357  * Create a new media stream with index @idx that handles RTP data on
358  * @pad and has a payloader element @payloader if @pad is a source pad
359  * or a depayloader element @payloader if @pad is a sink pad.
360  *
361  * Returns: (transfer full): a new #GstRTSPStream
362  */
363 GstRTSPStream *
364 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
365 {
366   GstRTSPStreamPrivate *priv;
367   GstRTSPStream *stream;
368
369   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
370   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
371
372   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
373   priv = stream->priv;
374   priv->idx = idx;
375   priv->payloader = gst_object_ref (payloader);
376   if (GST_PAD_IS_SRC (pad))
377     priv->srcpad = gst_object_ref (pad);
378   else
379     priv->sinkpad = gst_object_ref (pad);
380
381   return stream;
382 }
383
384 /**
385  * gst_rtsp_stream_get_index:
386  * @stream: a #GstRTSPStream
387  *
388  * Get the stream index.
389  *
390  * Return: the stream index.
391  */
392 guint
393 gst_rtsp_stream_get_index (GstRTSPStream * stream)
394 {
395   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
396
397   return stream->priv->idx;
398 }
399
400 /**
401  * gst_rtsp_stream_get_pt:
402  * @stream: a #GstRTSPStream
403  *
404  * Get the stream payload type.
405  *
406  * Return: the stream payload type.
407  */
408 guint
409 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
410 {
411   GstRTSPStreamPrivate *priv;
412   guint pt;
413
414   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
415
416   priv = stream->priv;
417
418   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
419
420   return pt;
421 }
422
423 /**
424  * gst_rtsp_stream_get_srcpad:
425  * @stream: a #GstRTSPStream
426  *
427  * Get the srcpad associated with @stream.
428  *
429  * Returns: (transfer full): the srcpad. Unref after usage.
430  */
431 GstPad *
432 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
433 {
434   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
435
436   if (!stream->priv->srcpad)
437     return NULL;
438
439   return gst_object_ref (stream->priv->srcpad);
440 }
441
442 /**
443  * gst_rtsp_stream_get_sinkpad:
444  * @stream: a #GstRTSPStream
445  *
446  * Get the sinkpad associated with @stream.
447  *
448  * Returns: (transfer full): the sinkpad. Unref after usage.
449  */
450 GstPad *
451 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
452 {
453   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
454
455   if (!stream->priv->sinkpad)
456     return NULL;
457
458   return gst_object_ref (stream->priv->sinkpad);
459 }
460
461 /**
462  * gst_rtsp_stream_get_control:
463  * @stream: a #GstRTSPStream
464  *
465  * Get the control string to identify this stream.
466  *
467  * Returns: (transfer full): the control string. g_free() after usage.
468  */
469 gchar *
470 gst_rtsp_stream_get_control (GstRTSPStream * stream)
471 {
472   GstRTSPStreamPrivate *priv;
473   gchar *result;
474
475   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
476
477   priv = stream->priv;
478
479   g_mutex_lock (&priv->lock);
480   if ((result = g_strdup (priv->control)) == NULL)
481     result = g_strdup_printf ("stream=%u", priv->idx);
482   g_mutex_unlock (&priv->lock);
483
484   return result;
485 }
486
487 /**
488  * gst_rtsp_stream_set_control:
489  * @stream: a #GstRTSPStream
490  * @control: a control string
491  *
492  * Set the control string in @stream.
493  */
494 void
495 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
496 {
497   GstRTSPStreamPrivate *priv;
498
499   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
500
501   priv = stream->priv;
502
503   g_mutex_lock (&priv->lock);
504   g_free (priv->control);
505   priv->control = g_strdup (control);
506   g_mutex_unlock (&priv->lock);
507 }
508
509 /**
510  * gst_rtsp_stream_has_control:
511  * @stream: a #GstRTSPStream
512  * @control: a control string
513  *
514  * Check if @stream has the control string @control.
515  *
516  * Returns: %TRUE is @stream has @control as the control string
517  */
518 gboolean
519 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
520 {
521   GstRTSPStreamPrivate *priv;
522   gboolean res;
523
524   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
525
526   priv = stream->priv;
527
528   g_mutex_lock (&priv->lock);
529   if (priv->control)
530     res = (g_strcmp0 (priv->control, control) == 0);
531   else {
532     guint streamid;
533
534     if (sscanf (control, "stream=%u", &streamid) > 0)
535       res = (streamid == priv->idx);
536     else
537       res = FALSE;
538   }
539   g_mutex_unlock (&priv->lock);
540
541   return res;
542 }
543
544 /**
545  * gst_rtsp_stream_set_mtu:
546  * @stream: a #GstRTSPStream
547  * @mtu: a new MTU
548  *
549  * Configure the mtu in the payloader of @stream to @mtu.
550  */
551 void
552 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
553 {
554   GstRTSPStreamPrivate *priv;
555
556   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
557
558   priv = stream->priv;
559
560   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
561
562   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
563 }
564
565 /**
566  * gst_rtsp_stream_get_mtu:
567  * @stream: a #GstRTSPStream
568  *
569  * Get the configured MTU in the payloader of @stream.
570  *
571  * Returns: the MTU of the payloader.
572  */
573 guint
574 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
575 {
576   GstRTSPStreamPrivate *priv;
577   guint mtu;
578
579   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
580
581   priv = stream->priv;
582
583   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
584
585   return mtu;
586 }
587
588 /* Update the dscp qos property on the udp sinks */
589 static void
590 update_dscp_qos (GstRTSPStream * stream)
591 {
592   GstRTSPStreamPrivate *priv;
593
594   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
595
596   priv = stream->priv;
597
598   if (priv->udpsink[0]) {
599     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
600         NULL);
601   }
602
603   if (priv->udpsink[1]) {
604     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
605         NULL);
606   }
607 }
608
609 /**
610  * gst_rtsp_stream_set_dscp_qos:
611  * @stream: a #GstRTSPStream
612  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
613  *
614  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
615  */
616 void
617 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
618 {
619   GstRTSPStreamPrivate *priv;
620
621   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
622
623   priv = stream->priv;
624
625   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
626
627   if (dscp_qos < -1 || dscp_qos > 63) {
628     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
629     return;
630   }
631
632   priv->dscp_qos = dscp_qos;
633
634   update_dscp_qos (stream);
635 }
636
637 /**
638  * gst_rtsp_stream_get_dscp_qos:
639  * @stream: a #GstRTSPStream
640  *
641  * Get the configured DSCP QoS in of the outgoing sockets.
642  *
643  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
644  */
645 gint
646 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
647 {
648   GstRTSPStreamPrivate *priv;
649
650   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
651
652   priv = stream->priv;
653
654   return priv->dscp_qos;
655 }
656
657 /**
658  * gst_rtsp_stream_is_transport_supported:
659  * @stream: a #GstRTSPStream
660  * @transport: (transfer none): a #GstRTSPTransport
661  *
662  * Check if @transport can be handled by stream
663  *
664  * Returns: %TRUE if @transport can be handled by @stream.
665  */
666 gboolean
667 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
668     GstRTSPTransport * transport)
669 {
670   GstRTSPStreamPrivate *priv;
671
672   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
673
674   priv = stream->priv;
675
676   g_mutex_lock (&priv->lock);
677   if (transport->trans != GST_RTSP_TRANS_RTP)
678     goto unsupported_transmode;
679
680   if (!(transport->profile & priv->profiles))
681     goto unsupported_profile;
682
683   if (!(transport->lower_transport & priv->protocols))
684     goto unsupported_ltrans;
685
686   g_mutex_unlock (&priv->lock);
687
688   return TRUE;
689
690   /* ERRORS */
691 unsupported_transmode:
692   {
693     GST_DEBUG ("unsupported transport mode %d", transport->trans);
694     g_mutex_unlock (&priv->lock);
695     return FALSE;
696   }
697 unsupported_profile:
698   {
699     GST_DEBUG ("unsupported profile %d", transport->profile);
700     g_mutex_unlock (&priv->lock);
701     return FALSE;
702   }
703 unsupported_ltrans:
704   {
705     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
706     g_mutex_unlock (&priv->lock);
707     return FALSE;
708   }
709 }
710
711 /**
712  * gst_rtsp_stream_set_profiles:
713  * @stream: a #GstRTSPStream
714  * @profiles: the new profiles
715  *
716  * Configure the allowed profiles for @stream.
717  */
718 void
719 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
720 {
721   GstRTSPStreamPrivate *priv;
722
723   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
724
725   priv = stream->priv;
726
727   g_mutex_lock (&priv->lock);
728   priv->profiles = profiles;
729   g_mutex_unlock (&priv->lock);
730 }
731
732 /**
733  * gst_rtsp_stream_get_profiles:
734  * @stream: a #GstRTSPStream
735  *
736  * Get the allowed profiles of @stream.
737  *
738  * Returns: a #GstRTSPProfile
739  */
740 GstRTSPProfile
741 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
742 {
743   GstRTSPStreamPrivate *priv;
744   GstRTSPProfile res;
745
746   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
747
748   priv = stream->priv;
749
750   g_mutex_lock (&priv->lock);
751   res = priv->profiles;
752   g_mutex_unlock (&priv->lock);
753
754   return res;
755 }
756
757 /**
758  * gst_rtsp_stream_set_protocols:
759  * @stream: a #GstRTSPStream
760  * @protocols: the new flags
761  *
762  * Configure the allowed lower transport for @stream.
763  */
764 void
765 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
766     GstRTSPLowerTrans protocols)
767 {
768   GstRTSPStreamPrivate *priv;
769
770   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
771
772   priv = stream->priv;
773
774   g_mutex_lock (&priv->lock);
775   priv->protocols = protocols;
776   g_mutex_unlock (&priv->lock);
777 }
778
779 /**
780  * gst_rtsp_stream_get_protocols:
781  * @stream: a #GstRTSPStream
782  *
783  * Get the allowed protocols of @stream.
784  *
785  * Returns: a #GstRTSPLowerTrans
786  */
787 GstRTSPLowerTrans
788 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
789 {
790   GstRTSPStreamPrivate *priv;
791   GstRTSPLowerTrans res;
792
793   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
794       GST_RTSP_LOWER_TRANS_UNKNOWN);
795
796   priv = stream->priv;
797
798   g_mutex_lock (&priv->lock);
799   res = priv->protocols;
800   g_mutex_unlock (&priv->lock);
801
802   return res;
803 }
804
805 /**
806  * gst_rtsp_stream_set_address_pool:
807  * @stream: a #GstRTSPStream
808  * @pool: (transfer none): a #GstRTSPAddressPool
809  *
810  * configure @pool to be used as the address pool of @stream.
811  */
812 void
813 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
814     GstRTSPAddressPool * pool)
815 {
816   GstRTSPStreamPrivate *priv;
817   GstRTSPAddressPool *old;
818
819   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
820
821   priv = stream->priv;
822
823   GST_LOG_OBJECT (stream, "set address pool %p", pool);
824
825   g_mutex_lock (&priv->lock);
826   if ((old = priv->pool) != pool)
827     priv->pool = pool ? g_object_ref (pool) : NULL;
828   else
829     old = NULL;
830   g_mutex_unlock (&priv->lock);
831
832   if (old)
833     g_object_unref (old);
834 }
835
836 /**
837  * gst_rtsp_stream_get_address_pool:
838  * @stream: a #GstRTSPStream
839  *
840  * Get the #GstRTSPAddressPool used as the address pool of @stream.
841  *
842  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
843  * usage.
844  */
845 GstRTSPAddressPool *
846 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
847 {
848   GstRTSPStreamPrivate *priv;
849   GstRTSPAddressPool *result;
850
851   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
852
853   priv = stream->priv;
854
855   g_mutex_lock (&priv->lock);
856   if ((result = priv->pool))
857     g_object_ref (result);
858   g_mutex_unlock (&priv->lock);
859
860   return result;
861 }
862
863 /**
864  * gst_rtsp_stream_get_multicast_address:
865  * @stream: a #GstRTSPStream
866  * @family: the #GSocketFamily
867  *
868  * Get the multicast address of @stream for @family.
869  *
870  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
871  * or %NULL when no address could be allocated. gst_rtsp_address_free()
872  * after usage.
873  */
874 GstRTSPAddress *
875 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
876     GSocketFamily family)
877 {
878   GstRTSPStreamPrivate *priv;
879   GstRTSPAddress *result;
880   GstRTSPAddress **addrp;
881   GstRTSPAddressFlags flags;
882
883   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
884
885   priv = stream->priv;
886
887   if (family == G_SOCKET_FAMILY_IPV6) {
888     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
889     addrp = &priv->addr_v6;
890   } else {
891     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
892     addrp = &priv->addr_v4;
893   }
894
895   g_mutex_lock (&priv->lock);
896   if (*addrp == NULL) {
897     if (priv->pool == NULL)
898       goto no_pool;
899
900     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
901
902     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
903     if (*addrp == NULL)
904       goto no_address;
905   }
906   result = gst_rtsp_address_copy (*addrp);
907   g_mutex_unlock (&priv->lock);
908
909   return result;
910
911   /* ERRORS */
912 no_pool:
913   {
914     GST_ERROR_OBJECT (stream, "no address pool specified");
915     g_mutex_unlock (&priv->lock);
916     return NULL;
917   }
918 no_address:
919   {
920     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
921     g_mutex_unlock (&priv->lock);
922     return NULL;
923   }
924 }
925
926 /**
927  * gst_rtsp_stream_reserve_address:
928  * @stream: a #GstRTSPStream
929  * @address: an address
930  * @port: a port
931  * @n_ports: n_ports
932  * @ttl: a TTL
933  *
934  * Reserve @address and @port as the address and port of @stream.
935  *
936  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
937  * the address could be reserved. gst_rtsp_address_free() after usage.
938  */
939 GstRTSPAddress *
940 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
941     const gchar * address, guint port, guint n_ports, guint ttl)
942 {
943   GstRTSPStreamPrivate *priv;
944   GstRTSPAddress *result;
945   GInetAddress *addr;
946   GSocketFamily family;
947   GstRTSPAddress **addrp;
948
949   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
950   g_return_val_if_fail (address != NULL, NULL);
951   g_return_val_if_fail (port > 0, NULL);
952   g_return_val_if_fail (n_ports > 0, NULL);
953   g_return_val_if_fail (ttl > 0, NULL);
954
955   priv = stream->priv;
956
957   addr = g_inet_address_new_from_string (address);
958   if (!addr) {
959     GST_ERROR ("failed to get inet addr from %s", address);
960     family = G_SOCKET_FAMILY_IPV4;
961   } else {
962     family = g_inet_address_get_family (addr);
963     g_object_unref (addr);
964   }
965
966   if (family == G_SOCKET_FAMILY_IPV6)
967     addrp = &priv->addr_v6;
968   else
969     addrp = &priv->addr_v4;
970
971   g_mutex_lock (&priv->lock);
972   if (*addrp == NULL) {
973     GstRTSPAddressPoolResult res;
974
975     if (priv->pool == NULL)
976       goto no_pool;
977
978     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
979         port, n_ports, ttl, addrp);
980     if (res != GST_RTSP_ADDRESS_POOL_OK)
981       goto no_address;
982   } else {
983     if (strcmp ((*addrp)->address, address) ||
984         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
985         (*addrp)->ttl != ttl)
986       goto different_address;
987   }
988   result = gst_rtsp_address_copy (*addrp);
989   g_mutex_unlock (&priv->lock);
990
991   return result;
992
993   /* ERRORS */
994 no_pool:
995   {
996     GST_ERROR_OBJECT (stream, "no address pool specified");
997     g_mutex_unlock (&priv->lock);
998     return NULL;
999   }
1000 no_address:
1001   {
1002     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1003         address);
1004     g_mutex_unlock (&priv->lock);
1005     return NULL;
1006   }
1007 different_address:
1008   {
1009     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1010         " reserved", address);
1011     g_mutex_unlock (&priv->lock);
1012     return NULL;
1013   }
1014 }
1015
1016 static gboolean
1017 alloc_ports_one_family (GstRTSPStream * stream, GstRTSPAddressPool * pool,
1018     gint buffer_size, GSocketFamily family, GstElement * udpsrc_out[2],
1019     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
1020     GstRTSPAddress ** server_addr_out)
1021 {
1022   GstRTSPStreamPrivate *priv = stream->priv;
1023   GstStateChangeReturn ret;
1024   GstElement *udpsrc0, *udpsrc1;
1025   GstElement *udpsink0, *udpsink1;
1026   GSocket *rtp_socket = NULL;
1027   GSocket *rtcp_socket;
1028   gint tmp_rtp, tmp_rtcp;
1029   guint count;
1030   gint rtpport, rtcpport;
1031   GList *rejected_addresses = NULL;
1032   GstRTSPAddress *addr = NULL;
1033   GInetAddress *inetaddr = NULL;
1034   GSocketAddress *rtp_sockaddr = NULL;
1035   GSocketAddress *rtcp_sockaddr = NULL;
1036   const gchar *multisink_socket;
1037
1038   if (family == G_SOCKET_FAMILY_IPV6)
1039     multisink_socket = "socket-v6";
1040   else
1041     multisink_socket = "socket";
1042
1043   udpsrc0 = NULL;
1044   udpsrc1 = NULL;
1045   udpsink0 = NULL;
1046   udpsink1 = NULL;
1047   count = 0;
1048
1049   /* Start with random port */
1050   tmp_rtp = 0;
1051
1052   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1053       G_SOCKET_PROTOCOL_UDP, NULL);
1054   if (!rtcp_socket)
1055     goto no_udp_protocol;
1056
1057   if (*server_addr_out)
1058     gst_rtsp_address_free (*server_addr_out);
1059
1060   /* try to allocate 2 UDP ports, the RTP port should be an even
1061    * number and the RTCP port should be the next (uneven) port */
1062 again:
1063
1064   if (rtp_socket == NULL) {
1065     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1066         G_SOCKET_PROTOCOL_UDP, NULL);
1067     if (!rtp_socket)
1068       goto no_udp_protocol;
1069   }
1070
1071   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1072     GstRTSPAddressFlags flags;
1073
1074     if (addr)
1075       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1076
1077     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1078     if (family == G_SOCKET_FAMILY_IPV6)
1079       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1080     else
1081       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1082
1083     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1084
1085     if (addr == NULL)
1086       goto no_ports;
1087
1088     tmp_rtp = addr->port;
1089
1090     g_clear_object (&inetaddr);
1091     inetaddr = g_inet_address_new_from_string (addr->address);
1092   } else {
1093     if (tmp_rtp != 0) {
1094       tmp_rtp += 2;
1095       if (++count > 20)
1096         goto no_ports;
1097     }
1098
1099     if (inetaddr == NULL)
1100       inetaddr = g_inet_address_new_any (family);
1101   }
1102
1103   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1104   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1105     g_object_unref (rtp_sockaddr);
1106     goto again;
1107   }
1108   g_object_unref (rtp_sockaddr);
1109
1110   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1111   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1112     g_clear_object (&rtp_sockaddr);
1113     goto socket_error;
1114   }
1115
1116   tmp_rtp =
1117       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1118   g_object_unref (rtp_sockaddr);
1119
1120   /* check if port is even */
1121   if ((tmp_rtp & 1) != 0) {
1122     /* port not even, close and allocate another */
1123     tmp_rtp++;
1124     g_clear_object (&rtp_socket);
1125     goto again;
1126   }
1127
1128   /* set port */
1129   tmp_rtcp = tmp_rtp + 1;
1130
1131   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1132   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1133     g_object_unref (rtcp_sockaddr);
1134     g_clear_object (&rtp_socket);
1135     goto again;
1136   }
1137   g_object_unref (rtcp_sockaddr);
1138
1139   g_clear_object (&inetaddr);
1140
1141   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1142   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1143
1144   if (udpsrc0 == NULL || udpsrc1 == NULL)
1145     goto no_udp_protocol;
1146
1147   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1148   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1149
1150   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1151   if (ret == GST_STATE_CHANGE_FAILURE)
1152     goto element_error;
1153   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1154   if (ret == GST_STATE_CHANGE_FAILURE)
1155     goto element_error;
1156
1157   /* all fine, do port check */
1158   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1159   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1160
1161   /* this should not happen... */
1162   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1163     goto port_error;
1164
1165   if (udpsink_out[0])
1166     udpsink0 = udpsink_out[0];
1167   else
1168     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1169
1170   if (!udpsink0)
1171     goto no_udp_protocol;
1172
1173   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1174   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1175
1176   if (udpsink_out[1])
1177     udpsink1 = udpsink_out[1];
1178   else
1179     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1180
1181   if (!udpsink1)
1182     goto no_udp_protocol;
1183
1184   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1185   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1186   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1187
1188   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1189   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1190   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1191   /* Needs to be async for RECORD streams, otherwise we will never go to
1192    * PLAYING because the sinks will wait for data while the udpsrc can't
1193    * provide data with timestamps in PAUSED. */
1194   if (priv->sinkpad)
1195     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1196   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1197   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1198   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1199   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1200   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1201
1202   /* we keep these elements, we will further configure them when the
1203    * client told us to really use the UDP ports. */
1204   udpsrc_out[0] = udpsrc0;
1205   udpsrc_out[1] = udpsrc1;
1206   udpsink_out[0] = udpsink0;
1207   udpsink_out[1] = udpsink1;
1208
1209   server_port_out->min = rtpport;
1210   server_port_out->max = rtcpport;
1211
1212   *server_addr_out = addr;
1213   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1214
1215   g_object_unref (rtp_socket);
1216   g_object_unref (rtcp_socket);
1217
1218   return TRUE;
1219
1220   /* ERRORS */
1221 no_udp_protocol:
1222   {
1223     goto cleanup;
1224   }
1225 no_ports:
1226   {
1227     goto cleanup;
1228   }
1229 port_error:
1230   {
1231     goto cleanup;
1232   }
1233 socket_error:
1234   {
1235     goto cleanup;
1236   }
1237 element_error:
1238   {
1239     goto cleanup;
1240   }
1241 cleanup:
1242   {
1243     if (udpsrc0) {
1244       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1245       gst_object_unref (udpsrc0);
1246     }
1247     if (udpsrc1) {
1248       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1249       gst_object_unref (udpsrc1);
1250     }
1251     if (udpsink0) {
1252       gst_element_set_state (udpsink0, GST_STATE_NULL);
1253       gst_object_unref (udpsink0);
1254     }
1255     if (inetaddr)
1256       g_object_unref (inetaddr);
1257     g_list_free_full (rejected_addresses,
1258         (GDestroyNotify) gst_rtsp_address_free);
1259     if (addr)
1260       gst_rtsp_address_free (addr);
1261     if (rtp_socket)
1262       g_object_unref (rtp_socket);
1263     if (rtcp_socket)
1264       g_object_unref (rtcp_socket);
1265     return FALSE;
1266   }
1267 }
1268
1269 /* must be called with lock */
1270 static gboolean
1271 alloc_ports (GstRTSPStream * stream)
1272 {
1273   GstRTSPStreamPrivate *priv = stream->priv;
1274
1275   priv->have_ipv4 =
1276       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1277       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1278       &priv->server_port_v4, &priv->server_addr_v4);
1279
1280   priv->have_ipv6 =
1281       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1282       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1283       &priv->server_port_v6, &priv->server_addr_v6);
1284
1285   return priv->have_ipv4 || priv->have_ipv6;
1286 }
1287
1288 /**
1289  * gst_rtsp_stream_get_server_port:
1290  * @stream: a #GstRTSPStream
1291  * @server_port: (out): result server port
1292  * @family: the port family to get
1293  *
1294  * Fill @server_port with the port pair used by the server. This function can
1295  * only be called when @stream has been joined.
1296  */
1297 void
1298 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1299     GstRTSPRange * server_port, GSocketFamily family)
1300 {
1301   GstRTSPStreamPrivate *priv;
1302
1303   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1304   priv = stream->priv;
1305   g_return_if_fail (priv->is_joined);
1306
1307   g_mutex_lock (&priv->lock);
1308   if (family == G_SOCKET_FAMILY_IPV4) {
1309     if (server_port)
1310       *server_port = priv->server_port_v4;
1311   } else {
1312     if (server_port)
1313       *server_port = priv->server_port_v6;
1314   }
1315   g_mutex_unlock (&priv->lock);
1316 }
1317
1318 /**
1319  * gst_rtsp_stream_get_rtpsession:
1320  * @stream: a #GstRTSPStream
1321  *
1322  * Get the RTP session of this stream.
1323  *
1324  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1325  */
1326 GObject *
1327 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1328 {
1329   GstRTSPStreamPrivate *priv;
1330   GObject *session;
1331
1332   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1333
1334   priv = stream->priv;
1335
1336   g_mutex_lock (&priv->lock);
1337   if ((session = priv->session))
1338     g_object_ref (session);
1339   g_mutex_unlock (&priv->lock);
1340
1341   return session;
1342 }
1343
1344 /**
1345  * gst_rtsp_stream_get_ssrc:
1346  * @stream: a #GstRTSPStream
1347  * @ssrc: (out): result ssrc
1348  *
1349  * Get the SSRC used by the RTP session of this stream. This function can only
1350  * be called when @stream has been joined.
1351  */
1352 void
1353 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1354 {
1355   GstRTSPStreamPrivate *priv;
1356
1357   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1358   priv = stream->priv;
1359   g_return_if_fail (priv->is_joined);
1360
1361   g_mutex_lock (&priv->lock);
1362   if (ssrc && priv->session)
1363     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1364   g_mutex_unlock (&priv->lock);
1365 }
1366
1367 /**
1368  * gst_rtsp_stream_set_retransmission_time:
1369  * @stream: a #GstRTSPStream
1370  * @time: a #GstClockTime
1371  *
1372  * Set the amount of time to store retransmission packets.
1373  */
1374 void
1375 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1376     GstClockTime time)
1377 {
1378   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1379
1380   g_mutex_lock (&stream->priv->lock);
1381   stream->priv->rtx_time = time;
1382   if (stream->priv->rtxsend)
1383     g_object_set (stream->priv->rtxsend, "max-size-time",
1384         GST_TIME_AS_MSECONDS (time), NULL);
1385   g_mutex_unlock (&stream->priv->lock);
1386 }
1387
1388 /**
1389  * gst_rtsp_media_get_retransmission_time:
1390  * @media: a #GstRTSPMedia
1391  *
1392  * Get the amount of time to store retransmission data.
1393  *
1394  * Returns: the amount of time to store retransmission data.
1395  */
1396 GstClockTime
1397 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1398 {
1399   GstClockTime ret;
1400
1401   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1402
1403   g_mutex_lock (&stream->priv->lock);
1404   ret = stream->priv->rtx_time;
1405   g_mutex_unlock (&stream->priv->lock);
1406
1407   return ret;
1408 }
1409
1410 void
1411 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1412 {
1413   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1414
1415   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1416
1417   g_mutex_lock (&stream->priv->lock);
1418   stream->priv->rtx_pt = rtx_pt;
1419   if (stream->priv->rtxsend) {
1420     guint pt = gst_rtsp_stream_get_pt (stream);
1421     gchar *pt_s = g_strdup_printf ("%d", pt);
1422     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1423         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1424     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1425     g_free (pt_s);
1426     gst_structure_free (rtx_pt_map);
1427   }
1428   g_mutex_unlock (&stream->priv->lock);
1429 }
1430
1431 guint
1432 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1433 {
1434   guint rtx_pt;
1435
1436   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1437
1438   g_mutex_lock (&stream->priv->lock);
1439   rtx_pt = stream->priv->rtx_pt;
1440   g_mutex_unlock (&stream->priv->lock);
1441
1442   return rtx_pt;
1443 }
1444
1445 /* executed from streaming thread */
1446 static void
1447 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1448 {
1449   GstRTSPStreamPrivate *priv = stream->priv;
1450   GstCaps *newcaps, *oldcaps;
1451
1452   newcaps = gst_pad_get_current_caps (pad);
1453
1454   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1455       newcaps);
1456
1457   g_mutex_lock (&priv->lock);
1458   oldcaps = priv->caps;
1459   priv->caps = newcaps;
1460   g_mutex_unlock (&priv->lock);
1461
1462   if (oldcaps)
1463     gst_caps_unref (oldcaps);
1464 }
1465
1466 static void
1467 dump_structure (const GstStructure * s)
1468 {
1469   gchar *sstr;
1470
1471   sstr = gst_structure_to_string (s);
1472   GST_INFO ("structure: %s", sstr);
1473   g_free (sstr);
1474 }
1475
1476 static GstRTSPStreamTransport *
1477 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1478 {
1479   GstRTSPStreamPrivate *priv = stream->priv;
1480   GList *walk;
1481   GstRTSPStreamTransport *result = NULL;
1482   const gchar *tmp;
1483   gchar *dest;
1484   guint port;
1485
1486   if (rtcp_from == NULL)
1487     return NULL;
1488
1489   tmp = g_strrstr (rtcp_from, ":");
1490   if (tmp == NULL)
1491     return NULL;
1492
1493   port = atoi (tmp + 1);
1494   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1495
1496   g_mutex_lock (&priv->lock);
1497   GST_INFO ("finding %s:%d in %d transports", dest, port,
1498       g_list_length (priv->transports));
1499
1500   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1501     GstRTSPStreamTransport *trans = walk->data;
1502     const GstRTSPTransport *tr;
1503     gint min, max;
1504
1505     tr = gst_rtsp_stream_transport_get_transport (trans);
1506
1507     min = tr->client_port.min;
1508     max = tr->client_port.max;
1509
1510     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1511       result = trans;
1512       break;
1513     }
1514   }
1515   if (result)
1516     g_object_ref (result);
1517   g_mutex_unlock (&priv->lock);
1518
1519   g_free (dest);
1520
1521   return result;
1522 }
1523
1524 static GstRTSPStreamTransport *
1525 check_transport (GObject * source, GstRTSPStream * stream)
1526 {
1527   GstStructure *stats;
1528   GstRTSPStreamTransport *trans;
1529
1530   /* see if we have a stream to match with the origin of the RTCP packet */
1531   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1532   if (trans == NULL) {
1533     g_object_get (source, "stats", &stats, NULL);
1534     if (stats) {
1535       const gchar *rtcp_from;
1536
1537       dump_structure (stats);
1538
1539       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1540       if ((trans = find_transport (stream, rtcp_from))) {
1541         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1542             source);
1543         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1544             g_object_unref);
1545       }
1546       gst_structure_free (stats);
1547     }
1548   }
1549   return trans;
1550 }
1551
1552
1553 static void
1554 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1555 {
1556   GstRTSPStreamTransport *trans;
1557
1558   GST_INFO ("%p: new source %p", stream, source);
1559
1560   trans = check_transport (source, stream);
1561
1562   if (trans)
1563     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1564 }
1565
1566 static void
1567 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1568 {
1569   GST_INFO ("%p: new SDES %p", stream, source);
1570 }
1571
1572 static void
1573 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1574 {
1575   GstRTSPStreamTransport *trans;
1576
1577   trans = check_transport (source, stream);
1578
1579   if (trans) {
1580     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1581     gst_rtsp_stream_transport_keep_alive (trans);
1582   }
1583 #ifdef DUMP_STATS
1584   {
1585     GstStructure *stats;
1586     g_object_get (source, "stats", &stats, NULL);
1587     if (stats) {
1588       dump_structure (stats);
1589       gst_structure_free (stats);
1590     }
1591   }
1592 #endif
1593 }
1594
1595 static void
1596 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1597 {
1598   GST_INFO ("%p: source %p bye", stream, source);
1599 }
1600
1601 static void
1602 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1603 {
1604   GstRTSPStreamTransport *trans;
1605
1606   GST_INFO ("%p: source %p bye timeout", stream, source);
1607
1608   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1609     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1610     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1611   }
1612 }
1613
1614 static void
1615 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1616 {
1617   GstRTSPStreamTransport *trans;
1618
1619   GST_INFO ("%p: source %p timeout", stream, source);
1620
1621   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1622     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1623     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1624   }
1625 }
1626
1627 static void
1628 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1629 {
1630   if (is_rtp) {
1631     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1632     g_list_free (priv->tr_cache_rtp);
1633     priv->tr_cache_rtp = NULL;
1634   } else {
1635     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1636     g_list_free (priv->tr_cache_rtcp);
1637     priv->tr_cache_rtcp = NULL;
1638   }
1639 }
1640
1641 static GstFlowReturn
1642 handle_new_sample (GstAppSink * sink, gpointer user_data)
1643 {
1644   GstRTSPStreamPrivate *priv;
1645   GList *walk;
1646   GstSample *sample;
1647   GstBuffer *buffer;
1648   GstRTSPStream *stream;
1649   gboolean is_rtp;
1650
1651   sample = gst_app_sink_pull_sample (sink);
1652   if (!sample)
1653     return GST_FLOW_OK;
1654
1655   stream = (GstRTSPStream *) user_data;
1656   priv = stream->priv;
1657   buffer = gst_sample_get_buffer (sample);
1658
1659   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1660
1661   g_mutex_lock (&priv->lock);
1662   if (is_rtp) {
1663     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1664       clear_tr_cache (priv, is_rtp);
1665       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1666         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1667         priv->tr_cache_rtp =
1668             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1669       }
1670       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1671     }
1672   } else {
1673     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1674       clear_tr_cache (priv, is_rtp);
1675       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1676         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1677         priv->tr_cache_rtcp =
1678             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1679       }
1680       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1681     }
1682   }
1683   g_mutex_unlock (&priv->lock);
1684
1685   if (is_rtp) {
1686     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1687       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1688       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1689     }
1690   } else {
1691     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1692       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1693       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1694     }
1695   }
1696   gst_sample_unref (sample);
1697
1698   return GST_FLOW_OK;
1699 }
1700
1701 static GstAppSinkCallbacks sink_cb = {
1702   NULL,                         /* not interested in EOS */
1703   NULL,                         /* not interested in preroll samples */
1704   handle_new_sample,
1705 };
1706
1707 static GstElement *
1708 get_rtp_encoder (GstRTSPStream * stream, guint session)
1709 {
1710   GstRTSPStreamPrivate *priv = stream->priv;
1711
1712   if (priv->srtpenc == NULL) {
1713     gchar *name;
1714
1715     name = g_strdup_printf ("srtpenc_%u", session);
1716     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1717     g_free (name);
1718
1719     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1720   }
1721   return gst_object_ref (priv->srtpenc);
1722 }
1723
1724 static GstElement *
1725 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1726 {
1727   GstRTSPStreamPrivate *priv = stream->priv;
1728   GstElement *oldenc, *enc;
1729   GstPad *pad;
1730   gchar *name;
1731
1732   if (priv->idx != session)
1733     return NULL;
1734
1735   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1736
1737   oldenc = priv->srtpenc;
1738   enc = get_rtp_encoder (stream, session);
1739   name = g_strdup_printf ("rtp_sink_%d", session);
1740   pad = gst_element_get_request_pad (enc, name);
1741   g_free (name);
1742   gst_object_unref (pad);
1743
1744   if (oldenc == NULL)
1745     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1746         enc);
1747
1748   return enc;
1749 }
1750
1751 static GstElement *
1752 request_rtcp_encoder (GstElement * rtpbin, guint session,
1753     GstRTSPStream * stream)
1754 {
1755   GstRTSPStreamPrivate *priv = stream->priv;
1756   GstElement *oldenc, *enc;
1757   GstPad *pad;
1758   gchar *name;
1759
1760   if (priv->idx != session)
1761     return NULL;
1762
1763   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1764
1765   oldenc = priv->srtpenc;
1766   enc = get_rtp_encoder (stream, session);
1767   name = g_strdup_printf ("rtcp_sink_%d", session);
1768   pad = gst_element_get_request_pad (enc, name);
1769   g_free (name);
1770   gst_object_unref (pad);
1771
1772   if (oldenc == NULL)
1773     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1774         enc);
1775
1776   return enc;
1777 }
1778
1779 static GstCaps *
1780 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1781 {
1782   GstRTSPStreamPrivate *priv = stream->priv;
1783   GstCaps *caps;
1784
1785   GST_DEBUG ("request key %08x", ssrc);
1786
1787   g_mutex_lock (&priv->lock);
1788   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1789     gst_caps_ref (caps);
1790   g_mutex_unlock (&priv->lock);
1791
1792   return caps;
1793 }
1794
1795 static GstElement *
1796 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1797     GstRTSPStream * stream)
1798 {
1799   GstRTSPStreamPrivate *priv = stream->priv;
1800
1801   if (priv->idx != session)
1802     return NULL;
1803
1804   if (priv->srtpdec == NULL) {
1805     gchar *name;
1806
1807     name = g_strdup_printf ("srtpdec_%u", session);
1808     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1809     g_free (name);
1810
1811     g_signal_connect (priv->srtpdec, "request-key",
1812         (GCallback) request_key, stream);
1813   }
1814   return gst_object_ref (priv->srtpdec);
1815 }
1816
1817 static GstElement *
1818 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
1819 {
1820   GstElement *bin;
1821   GstPad *pad;
1822   GstStructure *pt_map;
1823   gchar *name;
1824   guint pt, rtx_pt;
1825   gchar *pt_s;
1826
1827   pt = gst_rtsp_stream_get_pt (stream);
1828   pt_s = g_strdup_printf ("%u", pt);
1829   rtx_pt = stream->priv->rtx_pt;
1830
1831   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1832
1833   bin = gst_bin_new (NULL);
1834   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1835   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1836       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1837   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1838       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1839   g_free (pt_s);
1840   gst_structure_free (pt_map);
1841   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1842
1843   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1844   name = g_strdup_printf ("src_%u", sessid);
1845   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1846   g_free (name);
1847   gst_object_unref (pad);
1848
1849   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1850   name = g_strdup_printf ("sink_%u", sessid);
1851   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1852   g_free (name);
1853   gst_object_unref (pad);
1854
1855   return bin;
1856 }
1857
1858 /**
1859  * gst_rtsp_stream_set_pt_map:
1860  * @stream: a #GstRTSPStream
1861  * @pt: the pt
1862  * @caps: a #GstCaps
1863  *
1864  * Configure a pt map between @pt and @caps.
1865  */
1866 void
1867 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
1868 {
1869   GstRTSPStreamPrivate *priv = stream->priv;
1870
1871   g_mutex_lock (&priv->lock);
1872   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
1873   g_mutex_unlock (&priv->lock);
1874 }
1875
1876 static GstCaps *
1877 request_pt_map (GstElement * rtpbin, guint session, guint pt,
1878     GstRTSPStream * stream)
1879 {
1880   GstRTSPStreamPrivate *priv = stream->priv;
1881   GstCaps *caps = NULL;
1882
1883   g_mutex_lock (&priv->lock);
1884
1885   if (priv->idx == session) {
1886     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
1887     if (caps) {
1888       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
1889       gst_caps_ref (caps);
1890     } else {
1891       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
1892     }
1893   }
1894
1895   g_mutex_unlock (&priv->lock);
1896
1897   return caps;
1898 }
1899
1900 static void
1901 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
1902 {
1903   GstRTSPStreamPrivate *priv = stream->priv;
1904   gchar *name;
1905   GstPadLinkReturn ret;
1906   guint sessid;
1907
1908   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
1909       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
1910
1911   name = gst_pad_get_name (pad);
1912   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
1913     g_free (name);
1914     return;
1915   }
1916   g_free (name);
1917
1918   if (priv->idx != sessid)
1919     return;
1920
1921   if (gst_pad_is_linked (priv->sinkpad)) {
1922     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
1923         GST_DEBUG_PAD_NAME (priv->sinkpad));
1924     return;
1925   }
1926
1927   /* link the RTP pad to the session manager, it should not really fail unless
1928    * this is not really an RTP pad */
1929   ret = gst_pad_link (pad, priv->sinkpad);
1930   if (ret != GST_PAD_LINK_OK)
1931     goto link_failed;
1932   priv->recv_rtp_src = gst_object_ref (pad);
1933
1934   return;
1935
1936 /* ERRORS */
1937 link_failed:
1938   {
1939     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
1940         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
1941   }
1942 }
1943
1944 static void
1945 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
1946     GstRTSPStream * stream)
1947 {
1948   /* TODO: What to do here other than this? */
1949   GST_DEBUG ("Stream %p: Got EOS", stream);
1950   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
1951 }
1952
1953 /**
1954  * gst_rtsp_stream_join_bin:
1955  * @stream: a #GstRTSPStream
1956  * @bin: (transfer none): a #GstBin to join
1957  * @rtpbin: (transfer none): a rtpbin element in @bin
1958  * @state: the target state of the new elements
1959  *
1960  * Join the #GstBin @bin that contains the element @rtpbin.
1961  *
1962  * @stream will link to @rtpbin, which must be inside @bin. The elements
1963  * added to @bin will be set to the state given in @state.
1964  *
1965  * Returns: %TRUE on success.
1966  */
1967 gboolean
1968 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1969     GstElement * rtpbin, GstState state)
1970 {
1971   GstRTSPStreamPrivate *priv;
1972   gint i;
1973   guint idx;
1974   gchar *name;
1975   GstPad *pad, *sinkpad, *selpad;
1976   GstPadLinkReturn ret;
1977
1978   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1979   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1980   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1981
1982   priv = stream->priv;
1983
1984   g_mutex_lock (&priv->lock);
1985   if (priv->is_joined)
1986     goto was_joined;
1987
1988   /* create a session with the same index as the stream */
1989   idx = priv->idx;
1990
1991   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1992
1993   if (!alloc_ports (stream))
1994     goto no_ports;
1995
1996   /* update the dscp qos field in the sinks */
1997   update_dscp_qos (stream);
1998
1999   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2000       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2001     /* For SRTP */
2002     g_signal_connect (rtpbin, "request-rtp-encoder",
2003         (GCallback) request_rtp_encoder, stream);
2004     g_signal_connect (rtpbin, "request-rtcp-encoder",
2005         (GCallback) request_rtcp_encoder, stream);
2006     g_signal_connect (rtpbin, "request-rtp-decoder",
2007         (GCallback) request_rtp_rtcp_decoder, stream);
2008     g_signal_connect (rtpbin, "request-rtcp-decoder",
2009         (GCallback) request_rtp_rtcp_decoder, stream);
2010   }
2011
2012   if (priv->rtx_time > 0 && priv->srcpad) {
2013     /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
2014     g_signal_connect (rtpbin, "request-aux-sender",
2015         (GCallback) request_aux_sender, stream);
2016   }
2017   if (priv->sinkpad) {
2018     g_signal_connect (rtpbin, "request-pt-map",
2019         (GCallback) request_pt_map, stream);
2020   }
2021
2022   /* get a pad for sending RTP */
2023   name = g_strdup_printf ("send_rtp_sink_%u", idx);
2024   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2025   g_free (name);
2026
2027   if (priv->srcpad) {
2028     /* link the RTP pad to the session manager, it should not really fail unless
2029      * this is not really an RTP pad */
2030     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2031     if (ret != GST_PAD_LINK_OK)
2032       goto link_failed;
2033   } else {
2034     /* Need to connect our sinkpad from here */
2035     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2036     /* EOS */
2037     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2038   }
2039
2040   /* get pads from the RTP session element for sending and receiving
2041    * RTP/RTCP*/
2042   name = g_strdup_printf ("send_rtp_src_%u", idx);
2043   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2044   g_free (name);
2045   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2046   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2047   g_free (name);
2048
2049   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2050   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2051   g_free (name);
2052   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2053   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2054   g_free (name);
2055
2056   /* get the session */
2057   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2058
2059   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2060       stream);
2061   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2062       stream);
2063   g_signal_connect (priv->session, "on-ssrc-active",
2064       (GCallback) on_ssrc_active, stream);
2065   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2066       stream);
2067   g_signal_connect (priv->session, "on-bye-timeout",
2068       (GCallback) on_bye_timeout, stream);
2069   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2070       stream);
2071
2072   for (i = 0; i < 2; i++) {
2073     GstPad *teepad, *queuepad;
2074     /* For the sender we create this bit of pipeline for both
2075      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2076      * we need to add a queue before appsink to make the pipeline
2077      * not block. For the TCP case, we want to pump data to the
2078      * client as fast as possible anyway.
2079      *
2080      * .--------.      .-----.    .---------.
2081      * | rtpbin |      | tee |    | udpsink |
2082      * |       send->sink   src->sink       |
2083      * '--------'      |     |    '---------'
2084      *                 |     |    .---------.    .---------.
2085      *                 |     |    |  queue  |    | appsink |
2086      *                 |    src->sink      src->sink       |
2087      *                 '-----'    '---------'    '---------'
2088      *
2089      * When only UDP is allowed, we skip the tee, queue and appsink and link the
2090      * udpsink directly to the session.
2091      */
2092     /* add udpsink */
2093     gst_bin_add (bin, priv->udpsink[i]);
2094     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2095
2096     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2097       /* make tee for RTP/RTCP */
2098       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2099       gst_bin_add (bin, priv->tee[i]);
2100
2101       /* and link to rtpbin send pad */
2102       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2103       gst_pad_link (priv->send_src[i], pad);
2104       gst_object_unref (pad);
2105
2106       /* link tee to udpsink */
2107       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2108       gst_pad_link (teepad, sinkpad);
2109       gst_object_unref (teepad);
2110
2111       /* make queue */
2112       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2113       gst_bin_add (bin, priv->appqueue[i]);
2114       /* and link to tee */
2115       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2116       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2117       gst_pad_link (teepad, pad);
2118       gst_object_unref (pad);
2119       gst_object_unref (teepad);
2120
2121       /* make appsink */
2122       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2123       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2124       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2125       gst_bin_add (bin, priv->appsink[i]);
2126       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2127           &sink_cb, stream, NULL);
2128       /* and link to queue */
2129       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2130       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2131       gst_pad_link (queuepad, pad);
2132       gst_object_unref (pad);
2133       gst_object_unref (queuepad);
2134     } else {
2135       /* else only udpsink needed, link it to the session */
2136       gst_pad_link (priv->send_src[i], sinkpad);
2137     }
2138     gst_object_unref (sinkpad);
2139
2140     /* For the receiver we create this bit of pipeline for both
2141      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2142      * and it is all funneled into the rtpbin receive pad.
2143      *
2144      * .--------.     .--------.    .--------.
2145      * | udpsrc |     | funnel |    | rtpbin |
2146      * |       src->sink      src->sink      |
2147      * '--------'     |        |    '--------'
2148      * .--------.     |        |
2149      * | appsrc |     |        |
2150      * |       src->sink       |
2151      * '--------'     '--------'
2152      */
2153     /* make funnel for the RTP/RTCP receivers */
2154     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2155     gst_bin_add (bin, priv->funnel[i]);
2156
2157     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2158     gst_pad_link (pad, priv->recv_sink[i]);
2159     gst_object_unref (pad);
2160
2161     if (priv->udpsrc_v4[i]) {
2162       if (priv->srcpad) {
2163         /* we set and keep these to playing so that they don't cause NO_PREROLL return
2164          * values. This is only relevant for PLAY pipelines */
2165         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2166         gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2167       }
2168       /* add udpsrc */
2169       gst_bin_add (bin, priv->udpsrc_v4[i]);
2170
2171       /* and link to the funnel v4 */
2172       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2173       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2174       gst_pad_link (pad, selpad);
2175       gst_object_unref (pad);
2176       gst_object_unref (selpad);
2177     }
2178
2179     if (priv->udpsrc_v6[i]) {
2180       if (priv->srcpad) {
2181         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2182         gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2183       }
2184       gst_bin_add (bin, priv->udpsrc_v6[i]);
2185
2186       /* and link to the funnel v6 */
2187       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2188       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2189       gst_pad_link (pad, selpad);
2190       gst_object_unref (pad);
2191       gst_object_unref (selpad);
2192     }
2193
2194     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2195       /* make and add appsrc */
2196       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2197       priv->appsrc_base_time[i] = -1;
2198       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2199       gst_bin_add (bin, priv->appsrc[i]);
2200       /* and link to the funnel */
2201       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2202       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2203       gst_pad_link (pad, selpad);
2204       gst_object_unref (pad);
2205       gst_object_unref (selpad);
2206     }
2207
2208     /* check if we need to set to a special state */
2209     if (state != GST_STATE_NULL) {
2210       if (priv->udpsink[i])
2211         gst_element_set_state (priv->udpsink[i], state);
2212       if (priv->appsink[i])
2213         gst_element_set_state (priv->appsink[i], state);
2214       if (priv->appqueue[i])
2215         gst_element_set_state (priv->appqueue[i], state);
2216       if (priv->tee[i])
2217         gst_element_set_state (priv->tee[i], state);
2218       if (priv->funnel[i])
2219         gst_element_set_state (priv->funnel[i], state);
2220       if (priv->appsrc[i])
2221         gst_element_set_state (priv->appsrc[i], state);
2222     }
2223   }
2224
2225   /* be notified of caps changes */
2226   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2227       (GCallback) caps_notify, stream);
2228
2229   priv->is_joined = TRUE;
2230   g_mutex_unlock (&priv->lock);
2231
2232   return TRUE;
2233
2234   /* ERRORS */
2235 was_joined:
2236   {
2237     g_mutex_unlock (&priv->lock);
2238     return TRUE;
2239   }
2240 no_ports:
2241   {
2242     g_mutex_unlock (&priv->lock);
2243     GST_WARNING ("failed to allocate ports %u", idx);
2244     return FALSE;
2245   }
2246 link_failed:
2247   {
2248     GST_WARNING ("failed to link stream %u", idx);
2249     gst_object_unref (priv->send_rtp_sink);
2250     priv->send_rtp_sink = NULL;
2251     g_mutex_unlock (&priv->lock);
2252     return FALSE;
2253   }
2254 }
2255
2256 /**
2257  * gst_rtsp_stream_leave_bin:
2258  * @stream: a #GstRTSPStream
2259  * @bin: (transfer none): a #GstBin
2260  * @rtpbin: (transfer none): a rtpbin #GstElement
2261  *
2262  * Remove the elements of @stream from @bin.
2263  *
2264  * Return: %TRUE on success.
2265  */
2266 gboolean
2267 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2268     GstElement * rtpbin)
2269 {
2270   GstRTSPStreamPrivate *priv;
2271   gint i;
2272   GList *l;
2273
2274   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2275   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2276   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2277
2278   priv = stream->priv;
2279
2280   g_mutex_lock (&priv->lock);
2281   if (!priv->is_joined)
2282     goto was_not_joined;
2283
2284   /* all transports must be removed by now */
2285   if (priv->transports != NULL)
2286     goto transports_not_removed;
2287
2288   clear_tr_cache (priv, TRUE);
2289   clear_tr_cache (priv, FALSE);
2290
2291   GST_INFO ("stream %p leaving bin", stream);
2292
2293   if (priv->srcpad) {
2294     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2295   } else if (priv->recv_rtp_src) {
2296     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2297     gst_object_unref (priv->recv_rtp_src);
2298     priv->recv_rtp_src = NULL;
2299   }
2300   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2301   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2302   gst_object_unref (priv->send_rtp_sink);
2303   priv->send_rtp_sink = NULL;
2304
2305   for (i = 0; i < 2; i++) {
2306     if (priv->udpsink[i])
2307       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2308     if (priv->appsink[i])
2309       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2310     if (priv->appqueue[i])
2311       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2312     if (priv->tee[i])
2313       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2314     if (priv->funnel[i])
2315       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2316     if (priv->appsrc[i])
2317       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2318     if (priv->udpsrc_v4[i]) {
2319       /* and set udpsrc to NULL now before removing */
2320       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2321       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2322       /* removing them should also nicely release the request
2323        * pads when they finalize */
2324       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2325     }
2326     if (priv->udpsrc_v6[i]) {
2327       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2328       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2329       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2330     }
2331
2332     for (l = priv->transport_sources; l; l = l->next) {
2333       GstRTSPMulticastTransportSource *s = l->data;
2334
2335       if (!s->udpsrc[i])
2336         continue;
2337
2338       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2339       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2340       gst_bin_remove (bin, s->udpsrc[i]);
2341     }
2342
2343     if (priv->udpsink[i])
2344       gst_bin_remove (bin, priv->udpsink[i]);
2345     if (priv->appsrc[i])
2346       gst_bin_remove (bin, priv->appsrc[i]);
2347     if (priv->appsink[i])
2348       gst_bin_remove (bin, priv->appsink[i]);
2349     if (priv->appqueue[i])
2350       gst_bin_remove (bin, priv->appqueue[i]);
2351     if (priv->tee[i])
2352       gst_bin_remove (bin, priv->tee[i]);
2353     if (priv->funnel[i])
2354       gst_bin_remove (bin, priv->funnel[i]);
2355
2356     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2357     gst_object_unref (priv->recv_sink[i]);
2358     priv->recv_sink[i] = NULL;
2359
2360     priv->udpsrc_v4[i] = NULL;
2361     priv->udpsrc_v6[i] = NULL;
2362     priv->udpsink[i] = NULL;
2363     priv->appsrc[i] = NULL;
2364     priv->appsink[i] = NULL;
2365     priv->appqueue[i] = NULL;
2366     priv->tee[i] = NULL;
2367     priv->funnel[i] = NULL;
2368   }
2369
2370   for (l = priv->transport_sources; l; l = l->next) {
2371     GstRTSPMulticastTransportSource *s = l->data;
2372     g_slice_free (GstRTSPMulticastTransportSource, s);
2373   }
2374   g_list_free (priv->transport_sources);
2375   priv->transport_sources = NULL;
2376
2377   gst_object_unref (priv->send_src[0]);
2378   priv->send_src[0] = NULL;
2379
2380   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2381   gst_object_unref (priv->send_src[1]);
2382   priv->send_src[1] = NULL;
2383
2384   g_object_unref (priv->session);
2385   priv->session = NULL;
2386   if (priv->caps)
2387     gst_caps_unref (priv->caps);
2388   priv->caps = NULL;
2389
2390   if (priv->srtpenc)
2391     gst_object_unref (priv->srtpenc);
2392   if (priv->srtpdec)
2393     gst_object_unref (priv->srtpdec);
2394
2395   priv->is_joined = FALSE;
2396   g_mutex_unlock (&priv->lock);
2397
2398   return TRUE;
2399
2400 was_not_joined:
2401   {
2402     g_mutex_unlock (&priv->lock);
2403     return TRUE;
2404   }
2405 transports_not_removed:
2406   {
2407     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2408     g_mutex_unlock (&priv->lock);
2409     return FALSE;
2410   }
2411 }
2412
2413 /**
2414  * gst_rtsp_stream_get_rtpinfo:
2415  * @stream: a #GstRTSPStream
2416  * @rtptime: (allow-none): result RTP timestamp
2417  * @seq: (allow-none): result RTP seqnum
2418  * @clock_rate: (allow-none): the clock rate
2419  * @running_time: (allow-none): result running-time
2420  *
2421  * Retrieve the current rtptime, seq and running-time. This is used to
2422  * construct a RTPInfo reply header.
2423  *
2424  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2425  */
2426 gboolean
2427 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2428     guint * rtptime, guint * seq, guint * clock_rate,
2429     GstClockTime * running_time)
2430 {
2431   GstRTSPStreamPrivate *priv;
2432   GstStructure *stats;
2433   GObjectClass *payobjclass;
2434
2435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2436
2437   priv = stream->priv;
2438
2439   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2440
2441   g_mutex_lock (&priv->lock);
2442
2443   if (g_object_class_find_property (payobjclass, "stats")) {
2444     g_object_get (priv->payloader, "stats", &stats, NULL);
2445     if (stats == NULL)
2446       goto no_stats;
2447
2448     if (seq)
2449       gst_structure_get_uint (stats, "seqnum", seq);
2450
2451     if (rtptime)
2452       gst_structure_get_uint (stats, "timestamp", rtptime);
2453
2454     if (running_time)
2455       gst_structure_get_clock_time (stats, "running-time", running_time);
2456
2457     if (clock_rate) {
2458       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2459       if (*clock_rate == 0 && running_time)
2460         *running_time = GST_CLOCK_TIME_NONE;
2461     }
2462     gst_structure_free (stats);
2463   } else {
2464     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2465         !g_object_class_find_property (payobjclass, "timestamp"))
2466       goto no_stats;
2467
2468     if (seq)
2469       g_object_get (priv->payloader, "seqnum", seq, NULL);
2470
2471     if (rtptime)
2472       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2473
2474     if (running_time)
2475       *running_time = GST_CLOCK_TIME_NONE;
2476   }
2477   g_mutex_unlock (&priv->lock);
2478
2479   return TRUE;
2480
2481   /* ERRORS */
2482 no_stats:
2483   {
2484     GST_WARNING ("Could not get payloader stats");
2485     g_mutex_unlock (&priv->lock);
2486     return FALSE;
2487   }
2488 }
2489
2490 /**
2491  * gst_rtsp_stream_get_caps:
2492  * @stream: a #GstRTSPStream
2493  *
2494  * Retrieve the current caps of @stream.
2495  *
2496  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2497  * after usage.
2498  */
2499 GstCaps *
2500 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2501 {
2502   GstRTSPStreamPrivate *priv;
2503   GstCaps *result;
2504
2505   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2506
2507   priv = stream->priv;
2508
2509   g_mutex_lock (&priv->lock);
2510   if ((result = priv->caps))
2511     gst_caps_ref (result);
2512   g_mutex_unlock (&priv->lock);
2513
2514   return result;
2515 }
2516
2517 /**
2518  * gst_rtsp_stream_recv_rtp:
2519  * @stream: a #GstRTSPStream
2520  * @buffer: (transfer full): a #GstBuffer
2521  *
2522  * Handle an RTP buffer for the stream. This method is usually called when a
2523  * message has been received from a client using the TCP transport.
2524  *
2525  * This function takes ownership of @buffer.
2526  *
2527  * Returns: a GstFlowReturn.
2528  */
2529 GstFlowReturn
2530 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2531 {
2532   GstRTSPStreamPrivate *priv;
2533   GstFlowReturn ret;
2534   GstElement *element;
2535
2536   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2537   priv = stream->priv;
2538   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2539   g_return_val_if_fail (priv->is_joined, FALSE);
2540
2541   g_mutex_lock (&priv->lock);
2542   if (priv->appsrc[0])
2543     element = gst_object_ref (priv->appsrc[0]);
2544   else
2545     element = NULL;
2546   g_mutex_unlock (&priv->lock);
2547
2548   if (element) {
2549     if (priv->appsrc_base_time[0] == -1) {
2550       /* Take current running_time. This timestamp will be put on
2551        * the first buffer of each stream because we are a live source and so we
2552        * timestamp with the running_time. When we are dealing with TCP, we also
2553        * only timestamp the first buffer (using the DISCONT flag) because a server
2554        * typically bursts data, for which we don't want to compensate by speeding
2555        * up the media. The other timestamps will be interpollated from this one
2556        * using the RTP timestamps. */
2557       GST_OBJECT_LOCK (element);
2558       if (GST_ELEMENT_CLOCK (element)) {
2559         GstClockTime now;
2560         GstClockTime base_time;
2561
2562         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2563         base_time = GST_ELEMENT_CAST (element)->base_time;
2564
2565         priv->appsrc_base_time[0] = now - base_time;
2566         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2567         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2568             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2569             GST_TIME_ARGS (base_time));
2570       }
2571       GST_OBJECT_UNLOCK (element);
2572     }
2573
2574     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2575     gst_object_unref (element);
2576   } else {
2577     ret = GST_FLOW_OK;
2578   }
2579   return ret;
2580 }
2581
2582 /**
2583  * gst_rtsp_stream_recv_rtcp:
2584  * @stream: a #GstRTSPStream
2585  * @buffer: (transfer full): a #GstBuffer
2586  *
2587  * Handle an RTCP buffer for the stream. This method is usually called when a
2588  * message has been received from a client using the TCP transport.
2589  *
2590  * This function takes ownership of @buffer.
2591  *
2592  * Returns: a GstFlowReturn.
2593  */
2594 GstFlowReturn
2595 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2596 {
2597   GstRTSPStreamPrivate *priv;
2598   GstFlowReturn ret;
2599   GstElement *element;
2600
2601   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2602   priv = stream->priv;
2603   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2604
2605   if (!priv->is_joined) {
2606     gst_buffer_unref (buffer);
2607     return GST_FLOW_NOT_LINKED;
2608   }
2609   g_mutex_lock (&priv->lock);
2610   if (priv->appsrc[1])
2611     element = gst_object_ref (priv->appsrc[1]);
2612   else
2613     element = NULL;
2614   g_mutex_unlock (&priv->lock);
2615
2616   if (element) {
2617     if (priv->appsrc_base_time[1] == -1) {
2618       /* Take current running_time. This timestamp will be put on
2619        * the first buffer of each stream because we are a live source and so we
2620        * timestamp with the running_time. When we are dealing with TCP, we also
2621        * only timestamp the first buffer (using the DISCONT flag) because a server
2622        * typically bursts data, for which we don't want to compensate by speeding
2623        * up the media. The other timestamps will be interpollated from this one
2624        * using the RTP timestamps. */
2625       GST_OBJECT_LOCK (element);
2626       if (GST_ELEMENT_CLOCK (element)) {
2627         GstClockTime now;
2628         GstClockTime base_time;
2629
2630         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2631         base_time = GST_ELEMENT_CAST (element)->base_time;
2632
2633         priv->appsrc_base_time[1] = now - base_time;
2634         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
2635         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2636             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2637             GST_TIME_ARGS (base_time));
2638       }
2639       GST_OBJECT_UNLOCK (element);
2640     }
2641
2642     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2643     gst_object_unref (element);
2644   } else {
2645     ret = GST_FLOW_OK;
2646     gst_buffer_unref (buffer);
2647   }
2648   return ret;
2649 }
2650
2651 /* must be called with lock */
2652 static gboolean
2653 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2654     gboolean add)
2655 {
2656   GstRTSPStreamPrivate *priv = stream->priv;
2657   const GstRTSPTransport *tr;
2658
2659   tr = gst_rtsp_stream_transport_get_transport (trans);
2660
2661   switch (tr->lower_transport) {
2662     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2663     {
2664       GstRTSPMulticastTransportSource *source;
2665       GstBin *bin;
2666
2667       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2668
2669       if (add) {
2670         gchar *host;
2671         gint i;
2672         GstPad *selpad, *pad;
2673
2674         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2675         source->transport = trans;
2676
2677         for (i = 0; i < 2; i++) {
2678           host =
2679               g_strdup_printf ("udp://%s:%d", tr->destination,
2680               (i == 0) ? tr->port.min : tr->port.max);
2681           source->udpsrc[i] =
2682               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2683           g_free (host);
2684
2685           if (priv->srcpad) {
2686             /* we set and keep these to playing so that they don't cause NO_PREROLL return
2687              * values. This is only relevant for PLAY pipelines */
2688             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2689             gst_element_set_locked_state (source->udpsrc[i], TRUE);
2690           }
2691           /* add udpsrc */
2692           gst_bin_add (bin, source->udpsrc[i]);
2693
2694           /* and link to the funnel v4 */
2695           source->selpad[i] = selpad =
2696               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2697           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2698           gst_pad_link (pad, selpad);
2699           gst_object_unref (pad);
2700           gst_object_unref (selpad);
2701         }
2702         gst_object_unref (bin);
2703
2704         priv->transport_sources =
2705             g_list_prepend (priv->transport_sources, source);
2706       } else {
2707         GList *l;
2708
2709         for (l = priv->transport_sources; l; l = l->next) {
2710           source = l->data;
2711
2712           if (source->transport == trans) {
2713             priv->transport_sources =
2714                 g_list_delete_link (priv->transport_sources, l);
2715             break;
2716           }
2717         }
2718
2719         if (l != NULL) {
2720           gint i;
2721
2722           for (i = 0; i < 2; i++) {
2723             /* Will automatically unlink everything */
2724             gst_bin_remove (bin,
2725                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2726
2727             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2728             gst_object_unref (source->udpsrc[i]);
2729
2730             gst_element_release_request_pad (priv->funnel[i],
2731                 source->selpad[i]);
2732           }
2733
2734           g_slice_free (GstRTSPMulticastTransportSource, source);
2735         }
2736       }
2737
2738       /* fall through for the generic case */
2739     }
2740     case GST_RTSP_LOWER_TRANS_UDP:
2741     {
2742       gchar *dest;
2743       gint min, max;
2744       guint ttl = 0;
2745
2746       dest = tr->destination;
2747       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2748         min = tr->port.min;
2749         max = tr->port.max;
2750         ttl = tr->ttl;
2751       } else {
2752         min = tr->client_port.min;
2753         max = tr->client_port.max;
2754       }
2755
2756       if (add) {
2757         if (ttl > 0) {
2758           GST_INFO ("setting ttl-mc %d", ttl);
2759           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2760           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2761         }
2762         GST_INFO ("adding %s:%d-%d", dest, min, max);
2763         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2764         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2765         priv->transports = g_list_prepend (priv->transports, trans);
2766       } else {
2767         GST_INFO ("removing %s:%d-%d", dest, min, max);
2768         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2769         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2770         priv->transports = g_list_remove (priv->transports, trans);
2771       }
2772       priv->transports_cookie++;
2773       break;
2774     }
2775     case GST_RTSP_LOWER_TRANS_TCP:
2776       if (add) {
2777         GST_INFO ("adding TCP %s", tr->destination);
2778         priv->transports = g_list_prepend (priv->transports, trans);
2779       } else {
2780         GST_INFO ("removing TCP %s", tr->destination);
2781         priv->transports = g_list_remove (priv->transports, trans);
2782       }
2783       priv->transports_cookie++;
2784       break;
2785     default:
2786       goto unknown_transport;
2787   }
2788   return TRUE;
2789
2790   /* ERRORS */
2791 unknown_transport:
2792   {
2793     GST_INFO ("Unknown transport %d", tr->lower_transport);
2794     return FALSE;
2795   }
2796 }
2797
2798
2799 /**
2800  * gst_rtsp_stream_add_transport:
2801  * @stream: a #GstRTSPStream
2802  * @trans: (transfer none): a #GstRTSPStreamTransport
2803  *
2804  * Add the transport in @trans to @stream. The media of @stream will
2805  * then also be send to the values configured in @trans.
2806  *
2807  * @stream must be joined to a bin.
2808  *
2809  * @trans must contain a valid #GstRTSPTransport.
2810  *
2811  * Returns: %TRUE if @trans was added
2812  */
2813 gboolean
2814 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2815     GstRTSPStreamTransport * trans)
2816 {
2817   GstRTSPStreamPrivate *priv;
2818   gboolean res;
2819
2820   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2821   priv = stream->priv;
2822   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2823   g_return_val_if_fail (priv->is_joined, FALSE);
2824
2825   g_mutex_lock (&priv->lock);
2826   res = update_transport (stream, trans, TRUE);
2827   g_mutex_unlock (&priv->lock);
2828
2829   return res;
2830 }
2831
2832 /**
2833  * gst_rtsp_stream_remove_transport:
2834  * @stream: a #GstRTSPStream
2835  * @trans: (transfer none): a #GstRTSPStreamTransport
2836  *
2837  * Remove the transport in @trans from @stream. The media of @stream will
2838  * not be sent to the values configured in @trans.
2839  *
2840  * @stream must be joined to a bin.
2841  *
2842  * @trans must contain a valid #GstRTSPTransport.
2843  *
2844  * Returns: %TRUE if @trans was removed
2845  */
2846 gboolean
2847 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2848     GstRTSPStreamTransport * trans)
2849 {
2850   GstRTSPStreamPrivate *priv;
2851   gboolean res;
2852
2853   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2854   priv = stream->priv;
2855   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2856   g_return_val_if_fail (priv->is_joined, FALSE);
2857
2858   g_mutex_lock (&priv->lock);
2859   res = update_transport (stream, trans, FALSE);
2860   g_mutex_unlock (&priv->lock);
2861
2862   return res;
2863 }
2864
2865 /**
2866  * gst_rtsp_stream_update_crypto:
2867  * @stream: a #GstRTSPStream
2868  * @ssrc: the SSRC
2869  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2870  *
2871  * Update the new crypto information for @ssrc in @stream. If information
2872  * for @ssrc did not exist, it will be added. If information
2873  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2874  * be removed from @stream.
2875  *
2876  * Returns: %TRUE if @crypto could be updated
2877  */
2878 gboolean
2879 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2880     guint ssrc, GstCaps * crypto)
2881 {
2882   GstRTSPStreamPrivate *priv;
2883
2884   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2885   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2886
2887   priv = stream->priv;
2888
2889   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2890
2891   g_mutex_lock (&priv->lock);
2892   if (crypto)
2893     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2894         gst_caps_ref (crypto));
2895   else
2896     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2897   g_mutex_unlock (&priv->lock);
2898
2899   return TRUE;
2900 }
2901
2902 /**
2903  * gst_rtsp_stream_get_rtp_socket:
2904  * @stream: a #GstRTSPStream
2905  * @family: the socket family
2906  *
2907  * Get the RTP socket from @stream for a @family.
2908  *
2909  * @stream must be joined to a bin.
2910  *
2911  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2912  * socket could be allocated for @family. Unref after usage
2913  */
2914 GSocket *
2915 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2916 {
2917   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2918   GSocket *socket;
2919   const gchar *name;
2920
2921   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2922   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2923       family == G_SOCKET_FAMILY_IPV6, NULL);
2924   g_return_val_if_fail (priv->udpsink[0], NULL);
2925
2926   if (family == G_SOCKET_FAMILY_IPV6)
2927     name = "socket-v6";
2928   else
2929     name = "socket";
2930
2931   g_object_get (priv->udpsink[0], name, &socket, NULL);
2932
2933   return socket;
2934 }
2935
2936 /**
2937  * gst_rtsp_stream_get_rtcp_socket:
2938  * @stream: a #GstRTSPStream
2939  * @family: the socket family
2940  *
2941  * Get the RTCP socket from @stream for a @family.
2942  *
2943  * @stream must be joined to a bin.
2944  *
2945  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2946  * socket could be allocated for @family. Unref after usage
2947  */
2948 GSocket *
2949 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2950 {
2951   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2952   GSocket *socket;
2953   const gchar *name;
2954
2955   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2956   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2957       family == G_SOCKET_FAMILY_IPV6, NULL);
2958   g_return_val_if_fail (priv->udpsink[1], NULL);
2959
2960   if (family == G_SOCKET_FAMILY_IPV6)
2961     name = "socket-v6";
2962   else
2963     name = "socket";
2964
2965   g_object_get (priv->udpsink[1], name, &socket, NULL);
2966
2967   return socket;
2968 }
2969
2970 /**
2971  * gst_rtsp_stream_set_seqnum:
2972  * @stream: a #GstRTSPStream
2973  * @seqnum: a new sequence number
2974  *
2975  * Configure the sequence number in the payloader of @stream to @seqnum.
2976  */
2977 void
2978 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
2979 {
2980   GstRTSPStreamPrivate *priv;
2981
2982   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2983
2984   priv = stream->priv;
2985
2986   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
2987 }
2988
2989 /**
2990  * gst_rtsp_stream_get_seqnum:
2991  * @stream: a #GstRTSPStream
2992  *
2993  * Get the configured sequence number in the payloader of @stream.
2994  *
2995  * Returns: the sequence number of the payloader.
2996  */
2997 guint16
2998 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
2999 {
3000   GstRTSPStreamPrivate *priv;
3001   guint seqnum;
3002
3003   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3004
3005   priv = stream->priv;
3006
3007   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3008
3009   return seqnum;
3010 }
3011
3012 /**
3013  * gst_rtsp_stream_transport_filter:
3014  * @stream: a #GstRTSPStream
3015  * @func: (scope call) (allow-none): a callback
3016  * @user_data: (closure): user data passed to @func
3017  *
3018  * Call @func for each transport managed by @stream. The result value of @func
3019  * determines what happens to the transport. @func will be called with @stream
3020  * locked so no further actions on @stream can be performed from @func.
3021  *
3022  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3023  * @stream.
3024  *
3025  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3026  *
3027  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3028  * will also be added with an additional ref to the result #GList of this
3029  * function..
3030  *
3031  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3032  *
3033  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3034  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3035  * element in the #GList should be unreffed before the list is freed.
3036  */
3037 GList *
3038 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3039     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3040 {
3041   GstRTSPStreamPrivate *priv;
3042   GList *result, *walk, *next;
3043   GHashTable *visited = NULL;
3044   guint cookie;
3045
3046   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3047
3048   priv = stream->priv;
3049
3050   result = NULL;
3051   if (func)
3052     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3053
3054   g_mutex_lock (&priv->lock);
3055 restart:
3056   cookie = priv->transports_cookie;
3057   for (walk = priv->transports; walk; walk = next) {
3058     GstRTSPStreamTransport *trans = walk->data;
3059     GstRTSPFilterResult res;
3060     gboolean changed;
3061
3062     next = g_list_next (walk);
3063
3064     if (func) {
3065       /* only visit each transport once */
3066       if (g_hash_table_contains (visited, trans))
3067         continue;
3068
3069       g_hash_table_add (visited, g_object_ref (trans));
3070       g_mutex_unlock (&priv->lock);
3071
3072       res = func (stream, trans, user_data);
3073
3074       g_mutex_lock (&priv->lock);
3075     } else
3076       res = GST_RTSP_FILTER_REF;
3077
3078     changed = (cookie != priv->transports_cookie);
3079
3080     switch (res) {
3081       case GST_RTSP_FILTER_REMOVE:
3082         update_transport (stream, trans, FALSE);
3083         break;
3084       case GST_RTSP_FILTER_REF:
3085         result = g_list_prepend (result, g_object_ref (trans));
3086         break;
3087       case GST_RTSP_FILTER_KEEP:
3088       default:
3089         break;
3090     }
3091     if (changed)
3092       goto restart;
3093   }
3094   g_mutex_unlock (&priv->lock);
3095
3096   if (func)
3097     g_hash_table_unref (visited);
3098
3099   return result;
3100 }
3101
3102 static GstPadProbeReturn
3103 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3104 {
3105   GstRTSPStreamPrivate *priv;
3106   GstRTSPStream *stream;
3107
3108   stream = user_data;
3109   priv = stream->priv;
3110
3111   GST_DEBUG_OBJECT (pad, "now blocking");
3112
3113   g_mutex_lock (&priv->lock);
3114   priv->blocking = TRUE;
3115   g_mutex_unlock (&priv->lock);
3116
3117   gst_element_post_message (priv->payloader,
3118       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3119           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3120
3121   return GST_PAD_PROBE_OK;
3122 }
3123
3124 /**
3125  * gst_rtsp_stream_set_blocked:
3126  * @stream: a #GstRTSPStream
3127  * @blocked: boolean indicating we should block or unblock
3128  *
3129  * Blocks or unblocks the dataflow on @stream.
3130  *
3131  * Returns: %TRUE on success
3132  */
3133 gboolean
3134 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3135 {
3136   GstRTSPStreamPrivate *priv;
3137
3138   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3139
3140   priv = stream->priv;
3141
3142   g_mutex_lock (&priv->lock);
3143   if (blocked) {
3144     priv->blocking = FALSE;
3145     if (priv->blocked_id == 0) {
3146       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3147           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3148           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3149           g_object_ref (stream), g_object_unref);
3150     }
3151   } else {
3152     if (priv->blocked_id != 0) {
3153       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3154       priv->blocked_id = 0;
3155       priv->blocking = FALSE;
3156     }
3157   }
3158   g_mutex_unlock (&priv->lock);
3159
3160   return TRUE;
3161 }
3162
3163 /**
3164  * gst_rtsp_stream_is_blocking:
3165  * @stream: a #GstRTSPStream
3166  *
3167  * Check if @stream is blocking on a #GstBuffer.
3168  *
3169  * Returns: %TRUE if @stream is blocking
3170  */
3171 gboolean
3172 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3173 {
3174   GstRTSPStreamPrivate *priv;
3175   gboolean result;
3176
3177   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3178
3179   priv = stream->priv;
3180
3181   g_mutex_lock (&priv->lock);
3182   result = priv->blocking;
3183   g_mutex_unlock (&priv->lock);
3184
3185   return result;
3186 }
3187
3188 /**
3189  * gst_rtsp_stream_query_position:
3190  * @stream: a #GstRTSPStream
3191  *
3192  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3193  * the RTP parts of the pipeline and not the RTCP parts.
3194  *
3195  * Returns: %TRUE if the position could be queried
3196  */
3197 gboolean
3198 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3199 {
3200   GstRTSPStreamPrivate *priv;
3201   GstElement *sink;
3202   gboolean ret;
3203
3204   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3205
3206   priv = stream->priv;
3207
3208   g_mutex_lock (&priv->lock);
3209   if ((sink = priv->udpsink[0]))
3210     gst_object_ref (sink);
3211   g_mutex_unlock (&priv->lock);
3212
3213   if (!sink)
3214     return FALSE;
3215
3216   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3217   gst_object_unref (sink);
3218
3219   return ret;
3220 }
3221
3222 /**
3223  * gst_rtsp_stream_query_stop:
3224  * @stream: a #GstRTSPStream
3225  *
3226  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3227  * the RTP parts of the pipeline and not the RTCP parts.
3228  *
3229  * Returns: %TRUE if the stop could be queried
3230  */
3231 gboolean
3232 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3233 {
3234   GstRTSPStreamPrivate *priv;
3235   GstElement *sink;
3236   GstQuery *query;
3237   gboolean ret;
3238
3239   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3240
3241   priv = stream->priv;
3242
3243   g_mutex_lock (&priv->lock);
3244   if ((sink = priv->udpsink[0]))
3245     gst_object_ref (sink);
3246   g_mutex_unlock (&priv->lock);
3247
3248   if (!sink)
3249     return FALSE;
3250
3251   query = gst_query_new_segment (GST_FORMAT_TIME);
3252   if ((ret = gst_element_query (sink, query))) {
3253     GstFormat format;
3254
3255     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3256     if (format != GST_FORMAT_TIME)
3257       *stop = -1;
3258   }
3259   gst_query_unref (query);
3260   gst_object_unref (sink);
3261
3262   return ret;
3263
3264 }