rtsp-stream: Set format=TIME on our app sources for TCP
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 typedef struct
62 {
63   GstRTSPStreamTransport *transport;
64
65   /* RTP and RTCP source */
66   GstElement *udpsrc[2];
67   GstPad *selpad[2];
68 } GstRTSPMulticastTransportSource;
69
70 struct _GstRTSPStreamPrivate
71 {
72   GMutex lock;
73   guint idx;
74   GstPad *srcpad;
75   GstElement *payloader;
76   guint buffer_size;
77   gboolean is_joined;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_sink[2];
86   GstPad *send_src[2];
87
88   /* the RTPSession object */
89   GObject *session;
90
91   /* SRTP encoder/decoder */
92   GstElement *srtpenc;
93   GstElement *srtpdec;
94   GHashTable *keys;
95
96   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
97    * sockets */
98   GstElement *udpsrc_v4[2];
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
101    * sockets */
102   GstElement *udpsrc_v6[2];
103
104   GstElement *udpsink[2];
105
106   /* for TCP transport */
107   GstElement *appsrc[2];
108   GstElement *appqueue[2];
109   GstElement *appsink[2];
110
111   GstElement *tee[2];
112   GstElement *funnel[2];
113
114   /* retransmission */
115   GstElement *rtxsend;
116   guint rtx_pt;
117   GstClockTime rtx_time;
118
119   /* server ports for sending/receiving over ipv4 */
120   GstRTSPRange server_port_v4;
121   GstRTSPAddress *server_addr_v4;
122   gboolean have_ipv4;
123
124   /* server ports for sending/receiving over ipv6 */
125   GstRTSPRange server_port_v6;
126   GstRTSPAddress *server_addr_v6;
127   gboolean have_ipv6;
128
129   /* multicast addresses */
130   GstRTSPAddressPool *pool;
131   GstRTSPAddress *addr_v4;
132   GstRTSPAddress *addr_v6;
133
134   /* the caps of the stream */
135   gulong caps_sig;
136   GstCaps *caps;
137
138   /* transports we stream to */
139   guint n_active;
140   GList *transports;
141   guint transports_cookie;
142   GList *tr_cache;
143   guint tr_cache_cookie;
144
145   /* UDP sources for UDP multicast transports */
146   GList *transport_sources;
147
148   gint dscp_qos;
149
150   /* stream blocking */
151   gulong blocked_id;
152   gboolean blocking;
153 };
154
155 #define DEFAULT_CONTROL         NULL
156 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
157 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
158                                         GST_RTSP_LOWER_TRANS_TCP
159
160 enum
161 {
162   PROP_0,
163   PROP_CONTROL,
164   PROP_PROFILES,
165   PROP_PROTOCOLS,
166   PROP_LAST
167 };
168
169 enum
170 {
171   SIGNAL_NEW_RTP_ENCODER,
172   SIGNAL_NEW_RTCP_ENCODER,
173   SIGNAL_LAST
174 };
175
176 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
177 #define GST_CAT_DEFAULT rtsp_stream_debug
178
179 static GQuark ssrc_stream_map_key;
180
181 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
182     GValue * value, GParamSpec * pspec);
183 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
184     const GValue * value, GParamSpec * pspec);
185
186 static void gst_rtsp_stream_finalize (GObject * obj);
187
188 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
189
190 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
191
192 static void
193 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
194 {
195   GObjectClass *gobject_class;
196
197   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
198
199   gobject_class = G_OBJECT_CLASS (klass);
200
201   gobject_class->get_property = gst_rtsp_stream_get_property;
202   gobject_class->set_property = gst_rtsp_stream_set_property;
203   gobject_class->finalize = gst_rtsp_stream_finalize;
204
205   g_object_class_install_property (gobject_class, PROP_CONTROL,
206       g_param_spec_string ("control", "Control",
207           "The control string for this stream", DEFAULT_CONTROL,
208           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209
210   g_object_class_install_property (gobject_class, PROP_PROFILES,
211       g_param_spec_flags ("profiles", "Profiles",
212           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
213           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214
215   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
216       g_param_spec_flags ("protocols", "Protocols",
217           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
218           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
219
220   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
221       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
222       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
223       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
224
225   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
226       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
227       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
228       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
229
230   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
231
232   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
233 }
234
235 static void
236 gst_rtsp_stream_init (GstRTSPStream * stream)
237 {
238   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
239
240   GST_DEBUG ("new stream %p", stream);
241
242   stream->priv = priv;
243
244   priv->dscp_qos = -1;
245   priv->control = g_strdup (DEFAULT_CONTROL);
246   priv->profiles = DEFAULT_PROFILES;
247   priv->protocols = DEFAULT_PROTOCOLS;
248
249   g_mutex_init (&priv->lock);
250
251   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
252       NULL, (GDestroyNotify) gst_caps_unref);
253 }
254
255 static void
256 gst_rtsp_stream_finalize (GObject * obj)
257 {
258   GstRTSPStream *stream;
259   GstRTSPStreamPrivate *priv;
260
261   stream = GST_RTSP_STREAM (obj);
262   priv = stream->priv;
263
264   GST_DEBUG ("finalize stream %p", stream);
265
266   /* we really need to be unjoined now */
267   g_return_if_fail (!priv->is_joined);
268
269   if (priv->addr_v4)
270     gst_rtsp_address_free (priv->addr_v4);
271   if (priv->addr_v6)
272     gst_rtsp_address_free (priv->addr_v6);
273   if (priv->server_addr_v4)
274     gst_rtsp_address_free (priv->server_addr_v4);
275   if (priv->server_addr_v6)
276     gst_rtsp_address_free (priv->server_addr_v6);
277   if (priv->pool)
278     g_object_unref (priv->pool);
279   if (priv->rtxsend)
280     g_object_unref (priv->rtxsend);
281
282   gst_object_unref (priv->payloader);
283   gst_object_unref (priv->srcpad);
284   g_free (priv->control);
285   g_mutex_clear (&priv->lock);
286
287   g_hash_table_unref (priv->keys);
288
289   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
290 }
291
292 static void
293 gst_rtsp_stream_get_property (GObject * object, guint propid,
294     GValue * value, GParamSpec * pspec)
295 {
296   GstRTSPStream *stream = GST_RTSP_STREAM (object);
297
298   switch (propid) {
299     case PROP_CONTROL:
300       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
301       break;
302     case PROP_PROFILES:
303       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
304       break;
305     case PROP_PROTOCOLS:
306       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
307       break;
308     default:
309       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
310   }
311 }
312
313 static void
314 gst_rtsp_stream_set_property (GObject * object, guint propid,
315     const GValue * value, GParamSpec * pspec)
316 {
317   GstRTSPStream *stream = GST_RTSP_STREAM (object);
318
319   switch (propid) {
320     case PROP_CONTROL:
321       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
322       break;
323     case PROP_PROFILES:
324       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
325       break;
326     case PROP_PROTOCOLS:
327       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
328       break;
329     default:
330       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
331   }
332 }
333
334 /**
335  * gst_rtsp_stream_new:
336  * @idx: an index
337  * @srcpad: a #GstPad
338  * @payloader: a #GstElement
339  *
340  * Create a new media stream with index @idx that handles RTP data on
341  * @srcpad and has a payloader element @payloader.
342  *
343  * Returns: (transfer full): a new #GstRTSPStream
344  */
345 GstRTSPStream *
346 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
347 {
348   GstRTSPStreamPrivate *priv;
349   GstRTSPStream *stream;
350
351   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
352   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
353   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
354
355   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
356   priv = stream->priv;
357   priv->idx = idx;
358   priv->payloader = gst_object_ref (payloader);
359   priv->srcpad = gst_object_ref (srcpad);
360
361   return stream;
362 }
363
364 /**
365  * gst_rtsp_stream_get_index:
366  * @stream: a #GstRTSPStream
367  *
368  * Get the stream index.
369  *
370  * Return: the stream index.
371  */
372 guint
373 gst_rtsp_stream_get_index (GstRTSPStream * stream)
374 {
375   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
376
377   return stream->priv->idx;
378 }
379
380 /**
381  * gst_rtsp_stream_get_pt:
382  * @stream: a #GstRTSPStream
383  *
384  * Get the stream payload type.
385  *
386  * Return: the stream payload type.
387  */
388 guint
389 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
390 {
391   GstRTSPStreamPrivate *priv;
392   guint pt;
393
394   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
395
396   priv = stream->priv;
397
398   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
399
400   return pt;
401 }
402
403 /**
404  * gst_rtsp_stream_get_srcpad:
405  * @stream: a #GstRTSPStream
406  *
407  * Get the srcpad associated with @stream.
408  *
409  * Returns: (transfer full): the srcpad. Unref after usage.
410  */
411 GstPad *
412 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
413 {
414   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
415
416   return gst_object_ref (stream->priv->srcpad);
417 }
418
419 /**
420  * gst_rtsp_stream_get_control:
421  * @stream: a #GstRTSPStream
422  *
423  * Get the control string to identify this stream.
424  *
425  * Returns: (transfer full): the control string. g_free() after usage.
426  */
427 gchar *
428 gst_rtsp_stream_get_control (GstRTSPStream * stream)
429 {
430   GstRTSPStreamPrivate *priv;
431   gchar *result;
432
433   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
434
435   priv = stream->priv;
436
437   g_mutex_lock (&priv->lock);
438   if ((result = g_strdup (priv->control)) == NULL)
439     result = g_strdup_printf ("stream=%u", priv->idx);
440   g_mutex_unlock (&priv->lock);
441
442   return result;
443 }
444
445 /**
446  * gst_rtsp_stream_set_control:
447  * @stream: a #GstRTSPStream
448  * @control: a control string
449  *
450  * Set the control string in @stream.
451  */
452 void
453 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
454 {
455   GstRTSPStreamPrivate *priv;
456
457   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
458
459   priv = stream->priv;
460
461   g_mutex_lock (&priv->lock);
462   g_free (priv->control);
463   priv->control = g_strdup (control);
464   g_mutex_unlock (&priv->lock);
465 }
466
467 /**
468  * gst_rtsp_stream_has_control:
469  * @stream: a #GstRTSPStream
470  * @control: a control string
471  *
472  * Check if @stream has the control string @control.
473  *
474  * Returns: %TRUE is @stream has @control as the control string
475  */
476 gboolean
477 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
478 {
479   GstRTSPStreamPrivate *priv;
480   gboolean res;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if (priv->control)
488     res = (g_strcmp0 (priv->control, control) == 0);
489   else {
490     guint streamid;
491
492     if (sscanf (control, "stream=%u", &streamid) > 0)
493       res = (streamid == priv->idx);
494     else
495       res = FALSE;
496   }
497   g_mutex_unlock (&priv->lock);
498
499   return res;
500 }
501
502 /**
503  * gst_rtsp_stream_set_mtu:
504  * @stream: a #GstRTSPStream
505  * @mtu: a new MTU
506  *
507  * Configure the mtu in the payloader of @stream to @mtu.
508  */
509 void
510 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
511 {
512   GstRTSPStreamPrivate *priv;
513
514   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
515
516   priv = stream->priv;
517
518   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
519
520   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
521 }
522
523 /**
524  * gst_rtsp_stream_get_mtu:
525  * @stream: a #GstRTSPStream
526  *
527  * Get the configured MTU in the payloader of @stream.
528  *
529  * Returns: the MTU of the payloader.
530  */
531 guint
532 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
533 {
534   GstRTSPStreamPrivate *priv;
535   guint mtu;
536
537   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
538
539   priv = stream->priv;
540
541   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
542
543   return mtu;
544 }
545
546 /* Update the dscp qos property on the udp sinks */
547 static void
548 update_dscp_qos (GstRTSPStream * stream)
549 {
550   GstRTSPStreamPrivate *priv;
551
552   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
553
554   priv = stream->priv;
555
556   if (priv->udpsink[0]) {
557     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
558         NULL);
559   }
560
561   if (priv->udpsink[1]) {
562     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
563         NULL);
564   }
565 }
566
567 /**
568  * gst_rtsp_stream_set_dscp_qos:
569  * @stream: a #GstRTSPStream
570  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
571  *
572  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
573  */
574 void
575 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
576 {
577   GstRTSPStreamPrivate *priv;
578
579   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
580
581   priv = stream->priv;
582
583   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
584
585   if (dscp_qos < -1 || dscp_qos > 63) {
586     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
587     return;
588   }
589
590   priv->dscp_qos = dscp_qos;
591
592   update_dscp_qos (stream);
593 }
594
595 /**
596  * gst_rtsp_stream_get_dscp_qos:
597  * @stream: a #GstRTSPStream
598  *
599  * Get the configured DSCP QoS in of the outgoing sockets.
600  *
601  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
602  */
603 gint
604 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
605 {
606   GstRTSPStreamPrivate *priv;
607
608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
609
610   priv = stream->priv;
611
612   return priv->dscp_qos;
613 }
614
615 /**
616  * gst_rtsp_stream_is_transport_supported:
617  * @stream: a #GstRTSPStream
618  * @transport: (transfer none): a #GstRTSPTransport
619  *
620  * Check if @transport can be handled by stream
621  *
622  * Returns: %TRUE if @transport can be handled by @stream.
623  */
624 gboolean
625 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
626     GstRTSPTransport * transport)
627 {
628   GstRTSPStreamPrivate *priv;
629
630   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
631
632   priv = stream->priv;
633
634   g_mutex_lock (&priv->lock);
635   if (transport->trans != GST_RTSP_TRANS_RTP)
636     goto unsupported_transmode;
637
638   if (!(transport->profile & priv->profiles))
639     goto unsupported_profile;
640
641   if (!(transport->lower_transport & priv->protocols))
642     goto unsupported_ltrans;
643
644   g_mutex_unlock (&priv->lock);
645
646   return TRUE;
647
648   /* ERRORS */
649 unsupported_transmode:
650   {
651     GST_DEBUG ("unsupported transport mode %d", transport->trans);
652     g_mutex_unlock (&priv->lock);
653     return FALSE;
654   }
655 unsupported_profile:
656   {
657     GST_DEBUG ("unsupported profile %d", transport->profile);
658     g_mutex_unlock (&priv->lock);
659     return FALSE;
660   }
661 unsupported_ltrans:
662   {
663     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
664     g_mutex_unlock (&priv->lock);
665     return FALSE;
666   }
667 }
668
669 /**
670  * gst_rtsp_stream_set_profiles:
671  * @stream: a #GstRTSPStream
672  * @profiles: the new profiles
673  *
674  * Configure the allowed profiles for @stream.
675  */
676 void
677 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
678 {
679   GstRTSPStreamPrivate *priv;
680
681   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
682
683   priv = stream->priv;
684
685   g_mutex_lock (&priv->lock);
686   priv->profiles = profiles;
687   g_mutex_unlock (&priv->lock);
688 }
689
690 /**
691  * gst_rtsp_stream_get_profiles:
692  * @stream: a #GstRTSPStream
693  *
694  * Get the allowed profiles of @stream.
695  *
696  * Returns: a #GstRTSPProfile
697  */
698 GstRTSPProfile
699 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
700 {
701   GstRTSPStreamPrivate *priv;
702   GstRTSPProfile res;
703
704   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
705
706   priv = stream->priv;
707
708   g_mutex_lock (&priv->lock);
709   res = priv->profiles;
710   g_mutex_unlock (&priv->lock);
711
712   return res;
713 }
714
715 /**
716  * gst_rtsp_stream_set_protocols:
717  * @stream: a #GstRTSPStream
718  * @protocols: the new flags
719  *
720  * Configure the allowed lower transport for @stream.
721  */
722 void
723 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
724     GstRTSPLowerTrans protocols)
725 {
726   GstRTSPStreamPrivate *priv;
727
728   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
729
730   priv = stream->priv;
731
732   g_mutex_lock (&priv->lock);
733   priv->protocols = protocols;
734   g_mutex_unlock (&priv->lock);
735 }
736
737 /**
738  * gst_rtsp_stream_get_protocols:
739  * @stream: a #GstRTSPStream
740  *
741  * Get the allowed protocols of @stream.
742  *
743  * Returns: a #GstRTSPLowerTrans
744  */
745 GstRTSPLowerTrans
746 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
747 {
748   GstRTSPStreamPrivate *priv;
749   GstRTSPLowerTrans res;
750
751   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
752       GST_RTSP_LOWER_TRANS_UNKNOWN);
753
754   priv = stream->priv;
755
756   g_mutex_lock (&priv->lock);
757   res = priv->protocols;
758   g_mutex_unlock (&priv->lock);
759
760   return res;
761 }
762
763 /**
764  * gst_rtsp_stream_set_address_pool:
765  * @stream: a #GstRTSPStream
766  * @pool: (transfer none): a #GstRTSPAddressPool
767  *
768  * configure @pool to be used as the address pool of @stream.
769  */
770 void
771 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
772     GstRTSPAddressPool * pool)
773 {
774   GstRTSPStreamPrivate *priv;
775   GstRTSPAddressPool *old;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   GST_LOG_OBJECT (stream, "set address pool %p", pool);
782
783   g_mutex_lock (&priv->lock);
784   if ((old = priv->pool) != pool)
785     priv->pool = pool ? g_object_ref (pool) : NULL;
786   else
787     old = NULL;
788   g_mutex_unlock (&priv->lock);
789
790   if (old)
791     g_object_unref (old);
792 }
793
794 /**
795  * gst_rtsp_stream_get_address_pool:
796  * @stream: a #GstRTSPStream
797  *
798  * Get the #GstRTSPAddressPool used as the address pool of @stream.
799  *
800  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
801  * usage.
802  */
803 GstRTSPAddressPool *
804 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
805 {
806   GstRTSPStreamPrivate *priv;
807   GstRTSPAddressPool *result;
808
809   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
810
811   priv = stream->priv;
812
813   g_mutex_lock (&priv->lock);
814   if ((result = priv->pool))
815     g_object_ref (result);
816   g_mutex_unlock (&priv->lock);
817
818   return result;
819 }
820
821 /**
822  * gst_rtsp_stream_get_multicast_address:
823  * @stream: a #GstRTSPStream
824  * @family: the #GSocketFamily
825  *
826  * Get the multicast address of @stream for @family.
827  *
828  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
829  * or %NULL when no address could be allocated. gst_rtsp_address_free()
830  * after usage.
831  */
832 GstRTSPAddress *
833 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
834     GSocketFamily family)
835 {
836   GstRTSPStreamPrivate *priv;
837   GstRTSPAddress *result;
838   GstRTSPAddress **addrp;
839   GstRTSPAddressFlags flags;
840
841   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
842
843   priv = stream->priv;
844
845   if (family == G_SOCKET_FAMILY_IPV6) {
846     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
847     addrp = &priv->addr_v6;
848   } else {
849     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
850     addrp = &priv->addr_v4;
851   }
852
853   g_mutex_lock (&priv->lock);
854   if (*addrp == NULL) {
855     if (priv->pool == NULL)
856       goto no_pool;
857
858     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
859
860     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
861     if (*addrp == NULL)
862       goto no_address;
863   }
864   result = gst_rtsp_address_copy (*addrp);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868
869   /* ERRORS */
870 no_pool:
871   {
872     GST_ERROR_OBJECT (stream, "no address pool specified");
873     g_mutex_unlock (&priv->lock);
874     return NULL;
875   }
876 no_address:
877   {
878     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
879     g_mutex_unlock (&priv->lock);
880     return NULL;
881   }
882 }
883
884 /**
885  * gst_rtsp_stream_reserve_address:
886  * @stream: a #GstRTSPStream
887  * @address: an address
888  * @port: a port
889  * @n_ports: n_ports
890  * @ttl: a TTL
891  *
892  * Reserve @address and @port as the address and port of @stream.
893  *
894  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
895  * the address could be reserved. gst_rtsp_address_free() after usage.
896  */
897 GstRTSPAddress *
898 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
899     const gchar * address, guint port, guint n_ports, guint ttl)
900 {
901   GstRTSPStreamPrivate *priv;
902   GstRTSPAddress *result;
903   GInetAddress *addr;
904   GSocketFamily family;
905   GstRTSPAddress **addrp;
906
907   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
908   g_return_val_if_fail (address != NULL, NULL);
909   g_return_val_if_fail (port > 0, NULL);
910   g_return_val_if_fail (n_ports > 0, NULL);
911   g_return_val_if_fail (ttl > 0, NULL);
912
913   priv = stream->priv;
914
915   addr = g_inet_address_new_from_string (address);
916   if (!addr) {
917     GST_ERROR ("failed to get inet addr from %s", address);
918     family = G_SOCKET_FAMILY_IPV4;
919   } else {
920     family = g_inet_address_get_family (addr);
921     g_object_unref (addr);
922   }
923
924   if (family == G_SOCKET_FAMILY_IPV6)
925     addrp = &priv->addr_v6;
926   else
927     addrp = &priv->addr_v4;
928
929   g_mutex_lock (&priv->lock);
930   if (*addrp == NULL) {
931     GstRTSPAddressPoolResult res;
932
933     if (priv->pool == NULL)
934       goto no_pool;
935
936     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
937         port, n_ports, ttl, addrp);
938     if (res != GST_RTSP_ADDRESS_POOL_OK)
939       goto no_address;
940   } else {
941     if (strcmp ((*addrp)->address, address) ||
942         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
943         (*addrp)->ttl != ttl)
944       goto different_address;
945   }
946   result = gst_rtsp_address_copy (*addrp);
947   g_mutex_unlock (&priv->lock);
948
949   return result;
950
951   /* ERRORS */
952 no_pool:
953   {
954     GST_ERROR_OBJECT (stream, "no address pool specified");
955     g_mutex_unlock (&priv->lock);
956     return NULL;
957   }
958 no_address:
959   {
960     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
961         address);
962     g_mutex_unlock (&priv->lock);
963     return NULL;
964   }
965 different_address:
966   {
967     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
968         " reserved", address);
969     g_mutex_unlock (&priv->lock);
970     return NULL;
971   }
972 }
973
974 static gboolean
975 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
976     GSocketFamily family, GstElement * udpsrc_out[2],
977     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
978     GstRTSPAddress ** server_addr_out)
979 {
980   GstStateChangeReturn ret;
981   GstElement *udpsrc0, *udpsrc1;
982   GstElement *udpsink0, *udpsink1;
983   GSocket *rtp_socket = NULL;
984   GSocket *rtcp_socket;
985   gint tmp_rtp, tmp_rtcp;
986   guint count;
987   gint rtpport, rtcpport;
988   GList *rejected_addresses = NULL;
989   GstRTSPAddress *addr = NULL;
990   GInetAddress *inetaddr = NULL;
991   GSocketAddress *rtp_sockaddr = NULL;
992   GSocketAddress *rtcp_sockaddr = NULL;
993   const gchar *multisink_socket;
994
995   if (family == G_SOCKET_FAMILY_IPV6)
996     multisink_socket = "socket-v6";
997   else
998     multisink_socket = "socket";
999
1000   udpsrc0 = NULL;
1001   udpsrc1 = NULL;
1002   udpsink0 = NULL;
1003   udpsink1 = NULL;
1004   count = 0;
1005
1006   /* Start with random port */
1007   tmp_rtp = 0;
1008
1009   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1010       G_SOCKET_PROTOCOL_UDP, NULL);
1011   if (!rtcp_socket)
1012     goto no_udp_protocol;
1013
1014   if (*server_addr_out)
1015     gst_rtsp_address_free (*server_addr_out);
1016
1017   /* try to allocate 2 UDP ports, the RTP port should be an even
1018    * number and the RTCP port should be the next (uneven) port */
1019 again:
1020
1021   if (rtp_socket == NULL) {
1022     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1023         G_SOCKET_PROTOCOL_UDP, NULL);
1024     if (!rtp_socket)
1025       goto no_udp_protocol;
1026   }
1027
1028   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1029     GstRTSPAddressFlags flags;
1030
1031     if (addr)
1032       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1033
1034     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1035     if (family == G_SOCKET_FAMILY_IPV6)
1036       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1037     else
1038       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1039
1040     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1041
1042     if (addr == NULL)
1043       goto no_ports;
1044
1045     tmp_rtp = addr->port;
1046
1047     g_clear_object (&inetaddr);
1048     inetaddr = g_inet_address_new_from_string (addr->address);
1049   } else {
1050     if (tmp_rtp != 0) {
1051       tmp_rtp += 2;
1052       if (++count > 20)
1053         goto no_ports;
1054     }
1055
1056     if (inetaddr == NULL)
1057       inetaddr = g_inet_address_new_any (family);
1058   }
1059
1060   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1061   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1062     g_object_unref (rtp_sockaddr);
1063     goto again;
1064   }
1065   g_object_unref (rtp_sockaddr);
1066
1067   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1068   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1069     g_clear_object (&rtp_sockaddr);
1070     goto socket_error;
1071   }
1072
1073   tmp_rtp =
1074       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1075   g_object_unref (rtp_sockaddr);
1076
1077   /* check if port is even */
1078   if ((tmp_rtp & 1) != 0) {
1079     /* port not even, close and allocate another */
1080     tmp_rtp++;
1081     g_clear_object (&rtp_socket);
1082     goto again;
1083   }
1084
1085   /* set port */
1086   tmp_rtcp = tmp_rtp + 1;
1087
1088   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1089   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1090     g_object_unref (rtcp_sockaddr);
1091     g_clear_object (&rtp_socket);
1092     goto again;
1093   }
1094   g_object_unref (rtcp_sockaddr);
1095
1096   g_clear_object (&inetaddr);
1097
1098   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1099   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1100
1101   if (udpsrc0 == NULL || udpsrc1 == NULL)
1102     goto no_udp_protocol;
1103
1104   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1105   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1106
1107   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1108   if (ret == GST_STATE_CHANGE_FAILURE)
1109     goto element_error;
1110   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1111   if (ret == GST_STATE_CHANGE_FAILURE)
1112     goto element_error;
1113
1114   /* all fine, do port check */
1115   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1116   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1117
1118   /* this should not happen... */
1119   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1120     goto port_error;
1121
1122   if (udpsink_out[0])
1123     udpsink0 = udpsink_out[0];
1124   else
1125     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1126
1127   if (!udpsink0)
1128     goto no_udp_protocol;
1129
1130   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1131   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1132
1133   if (udpsink_out[1])
1134     udpsink1 = udpsink_out[1];
1135   else
1136     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1137
1138   if (!udpsink1)
1139     goto no_udp_protocol;
1140
1141   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1142   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1143   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1144
1145   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1146   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1147   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1148   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1149   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1150   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1151   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1152   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1153
1154   /* we keep these elements, we will further configure them when the
1155    * client told us to really use the UDP ports. */
1156   udpsrc_out[0] = udpsrc0;
1157   udpsrc_out[1] = udpsrc1;
1158   udpsink_out[0] = udpsink0;
1159   udpsink_out[1] = udpsink1;
1160
1161   server_port_out->min = rtpport;
1162   server_port_out->max = rtcpport;
1163
1164   *server_addr_out = addr;
1165   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1166
1167   g_object_unref (rtp_socket);
1168   g_object_unref (rtcp_socket);
1169
1170   return TRUE;
1171
1172   /* ERRORS */
1173 no_udp_protocol:
1174   {
1175     goto cleanup;
1176   }
1177 no_ports:
1178   {
1179     goto cleanup;
1180   }
1181 port_error:
1182   {
1183     goto cleanup;
1184   }
1185 socket_error:
1186   {
1187     goto cleanup;
1188   }
1189 element_error:
1190   {
1191     goto cleanup;
1192   }
1193 cleanup:
1194   {
1195     if (udpsrc0) {
1196       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1197       gst_object_unref (udpsrc0);
1198     }
1199     if (udpsrc1) {
1200       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1201       gst_object_unref (udpsrc1);
1202     }
1203     if (udpsink0) {
1204       gst_element_set_state (udpsink0, GST_STATE_NULL);
1205       gst_object_unref (udpsink0);
1206     }
1207     if (inetaddr)
1208       g_object_unref (inetaddr);
1209     g_list_free_full (rejected_addresses,
1210         (GDestroyNotify) gst_rtsp_address_free);
1211     if (addr)
1212       gst_rtsp_address_free (addr);
1213     if (rtp_socket)
1214       g_object_unref (rtp_socket);
1215     if (rtcp_socket)
1216       g_object_unref (rtcp_socket);
1217     return FALSE;
1218   }
1219 }
1220
1221 /* must be called with lock */
1222 static gboolean
1223 alloc_ports (GstRTSPStream * stream)
1224 {
1225   GstRTSPStreamPrivate *priv = stream->priv;
1226
1227   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1228       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1229       &priv->server_port_v4, &priv->server_addr_v4);
1230
1231   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1232       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1233       &priv->server_port_v6, &priv->server_addr_v6);
1234
1235   return priv->have_ipv4 || priv->have_ipv6;
1236 }
1237
1238 /**
1239  * gst_rtsp_stream_get_server_port:
1240  * @stream: a #GstRTSPStream
1241  * @server_port: (out): result server port
1242  * @family: the port family to get
1243  *
1244  * Fill @server_port with the port pair used by the server. This function can
1245  * only be called when @stream has been joined.
1246  */
1247 void
1248 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1249     GstRTSPRange * server_port, GSocketFamily family)
1250 {
1251   GstRTSPStreamPrivate *priv;
1252
1253   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1254   priv = stream->priv;
1255   g_return_if_fail (priv->is_joined);
1256
1257   g_mutex_lock (&priv->lock);
1258   if (family == G_SOCKET_FAMILY_IPV4) {
1259     if (server_port)
1260       *server_port = priv->server_port_v4;
1261   } else {
1262     if (server_port)
1263       *server_port = priv->server_port_v6;
1264   }
1265   g_mutex_unlock (&priv->lock);
1266 }
1267
1268 /**
1269  * gst_rtsp_stream_get_rtpsession:
1270  * @stream: a #GstRTSPStream
1271  *
1272  * Get the RTP session of this stream.
1273  *
1274  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1275  */
1276 GObject *
1277 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1278 {
1279   GstRTSPStreamPrivate *priv;
1280   GObject *session;
1281
1282   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1283
1284   priv = stream->priv;
1285
1286   g_mutex_lock (&priv->lock);
1287   if ((session = priv->session))
1288     g_object_ref (session);
1289   g_mutex_unlock (&priv->lock);
1290
1291   return session;
1292 }
1293
1294 /**
1295  * gst_rtsp_stream_get_ssrc:
1296  * @stream: a #GstRTSPStream
1297  * @ssrc: (out): result ssrc
1298  *
1299  * Get the SSRC used by the RTP session of this stream. This function can only
1300  * be called when @stream has been joined.
1301  */
1302 void
1303 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1304 {
1305   GstRTSPStreamPrivate *priv;
1306
1307   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1308   priv = stream->priv;
1309   g_return_if_fail (priv->is_joined);
1310
1311   g_mutex_lock (&priv->lock);
1312   if (ssrc && priv->session)
1313     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1314   g_mutex_unlock (&priv->lock);
1315 }
1316
1317 /**
1318  * gst_rtsp_stream_set_retransmission_time:
1319  * @stream: a #GstRTSPStream
1320  * @time: a #GstClockTime
1321  *
1322  * Set the amount of time to store retransmission packets.
1323  */
1324 void
1325 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1326     GstClockTime time)
1327 {
1328   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1329
1330   g_mutex_lock (&stream->priv->lock);
1331   stream->priv->rtx_time = time;
1332   if (stream->priv->rtxsend)
1333     g_object_set (stream->priv->rtxsend, "max-size-time",
1334         GST_TIME_AS_MSECONDS (time), NULL);
1335   g_mutex_unlock (&stream->priv->lock);
1336 }
1337
1338 /**
1339  * gst_rtsp_media_get_retransmission_time:
1340  * @media: a #GstRTSPMedia
1341  *
1342  * Get the amount of time to store retransmission data.
1343  *
1344  * Returns: the amount of time to store retransmission data.
1345  */
1346 GstClockTime
1347 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1348 {
1349   GstClockTime ret;
1350
1351   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1352
1353   g_mutex_lock (&stream->priv->lock);
1354   ret = stream->priv->rtx_time;
1355   g_mutex_unlock (&stream->priv->lock);
1356
1357   return ret;
1358 }
1359
1360 void
1361 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1362 {
1363   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1364
1365   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1366
1367   g_mutex_lock (&stream->priv->lock);
1368   stream->priv->rtx_pt = rtx_pt;
1369   if (stream->priv->rtxsend) {
1370     guint pt = gst_rtsp_stream_get_pt (stream);
1371     gchar *pt_s = g_strdup_printf ("%d", pt);
1372     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1373         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1374     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1375     g_free (pt_s);
1376     gst_structure_free (rtx_pt_map);
1377   }
1378   g_mutex_unlock (&stream->priv->lock);
1379 }
1380
1381 guint
1382 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1383 {
1384   guint rtx_pt;
1385
1386   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1387
1388   g_mutex_lock (&stream->priv->lock);
1389   rtx_pt = stream->priv->rtx_pt;
1390   g_mutex_unlock (&stream->priv->lock);
1391
1392   return rtx_pt;
1393 }
1394
1395 /* executed from streaming thread */
1396 static void
1397 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1398 {
1399   GstRTSPStreamPrivate *priv = stream->priv;
1400   GstCaps *newcaps, *oldcaps;
1401
1402   newcaps = gst_pad_get_current_caps (pad);
1403
1404   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1405       newcaps);
1406
1407   g_mutex_lock (&priv->lock);
1408   oldcaps = priv->caps;
1409   priv->caps = newcaps;
1410   g_mutex_unlock (&priv->lock);
1411
1412   if (oldcaps)
1413     gst_caps_unref (oldcaps);
1414 }
1415
1416 static void
1417 dump_structure (const GstStructure * s)
1418 {
1419   gchar *sstr;
1420
1421   sstr = gst_structure_to_string (s);
1422   GST_INFO ("structure: %s", sstr);
1423   g_free (sstr);
1424 }
1425
1426 static GstRTSPStreamTransport *
1427 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1428 {
1429   GstRTSPStreamPrivate *priv = stream->priv;
1430   GList *walk;
1431   GstRTSPStreamTransport *result = NULL;
1432   const gchar *tmp;
1433   gchar *dest;
1434   guint port;
1435
1436   if (rtcp_from == NULL)
1437     return NULL;
1438
1439   tmp = g_strrstr (rtcp_from, ":");
1440   if (tmp == NULL)
1441     return NULL;
1442
1443   port = atoi (tmp + 1);
1444   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1445
1446   g_mutex_lock (&priv->lock);
1447   GST_INFO ("finding %s:%d in %d transports", dest, port,
1448       g_list_length (priv->transports));
1449
1450   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1451     GstRTSPStreamTransport *trans = walk->data;
1452     const GstRTSPTransport *tr;
1453     gint min, max;
1454
1455     tr = gst_rtsp_stream_transport_get_transport (trans);
1456
1457     min = tr->client_port.min;
1458     max = tr->client_port.max;
1459
1460     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1461       result = trans;
1462       break;
1463     }
1464   }
1465   if (result)
1466     g_object_ref (result);
1467   g_mutex_unlock (&priv->lock);
1468
1469   g_free (dest);
1470
1471   return result;
1472 }
1473
1474 static GstRTSPStreamTransport *
1475 check_transport (GObject * source, GstRTSPStream * stream)
1476 {
1477   GstStructure *stats;
1478   GstRTSPStreamTransport *trans;
1479
1480   /* see if we have a stream to match with the origin of the RTCP packet */
1481   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1482   if (trans == NULL) {
1483     g_object_get (source, "stats", &stats, NULL);
1484     if (stats) {
1485       const gchar *rtcp_from;
1486
1487       dump_structure (stats);
1488
1489       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1490       if ((trans = find_transport (stream, rtcp_from))) {
1491         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1492             source);
1493         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1494             g_object_unref);
1495       }
1496       gst_structure_free (stats);
1497     }
1498   }
1499   return trans;
1500 }
1501
1502
1503 static void
1504 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1505 {
1506   GstRTSPStreamTransport *trans;
1507
1508   GST_INFO ("%p: new source %p", stream, source);
1509
1510   trans = check_transport (source, stream);
1511
1512   if (trans)
1513     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1514 }
1515
1516 static void
1517 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1518 {
1519   GST_INFO ("%p: new SDES %p", stream, source);
1520 }
1521
1522 static void
1523 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1524 {
1525   GstRTSPStreamTransport *trans;
1526
1527   trans = check_transport (source, stream);
1528
1529   if (trans) {
1530     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1531     gst_rtsp_stream_transport_keep_alive (trans);
1532   }
1533 #ifdef DUMP_STATS
1534   {
1535     GstStructure *stats;
1536     g_object_get (source, "stats", &stats, NULL);
1537     if (stats) {
1538       dump_structure (stats);
1539       gst_structure_free (stats);
1540     }
1541   }
1542 #endif
1543 }
1544
1545 static void
1546 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1547 {
1548   GST_INFO ("%p: source %p bye", stream, source);
1549 }
1550
1551 static void
1552 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1553 {
1554   GstRTSPStreamTransport *trans;
1555
1556   GST_INFO ("%p: source %p bye timeout", stream, source);
1557
1558   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1559     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1560     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1561   }
1562 }
1563
1564 static void
1565 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1566 {
1567   GstRTSPStreamTransport *trans;
1568
1569   GST_INFO ("%p: source %p timeout", stream, source);
1570
1571   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1572     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1573     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1574   }
1575 }
1576
1577 static void
1578 clear_tr_cache (GstRTSPStreamPrivate * priv)
1579 {
1580   g_list_foreach (priv->tr_cache, (GFunc) g_object_unref, NULL);
1581   g_list_free (priv->tr_cache);
1582   priv->tr_cache = NULL;
1583 }
1584
1585 static GstFlowReturn
1586 handle_new_sample (GstAppSink * sink, gpointer user_data)
1587 {
1588   GstRTSPStreamPrivate *priv;
1589   GList *walk;
1590   GstSample *sample;
1591   GstBuffer *buffer;
1592   GstRTSPStream *stream;
1593   gboolean is_rtp;
1594
1595   sample = gst_app_sink_pull_sample (sink);
1596   if (!sample)
1597     return GST_FLOW_OK;
1598
1599   stream = (GstRTSPStream *) user_data;
1600   priv = stream->priv;
1601   buffer = gst_sample_get_buffer (sample);
1602
1603   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1604
1605   g_mutex_lock (&priv->lock);
1606   if (priv->tr_cache_cookie != priv->transports_cookie) {
1607     clear_tr_cache (priv);
1608     for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1609       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1610       priv->tr_cache = g_list_prepend (priv->tr_cache, g_object_ref (tr));
1611     }
1612     priv->tr_cache_cookie = priv->transports_cookie;
1613   }
1614   g_mutex_unlock (&priv->lock);
1615
1616   for (walk = priv->tr_cache; walk; walk = g_list_next (walk)) {
1617     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1618
1619     if (is_rtp) {
1620       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1621     } else {
1622       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1623     }
1624   }
1625   gst_sample_unref (sample);
1626
1627   return GST_FLOW_OK;
1628 }
1629
1630 static GstAppSinkCallbacks sink_cb = {
1631   NULL,                         /* not interested in EOS */
1632   NULL,                         /* not interested in preroll samples */
1633   handle_new_sample,
1634 };
1635
1636 static GstElement *
1637 get_rtp_encoder (GstRTSPStream * stream, guint session)
1638 {
1639   GstRTSPStreamPrivate *priv = stream->priv;
1640
1641   if (priv->srtpenc == NULL) {
1642     gchar *name;
1643
1644     name = g_strdup_printf ("srtpenc_%u", session);
1645     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1646     g_free (name);
1647
1648     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1649   }
1650   return gst_object_ref (priv->srtpenc);
1651 }
1652
1653 static GstElement *
1654 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1655 {
1656   GstRTSPStreamPrivate *priv = stream->priv;
1657   GstElement *oldenc, *enc;
1658   GstPad *pad;
1659   gchar *name;
1660
1661   if (priv->idx != session)
1662     return NULL;
1663
1664   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1665
1666   oldenc = priv->srtpenc;
1667   enc = get_rtp_encoder (stream, session);
1668   name = g_strdup_printf ("rtp_sink_%d", session);
1669   pad = gst_element_get_request_pad (enc, name);
1670   g_free (name);
1671   gst_object_unref (pad);
1672
1673   if (oldenc == NULL)
1674     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1675         enc);
1676
1677   return enc;
1678 }
1679
1680 static GstElement *
1681 request_rtcp_encoder (GstElement * rtpbin, guint session,
1682     GstRTSPStream * stream)
1683 {
1684   GstRTSPStreamPrivate *priv = stream->priv;
1685   GstElement *oldenc, *enc;
1686   GstPad *pad;
1687   gchar *name;
1688
1689   if (priv->idx != session)
1690     return NULL;
1691
1692   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1693
1694   oldenc = priv->srtpenc;
1695   enc = get_rtp_encoder (stream, session);
1696   name = g_strdup_printf ("rtcp_sink_%d", session);
1697   pad = gst_element_get_request_pad (enc, name);
1698   g_free (name);
1699   gst_object_unref (pad);
1700
1701   if (oldenc == NULL)
1702     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1703         enc);
1704
1705   return enc;
1706 }
1707
1708 static GstCaps *
1709 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1710 {
1711   GstRTSPStreamPrivate *priv = stream->priv;
1712   GstCaps *caps;
1713
1714   GST_DEBUG ("request key %08x", ssrc);
1715
1716   g_mutex_lock (&priv->lock);
1717   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1718     gst_caps_ref (caps);
1719   g_mutex_unlock (&priv->lock);
1720
1721   return caps;
1722 }
1723
1724 static GstElement *
1725 request_rtcp_decoder (GstElement * rtpbin, guint session,
1726     GstRTSPStream * stream)
1727 {
1728   GstRTSPStreamPrivate *priv = stream->priv;
1729
1730   if (priv->idx != session)
1731     return NULL;
1732
1733   if (priv->srtpdec == NULL) {
1734     gchar *name;
1735
1736     name = g_strdup_printf ("srtpdec_%u", session);
1737     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1738     g_free (name);
1739
1740     g_signal_connect (priv->srtpdec, "request-key",
1741         (GCallback) request_key, stream);
1742   }
1743   return gst_object_ref (priv->srtpdec);
1744 }
1745
1746 static GstElement *
1747 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
1748 {
1749   GstElement *bin;
1750   GstPad *pad;
1751   GstStructure *pt_map;
1752   gchar *name;
1753   guint pt, rtx_pt;
1754   gchar *pt_s;
1755
1756   pt = gst_rtsp_stream_get_pt (stream);
1757   pt_s = g_strdup_printf ("%u", pt);
1758   rtx_pt = stream->priv->rtx_pt;
1759
1760   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1761
1762   bin = gst_bin_new (NULL);
1763   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1764   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1765       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1766   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1767       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1768   g_free (pt_s);
1769   gst_structure_free (pt_map);
1770   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1771
1772   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1773   name = g_strdup_printf ("src_%u", sessid);
1774   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1775   g_free (name);
1776   gst_object_unref (pad);
1777
1778   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1779   name = g_strdup_printf ("sink_%u", sessid);
1780   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1781   g_free (name);
1782   gst_object_unref (pad);
1783
1784   return bin;
1785 }
1786
1787 /**
1788  * gst_rtsp_stream_join_bin:
1789  * @stream: a #GstRTSPStream
1790  * @bin: (transfer none): a #GstBin to join
1791  * @rtpbin: (transfer none): a rtpbin element in @bin
1792  * @state: the target state of the new elements
1793  *
1794  * Join the #GstBin @bin that contains the element @rtpbin.
1795  *
1796  * @stream will link to @rtpbin, which must be inside @bin. The elements
1797  * added to @bin will be set to the state given in @state.
1798  *
1799  * Returns: %TRUE on success.
1800  */
1801 gboolean
1802 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1803     GstElement * rtpbin, GstState state)
1804 {
1805   GstRTSPStreamPrivate *priv;
1806   gint i;
1807   guint idx;
1808   gchar *name;
1809   GstPad *pad, *sinkpad, *selpad;
1810   GstPadLinkReturn ret;
1811
1812   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1813   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1814   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1815
1816   priv = stream->priv;
1817
1818   g_mutex_lock (&priv->lock);
1819   if (priv->is_joined)
1820     goto was_joined;
1821
1822   /* create a session with the same index as the stream */
1823   idx = priv->idx;
1824
1825   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1826
1827   if (!alloc_ports (stream))
1828     goto no_ports;
1829
1830   /* update the dscp qos field in the sinks */
1831   update_dscp_qos (stream);
1832
1833   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1834       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1835     /* For SRTP */
1836     g_signal_connect (rtpbin, "request-rtp-encoder",
1837         (GCallback) request_rtp_encoder, stream);
1838     g_signal_connect (rtpbin, "request-rtcp-encoder",
1839         (GCallback) request_rtcp_encoder, stream);
1840     g_signal_connect (rtpbin, "request-rtcp-decoder",
1841         (GCallback) request_rtcp_decoder, stream);
1842   }
1843
1844   if (priv->rtx_time > 0) {
1845     /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
1846     g_signal_connect (rtpbin, "request-aux-sender",
1847         (GCallback) request_aux_sender, stream);
1848   }
1849
1850   /* get a pad for sending RTP */
1851   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1852   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1853   g_free (name);
1854   /* link the RTP pad to the session manager, it should not really fail unless
1855    * this is not really an RTP pad */
1856   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1857   if (ret != GST_PAD_LINK_OK)
1858     goto link_failed;
1859
1860   /* get pads from the RTP session element for sending and receiving
1861    * RTP/RTCP*/
1862   name = g_strdup_printf ("send_rtp_src_%u", idx);
1863   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1864   g_free (name);
1865   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1866   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1867   g_free (name);
1868   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1869   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1870   g_free (name);
1871   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1872   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1873   g_free (name);
1874
1875   /* get the session */
1876   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1877
1878   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1879       stream);
1880   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1881       stream);
1882   g_signal_connect (priv->session, "on-ssrc-active",
1883       (GCallback) on_ssrc_active, stream);
1884   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1885       stream);
1886   g_signal_connect (priv->session, "on-bye-timeout",
1887       (GCallback) on_bye_timeout, stream);
1888   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1889       stream);
1890
1891   for (i = 0; i < 2; i++) {
1892     GstPad *teepad, *queuepad;
1893     /* For the sender we create this bit of pipeline for both
1894      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1895      * we need to add a queue before appsink to make the pipeline
1896      * not block. For the TCP case, we want to pump data to the
1897      * client as fast as possible anyway.
1898      *
1899      * .--------.      .-----.    .---------.
1900      * | rtpbin |      | tee |    | udpsink |
1901      * |       send->sink   src->sink       |
1902      * '--------'      |     |    '---------'
1903      *                 |     |    .---------.    .---------.
1904      *                 |     |    |  queue  |    | appsink |
1905      *                 |    src->sink      src->sink       |
1906      *                 '-----'    '---------'    '---------'
1907      *
1908      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1909      * udpsink directly to the session.
1910      */
1911     /* add udpsink */
1912     gst_bin_add (bin, priv->udpsink[i]);
1913     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1914
1915     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1916       /* make tee for RTP/RTCP */
1917       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1918       gst_bin_add (bin, priv->tee[i]);
1919
1920       /* and link to rtpbin send pad */
1921       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1922       gst_pad_link (priv->send_src[i], pad);
1923       gst_object_unref (pad);
1924
1925       /* link tee to udpsink */
1926       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1927       gst_pad_link (teepad, sinkpad);
1928       gst_object_unref (teepad);
1929
1930       /* make queue */
1931       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1932       gst_bin_add (bin, priv->appqueue[i]);
1933       /* and link to tee */
1934       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1935       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1936       gst_pad_link (teepad, pad);
1937       gst_object_unref (pad);
1938       gst_object_unref (teepad);
1939
1940       /* make appsink */
1941       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1942       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1943       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1944       gst_bin_add (bin, priv->appsink[i]);
1945       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1946           &sink_cb, stream, NULL);
1947       /* and link to queue */
1948       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1949       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1950       gst_pad_link (queuepad, pad);
1951       gst_object_unref (pad);
1952       gst_object_unref (queuepad);
1953     } else {
1954       /* else only udpsink needed, link it to the session */
1955       gst_pad_link (priv->send_src[i], sinkpad);
1956     }
1957     gst_object_unref (sinkpad);
1958
1959     /* For the receiver we create this bit of pipeline for both
1960      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1961      * and it is all funneled into the rtpbin receive pad.
1962      *
1963      * .--------.     .--------.    .--------.
1964      * | udpsrc |     | funnel |    | rtpbin |
1965      * |       src->sink      src->sink      |
1966      * '--------'     |        |    '--------'
1967      * .--------.     |        |
1968      * | appsrc |     |        |
1969      * |       src->sink       |
1970      * '--------'     '--------'
1971      */
1972     /* make funnel for the RTP/RTCP receivers */
1973     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1974     gst_bin_add (bin, priv->funnel[i]);
1975
1976     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1977     gst_pad_link (pad, priv->recv_sink[i]);
1978     gst_object_unref (pad);
1979
1980     if (priv->udpsrc_v4[i]) {
1981       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1982        * values */
1983       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1984       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1985       /* add udpsrc */
1986       gst_bin_add (bin, priv->udpsrc_v4[i]);
1987
1988       /* and link to the funnel v4 */
1989       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1990       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1991       gst_pad_link (pad, selpad);
1992       gst_object_unref (pad);
1993       gst_object_unref (selpad);
1994     }
1995
1996     if (priv->udpsrc_v6[i]) {
1997       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1998       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1999       gst_bin_add (bin, priv->udpsrc_v6[i]);
2000
2001       /* and link to the funnel v6 */
2002       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2003       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2004       gst_pad_link (pad, selpad);
2005       gst_object_unref (pad);
2006       gst_object_unref (selpad);
2007     }
2008
2009     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2010       /* make and add appsrc */
2011       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2012       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2013       gst_bin_add (bin, priv->appsrc[i]);
2014       /* and link to the funnel */
2015       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2016       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2017       gst_pad_link (pad, selpad);
2018       gst_object_unref (pad);
2019       gst_object_unref (selpad);
2020     }
2021
2022     /* check if we need to set to a special state */
2023     if (state != GST_STATE_NULL) {
2024       if (priv->udpsink[i])
2025         gst_element_set_state (priv->udpsink[i], state);
2026       if (priv->appsink[i])
2027         gst_element_set_state (priv->appsink[i], state);
2028       if (priv->appqueue[i])
2029         gst_element_set_state (priv->appqueue[i], state);
2030       if (priv->tee[i])
2031         gst_element_set_state (priv->tee[i], state);
2032       if (priv->funnel[i])
2033         gst_element_set_state (priv->funnel[i], state);
2034       if (priv->appsrc[i])
2035         gst_element_set_state (priv->appsrc[i], state);
2036     }
2037   }
2038
2039   /* be notified of caps changes */
2040   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2041       (GCallback) caps_notify, stream);
2042
2043   priv->is_joined = TRUE;
2044   g_mutex_unlock (&priv->lock);
2045
2046   return TRUE;
2047
2048   /* ERRORS */
2049 was_joined:
2050   {
2051     g_mutex_unlock (&priv->lock);
2052     return TRUE;
2053   }
2054 no_ports:
2055   {
2056     g_mutex_unlock (&priv->lock);
2057     GST_WARNING ("failed to allocate ports %u", idx);
2058     return FALSE;
2059   }
2060 link_failed:
2061   {
2062     GST_WARNING ("failed to link stream %u", idx);
2063     gst_object_unref (priv->send_rtp_sink);
2064     priv->send_rtp_sink = NULL;
2065     g_mutex_unlock (&priv->lock);
2066     return FALSE;
2067   }
2068 }
2069
2070 /**
2071  * gst_rtsp_stream_leave_bin:
2072  * @stream: a #GstRTSPStream
2073  * @bin: (transfer none): a #GstBin
2074  * @rtpbin: (transfer none): a rtpbin #GstElement
2075  *
2076  * Remove the elements of @stream from @bin.
2077  *
2078  * Return: %TRUE on success.
2079  */
2080 gboolean
2081 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2082     GstElement * rtpbin)
2083 {
2084   GstRTSPStreamPrivate *priv;
2085   gint i;
2086   GList *l;
2087
2088   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2089   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2090   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2091
2092   priv = stream->priv;
2093
2094   g_mutex_lock (&priv->lock);
2095   if (!priv->is_joined)
2096     goto was_not_joined;
2097
2098   /* all transports must be removed by now */
2099   if (priv->transports != NULL)
2100     goto transports_not_removed;
2101
2102   clear_tr_cache (priv);
2103
2104   GST_INFO ("stream %p leaving bin", stream);
2105
2106   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2107   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2108   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2109   gst_object_unref (priv->send_rtp_sink);
2110   priv->send_rtp_sink = NULL;
2111
2112   for (i = 0; i < 2; i++) {
2113     if (priv->udpsink[i])
2114       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2115     if (priv->appsink[i])
2116       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2117     if (priv->appqueue[i])
2118       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2119     if (priv->tee[i])
2120       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2121     if (priv->funnel[i])
2122       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2123     if (priv->appsrc[i])
2124       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2125     if (priv->udpsrc_v4[i]) {
2126       /* and set udpsrc to NULL now before removing */
2127       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2128       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2129       /* removing them should also nicely release the request
2130        * pads when they finalize */
2131       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2132     }
2133     if (priv->udpsrc_v6[i]) {
2134       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2135       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2136       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2137     }
2138
2139     for (l = priv->transport_sources; l; l = l->next) {
2140       GstRTSPMulticastTransportSource *s = l->data;
2141
2142       if (!s->udpsrc[i])
2143         continue;
2144
2145       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2146       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2147       gst_bin_remove (bin, s->udpsrc[i]);
2148     }
2149
2150     if (priv->udpsink[i])
2151       gst_bin_remove (bin, priv->udpsink[i]);
2152     if (priv->appsrc[i])
2153       gst_bin_remove (bin, priv->appsrc[i]);
2154     if (priv->appsink[i])
2155       gst_bin_remove (bin, priv->appsink[i]);
2156     if (priv->appqueue[i])
2157       gst_bin_remove (bin, priv->appqueue[i]);
2158     if (priv->tee[i])
2159       gst_bin_remove (bin, priv->tee[i]);
2160     if (priv->funnel[i])
2161       gst_bin_remove (bin, priv->funnel[i]);
2162
2163     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2164     gst_object_unref (priv->recv_sink[i]);
2165     priv->recv_sink[i] = NULL;
2166
2167     priv->udpsrc_v4[i] = NULL;
2168     priv->udpsrc_v6[i] = NULL;
2169     priv->udpsink[i] = NULL;
2170     priv->appsrc[i] = NULL;
2171     priv->appsink[i] = NULL;
2172     priv->appqueue[i] = NULL;
2173     priv->tee[i] = NULL;
2174     priv->funnel[i] = NULL;
2175   }
2176
2177   for (l = priv->transport_sources; l; l = l->next) {
2178     GstRTSPMulticastTransportSource *s = l->data;
2179     g_slice_free (GstRTSPMulticastTransportSource, s);
2180   }
2181   g_list_free (priv->transport_sources);
2182   priv->transport_sources = NULL;
2183
2184   gst_object_unref (priv->send_src[0]);
2185   priv->send_src[0] = NULL;
2186
2187   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2188   gst_object_unref (priv->send_src[1]);
2189   priv->send_src[1] = NULL;
2190
2191   g_object_unref (priv->session);
2192   priv->session = NULL;
2193   if (priv->caps)
2194     gst_caps_unref (priv->caps);
2195   priv->caps = NULL;
2196
2197   if (priv->srtpenc)
2198     gst_object_unref (priv->srtpenc);
2199   if (priv->srtpdec)
2200     gst_object_unref (priv->srtpdec);
2201
2202   priv->is_joined = FALSE;
2203   g_mutex_unlock (&priv->lock);
2204
2205   return TRUE;
2206
2207 was_not_joined:
2208   {
2209     g_mutex_unlock (&priv->lock);
2210     return TRUE;
2211   }
2212 transports_not_removed:
2213   {
2214     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2215     g_mutex_unlock (&priv->lock);
2216     return FALSE;
2217   }
2218 }
2219
2220 /**
2221  * gst_rtsp_stream_get_rtpinfo:
2222  * @stream: a #GstRTSPStream
2223  * @rtptime: (allow-none): result RTP timestamp
2224  * @seq: (allow-none): result RTP seqnum
2225  * @clock_rate: (allow-none): the clock rate
2226  * @running_time: (allow-none): result running-time
2227  *
2228  * Retrieve the current rtptime, seq and running-time. This is used to
2229  * construct a RTPInfo reply header.
2230  *
2231  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2232  */
2233 gboolean
2234 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2235     guint * rtptime, guint * seq, guint * clock_rate,
2236     GstClockTime * running_time)
2237 {
2238   GstRTSPStreamPrivate *priv;
2239   GstStructure *stats;
2240   GObjectClass *payobjclass;
2241
2242   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2243
2244   priv = stream->priv;
2245
2246   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2247
2248   g_mutex_lock (&priv->lock);
2249
2250   if (g_object_class_find_property (payobjclass, "stats")) {
2251     g_object_get (priv->payloader, "stats", &stats, NULL);
2252     if (stats == NULL)
2253       goto no_stats;
2254
2255     if (seq)
2256       gst_structure_get_uint (stats, "seqnum", seq);
2257
2258     if (rtptime)
2259       gst_structure_get_uint (stats, "timestamp", rtptime);
2260
2261     if (running_time)
2262       gst_structure_get_clock_time (stats, "running-time", running_time);
2263
2264     if (clock_rate) {
2265       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2266       if (*clock_rate == 0 && running_time)
2267         *running_time = GST_CLOCK_TIME_NONE;
2268     }
2269     gst_structure_free (stats);
2270   } else {
2271     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2272         !g_object_class_find_property (payobjclass, "timestamp"))
2273       goto no_stats;
2274
2275     if (seq)
2276       g_object_get (priv->payloader, "seqnum", seq, NULL);
2277
2278     if (rtptime)
2279       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2280
2281     if (running_time)
2282       *running_time = GST_CLOCK_TIME_NONE;
2283   }
2284   g_mutex_unlock (&priv->lock);
2285
2286   return TRUE;
2287
2288   /* ERRORS */
2289 no_stats:
2290   {
2291     GST_WARNING ("Could not get payloader stats");
2292     g_mutex_unlock (&priv->lock);
2293     return FALSE;
2294   }
2295 }
2296
2297 /**
2298  * gst_rtsp_stream_get_caps:
2299  * @stream: a #GstRTSPStream
2300  *
2301  * Retrieve the current caps of @stream.
2302  *
2303  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2304  * after usage.
2305  */
2306 GstCaps *
2307 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2308 {
2309   GstRTSPStreamPrivate *priv;
2310   GstCaps *result;
2311
2312   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2313
2314   priv = stream->priv;
2315
2316   g_mutex_lock (&priv->lock);
2317   if ((result = priv->caps))
2318     gst_caps_ref (result);
2319   g_mutex_unlock (&priv->lock);
2320
2321   return result;
2322 }
2323
2324 /**
2325  * gst_rtsp_stream_recv_rtp:
2326  * @stream: a #GstRTSPStream
2327  * @buffer: (transfer full): a #GstBuffer
2328  *
2329  * Handle an RTP buffer for the stream. This method is usually called when a
2330  * message has been received from a client using the TCP transport.
2331  *
2332  * This function takes ownership of @buffer.
2333  *
2334  * Returns: a GstFlowReturn.
2335  */
2336 GstFlowReturn
2337 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2338 {
2339   GstRTSPStreamPrivate *priv;
2340   GstFlowReturn ret;
2341   GstElement *element;
2342
2343   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2344   priv = stream->priv;
2345   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2346   g_return_val_if_fail (priv->is_joined, FALSE);
2347
2348   g_mutex_lock (&priv->lock);
2349   if (priv->appsrc[0])
2350     element = gst_object_ref (priv->appsrc[0]);
2351   else
2352     element = NULL;
2353   g_mutex_unlock (&priv->lock);
2354
2355   if (element) {
2356     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2357     gst_object_unref (element);
2358   } else {
2359     ret = GST_FLOW_OK;
2360   }
2361   return ret;
2362 }
2363
2364 /**
2365  * gst_rtsp_stream_recv_rtcp:
2366  * @stream: a #GstRTSPStream
2367  * @buffer: (transfer full): a #GstBuffer
2368  *
2369  * Handle an RTCP buffer for the stream. This method is usually called when a
2370  * message has been received from a client using the TCP transport.
2371  *
2372  * This function takes ownership of @buffer.
2373  *
2374  * Returns: a GstFlowReturn.
2375  */
2376 GstFlowReturn
2377 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2378 {
2379   GstRTSPStreamPrivate *priv;
2380   GstFlowReturn ret;
2381   GstElement *element;
2382
2383   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2384   priv = stream->priv;
2385   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2386
2387   if (!priv->is_joined) {
2388     gst_buffer_unref (buffer);
2389     return GST_FLOW_NOT_LINKED;
2390   }
2391   g_mutex_lock (&priv->lock);
2392   if (priv->appsrc[1])
2393     element = gst_object_ref (priv->appsrc[1]);
2394   else
2395     element = NULL;
2396   g_mutex_unlock (&priv->lock);
2397
2398   if (element) {
2399     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2400     gst_object_unref (element);
2401   } else {
2402     ret = GST_FLOW_OK;
2403     gst_buffer_unref (buffer);
2404   }
2405   return ret;
2406 }
2407
2408 /* must be called with lock */
2409 static gboolean
2410 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2411     gboolean add)
2412 {
2413   GstRTSPStreamPrivate *priv = stream->priv;
2414   const GstRTSPTransport *tr;
2415
2416   tr = gst_rtsp_stream_transport_get_transport (trans);
2417
2418   switch (tr->lower_transport) {
2419     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2420     {
2421       GstRTSPMulticastTransportSource *source;
2422       GstBin *bin;
2423
2424       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2425
2426       if (add) {
2427         gchar *host;
2428         gint i;
2429         GstPad *selpad, *pad;
2430
2431         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2432         source->transport = trans;
2433
2434         for (i = 0; i < 2; i++) {
2435           host =
2436               g_strdup_printf ("udp://%s:%d", tr->destination,
2437               (i == 0) ? tr->port.min : tr->port.max);
2438           source->udpsrc[i] =
2439               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2440           g_free (host);
2441
2442           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2443            * values */
2444           gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2445           gst_element_set_locked_state (source->udpsrc[i], TRUE);
2446           /* add udpsrc */
2447           gst_bin_add (bin, source->udpsrc[i]);
2448
2449           /* and link to the funnel v4 */
2450           source->selpad[i] = selpad =
2451               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2452           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2453           gst_pad_link (pad, selpad);
2454           gst_object_unref (pad);
2455           gst_object_unref (selpad);
2456         }
2457         gst_object_unref (bin);
2458
2459         priv->transport_sources =
2460             g_list_prepend (priv->transport_sources, source);
2461       } else {
2462         GList *l;
2463
2464         for (l = priv->transport_sources; l; l = l->next) {
2465           source = l->data;
2466
2467           if (source->transport == trans) {
2468             priv->transport_sources =
2469                 g_list_delete_link (priv->transport_sources, l);
2470             break;
2471           }
2472         }
2473
2474         if (l != NULL) {
2475           gint i;
2476
2477           for (i = 0; i < 2; i++) {
2478             /* Will automatically unlink everything */
2479             gst_bin_remove (bin,
2480                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2481
2482             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2483             gst_object_unref (source->udpsrc[i]);
2484
2485             gst_element_release_request_pad (priv->funnel[i],
2486                 source->selpad[i]);
2487           }
2488
2489           g_slice_free (GstRTSPMulticastTransportSource, source);
2490         }
2491       }
2492
2493       /* fall through for the generic case */
2494     }
2495     case GST_RTSP_LOWER_TRANS_UDP:
2496     {
2497       gchar *dest;
2498       gint min, max;
2499       guint ttl = 0;
2500
2501       dest = tr->destination;
2502       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2503         min = tr->port.min;
2504         max = tr->port.max;
2505         ttl = tr->ttl;
2506       } else {
2507         min = tr->client_port.min;
2508         max = tr->client_port.max;
2509       }
2510
2511       if (add) {
2512         if (ttl > 0) {
2513           GST_INFO ("setting ttl-mc %d", ttl);
2514           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2515           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2516         }
2517         GST_INFO ("adding %s:%d-%d", dest, min, max);
2518         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2519         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2520         priv->transports = g_list_prepend (priv->transports, trans);
2521       } else {
2522         GST_INFO ("removing %s:%d-%d", dest, min, max);
2523         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2524         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2525         priv->transports = g_list_remove (priv->transports, trans);
2526       }
2527       priv->transports_cookie++;
2528       break;
2529     }
2530     case GST_RTSP_LOWER_TRANS_TCP:
2531       if (add) {
2532         GST_INFO ("adding TCP %s", tr->destination);
2533         priv->transports = g_list_prepend (priv->transports, trans);
2534       } else {
2535         GST_INFO ("removing TCP %s", tr->destination);
2536         priv->transports = g_list_remove (priv->transports, trans);
2537       }
2538       priv->transports_cookie++;
2539       break;
2540     default:
2541       goto unknown_transport;
2542   }
2543   return TRUE;
2544
2545   /* ERRORS */
2546 unknown_transport:
2547   {
2548     GST_INFO ("Unknown transport %d", tr->lower_transport);
2549     return FALSE;
2550   }
2551 }
2552
2553
2554 /**
2555  * gst_rtsp_stream_add_transport:
2556  * @stream: a #GstRTSPStream
2557  * @trans: (transfer none): a #GstRTSPStreamTransport
2558  *
2559  * Add the transport in @trans to @stream. The media of @stream will
2560  * then also be send to the values configured in @trans.
2561  *
2562  * @stream must be joined to a bin.
2563  *
2564  * @trans must contain a valid #GstRTSPTransport.
2565  *
2566  * Returns: %TRUE if @trans was added
2567  */
2568 gboolean
2569 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2570     GstRTSPStreamTransport * trans)
2571 {
2572   GstRTSPStreamPrivate *priv;
2573   gboolean res;
2574
2575   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2576   priv = stream->priv;
2577   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2578   g_return_val_if_fail (priv->is_joined, FALSE);
2579
2580   g_mutex_lock (&priv->lock);
2581   res = update_transport (stream, trans, TRUE);
2582   g_mutex_unlock (&priv->lock);
2583
2584   return res;
2585 }
2586
2587 /**
2588  * gst_rtsp_stream_remove_transport:
2589  * @stream: a #GstRTSPStream
2590  * @trans: (transfer none): a #GstRTSPStreamTransport
2591  *
2592  * Remove the transport in @trans from @stream. The media of @stream will
2593  * not be sent to the values configured in @trans.
2594  *
2595  * @stream must be joined to a bin.
2596  *
2597  * @trans must contain a valid #GstRTSPTransport.
2598  *
2599  * Returns: %TRUE if @trans was removed
2600  */
2601 gboolean
2602 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2603     GstRTSPStreamTransport * trans)
2604 {
2605   GstRTSPStreamPrivate *priv;
2606   gboolean res;
2607
2608   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2609   priv = stream->priv;
2610   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2611   g_return_val_if_fail (priv->is_joined, FALSE);
2612
2613   g_mutex_lock (&priv->lock);
2614   res = update_transport (stream, trans, FALSE);
2615   g_mutex_unlock (&priv->lock);
2616
2617   return res;
2618 }
2619
2620 /**
2621  * gst_rtsp_stream_update_crypto:
2622  * @stream: a #GstRTSPStream
2623  * @ssrc: the SSRC
2624  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2625  *
2626  * Update the new crypto information for @ssrc in @stream. If information
2627  * for @ssrc did not exist, it will be added. If information
2628  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2629  * be removed from @stream.
2630  *
2631  * Returns: %TRUE if @crypto could be updated
2632  */
2633 gboolean
2634 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2635     guint ssrc, GstCaps * crypto)
2636 {
2637   GstRTSPStreamPrivate *priv;
2638
2639   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2640   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2641
2642   priv = stream->priv;
2643
2644   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2645
2646   g_mutex_lock (&priv->lock);
2647   if (crypto)
2648     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2649         gst_caps_ref (crypto));
2650   else
2651     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2652   g_mutex_unlock (&priv->lock);
2653
2654   return TRUE;
2655 }
2656
2657 /**
2658  * gst_rtsp_stream_get_rtp_socket:
2659  * @stream: a #GstRTSPStream
2660  * @family: the socket family
2661  *
2662  * Get the RTP socket from @stream for a @family.
2663  *
2664  * @stream must be joined to a bin.
2665  *
2666  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2667  * socket could be allocated for @family. Unref after usage
2668  */
2669 GSocket *
2670 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2671 {
2672   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2673   GSocket *socket;
2674   const gchar *name;
2675
2676   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2677   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2678       family == G_SOCKET_FAMILY_IPV6, NULL);
2679   g_return_val_if_fail (priv->udpsink[0], NULL);
2680
2681   if (family == G_SOCKET_FAMILY_IPV6)
2682     name = "socket-v6";
2683   else
2684     name = "socket";
2685
2686   g_object_get (priv->udpsink[0], name, &socket, NULL);
2687
2688   return socket;
2689 }
2690
2691 /**
2692  * gst_rtsp_stream_get_rtcp_socket:
2693  * @stream: a #GstRTSPStream
2694  * @family: the socket family
2695  *
2696  * Get the RTCP socket from @stream for a @family.
2697  *
2698  * @stream must be joined to a bin.
2699  *
2700  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2701  * socket could be allocated for @family. Unref after usage
2702  */
2703 GSocket *
2704 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2705 {
2706   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2707   GSocket *socket;
2708   const gchar *name;
2709
2710   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2711   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2712       family == G_SOCKET_FAMILY_IPV6, NULL);
2713   g_return_val_if_fail (priv->udpsink[1], NULL);
2714
2715   if (family == G_SOCKET_FAMILY_IPV6)
2716     name = "socket-v6";
2717   else
2718     name = "socket";
2719
2720   g_object_get (priv->udpsink[1], name, &socket, NULL);
2721
2722   return socket;
2723 }
2724
2725 /**
2726  * gst_rtsp_stream_set_seqnum:
2727  * @stream: a #GstRTSPStream
2728  * @seqnum: a new sequence number
2729  *
2730  * Configure the sequence number in the payloader of @stream to @seqnum.
2731  */
2732 void
2733 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
2734 {
2735   GstRTSPStreamPrivate *priv;
2736
2737   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2738
2739   priv = stream->priv;
2740
2741   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
2742 }
2743
2744 /**
2745  * gst_rtsp_stream_get_seqnum:
2746  * @stream: a #GstRTSPStream
2747  *
2748  * Get the configured sequence number in the payloader of @stream.
2749  *
2750  * Returns: the sequence number of the payloader.
2751  */
2752 guint16
2753 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
2754 {
2755   GstRTSPStreamPrivate *priv;
2756   guint seqnum;
2757
2758   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2759
2760   priv = stream->priv;
2761
2762   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
2763
2764   return seqnum;
2765 }
2766
2767 /**
2768  * gst_rtsp_stream_transport_filter:
2769  * @stream: a #GstRTSPStream
2770  * @func: (scope call) (allow-none): a callback
2771  * @user_data: (closure): user data passed to @func
2772  *
2773  * Call @func for each transport managed by @stream. The result value of @func
2774  * determines what happens to the transport. @func will be called with @stream
2775  * locked so no further actions on @stream can be performed from @func.
2776  *
2777  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2778  * @stream.
2779  *
2780  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2781  *
2782  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2783  * will also be added with an additional ref to the result #GList of this
2784  * function..
2785  *
2786  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2787  *
2788  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2789  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2790  * element in the #GList should be unreffed before the list is freed.
2791  */
2792 GList *
2793 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2794     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2795 {
2796   GstRTSPStreamPrivate *priv;
2797   GList *result, *walk, *next;
2798   GHashTable *visited;
2799   guint cookie;
2800
2801   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2802
2803   priv = stream->priv;
2804
2805   result = NULL;
2806   if (func)
2807     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
2808
2809   g_mutex_lock (&priv->lock);
2810 restart:
2811   cookie = priv->transports_cookie;
2812   for (walk = priv->transports; walk; walk = next) {
2813     GstRTSPStreamTransport *trans = walk->data;
2814     GstRTSPFilterResult res;
2815     gboolean changed;
2816
2817     next = g_list_next (walk);
2818
2819     if (func) {
2820       /* only visit each transport once */
2821       if (g_hash_table_contains (visited, trans))
2822         continue;
2823
2824       g_hash_table_add (visited, g_object_ref (trans));
2825       g_mutex_unlock (&priv->lock);
2826
2827       res = func (stream, trans, user_data);
2828
2829       g_mutex_lock (&priv->lock);
2830     } else
2831       res = GST_RTSP_FILTER_REF;
2832
2833     changed = (cookie != priv->transports_cookie);
2834
2835     switch (res) {
2836       case GST_RTSP_FILTER_REMOVE:
2837         update_transport (stream, trans, FALSE);
2838         break;
2839       case GST_RTSP_FILTER_REF:
2840         result = g_list_prepend (result, g_object_ref (trans));
2841         break;
2842       case GST_RTSP_FILTER_KEEP:
2843       default:
2844         break;
2845     }
2846     if (changed)
2847       goto restart;
2848   }
2849   g_mutex_unlock (&priv->lock);
2850
2851   if (func)
2852     g_hash_table_unref (visited);
2853
2854   return result;
2855 }
2856
2857 static GstPadProbeReturn
2858 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2859 {
2860   GstRTSPStreamPrivate *priv;
2861   GstRTSPStream *stream;
2862
2863   stream = user_data;
2864   priv = stream->priv;
2865
2866   GST_DEBUG_OBJECT (pad, "now blocking");
2867
2868   g_mutex_lock (&priv->lock);
2869   priv->blocking = TRUE;
2870   g_mutex_unlock (&priv->lock);
2871
2872   gst_element_post_message (priv->payloader,
2873       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2874           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2875
2876   return GST_PAD_PROBE_OK;
2877 }
2878
2879 /**
2880  * gst_rtsp_stream_set_blocked:
2881  * @stream: a #GstRTSPStream
2882  * @blocked: boolean indicating we should block or unblock
2883  *
2884  * Blocks or unblocks the dataflow on @stream.
2885  *
2886  * Returns: %TRUE on success
2887  */
2888 gboolean
2889 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2890 {
2891   GstRTSPStreamPrivate *priv;
2892
2893   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2894
2895   priv = stream->priv;
2896
2897   g_mutex_lock (&priv->lock);
2898   if (blocked) {
2899     priv->blocking = FALSE;
2900     if (priv->blocked_id == 0) {
2901       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2902           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2903           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2904           g_object_ref (stream), g_object_unref);
2905     }
2906   } else {
2907     if (priv->blocked_id != 0) {
2908       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2909       priv->blocked_id = 0;
2910       priv->blocking = FALSE;
2911     }
2912   }
2913   g_mutex_unlock (&priv->lock);
2914
2915   return TRUE;
2916 }
2917
2918 /**
2919  * gst_rtsp_stream_is_blocking:
2920  * @stream: a #GstRTSPStream
2921  *
2922  * Check if @stream is blocking on a #GstBuffer.
2923  *
2924  * Returns: %TRUE if @stream is blocking
2925  */
2926 gboolean
2927 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2928 {
2929   GstRTSPStreamPrivate *priv;
2930   gboolean result;
2931
2932   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2933
2934   priv = stream->priv;
2935
2936   g_mutex_lock (&priv->lock);
2937   result = priv->blocking;
2938   g_mutex_unlock (&priv->lock);
2939
2940   return result;
2941 }
2942
2943 /**
2944  * gst_rtsp_stream_query_position:
2945  * @stream: a #GstRTSPStream
2946  *
2947  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
2948  * the RTP parts of the pipeline and not the RTCP parts.
2949  *
2950  * Returns: %TRUE if the position could be queried
2951  */
2952 gboolean
2953 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
2954 {
2955   GstRTSPStreamPrivate *priv;
2956   GstElement *sink;
2957   gboolean ret;
2958
2959   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2960
2961   priv = stream->priv;
2962
2963   g_mutex_lock (&priv->lock);
2964   if ((sink = priv->udpsink[0]))
2965     gst_object_ref (sink);
2966   g_mutex_unlock (&priv->lock);
2967
2968   if (!sink)
2969     return FALSE;
2970
2971   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
2972   gst_object_unref (sink);
2973
2974   return ret;
2975 }
2976
2977 /**
2978  * gst_rtsp_stream_query_stop:
2979  * @stream: a #GstRTSPStream
2980  *
2981  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
2982  * the RTP parts of the pipeline and not the RTCP parts.
2983  *
2984  * Returns: %TRUE if the stop could be queried
2985  */
2986 gboolean
2987 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
2988 {
2989   GstRTSPStreamPrivate *priv;
2990   GstElement *sink;
2991   GstQuery *query;
2992   gboolean ret;
2993
2994   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2995
2996   priv = stream->priv;
2997
2998   g_mutex_lock (&priv->lock);
2999   if ((sink = priv->udpsink[0]))
3000     gst_object_ref (sink);
3001   g_mutex_unlock (&priv->lock);
3002
3003   if (!sink)
3004     return FALSE;
3005
3006   query = gst_query_new_segment (GST_FORMAT_TIME);
3007   if ((ret = gst_element_query (sink, query))) {
3008     GstFormat format;
3009
3010     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3011     if (format != GST_FORMAT_TIME)
3012       *stop = -1;
3013   }
3014   gst_query_unref (query);
3015   gst_object_unref (sink);
3016
3017   return ret;
3018
3019 }