rtsp-stream: RTCP and RTP transport cache cookies seperated
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 typedef struct
62 {
63   GstRTSPStreamTransport *transport;
64
65   /* RTP and RTCP source */
66   GstElement *udpsrc[2];
67   GstPad *selpad[2];
68 } GstRTSPMulticastTransportSource;
69
70 struct _GstRTSPStreamPrivate
71 {
72   GMutex lock;
73   guint idx;
74   GstPad *srcpad;
75   GstElement *payloader;
76   guint buffer_size;
77   gboolean is_joined;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_sink[2];
86   GstPad *send_src[2];
87
88   /* the RTPSession object */
89   GObject *session;
90
91   /* SRTP encoder/decoder */
92   GstElement *srtpenc;
93   GstElement *srtpdec;
94   GHashTable *keys;
95
96   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
97    * sockets */
98   GstElement *udpsrc_v4[2];
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
101    * sockets */
102   GstElement *udpsrc_v6[2];
103
104   GstElement *udpsink[2];
105
106   /* for TCP transport */
107   GstElement *appsrc[2];
108   GstElement *appqueue[2];
109   GstElement *appsink[2];
110
111   GstElement *tee[2];
112   GstElement *funnel[2];
113
114   /* retransmission */
115   GstElement *rtxsend;
116   guint rtx_pt;
117   GstClockTime rtx_time;
118
119   /* server ports for sending/receiving over ipv4 */
120   GstRTSPRange server_port_v4;
121   GstRTSPAddress *server_addr_v4;
122   gboolean have_ipv4;
123
124   /* server ports for sending/receiving over ipv6 */
125   GstRTSPRange server_port_v6;
126   GstRTSPAddress *server_addr_v6;
127   gboolean have_ipv6;
128
129   /* multicast addresses */
130   GstRTSPAddressPool *pool;
131   GstRTSPAddress *addr_v4;
132   GstRTSPAddress *addr_v6;
133
134   /* the caps of the stream */
135   gulong caps_sig;
136   GstCaps *caps;
137
138   /* transports we stream to */
139   guint n_active;
140   GList *transports;
141   guint transports_cookie;
142   GList *tr_cache_rtp;
143   GList *tr_cache_rtcp;
144   guint tr_cache_cookie_rtp;
145   guint tr_cache_cookie_rtcp;
146
147
148   /* UDP sources for UDP multicast transports */
149   GList *transport_sources;
150
151   gint dscp_qos;
152
153   /* stream blocking */
154   gulong blocked_id;
155   gboolean blocking;
156 };
157
158 #define DEFAULT_CONTROL         NULL
159 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
160 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
161                                         GST_RTSP_LOWER_TRANS_TCP
162
163 enum
164 {
165   PROP_0,
166   PROP_CONTROL,
167   PROP_PROFILES,
168   PROP_PROTOCOLS,
169   PROP_LAST
170 };
171
172 enum
173 {
174   SIGNAL_NEW_RTP_ENCODER,
175   SIGNAL_NEW_RTCP_ENCODER,
176   SIGNAL_LAST
177 };
178
179 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
180 #define GST_CAT_DEFAULT rtsp_stream_debug
181
182 static GQuark ssrc_stream_map_key;
183
184 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
185     GValue * value, GParamSpec * pspec);
186 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
187     const GValue * value, GParamSpec * pspec);
188
189 static void gst_rtsp_stream_finalize (GObject * obj);
190
191 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
192
193 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
194
195 static void
196 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
197 {
198   GObjectClass *gobject_class;
199
200   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
201
202   gobject_class = G_OBJECT_CLASS (klass);
203
204   gobject_class->get_property = gst_rtsp_stream_get_property;
205   gobject_class->set_property = gst_rtsp_stream_set_property;
206   gobject_class->finalize = gst_rtsp_stream_finalize;
207
208   g_object_class_install_property (gobject_class, PROP_CONTROL,
209       g_param_spec_string ("control", "Control",
210           "The control string for this stream", DEFAULT_CONTROL,
211           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
212
213   g_object_class_install_property (gobject_class, PROP_PROFILES,
214       g_param_spec_flags ("profiles", "Profiles",
215           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
216           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
217
218   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
219       g_param_spec_flags ("protocols", "Protocols",
220           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
221           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222
223   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
224       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
225       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
226       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
227
228   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
229       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
230       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
231       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
232
233   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
234
235   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
236 }
237
238 static void
239 gst_rtsp_stream_init (GstRTSPStream * stream)
240 {
241   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
242
243   GST_DEBUG ("new stream %p", stream);
244
245   stream->priv = priv;
246
247   priv->dscp_qos = -1;
248   priv->control = g_strdup (DEFAULT_CONTROL);
249   priv->profiles = DEFAULT_PROFILES;
250   priv->protocols = DEFAULT_PROTOCOLS;
251
252   g_mutex_init (&priv->lock);
253
254   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
255       NULL, (GDestroyNotify) gst_caps_unref);
256 }
257
258 static void
259 gst_rtsp_stream_finalize (GObject * obj)
260 {
261   GstRTSPStream *stream;
262   GstRTSPStreamPrivate *priv;
263
264   stream = GST_RTSP_STREAM (obj);
265   priv = stream->priv;
266
267   GST_DEBUG ("finalize stream %p", stream);
268
269   /* we really need to be unjoined now */
270   g_return_if_fail (!priv->is_joined);
271
272   if (priv->addr_v4)
273     gst_rtsp_address_free (priv->addr_v4);
274   if (priv->addr_v6)
275     gst_rtsp_address_free (priv->addr_v6);
276   if (priv->server_addr_v4)
277     gst_rtsp_address_free (priv->server_addr_v4);
278   if (priv->server_addr_v6)
279     gst_rtsp_address_free (priv->server_addr_v6);
280   if (priv->pool)
281     g_object_unref (priv->pool);
282   if (priv->rtxsend)
283     g_object_unref (priv->rtxsend);
284
285   gst_object_unref (priv->payloader);
286   gst_object_unref (priv->srcpad);
287   g_free (priv->control);
288   g_mutex_clear (&priv->lock);
289
290   g_hash_table_unref (priv->keys);
291
292   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
293 }
294
295 static void
296 gst_rtsp_stream_get_property (GObject * object, guint propid,
297     GValue * value, GParamSpec * pspec)
298 {
299   GstRTSPStream *stream = GST_RTSP_STREAM (object);
300
301   switch (propid) {
302     case PROP_CONTROL:
303       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
304       break;
305     case PROP_PROFILES:
306       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
307       break;
308     case PROP_PROTOCOLS:
309       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
310       break;
311     default:
312       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
313   }
314 }
315
316 static void
317 gst_rtsp_stream_set_property (GObject * object, guint propid,
318     const GValue * value, GParamSpec * pspec)
319 {
320   GstRTSPStream *stream = GST_RTSP_STREAM (object);
321
322   switch (propid) {
323     case PROP_CONTROL:
324       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
325       break;
326     case PROP_PROFILES:
327       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
328       break;
329     case PROP_PROTOCOLS:
330       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
331       break;
332     default:
333       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
334   }
335 }
336
337 /**
338  * gst_rtsp_stream_new:
339  * @idx: an index
340  * @srcpad: a #GstPad
341  * @payloader: a #GstElement
342  *
343  * Create a new media stream with index @idx that handles RTP data on
344  * @srcpad and has a payloader element @payloader.
345  *
346  * Returns: (transfer full): a new #GstRTSPStream
347  */
348 GstRTSPStream *
349 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
350 {
351   GstRTSPStreamPrivate *priv;
352   GstRTSPStream *stream;
353
354   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
355   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
356   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
357
358   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
359   priv = stream->priv;
360   priv->idx = idx;
361   priv->payloader = gst_object_ref (payloader);
362   priv->srcpad = gst_object_ref (srcpad);
363
364   return stream;
365 }
366
367 /**
368  * gst_rtsp_stream_get_index:
369  * @stream: a #GstRTSPStream
370  *
371  * Get the stream index.
372  *
373  * Return: the stream index.
374  */
375 guint
376 gst_rtsp_stream_get_index (GstRTSPStream * stream)
377 {
378   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
379
380   return stream->priv->idx;
381 }
382
383 /**
384  * gst_rtsp_stream_get_pt:
385  * @stream: a #GstRTSPStream
386  *
387  * Get the stream payload type.
388  *
389  * Return: the stream payload type.
390  */
391 guint
392 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
393 {
394   GstRTSPStreamPrivate *priv;
395   guint pt;
396
397   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
398
399   priv = stream->priv;
400
401   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
402
403   return pt;
404 }
405
406 /**
407  * gst_rtsp_stream_get_srcpad:
408  * @stream: a #GstRTSPStream
409  *
410  * Get the srcpad associated with @stream.
411  *
412  * Returns: (transfer full): the srcpad. Unref after usage.
413  */
414 GstPad *
415 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
416 {
417   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
418
419   return gst_object_ref (stream->priv->srcpad);
420 }
421
422 /**
423  * gst_rtsp_stream_get_control:
424  * @stream: a #GstRTSPStream
425  *
426  * Get the control string to identify this stream.
427  *
428  * Returns: (transfer full): the control string. g_free() after usage.
429  */
430 gchar *
431 gst_rtsp_stream_get_control (GstRTSPStream * stream)
432 {
433   GstRTSPStreamPrivate *priv;
434   gchar *result;
435
436   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
437
438   priv = stream->priv;
439
440   g_mutex_lock (&priv->lock);
441   if ((result = g_strdup (priv->control)) == NULL)
442     result = g_strdup_printf ("stream=%u", priv->idx);
443   g_mutex_unlock (&priv->lock);
444
445   return result;
446 }
447
448 /**
449  * gst_rtsp_stream_set_control:
450  * @stream: a #GstRTSPStream
451  * @control: a control string
452  *
453  * Set the control string in @stream.
454  */
455 void
456 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
457 {
458   GstRTSPStreamPrivate *priv;
459
460   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
461
462   priv = stream->priv;
463
464   g_mutex_lock (&priv->lock);
465   g_free (priv->control);
466   priv->control = g_strdup (control);
467   g_mutex_unlock (&priv->lock);
468 }
469
470 /**
471  * gst_rtsp_stream_has_control:
472  * @stream: a #GstRTSPStream
473  * @control: a control string
474  *
475  * Check if @stream has the control string @control.
476  *
477  * Returns: %TRUE is @stream has @control as the control string
478  */
479 gboolean
480 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
481 {
482   GstRTSPStreamPrivate *priv;
483   gboolean res;
484
485   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
486
487   priv = stream->priv;
488
489   g_mutex_lock (&priv->lock);
490   if (priv->control)
491     res = (g_strcmp0 (priv->control, control) == 0);
492   else {
493     guint streamid;
494
495     if (sscanf (control, "stream=%u", &streamid) > 0)
496       res = (streamid == priv->idx);
497     else
498       res = FALSE;
499   }
500   g_mutex_unlock (&priv->lock);
501
502   return res;
503 }
504
505 /**
506  * gst_rtsp_stream_set_mtu:
507  * @stream: a #GstRTSPStream
508  * @mtu: a new MTU
509  *
510  * Configure the mtu in the payloader of @stream to @mtu.
511  */
512 void
513 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
514 {
515   GstRTSPStreamPrivate *priv;
516
517   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
518
519   priv = stream->priv;
520
521   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
522
523   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
524 }
525
526 /**
527  * gst_rtsp_stream_get_mtu:
528  * @stream: a #GstRTSPStream
529  *
530  * Get the configured MTU in the payloader of @stream.
531  *
532  * Returns: the MTU of the payloader.
533  */
534 guint
535 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
536 {
537   GstRTSPStreamPrivate *priv;
538   guint mtu;
539
540   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
541
542   priv = stream->priv;
543
544   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
545
546   return mtu;
547 }
548
549 /* Update the dscp qos property on the udp sinks */
550 static void
551 update_dscp_qos (GstRTSPStream * stream)
552 {
553   GstRTSPStreamPrivate *priv;
554
555   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
556
557   priv = stream->priv;
558
559   if (priv->udpsink[0]) {
560     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
561         NULL);
562   }
563
564   if (priv->udpsink[1]) {
565     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
566         NULL);
567   }
568 }
569
570 /**
571  * gst_rtsp_stream_set_dscp_qos:
572  * @stream: a #GstRTSPStream
573  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
574  *
575  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
576  */
577 void
578 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
579 {
580   GstRTSPStreamPrivate *priv;
581
582   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
583
584   priv = stream->priv;
585
586   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
587
588   if (dscp_qos < -1 || dscp_qos > 63) {
589     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
590     return;
591   }
592
593   priv->dscp_qos = dscp_qos;
594
595   update_dscp_qos (stream);
596 }
597
598 /**
599  * gst_rtsp_stream_get_dscp_qos:
600  * @stream: a #GstRTSPStream
601  *
602  * Get the configured DSCP QoS in of the outgoing sockets.
603  *
604  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
605  */
606 gint
607 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
608 {
609   GstRTSPStreamPrivate *priv;
610
611   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
612
613   priv = stream->priv;
614
615   return priv->dscp_qos;
616 }
617
618 /**
619  * gst_rtsp_stream_is_transport_supported:
620  * @stream: a #GstRTSPStream
621  * @transport: (transfer none): a #GstRTSPTransport
622  *
623  * Check if @transport can be handled by stream
624  *
625  * Returns: %TRUE if @transport can be handled by @stream.
626  */
627 gboolean
628 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
629     GstRTSPTransport * transport)
630 {
631   GstRTSPStreamPrivate *priv;
632
633   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
634
635   priv = stream->priv;
636
637   g_mutex_lock (&priv->lock);
638   if (transport->trans != GST_RTSP_TRANS_RTP)
639     goto unsupported_transmode;
640
641   if (!(transport->profile & priv->profiles))
642     goto unsupported_profile;
643
644   if (!(transport->lower_transport & priv->protocols))
645     goto unsupported_ltrans;
646
647   g_mutex_unlock (&priv->lock);
648
649   return TRUE;
650
651   /* ERRORS */
652 unsupported_transmode:
653   {
654     GST_DEBUG ("unsupported transport mode %d", transport->trans);
655     g_mutex_unlock (&priv->lock);
656     return FALSE;
657   }
658 unsupported_profile:
659   {
660     GST_DEBUG ("unsupported profile %d", transport->profile);
661     g_mutex_unlock (&priv->lock);
662     return FALSE;
663   }
664 unsupported_ltrans:
665   {
666     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
667     g_mutex_unlock (&priv->lock);
668     return FALSE;
669   }
670 }
671
672 /**
673  * gst_rtsp_stream_set_profiles:
674  * @stream: a #GstRTSPStream
675  * @profiles: the new profiles
676  *
677  * Configure the allowed profiles for @stream.
678  */
679 void
680 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
681 {
682   GstRTSPStreamPrivate *priv;
683
684   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
685
686   priv = stream->priv;
687
688   g_mutex_lock (&priv->lock);
689   priv->profiles = profiles;
690   g_mutex_unlock (&priv->lock);
691 }
692
693 /**
694  * gst_rtsp_stream_get_profiles:
695  * @stream: a #GstRTSPStream
696  *
697  * Get the allowed profiles of @stream.
698  *
699  * Returns: a #GstRTSPProfile
700  */
701 GstRTSPProfile
702 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
703 {
704   GstRTSPStreamPrivate *priv;
705   GstRTSPProfile res;
706
707   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
708
709   priv = stream->priv;
710
711   g_mutex_lock (&priv->lock);
712   res = priv->profiles;
713   g_mutex_unlock (&priv->lock);
714
715   return res;
716 }
717
718 /**
719  * gst_rtsp_stream_set_protocols:
720  * @stream: a #GstRTSPStream
721  * @protocols: the new flags
722  *
723  * Configure the allowed lower transport for @stream.
724  */
725 void
726 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
727     GstRTSPLowerTrans protocols)
728 {
729   GstRTSPStreamPrivate *priv;
730
731   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
732
733   priv = stream->priv;
734
735   g_mutex_lock (&priv->lock);
736   priv->protocols = protocols;
737   g_mutex_unlock (&priv->lock);
738 }
739
740 /**
741  * gst_rtsp_stream_get_protocols:
742  * @stream: a #GstRTSPStream
743  *
744  * Get the allowed protocols of @stream.
745  *
746  * Returns: a #GstRTSPLowerTrans
747  */
748 GstRTSPLowerTrans
749 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
750 {
751   GstRTSPStreamPrivate *priv;
752   GstRTSPLowerTrans res;
753
754   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
755       GST_RTSP_LOWER_TRANS_UNKNOWN);
756
757   priv = stream->priv;
758
759   g_mutex_lock (&priv->lock);
760   res = priv->protocols;
761   g_mutex_unlock (&priv->lock);
762
763   return res;
764 }
765
766 /**
767  * gst_rtsp_stream_set_address_pool:
768  * @stream: a #GstRTSPStream
769  * @pool: (transfer none): a #GstRTSPAddressPool
770  *
771  * configure @pool to be used as the address pool of @stream.
772  */
773 void
774 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
775     GstRTSPAddressPool * pool)
776 {
777   GstRTSPStreamPrivate *priv;
778   GstRTSPAddressPool *old;
779
780   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
781
782   priv = stream->priv;
783
784   GST_LOG_OBJECT (stream, "set address pool %p", pool);
785
786   g_mutex_lock (&priv->lock);
787   if ((old = priv->pool) != pool)
788     priv->pool = pool ? g_object_ref (pool) : NULL;
789   else
790     old = NULL;
791   g_mutex_unlock (&priv->lock);
792
793   if (old)
794     g_object_unref (old);
795 }
796
797 /**
798  * gst_rtsp_stream_get_address_pool:
799  * @stream: a #GstRTSPStream
800  *
801  * Get the #GstRTSPAddressPool used as the address pool of @stream.
802  *
803  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
804  * usage.
805  */
806 GstRTSPAddressPool *
807 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
808 {
809   GstRTSPStreamPrivate *priv;
810   GstRTSPAddressPool *result;
811
812   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
813
814   priv = stream->priv;
815
816   g_mutex_lock (&priv->lock);
817   if ((result = priv->pool))
818     g_object_ref (result);
819   g_mutex_unlock (&priv->lock);
820
821   return result;
822 }
823
824 /**
825  * gst_rtsp_stream_get_multicast_address:
826  * @stream: a #GstRTSPStream
827  * @family: the #GSocketFamily
828  *
829  * Get the multicast address of @stream for @family.
830  *
831  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
832  * or %NULL when no address could be allocated. gst_rtsp_address_free()
833  * after usage.
834  */
835 GstRTSPAddress *
836 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
837     GSocketFamily family)
838 {
839   GstRTSPStreamPrivate *priv;
840   GstRTSPAddress *result;
841   GstRTSPAddress **addrp;
842   GstRTSPAddressFlags flags;
843
844   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
845
846   priv = stream->priv;
847
848   if (family == G_SOCKET_FAMILY_IPV6) {
849     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
850     addrp = &priv->addr_v6;
851   } else {
852     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
853     addrp = &priv->addr_v4;
854   }
855
856   g_mutex_lock (&priv->lock);
857   if (*addrp == NULL) {
858     if (priv->pool == NULL)
859       goto no_pool;
860
861     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
862
863     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
864     if (*addrp == NULL)
865       goto no_address;
866   }
867   result = gst_rtsp_address_copy (*addrp);
868   g_mutex_unlock (&priv->lock);
869
870   return result;
871
872   /* ERRORS */
873 no_pool:
874   {
875     GST_ERROR_OBJECT (stream, "no address pool specified");
876     g_mutex_unlock (&priv->lock);
877     return NULL;
878   }
879 no_address:
880   {
881     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
882     g_mutex_unlock (&priv->lock);
883     return NULL;
884   }
885 }
886
887 /**
888  * gst_rtsp_stream_reserve_address:
889  * @stream: a #GstRTSPStream
890  * @address: an address
891  * @port: a port
892  * @n_ports: n_ports
893  * @ttl: a TTL
894  *
895  * Reserve @address and @port as the address and port of @stream.
896  *
897  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
898  * the address could be reserved. gst_rtsp_address_free() after usage.
899  */
900 GstRTSPAddress *
901 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
902     const gchar * address, guint port, guint n_ports, guint ttl)
903 {
904   GstRTSPStreamPrivate *priv;
905   GstRTSPAddress *result;
906   GInetAddress *addr;
907   GSocketFamily family;
908   GstRTSPAddress **addrp;
909
910   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
911   g_return_val_if_fail (address != NULL, NULL);
912   g_return_val_if_fail (port > 0, NULL);
913   g_return_val_if_fail (n_ports > 0, NULL);
914   g_return_val_if_fail (ttl > 0, NULL);
915
916   priv = stream->priv;
917
918   addr = g_inet_address_new_from_string (address);
919   if (!addr) {
920     GST_ERROR ("failed to get inet addr from %s", address);
921     family = G_SOCKET_FAMILY_IPV4;
922   } else {
923     family = g_inet_address_get_family (addr);
924     g_object_unref (addr);
925   }
926
927   if (family == G_SOCKET_FAMILY_IPV6)
928     addrp = &priv->addr_v6;
929   else
930     addrp = &priv->addr_v4;
931
932   g_mutex_lock (&priv->lock);
933   if (*addrp == NULL) {
934     GstRTSPAddressPoolResult res;
935
936     if (priv->pool == NULL)
937       goto no_pool;
938
939     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
940         port, n_ports, ttl, addrp);
941     if (res != GST_RTSP_ADDRESS_POOL_OK)
942       goto no_address;
943   } else {
944     if (strcmp ((*addrp)->address, address) ||
945         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
946         (*addrp)->ttl != ttl)
947       goto different_address;
948   }
949   result = gst_rtsp_address_copy (*addrp);
950   g_mutex_unlock (&priv->lock);
951
952   return result;
953
954   /* ERRORS */
955 no_pool:
956   {
957     GST_ERROR_OBJECT (stream, "no address pool specified");
958     g_mutex_unlock (&priv->lock);
959     return NULL;
960   }
961 no_address:
962   {
963     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
964         address);
965     g_mutex_unlock (&priv->lock);
966     return NULL;
967   }
968 different_address:
969   {
970     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
971         " reserved", address);
972     g_mutex_unlock (&priv->lock);
973     return NULL;
974   }
975 }
976
977 static gboolean
978 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
979     GSocketFamily family, GstElement * udpsrc_out[2],
980     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
981     GstRTSPAddress ** server_addr_out)
982 {
983   GstStateChangeReturn ret;
984   GstElement *udpsrc0, *udpsrc1;
985   GstElement *udpsink0, *udpsink1;
986   GSocket *rtp_socket = NULL;
987   GSocket *rtcp_socket;
988   gint tmp_rtp, tmp_rtcp;
989   guint count;
990   gint rtpport, rtcpport;
991   GList *rejected_addresses = NULL;
992   GstRTSPAddress *addr = NULL;
993   GInetAddress *inetaddr = NULL;
994   GSocketAddress *rtp_sockaddr = NULL;
995   GSocketAddress *rtcp_sockaddr = NULL;
996   const gchar *multisink_socket;
997
998   if (family == G_SOCKET_FAMILY_IPV6)
999     multisink_socket = "socket-v6";
1000   else
1001     multisink_socket = "socket";
1002
1003   udpsrc0 = NULL;
1004   udpsrc1 = NULL;
1005   udpsink0 = NULL;
1006   udpsink1 = NULL;
1007   count = 0;
1008
1009   /* Start with random port */
1010   tmp_rtp = 0;
1011
1012   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1013       G_SOCKET_PROTOCOL_UDP, NULL);
1014   if (!rtcp_socket)
1015     goto no_udp_protocol;
1016
1017   if (*server_addr_out)
1018     gst_rtsp_address_free (*server_addr_out);
1019
1020   /* try to allocate 2 UDP ports, the RTP port should be an even
1021    * number and the RTCP port should be the next (uneven) port */
1022 again:
1023
1024   if (rtp_socket == NULL) {
1025     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1026         G_SOCKET_PROTOCOL_UDP, NULL);
1027     if (!rtp_socket)
1028       goto no_udp_protocol;
1029   }
1030
1031   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1032     GstRTSPAddressFlags flags;
1033
1034     if (addr)
1035       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1036
1037     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1038     if (family == G_SOCKET_FAMILY_IPV6)
1039       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1040     else
1041       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1042
1043     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1044
1045     if (addr == NULL)
1046       goto no_ports;
1047
1048     tmp_rtp = addr->port;
1049
1050     g_clear_object (&inetaddr);
1051     inetaddr = g_inet_address_new_from_string (addr->address);
1052   } else {
1053     if (tmp_rtp != 0) {
1054       tmp_rtp += 2;
1055       if (++count > 20)
1056         goto no_ports;
1057     }
1058
1059     if (inetaddr == NULL)
1060       inetaddr = g_inet_address_new_any (family);
1061   }
1062
1063   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1064   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1065     g_object_unref (rtp_sockaddr);
1066     goto again;
1067   }
1068   g_object_unref (rtp_sockaddr);
1069
1070   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1071   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1072     g_clear_object (&rtp_sockaddr);
1073     goto socket_error;
1074   }
1075
1076   tmp_rtp =
1077       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1078   g_object_unref (rtp_sockaddr);
1079
1080   /* check if port is even */
1081   if ((tmp_rtp & 1) != 0) {
1082     /* port not even, close and allocate another */
1083     tmp_rtp++;
1084     g_clear_object (&rtp_socket);
1085     goto again;
1086   }
1087
1088   /* set port */
1089   tmp_rtcp = tmp_rtp + 1;
1090
1091   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1092   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1093     g_object_unref (rtcp_sockaddr);
1094     g_clear_object (&rtp_socket);
1095     goto again;
1096   }
1097   g_object_unref (rtcp_sockaddr);
1098
1099   g_clear_object (&inetaddr);
1100
1101   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1102   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1103
1104   if (udpsrc0 == NULL || udpsrc1 == NULL)
1105     goto no_udp_protocol;
1106
1107   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1108   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1109
1110   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1111   if (ret == GST_STATE_CHANGE_FAILURE)
1112     goto element_error;
1113   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1114   if (ret == GST_STATE_CHANGE_FAILURE)
1115     goto element_error;
1116
1117   /* all fine, do port check */
1118   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1119   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1120
1121   /* this should not happen... */
1122   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1123     goto port_error;
1124
1125   if (udpsink_out[0])
1126     udpsink0 = udpsink_out[0];
1127   else
1128     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1129
1130   if (!udpsink0)
1131     goto no_udp_protocol;
1132
1133   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1134   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1135
1136   if (udpsink_out[1])
1137     udpsink1 = udpsink_out[1];
1138   else
1139     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1140
1141   if (!udpsink1)
1142     goto no_udp_protocol;
1143
1144   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1145   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1146   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1147
1148   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1149   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1150   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1151   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1152   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1153   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1154   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1155   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1156
1157   /* we keep these elements, we will further configure them when the
1158    * client told us to really use the UDP ports. */
1159   udpsrc_out[0] = udpsrc0;
1160   udpsrc_out[1] = udpsrc1;
1161   udpsink_out[0] = udpsink0;
1162   udpsink_out[1] = udpsink1;
1163
1164   server_port_out->min = rtpport;
1165   server_port_out->max = rtcpport;
1166
1167   *server_addr_out = addr;
1168   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1169
1170   g_object_unref (rtp_socket);
1171   g_object_unref (rtcp_socket);
1172
1173   return TRUE;
1174
1175   /* ERRORS */
1176 no_udp_protocol:
1177   {
1178     goto cleanup;
1179   }
1180 no_ports:
1181   {
1182     goto cleanup;
1183   }
1184 port_error:
1185   {
1186     goto cleanup;
1187   }
1188 socket_error:
1189   {
1190     goto cleanup;
1191   }
1192 element_error:
1193   {
1194     goto cleanup;
1195   }
1196 cleanup:
1197   {
1198     if (udpsrc0) {
1199       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1200       gst_object_unref (udpsrc0);
1201     }
1202     if (udpsrc1) {
1203       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1204       gst_object_unref (udpsrc1);
1205     }
1206     if (udpsink0) {
1207       gst_element_set_state (udpsink0, GST_STATE_NULL);
1208       gst_object_unref (udpsink0);
1209     }
1210     if (inetaddr)
1211       g_object_unref (inetaddr);
1212     g_list_free_full (rejected_addresses,
1213         (GDestroyNotify) gst_rtsp_address_free);
1214     if (addr)
1215       gst_rtsp_address_free (addr);
1216     if (rtp_socket)
1217       g_object_unref (rtp_socket);
1218     if (rtcp_socket)
1219       g_object_unref (rtcp_socket);
1220     return FALSE;
1221   }
1222 }
1223
1224 /* must be called with lock */
1225 static gboolean
1226 alloc_ports (GstRTSPStream * stream)
1227 {
1228   GstRTSPStreamPrivate *priv = stream->priv;
1229
1230   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1231       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1232       &priv->server_port_v4, &priv->server_addr_v4);
1233
1234   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1235       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1236       &priv->server_port_v6, &priv->server_addr_v6);
1237
1238   return priv->have_ipv4 || priv->have_ipv6;
1239 }
1240
1241 /**
1242  * gst_rtsp_stream_get_server_port:
1243  * @stream: a #GstRTSPStream
1244  * @server_port: (out): result server port
1245  * @family: the port family to get
1246  *
1247  * Fill @server_port with the port pair used by the server. This function can
1248  * only be called when @stream has been joined.
1249  */
1250 void
1251 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1252     GstRTSPRange * server_port, GSocketFamily family)
1253 {
1254   GstRTSPStreamPrivate *priv;
1255
1256   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1257   priv = stream->priv;
1258   g_return_if_fail (priv->is_joined);
1259
1260   g_mutex_lock (&priv->lock);
1261   if (family == G_SOCKET_FAMILY_IPV4) {
1262     if (server_port)
1263       *server_port = priv->server_port_v4;
1264   } else {
1265     if (server_port)
1266       *server_port = priv->server_port_v6;
1267   }
1268   g_mutex_unlock (&priv->lock);
1269 }
1270
1271 /**
1272  * gst_rtsp_stream_get_rtpsession:
1273  * @stream: a #GstRTSPStream
1274  *
1275  * Get the RTP session of this stream.
1276  *
1277  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1278  */
1279 GObject *
1280 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1281 {
1282   GstRTSPStreamPrivate *priv;
1283   GObject *session;
1284
1285   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1286
1287   priv = stream->priv;
1288
1289   g_mutex_lock (&priv->lock);
1290   if ((session = priv->session))
1291     g_object_ref (session);
1292   g_mutex_unlock (&priv->lock);
1293
1294   return session;
1295 }
1296
1297 /**
1298  * gst_rtsp_stream_get_ssrc:
1299  * @stream: a #GstRTSPStream
1300  * @ssrc: (out): result ssrc
1301  *
1302  * Get the SSRC used by the RTP session of this stream. This function can only
1303  * be called when @stream has been joined.
1304  */
1305 void
1306 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1307 {
1308   GstRTSPStreamPrivate *priv;
1309
1310   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1311   priv = stream->priv;
1312   g_return_if_fail (priv->is_joined);
1313
1314   g_mutex_lock (&priv->lock);
1315   if (ssrc && priv->session)
1316     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1317   g_mutex_unlock (&priv->lock);
1318 }
1319
1320 /**
1321  * gst_rtsp_stream_set_retransmission_time:
1322  * @stream: a #GstRTSPStream
1323  * @time: a #GstClockTime
1324  *
1325  * Set the amount of time to store retransmission packets.
1326  */
1327 void
1328 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1329     GstClockTime time)
1330 {
1331   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1332
1333   g_mutex_lock (&stream->priv->lock);
1334   stream->priv->rtx_time = time;
1335   if (stream->priv->rtxsend)
1336     g_object_set (stream->priv->rtxsend, "max-size-time",
1337         GST_TIME_AS_MSECONDS (time), NULL);
1338   g_mutex_unlock (&stream->priv->lock);
1339 }
1340
1341 /**
1342  * gst_rtsp_media_get_retransmission_time:
1343  * @media: a #GstRTSPMedia
1344  *
1345  * Get the amount of time to store retransmission data.
1346  *
1347  * Returns: the amount of time to store retransmission data.
1348  */
1349 GstClockTime
1350 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1351 {
1352   GstClockTime ret;
1353
1354   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1355
1356   g_mutex_lock (&stream->priv->lock);
1357   ret = stream->priv->rtx_time;
1358   g_mutex_unlock (&stream->priv->lock);
1359
1360   return ret;
1361 }
1362
1363 void
1364 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1365 {
1366   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1367
1368   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1369
1370   g_mutex_lock (&stream->priv->lock);
1371   stream->priv->rtx_pt = rtx_pt;
1372   if (stream->priv->rtxsend) {
1373     guint pt = gst_rtsp_stream_get_pt (stream);
1374     gchar *pt_s = g_strdup_printf ("%d", pt);
1375     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1376         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1377     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1378     g_free (pt_s);
1379     gst_structure_free (rtx_pt_map);
1380   }
1381   g_mutex_unlock (&stream->priv->lock);
1382 }
1383
1384 guint
1385 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1386 {
1387   guint rtx_pt;
1388
1389   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1390
1391   g_mutex_lock (&stream->priv->lock);
1392   rtx_pt = stream->priv->rtx_pt;
1393   g_mutex_unlock (&stream->priv->lock);
1394
1395   return rtx_pt;
1396 }
1397
1398 /* executed from streaming thread */
1399 static void
1400 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1401 {
1402   GstRTSPStreamPrivate *priv = stream->priv;
1403   GstCaps *newcaps, *oldcaps;
1404
1405   newcaps = gst_pad_get_current_caps (pad);
1406
1407   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1408       newcaps);
1409
1410   g_mutex_lock (&priv->lock);
1411   oldcaps = priv->caps;
1412   priv->caps = newcaps;
1413   g_mutex_unlock (&priv->lock);
1414
1415   if (oldcaps)
1416     gst_caps_unref (oldcaps);
1417 }
1418
1419 static void
1420 dump_structure (const GstStructure * s)
1421 {
1422   gchar *sstr;
1423
1424   sstr = gst_structure_to_string (s);
1425   GST_INFO ("structure: %s", sstr);
1426   g_free (sstr);
1427 }
1428
1429 static GstRTSPStreamTransport *
1430 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1431 {
1432   GstRTSPStreamPrivate *priv = stream->priv;
1433   GList *walk;
1434   GstRTSPStreamTransport *result = NULL;
1435   const gchar *tmp;
1436   gchar *dest;
1437   guint port;
1438
1439   if (rtcp_from == NULL)
1440     return NULL;
1441
1442   tmp = g_strrstr (rtcp_from, ":");
1443   if (tmp == NULL)
1444     return NULL;
1445
1446   port = atoi (tmp + 1);
1447   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1448
1449   g_mutex_lock (&priv->lock);
1450   GST_INFO ("finding %s:%d in %d transports", dest, port,
1451       g_list_length (priv->transports));
1452
1453   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1454     GstRTSPStreamTransport *trans = walk->data;
1455     const GstRTSPTransport *tr;
1456     gint min, max;
1457
1458     tr = gst_rtsp_stream_transport_get_transport (trans);
1459
1460     min = tr->client_port.min;
1461     max = tr->client_port.max;
1462
1463     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1464       result = trans;
1465       break;
1466     }
1467   }
1468   if (result)
1469     g_object_ref (result);
1470   g_mutex_unlock (&priv->lock);
1471
1472   g_free (dest);
1473
1474   return result;
1475 }
1476
1477 static GstRTSPStreamTransport *
1478 check_transport (GObject * source, GstRTSPStream * stream)
1479 {
1480   GstStructure *stats;
1481   GstRTSPStreamTransport *trans;
1482
1483   /* see if we have a stream to match with the origin of the RTCP packet */
1484   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1485   if (trans == NULL) {
1486     g_object_get (source, "stats", &stats, NULL);
1487     if (stats) {
1488       const gchar *rtcp_from;
1489
1490       dump_structure (stats);
1491
1492       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1493       if ((trans = find_transport (stream, rtcp_from))) {
1494         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1495             source);
1496         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1497             g_object_unref);
1498       }
1499       gst_structure_free (stats);
1500     }
1501   }
1502   return trans;
1503 }
1504
1505
1506 static void
1507 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1508 {
1509   GstRTSPStreamTransport *trans;
1510
1511   GST_INFO ("%p: new source %p", stream, source);
1512
1513   trans = check_transport (source, stream);
1514
1515   if (trans)
1516     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1517 }
1518
1519 static void
1520 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1521 {
1522   GST_INFO ("%p: new SDES %p", stream, source);
1523 }
1524
1525 static void
1526 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1527 {
1528   GstRTSPStreamTransport *trans;
1529
1530   trans = check_transport (source, stream);
1531
1532   if (trans) {
1533     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1534     gst_rtsp_stream_transport_keep_alive (trans);
1535   }
1536 #ifdef DUMP_STATS
1537   {
1538     GstStructure *stats;
1539     g_object_get (source, "stats", &stats, NULL);
1540     if (stats) {
1541       dump_structure (stats);
1542       gst_structure_free (stats);
1543     }
1544   }
1545 #endif
1546 }
1547
1548 static void
1549 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1550 {
1551   GST_INFO ("%p: source %p bye", stream, source);
1552 }
1553
1554 static void
1555 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1556 {
1557   GstRTSPStreamTransport *trans;
1558
1559   GST_INFO ("%p: source %p bye timeout", stream, source);
1560
1561   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1562     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1563     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1564   }
1565 }
1566
1567 static void
1568 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1569 {
1570   GstRTSPStreamTransport *trans;
1571
1572   GST_INFO ("%p: source %p timeout", stream, source);
1573
1574   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1575     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1576     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1577   }
1578 }
1579
1580 static void
1581 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1582 {
1583   if (is_rtp) {
1584     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1585     g_list_free (priv->tr_cache_rtp);
1586     priv->tr_cache_rtp = NULL;
1587   } else {
1588     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1589     g_list_free (priv->tr_cache_rtcp);
1590     priv->tr_cache_rtcp = NULL;
1591   }
1592 }
1593
1594 static GstFlowReturn
1595 handle_new_sample (GstAppSink * sink, gpointer user_data)
1596 {
1597   GstRTSPStreamPrivate *priv;
1598   GList *walk;
1599   GstSample *sample;
1600   GstBuffer *buffer;
1601   GstRTSPStream *stream;
1602   gboolean is_rtp;
1603
1604   sample = gst_app_sink_pull_sample (sink);
1605   if (!sample)
1606     return GST_FLOW_OK;
1607
1608   stream = (GstRTSPStream *) user_data;
1609   priv = stream->priv;
1610   buffer = gst_sample_get_buffer (sample);
1611
1612   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1613
1614   g_mutex_lock (&priv->lock);
1615   if (is_rtp) {
1616     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1617       clear_tr_cache (priv, is_rtp);
1618       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1619         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1620         priv->tr_cache_rtp =
1621             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1622       }
1623       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1624     }
1625   } else {
1626     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1627       clear_tr_cache (priv, is_rtp);
1628       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1629         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1630         priv->tr_cache_rtcp =
1631             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1632       }
1633       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1634     }
1635   }
1636   g_mutex_unlock (&priv->lock);
1637
1638   if (is_rtp) {
1639     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1640       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1641       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1642     }
1643   } else {
1644     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1645       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1646       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1647     }
1648   }
1649   gst_sample_unref (sample);
1650
1651   return GST_FLOW_OK;
1652 }
1653
1654 static GstAppSinkCallbacks sink_cb = {
1655   NULL,                         /* not interested in EOS */
1656   NULL,                         /* not interested in preroll samples */
1657   handle_new_sample,
1658 };
1659
1660 static GstElement *
1661 get_rtp_encoder (GstRTSPStream * stream, guint session)
1662 {
1663   GstRTSPStreamPrivate *priv = stream->priv;
1664
1665   if (priv->srtpenc == NULL) {
1666     gchar *name;
1667
1668     name = g_strdup_printf ("srtpenc_%u", session);
1669     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1670     g_free (name);
1671
1672     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1673   }
1674   return gst_object_ref (priv->srtpenc);
1675 }
1676
1677 static GstElement *
1678 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1679 {
1680   GstRTSPStreamPrivate *priv = stream->priv;
1681   GstElement *oldenc, *enc;
1682   GstPad *pad;
1683   gchar *name;
1684
1685   if (priv->idx != session)
1686     return NULL;
1687
1688   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1689
1690   oldenc = priv->srtpenc;
1691   enc = get_rtp_encoder (stream, session);
1692   name = g_strdup_printf ("rtp_sink_%d", session);
1693   pad = gst_element_get_request_pad (enc, name);
1694   g_free (name);
1695   gst_object_unref (pad);
1696
1697   if (oldenc == NULL)
1698     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1699         enc);
1700
1701   return enc;
1702 }
1703
1704 static GstElement *
1705 request_rtcp_encoder (GstElement * rtpbin, guint session,
1706     GstRTSPStream * stream)
1707 {
1708   GstRTSPStreamPrivate *priv = stream->priv;
1709   GstElement *oldenc, *enc;
1710   GstPad *pad;
1711   gchar *name;
1712
1713   if (priv->idx != session)
1714     return NULL;
1715
1716   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1717
1718   oldenc = priv->srtpenc;
1719   enc = get_rtp_encoder (stream, session);
1720   name = g_strdup_printf ("rtcp_sink_%d", session);
1721   pad = gst_element_get_request_pad (enc, name);
1722   g_free (name);
1723   gst_object_unref (pad);
1724
1725   if (oldenc == NULL)
1726     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1727         enc);
1728
1729   return enc;
1730 }
1731
1732 static GstCaps *
1733 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1734 {
1735   GstRTSPStreamPrivate *priv = stream->priv;
1736   GstCaps *caps;
1737
1738   GST_DEBUG ("request key %08x", ssrc);
1739
1740   g_mutex_lock (&priv->lock);
1741   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1742     gst_caps_ref (caps);
1743   g_mutex_unlock (&priv->lock);
1744
1745   return caps;
1746 }
1747
1748 static GstElement *
1749 request_rtcp_decoder (GstElement * rtpbin, guint session,
1750     GstRTSPStream * stream)
1751 {
1752   GstRTSPStreamPrivate *priv = stream->priv;
1753
1754   if (priv->idx != session)
1755     return NULL;
1756
1757   if (priv->srtpdec == NULL) {
1758     gchar *name;
1759
1760     name = g_strdup_printf ("srtpdec_%u", session);
1761     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1762     g_free (name);
1763
1764     g_signal_connect (priv->srtpdec, "request-key",
1765         (GCallback) request_key, stream);
1766   }
1767   return gst_object_ref (priv->srtpdec);
1768 }
1769
1770 static GstElement *
1771 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
1772 {
1773   GstElement *bin;
1774   GstPad *pad;
1775   GstStructure *pt_map;
1776   gchar *name;
1777   guint pt, rtx_pt;
1778   gchar *pt_s;
1779
1780   pt = gst_rtsp_stream_get_pt (stream);
1781   pt_s = g_strdup_printf ("%u", pt);
1782   rtx_pt = stream->priv->rtx_pt;
1783
1784   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1785
1786   bin = gst_bin_new (NULL);
1787   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1788   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1789       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1790   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1791       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1792   g_free (pt_s);
1793   gst_structure_free (pt_map);
1794   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1795
1796   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1797   name = g_strdup_printf ("src_%u", sessid);
1798   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1799   g_free (name);
1800   gst_object_unref (pad);
1801
1802   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1803   name = g_strdup_printf ("sink_%u", sessid);
1804   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1805   g_free (name);
1806   gst_object_unref (pad);
1807
1808   return bin;
1809 }
1810
1811 /**
1812  * gst_rtsp_stream_join_bin:
1813  * @stream: a #GstRTSPStream
1814  * @bin: (transfer none): a #GstBin to join
1815  * @rtpbin: (transfer none): a rtpbin element in @bin
1816  * @state: the target state of the new elements
1817  *
1818  * Join the #GstBin @bin that contains the element @rtpbin.
1819  *
1820  * @stream will link to @rtpbin, which must be inside @bin. The elements
1821  * added to @bin will be set to the state given in @state.
1822  *
1823  * Returns: %TRUE on success.
1824  */
1825 gboolean
1826 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1827     GstElement * rtpbin, GstState state)
1828 {
1829   GstRTSPStreamPrivate *priv;
1830   gint i;
1831   guint idx;
1832   gchar *name;
1833   GstPad *pad, *sinkpad, *selpad;
1834   GstPadLinkReturn ret;
1835
1836   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1837   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1838   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1839
1840   priv = stream->priv;
1841
1842   g_mutex_lock (&priv->lock);
1843   if (priv->is_joined)
1844     goto was_joined;
1845
1846   /* create a session with the same index as the stream */
1847   idx = priv->idx;
1848
1849   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1850
1851   if (!alloc_ports (stream))
1852     goto no_ports;
1853
1854   /* update the dscp qos field in the sinks */
1855   update_dscp_qos (stream);
1856
1857   if (priv->profiles & GST_RTSP_PROFILE_SAVP
1858       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
1859     /* For SRTP */
1860     g_signal_connect (rtpbin, "request-rtp-encoder",
1861         (GCallback) request_rtp_encoder, stream);
1862     g_signal_connect (rtpbin, "request-rtcp-encoder",
1863         (GCallback) request_rtcp_encoder, stream);
1864     g_signal_connect (rtpbin, "request-rtcp-decoder",
1865         (GCallback) request_rtcp_decoder, stream);
1866   }
1867
1868   if (priv->rtx_time > 0) {
1869     /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
1870     g_signal_connect (rtpbin, "request-aux-sender",
1871         (GCallback) request_aux_sender, stream);
1872   }
1873
1874   /* get a pad for sending RTP */
1875   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1876   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1877   g_free (name);
1878   /* link the RTP pad to the session manager, it should not really fail unless
1879    * this is not really an RTP pad */
1880   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1881   if (ret != GST_PAD_LINK_OK)
1882     goto link_failed;
1883
1884   /* get pads from the RTP session element for sending and receiving
1885    * RTP/RTCP*/
1886   name = g_strdup_printf ("send_rtp_src_%u", idx);
1887   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1888   g_free (name);
1889   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1890   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1891   g_free (name);
1892   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1893   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1894   g_free (name);
1895   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1896   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1897   g_free (name);
1898
1899   /* get the session */
1900   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1901
1902   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1903       stream);
1904   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1905       stream);
1906   g_signal_connect (priv->session, "on-ssrc-active",
1907       (GCallback) on_ssrc_active, stream);
1908   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1909       stream);
1910   g_signal_connect (priv->session, "on-bye-timeout",
1911       (GCallback) on_bye_timeout, stream);
1912   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1913       stream);
1914
1915   for (i = 0; i < 2; i++) {
1916     GstPad *teepad, *queuepad;
1917     /* For the sender we create this bit of pipeline for both
1918      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1919      * we need to add a queue before appsink to make the pipeline
1920      * not block. For the TCP case, we want to pump data to the
1921      * client as fast as possible anyway.
1922      *
1923      * .--------.      .-----.    .---------.
1924      * | rtpbin |      | tee |    | udpsink |
1925      * |       send->sink   src->sink       |
1926      * '--------'      |     |    '---------'
1927      *                 |     |    .---------.    .---------.
1928      *                 |     |    |  queue  |    | appsink |
1929      *                 |    src->sink      src->sink       |
1930      *                 '-----'    '---------'    '---------'
1931      *
1932      * When only UDP is allowed, we skip the tee, queue and appsink and link the
1933      * udpsink directly to the session.
1934      */
1935     /* add udpsink */
1936     gst_bin_add (bin, priv->udpsink[i]);
1937     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1938
1939     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
1940       /* make tee for RTP/RTCP */
1941       priv->tee[i] = gst_element_factory_make ("tee", NULL);
1942       gst_bin_add (bin, priv->tee[i]);
1943
1944       /* and link to rtpbin send pad */
1945       pad = gst_element_get_static_pad (priv->tee[i], "sink");
1946       gst_pad_link (priv->send_src[i], pad);
1947       gst_object_unref (pad);
1948
1949       /* link tee to udpsink */
1950       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1951       gst_pad_link (teepad, sinkpad);
1952       gst_object_unref (teepad);
1953
1954       /* make queue */
1955       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1956       gst_bin_add (bin, priv->appqueue[i]);
1957       /* and link to tee */
1958       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1959       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1960       gst_pad_link (teepad, pad);
1961       gst_object_unref (pad);
1962       gst_object_unref (teepad);
1963
1964       /* make appsink */
1965       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1966       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1967       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1968       gst_bin_add (bin, priv->appsink[i]);
1969       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1970           &sink_cb, stream, NULL);
1971       /* and link to queue */
1972       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1973       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1974       gst_pad_link (queuepad, pad);
1975       gst_object_unref (pad);
1976       gst_object_unref (queuepad);
1977     } else {
1978       /* else only udpsink needed, link it to the session */
1979       gst_pad_link (priv->send_src[i], sinkpad);
1980     }
1981     gst_object_unref (sinkpad);
1982
1983     /* For the receiver we create this bit of pipeline for both
1984      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1985      * and it is all funneled into the rtpbin receive pad.
1986      *
1987      * .--------.     .--------.    .--------.
1988      * | udpsrc |     | funnel |    | rtpbin |
1989      * |       src->sink      src->sink      |
1990      * '--------'     |        |    '--------'
1991      * .--------.     |        |
1992      * | appsrc |     |        |
1993      * |       src->sink       |
1994      * '--------'     '--------'
1995      */
1996     /* make funnel for the RTP/RTCP receivers */
1997     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1998     gst_bin_add (bin, priv->funnel[i]);
1999
2000     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2001     gst_pad_link (pad, priv->recv_sink[i]);
2002     gst_object_unref (pad);
2003
2004     if (priv->udpsrc_v4[i]) {
2005       /* we set and keep these to playing so that they don't cause NO_PREROLL return
2006        * values */
2007       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2008       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2009       /* add udpsrc */
2010       gst_bin_add (bin, priv->udpsrc_v4[i]);
2011
2012       /* and link to the funnel v4 */
2013       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2014       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2015       gst_pad_link (pad, selpad);
2016       gst_object_unref (pad);
2017       gst_object_unref (selpad);
2018     }
2019
2020     if (priv->udpsrc_v6[i]) {
2021       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2022       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2023       gst_bin_add (bin, priv->udpsrc_v6[i]);
2024
2025       /* and link to the funnel v6 */
2026       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2027       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2028       gst_pad_link (pad, selpad);
2029       gst_object_unref (pad);
2030       gst_object_unref (selpad);
2031     }
2032
2033     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2034       /* make and add appsrc */
2035       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2036       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2037       gst_bin_add (bin, priv->appsrc[i]);
2038       /* and link to the funnel */
2039       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2040       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2041       gst_pad_link (pad, selpad);
2042       gst_object_unref (pad);
2043       gst_object_unref (selpad);
2044     }
2045
2046     /* check if we need to set to a special state */
2047     if (state != GST_STATE_NULL) {
2048       if (priv->udpsink[i])
2049         gst_element_set_state (priv->udpsink[i], state);
2050       if (priv->appsink[i])
2051         gst_element_set_state (priv->appsink[i], state);
2052       if (priv->appqueue[i])
2053         gst_element_set_state (priv->appqueue[i], state);
2054       if (priv->tee[i])
2055         gst_element_set_state (priv->tee[i], state);
2056       if (priv->funnel[i])
2057         gst_element_set_state (priv->funnel[i], state);
2058       if (priv->appsrc[i])
2059         gst_element_set_state (priv->appsrc[i], state);
2060     }
2061   }
2062
2063   /* be notified of caps changes */
2064   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2065       (GCallback) caps_notify, stream);
2066
2067   priv->is_joined = TRUE;
2068   g_mutex_unlock (&priv->lock);
2069
2070   return TRUE;
2071
2072   /* ERRORS */
2073 was_joined:
2074   {
2075     g_mutex_unlock (&priv->lock);
2076     return TRUE;
2077   }
2078 no_ports:
2079   {
2080     g_mutex_unlock (&priv->lock);
2081     GST_WARNING ("failed to allocate ports %u", idx);
2082     return FALSE;
2083   }
2084 link_failed:
2085   {
2086     GST_WARNING ("failed to link stream %u", idx);
2087     gst_object_unref (priv->send_rtp_sink);
2088     priv->send_rtp_sink = NULL;
2089     g_mutex_unlock (&priv->lock);
2090     return FALSE;
2091   }
2092 }
2093
2094 /**
2095  * gst_rtsp_stream_leave_bin:
2096  * @stream: a #GstRTSPStream
2097  * @bin: (transfer none): a #GstBin
2098  * @rtpbin: (transfer none): a rtpbin #GstElement
2099  *
2100  * Remove the elements of @stream from @bin.
2101  *
2102  * Return: %TRUE on success.
2103  */
2104 gboolean
2105 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2106     GstElement * rtpbin)
2107 {
2108   GstRTSPStreamPrivate *priv;
2109   gint i;
2110   GList *l;
2111
2112   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2113   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2114   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2115
2116   priv = stream->priv;
2117
2118   g_mutex_lock (&priv->lock);
2119   if (!priv->is_joined)
2120     goto was_not_joined;
2121
2122   /* all transports must be removed by now */
2123   if (priv->transports != NULL)
2124     goto transports_not_removed;
2125
2126   clear_tr_cache (priv, TRUE);
2127   clear_tr_cache (priv, FALSE);
2128
2129   GST_INFO ("stream %p leaving bin", stream);
2130
2131   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2132   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2133   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2134   gst_object_unref (priv->send_rtp_sink);
2135   priv->send_rtp_sink = NULL;
2136
2137   for (i = 0; i < 2; i++) {
2138     if (priv->udpsink[i])
2139       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2140     if (priv->appsink[i])
2141       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2142     if (priv->appqueue[i])
2143       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2144     if (priv->tee[i])
2145       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2146     if (priv->funnel[i])
2147       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2148     if (priv->appsrc[i])
2149       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2150     if (priv->udpsrc_v4[i]) {
2151       /* and set udpsrc to NULL now before removing */
2152       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2153       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2154       /* removing them should also nicely release the request
2155        * pads when they finalize */
2156       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2157     }
2158     if (priv->udpsrc_v6[i]) {
2159       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2160       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2161       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2162     }
2163
2164     for (l = priv->transport_sources; l; l = l->next) {
2165       GstRTSPMulticastTransportSource *s = l->data;
2166
2167       if (!s->udpsrc[i])
2168         continue;
2169
2170       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2171       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2172       gst_bin_remove (bin, s->udpsrc[i]);
2173     }
2174
2175     if (priv->udpsink[i])
2176       gst_bin_remove (bin, priv->udpsink[i]);
2177     if (priv->appsrc[i])
2178       gst_bin_remove (bin, priv->appsrc[i]);
2179     if (priv->appsink[i])
2180       gst_bin_remove (bin, priv->appsink[i]);
2181     if (priv->appqueue[i])
2182       gst_bin_remove (bin, priv->appqueue[i]);
2183     if (priv->tee[i])
2184       gst_bin_remove (bin, priv->tee[i]);
2185     if (priv->funnel[i])
2186       gst_bin_remove (bin, priv->funnel[i]);
2187
2188     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2189     gst_object_unref (priv->recv_sink[i]);
2190     priv->recv_sink[i] = NULL;
2191
2192     priv->udpsrc_v4[i] = NULL;
2193     priv->udpsrc_v6[i] = NULL;
2194     priv->udpsink[i] = NULL;
2195     priv->appsrc[i] = NULL;
2196     priv->appsink[i] = NULL;
2197     priv->appqueue[i] = NULL;
2198     priv->tee[i] = NULL;
2199     priv->funnel[i] = NULL;
2200   }
2201
2202   for (l = priv->transport_sources; l; l = l->next) {
2203     GstRTSPMulticastTransportSource *s = l->data;
2204     g_slice_free (GstRTSPMulticastTransportSource, s);
2205   }
2206   g_list_free (priv->transport_sources);
2207   priv->transport_sources = NULL;
2208
2209   gst_object_unref (priv->send_src[0]);
2210   priv->send_src[0] = NULL;
2211
2212   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2213   gst_object_unref (priv->send_src[1]);
2214   priv->send_src[1] = NULL;
2215
2216   g_object_unref (priv->session);
2217   priv->session = NULL;
2218   if (priv->caps)
2219     gst_caps_unref (priv->caps);
2220   priv->caps = NULL;
2221
2222   if (priv->srtpenc)
2223     gst_object_unref (priv->srtpenc);
2224   if (priv->srtpdec)
2225     gst_object_unref (priv->srtpdec);
2226
2227   priv->is_joined = FALSE;
2228   g_mutex_unlock (&priv->lock);
2229
2230   return TRUE;
2231
2232 was_not_joined:
2233   {
2234     g_mutex_unlock (&priv->lock);
2235     return TRUE;
2236   }
2237 transports_not_removed:
2238   {
2239     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2240     g_mutex_unlock (&priv->lock);
2241     return FALSE;
2242   }
2243 }
2244
2245 /**
2246  * gst_rtsp_stream_get_rtpinfo:
2247  * @stream: a #GstRTSPStream
2248  * @rtptime: (allow-none): result RTP timestamp
2249  * @seq: (allow-none): result RTP seqnum
2250  * @clock_rate: (allow-none): the clock rate
2251  * @running_time: (allow-none): result running-time
2252  *
2253  * Retrieve the current rtptime, seq and running-time. This is used to
2254  * construct a RTPInfo reply header.
2255  *
2256  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2257  */
2258 gboolean
2259 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2260     guint * rtptime, guint * seq, guint * clock_rate,
2261     GstClockTime * running_time)
2262 {
2263   GstRTSPStreamPrivate *priv;
2264   GstStructure *stats;
2265   GObjectClass *payobjclass;
2266
2267   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2268
2269   priv = stream->priv;
2270
2271   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2272
2273   g_mutex_lock (&priv->lock);
2274
2275   if (g_object_class_find_property (payobjclass, "stats")) {
2276     g_object_get (priv->payloader, "stats", &stats, NULL);
2277     if (stats == NULL)
2278       goto no_stats;
2279
2280     if (seq)
2281       gst_structure_get_uint (stats, "seqnum", seq);
2282
2283     if (rtptime)
2284       gst_structure_get_uint (stats, "timestamp", rtptime);
2285
2286     if (running_time)
2287       gst_structure_get_clock_time (stats, "running-time", running_time);
2288
2289     if (clock_rate) {
2290       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2291       if (*clock_rate == 0 && running_time)
2292         *running_time = GST_CLOCK_TIME_NONE;
2293     }
2294     gst_structure_free (stats);
2295   } else {
2296     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2297         !g_object_class_find_property (payobjclass, "timestamp"))
2298       goto no_stats;
2299
2300     if (seq)
2301       g_object_get (priv->payloader, "seqnum", seq, NULL);
2302
2303     if (rtptime)
2304       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2305
2306     if (running_time)
2307       *running_time = GST_CLOCK_TIME_NONE;
2308   }
2309   g_mutex_unlock (&priv->lock);
2310
2311   return TRUE;
2312
2313   /* ERRORS */
2314 no_stats:
2315   {
2316     GST_WARNING ("Could not get payloader stats");
2317     g_mutex_unlock (&priv->lock);
2318     return FALSE;
2319   }
2320 }
2321
2322 /**
2323  * gst_rtsp_stream_get_caps:
2324  * @stream: a #GstRTSPStream
2325  *
2326  * Retrieve the current caps of @stream.
2327  *
2328  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2329  * after usage.
2330  */
2331 GstCaps *
2332 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2333 {
2334   GstRTSPStreamPrivate *priv;
2335   GstCaps *result;
2336
2337   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2338
2339   priv = stream->priv;
2340
2341   g_mutex_lock (&priv->lock);
2342   if ((result = priv->caps))
2343     gst_caps_ref (result);
2344   g_mutex_unlock (&priv->lock);
2345
2346   return result;
2347 }
2348
2349 /**
2350  * gst_rtsp_stream_recv_rtp:
2351  * @stream: a #GstRTSPStream
2352  * @buffer: (transfer full): a #GstBuffer
2353  *
2354  * Handle an RTP buffer for the stream. This method is usually called when a
2355  * message has been received from a client using the TCP transport.
2356  *
2357  * This function takes ownership of @buffer.
2358  *
2359  * Returns: a GstFlowReturn.
2360  */
2361 GstFlowReturn
2362 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2363 {
2364   GstRTSPStreamPrivate *priv;
2365   GstFlowReturn ret;
2366   GstElement *element;
2367
2368   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2369   priv = stream->priv;
2370   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2371   g_return_val_if_fail (priv->is_joined, FALSE);
2372
2373   g_mutex_lock (&priv->lock);
2374   if (priv->appsrc[0])
2375     element = gst_object_ref (priv->appsrc[0]);
2376   else
2377     element = NULL;
2378   g_mutex_unlock (&priv->lock);
2379
2380   if (element) {
2381     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2382     gst_object_unref (element);
2383   } else {
2384     ret = GST_FLOW_OK;
2385   }
2386   return ret;
2387 }
2388
2389 /**
2390  * gst_rtsp_stream_recv_rtcp:
2391  * @stream: a #GstRTSPStream
2392  * @buffer: (transfer full): a #GstBuffer
2393  *
2394  * Handle an RTCP buffer for the stream. This method is usually called when a
2395  * message has been received from a client using the TCP transport.
2396  *
2397  * This function takes ownership of @buffer.
2398  *
2399  * Returns: a GstFlowReturn.
2400  */
2401 GstFlowReturn
2402 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2403 {
2404   GstRTSPStreamPrivate *priv;
2405   GstFlowReturn ret;
2406   GstElement *element;
2407
2408   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2409   priv = stream->priv;
2410   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2411
2412   if (!priv->is_joined) {
2413     gst_buffer_unref (buffer);
2414     return GST_FLOW_NOT_LINKED;
2415   }
2416   g_mutex_lock (&priv->lock);
2417   if (priv->appsrc[1])
2418     element = gst_object_ref (priv->appsrc[1]);
2419   else
2420     element = NULL;
2421   g_mutex_unlock (&priv->lock);
2422
2423   if (element) {
2424     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2425     gst_object_unref (element);
2426   } else {
2427     ret = GST_FLOW_OK;
2428     gst_buffer_unref (buffer);
2429   }
2430   return ret;
2431 }
2432
2433 /* must be called with lock */
2434 static gboolean
2435 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2436     gboolean add)
2437 {
2438   GstRTSPStreamPrivate *priv = stream->priv;
2439   const GstRTSPTransport *tr;
2440
2441   tr = gst_rtsp_stream_transport_get_transport (trans);
2442
2443   switch (tr->lower_transport) {
2444     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2445     {
2446       GstRTSPMulticastTransportSource *source;
2447       GstBin *bin;
2448
2449       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2450
2451       if (add) {
2452         gchar *host;
2453         gint i;
2454         GstPad *selpad, *pad;
2455
2456         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2457         source->transport = trans;
2458
2459         for (i = 0; i < 2; i++) {
2460           host =
2461               g_strdup_printf ("udp://%s:%d", tr->destination,
2462               (i == 0) ? tr->port.min : tr->port.max);
2463           source->udpsrc[i] =
2464               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2465           g_free (host);
2466
2467           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2468            * values */
2469           gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2470           gst_element_set_locked_state (source->udpsrc[i], TRUE);
2471           /* add udpsrc */
2472           gst_bin_add (bin, source->udpsrc[i]);
2473
2474           /* and link to the funnel v4 */
2475           source->selpad[i] = selpad =
2476               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2477           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2478           gst_pad_link (pad, selpad);
2479           gst_object_unref (pad);
2480           gst_object_unref (selpad);
2481         }
2482         gst_object_unref (bin);
2483
2484         priv->transport_sources =
2485             g_list_prepend (priv->transport_sources, source);
2486       } else {
2487         GList *l;
2488
2489         for (l = priv->transport_sources; l; l = l->next) {
2490           source = l->data;
2491
2492           if (source->transport == trans) {
2493             priv->transport_sources =
2494                 g_list_delete_link (priv->transport_sources, l);
2495             break;
2496           }
2497         }
2498
2499         if (l != NULL) {
2500           gint i;
2501
2502           for (i = 0; i < 2; i++) {
2503             /* Will automatically unlink everything */
2504             gst_bin_remove (bin,
2505                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2506
2507             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2508             gst_object_unref (source->udpsrc[i]);
2509
2510             gst_element_release_request_pad (priv->funnel[i],
2511                 source->selpad[i]);
2512           }
2513
2514           g_slice_free (GstRTSPMulticastTransportSource, source);
2515         }
2516       }
2517
2518       /* fall through for the generic case */
2519     }
2520     case GST_RTSP_LOWER_TRANS_UDP:
2521     {
2522       gchar *dest;
2523       gint min, max;
2524       guint ttl = 0;
2525
2526       dest = tr->destination;
2527       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2528         min = tr->port.min;
2529         max = tr->port.max;
2530         ttl = tr->ttl;
2531       } else {
2532         min = tr->client_port.min;
2533         max = tr->client_port.max;
2534       }
2535
2536       if (add) {
2537         if (ttl > 0) {
2538           GST_INFO ("setting ttl-mc %d", ttl);
2539           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2540           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2541         }
2542         GST_INFO ("adding %s:%d-%d", dest, min, max);
2543         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2544         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2545         priv->transports = g_list_prepend (priv->transports, trans);
2546       } else {
2547         GST_INFO ("removing %s:%d-%d", dest, min, max);
2548         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2549         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2550         priv->transports = g_list_remove (priv->transports, trans);
2551       }
2552       priv->transports_cookie++;
2553       break;
2554     }
2555     case GST_RTSP_LOWER_TRANS_TCP:
2556       if (add) {
2557         GST_INFO ("adding TCP %s", tr->destination);
2558         priv->transports = g_list_prepend (priv->transports, trans);
2559       } else {
2560         GST_INFO ("removing TCP %s", tr->destination);
2561         priv->transports = g_list_remove (priv->transports, trans);
2562       }
2563       priv->transports_cookie++;
2564       break;
2565     default:
2566       goto unknown_transport;
2567   }
2568   return TRUE;
2569
2570   /* ERRORS */
2571 unknown_transport:
2572   {
2573     GST_INFO ("Unknown transport %d", tr->lower_transport);
2574     return FALSE;
2575   }
2576 }
2577
2578
2579 /**
2580  * gst_rtsp_stream_add_transport:
2581  * @stream: a #GstRTSPStream
2582  * @trans: (transfer none): a #GstRTSPStreamTransport
2583  *
2584  * Add the transport in @trans to @stream. The media of @stream will
2585  * then also be send to the values configured in @trans.
2586  *
2587  * @stream must be joined to a bin.
2588  *
2589  * @trans must contain a valid #GstRTSPTransport.
2590  *
2591  * Returns: %TRUE if @trans was added
2592  */
2593 gboolean
2594 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2595     GstRTSPStreamTransport * trans)
2596 {
2597   GstRTSPStreamPrivate *priv;
2598   gboolean res;
2599
2600   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2601   priv = stream->priv;
2602   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2603   g_return_val_if_fail (priv->is_joined, FALSE);
2604
2605   g_mutex_lock (&priv->lock);
2606   res = update_transport (stream, trans, TRUE);
2607   g_mutex_unlock (&priv->lock);
2608
2609   return res;
2610 }
2611
2612 /**
2613  * gst_rtsp_stream_remove_transport:
2614  * @stream: a #GstRTSPStream
2615  * @trans: (transfer none): a #GstRTSPStreamTransport
2616  *
2617  * Remove the transport in @trans from @stream. The media of @stream will
2618  * not be sent to the values configured in @trans.
2619  *
2620  * @stream must be joined to a bin.
2621  *
2622  * @trans must contain a valid #GstRTSPTransport.
2623  *
2624  * Returns: %TRUE if @trans was removed
2625  */
2626 gboolean
2627 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2628     GstRTSPStreamTransport * trans)
2629 {
2630   GstRTSPStreamPrivate *priv;
2631   gboolean res;
2632
2633   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2634   priv = stream->priv;
2635   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2636   g_return_val_if_fail (priv->is_joined, FALSE);
2637
2638   g_mutex_lock (&priv->lock);
2639   res = update_transport (stream, trans, FALSE);
2640   g_mutex_unlock (&priv->lock);
2641
2642   return res;
2643 }
2644
2645 /**
2646  * gst_rtsp_stream_update_crypto:
2647  * @stream: a #GstRTSPStream
2648  * @ssrc: the SSRC
2649  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2650  *
2651  * Update the new crypto information for @ssrc in @stream. If information
2652  * for @ssrc did not exist, it will be added. If information
2653  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2654  * be removed from @stream.
2655  *
2656  * Returns: %TRUE if @crypto could be updated
2657  */
2658 gboolean
2659 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2660     guint ssrc, GstCaps * crypto)
2661 {
2662   GstRTSPStreamPrivate *priv;
2663
2664   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2665   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2666
2667   priv = stream->priv;
2668
2669   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2670
2671   g_mutex_lock (&priv->lock);
2672   if (crypto)
2673     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2674         gst_caps_ref (crypto));
2675   else
2676     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2677   g_mutex_unlock (&priv->lock);
2678
2679   return TRUE;
2680 }
2681
2682 /**
2683  * gst_rtsp_stream_get_rtp_socket:
2684  * @stream: a #GstRTSPStream
2685  * @family: the socket family
2686  *
2687  * Get the RTP socket from @stream for a @family.
2688  *
2689  * @stream must be joined to a bin.
2690  *
2691  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2692  * socket could be allocated for @family. Unref after usage
2693  */
2694 GSocket *
2695 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2696 {
2697   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2698   GSocket *socket;
2699   const gchar *name;
2700
2701   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2702   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2703       family == G_SOCKET_FAMILY_IPV6, NULL);
2704   g_return_val_if_fail (priv->udpsink[0], NULL);
2705
2706   if (family == G_SOCKET_FAMILY_IPV6)
2707     name = "socket-v6";
2708   else
2709     name = "socket";
2710
2711   g_object_get (priv->udpsink[0], name, &socket, NULL);
2712
2713   return socket;
2714 }
2715
2716 /**
2717  * gst_rtsp_stream_get_rtcp_socket:
2718  * @stream: a #GstRTSPStream
2719  * @family: the socket family
2720  *
2721  * Get the RTCP socket from @stream for a @family.
2722  *
2723  * @stream must be joined to a bin.
2724  *
2725  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2726  * socket could be allocated for @family. Unref after usage
2727  */
2728 GSocket *
2729 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2730 {
2731   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2732   GSocket *socket;
2733   const gchar *name;
2734
2735   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2736   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2737       family == G_SOCKET_FAMILY_IPV6, NULL);
2738   g_return_val_if_fail (priv->udpsink[1], NULL);
2739
2740   if (family == G_SOCKET_FAMILY_IPV6)
2741     name = "socket-v6";
2742   else
2743     name = "socket";
2744
2745   g_object_get (priv->udpsink[1], name, &socket, NULL);
2746
2747   return socket;
2748 }
2749
2750 /**
2751  * gst_rtsp_stream_set_seqnum:
2752  * @stream: a #GstRTSPStream
2753  * @seqnum: a new sequence number
2754  *
2755  * Configure the sequence number in the payloader of @stream to @seqnum.
2756  */
2757 void
2758 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
2759 {
2760   GstRTSPStreamPrivate *priv;
2761
2762   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
2763
2764   priv = stream->priv;
2765
2766   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
2767 }
2768
2769 /**
2770  * gst_rtsp_stream_get_seqnum:
2771  * @stream: a #GstRTSPStream
2772  *
2773  * Get the configured sequence number in the payloader of @stream.
2774  *
2775  * Returns: the sequence number of the payloader.
2776  */
2777 guint16
2778 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
2779 {
2780   GstRTSPStreamPrivate *priv;
2781   guint seqnum;
2782
2783   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
2784
2785   priv = stream->priv;
2786
2787   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
2788
2789   return seqnum;
2790 }
2791
2792 /**
2793  * gst_rtsp_stream_transport_filter:
2794  * @stream: a #GstRTSPStream
2795  * @func: (scope call) (allow-none): a callback
2796  * @user_data: (closure): user data passed to @func
2797  *
2798  * Call @func for each transport managed by @stream. The result value of @func
2799  * determines what happens to the transport. @func will be called with @stream
2800  * locked so no further actions on @stream can be performed from @func.
2801  *
2802  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
2803  * @stream.
2804  *
2805  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
2806  *
2807  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
2808  * will also be added with an additional ref to the result #GList of this
2809  * function..
2810  *
2811  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
2812  *
2813  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
2814  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
2815  * element in the #GList should be unreffed before the list is freed.
2816  */
2817 GList *
2818 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
2819     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
2820 {
2821   GstRTSPStreamPrivate *priv;
2822   GList *result, *walk, *next;
2823   GHashTable *visited = NULL;
2824   guint cookie;
2825
2826   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2827
2828   priv = stream->priv;
2829
2830   result = NULL;
2831   if (func)
2832     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
2833
2834   g_mutex_lock (&priv->lock);
2835 restart:
2836   cookie = priv->transports_cookie;
2837   for (walk = priv->transports; walk; walk = next) {
2838     GstRTSPStreamTransport *trans = walk->data;
2839     GstRTSPFilterResult res;
2840     gboolean changed;
2841
2842     next = g_list_next (walk);
2843
2844     if (func) {
2845       /* only visit each transport once */
2846       if (g_hash_table_contains (visited, trans))
2847         continue;
2848
2849       g_hash_table_add (visited, g_object_ref (trans));
2850       g_mutex_unlock (&priv->lock);
2851
2852       res = func (stream, trans, user_data);
2853
2854       g_mutex_lock (&priv->lock);
2855     } else
2856       res = GST_RTSP_FILTER_REF;
2857
2858     changed = (cookie != priv->transports_cookie);
2859
2860     switch (res) {
2861       case GST_RTSP_FILTER_REMOVE:
2862         update_transport (stream, trans, FALSE);
2863         break;
2864       case GST_RTSP_FILTER_REF:
2865         result = g_list_prepend (result, g_object_ref (trans));
2866         break;
2867       case GST_RTSP_FILTER_KEEP:
2868       default:
2869         break;
2870     }
2871     if (changed)
2872       goto restart;
2873   }
2874   g_mutex_unlock (&priv->lock);
2875
2876   if (func)
2877     g_hash_table_unref (visited);
2878
2879   return result;
2880 }
2881
2882 static GstPadProbeReturn
2883 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2884 {
2885   GstRTSPStreamPrivate *priv;
2886   GstRTSPStream *stream;
2887
2888   stream = user_data;
2889   priv = stream->priv;
2890
2891   GST_DEBUG_OBJECT (pad, "now blocking");
2892
2893   g_mutex_lock (&priv->lock);
2894   priv->blocking = TRUE;
2895   g_mutex_unlock (&priv->lock);
2896
2897   gst_element_post_message (priv->payloader,
2898       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
2899           gst_structure_new_empty ("GstRTSPStreamBlocking")));
2900
2901   return GST_PAD_PROBE_OK;
2902 }
2903
2904 /**
2905  * gst_rtsp_stream_set_blocked:
2906  * @stream: a #GstRTSPStream
2907  * @blocked: boolean indicating we should block or unblock
2908  *
2909  * Blocks or unblocks the dataflow on @stream.
2910  *
2911  * Returns: %TRUE on success
2912  */
2913 gboolean
2914 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
2915 {
2916   GstRTSPStreamPrivate *priv;
2917
2918   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2919
2920   priv = stream->priv;
2921
2922   g_mutex_lock (&priv->lock);
2923   if (blocked) {
2924     priv->blocking = FALSE;
2925     if (priv->blocked_id == 0) {
2926       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
2927           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
2928           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
2929           g_object_ref (stream), g_object_unref);
2930     }
2931   } else {
2932     if (priv->blocked_id != 0) {
2933       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
2934       priv->blocked_id = 0;
2935       priv->blocking = FALSE;
2936     }
2937   }
2938   g_mutex_unlock (&priv->lock);
2939
2940   return TRUE;
2941 }
2942
2943 /**
2944  * gst_rtsp_stream_is_blocking:
2945  * @stream: a #GstRTSPStream
2946  *
2947  * Check if @stream is blocking on a #GstBuffer.
2948  *
2949  * Returns: %TRUE if @stream is blocking
2950  */
2951 gboolean
2952 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
2953 {
2954   GstRTSPStreamPrivate *priv;
2955   gboolean result;
2956
2957   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2958
2959   priv = stream->priv;
2960
2961   g_mutex_lock (&priv->lock);
2962   result = priv->blocking;
2963   g_mutex_unlock (&priv->lock);
2964
2965   return result;
2966 }
2967
2968 /**
2969  * gst_rtsp_stream_query_position:
2970  * @stream: a #GstRTSPStream
2971  *
2972  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
2973  * the RTP parts of the pipeline and not the RTCP parts.
2974  *
2975  * Returns: %TRUE if the position could be queried
2976  */
2977 gboolean
2978 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
2979 {
2980   GstRTSPStreamPrivate *priv;
2981   GstElement *sink;
2982   gboolean ret;
2983
2984   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2985
2986   priv = stream->priv;
2987
2988   g_mutex_lock (&priv->lock);
2989   if ((sink = priv->udpsink[0]))
2990     gst_object_ref (sink);
2991   g_mutex_unlock (&priv->lock);
2992
2993   if (!sink)
2994     return FALSE;
2995
2996   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
2997   gst_object_unref (sink);
2998
2999   return ret;
3000 }
3001
3002 /**
3003  * gst_rtsp_stream_query_stop:
3004  * @stream: a #GstRTSPStream
3005  *
3006  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3007  * the RTP parts of the pipeline and not the RTCP parts.
3008  *
3009  * Returns: %TRUE if the stop could be queried
3010  */
3011 gboolean
3012 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3013 {
3014   GstRTSPStreamPrivate *priv;
3015   GstElement *sink;
3016   GstQuery *query;
3017   gboolean ret;
3018
3019   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3020
3021   priv = stream->priv;
3022
3023   g_mutex_lock (&priv->lock);
3024   if ((sink = priv->udpsink[0]))
3025     gst_object_ref (sink);
3026   g_mutex_unlock (&priv->lock);
3027
3028   if (!sink)
3029     return FALSE;
3030
3031   query = gst_query_new_segment (GST_FORMAT_TIME);
3032   if ((ret = gst_element_query (sink, query))) {
3033     GstFormat format;
3034
3035     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3036     if (format != GST_FORMAT_TIME)
3037       *stop = -1;
3038   }
3039   gst_query_unref (query);
3040   gst_object_unref (sink);
3041
3042   return ret;
3043
3044 }