rtsp-stream: added function for RTP/RTCP socket configuration
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83
84   /* TRUE if this stream is running on
85    * the client side of an RTSP link (for RECORD) */
86   gboolean client_side;
87   gchar *control;
88
89   GstRTSPProfile profiles;
90   GstRTSPLowerTrans protocols;
91
92   /* pads on the rtpbin */
93   GstPad *send_rtp_sink;
94   GstPad *recv_rtp_src;
95   GstPad *recv_sink[2];
96   GstPad *send_src[2];
97
98   /* the RTPSession object */
99   GObject *session;
100
101   /* SRTP encoder/decoder */
102   GstElement *srtpenc;
103   GstElement *srtpdec;
104   GHashTable *keys;
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
107    * sockets */
108   GstElement *udpsrc_v4[2];
109
110   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
111    * sockets */
112   GstElement *udpsrc_v6[2];
113
114   GstElement *udpqueue[2];
115   GstElement *udpsink[2];
116
117   /* for TCP transport */
118   GstElement *appsrc[2];
119   GstClockTime appsrc_base_time[2];
120   GstElement *appqueue[2];
121   GstElement *appsink[2];
122
123   GstElement *tee[2];
124   GstElement *funnel[2];
125
126   /* retransmission */
127   GstElement *rtxsend;
128   guint rtx_pt;
129   GstClockTime rtx_time;
130
131   /* server ports for sending/receiving over ipv4 */
132   GstRTSPRange server_port_v4;
133   GstRTSPAddress *server_addr_v4;
134   gboolean have_ipv4;
135
136   /* server ports for sending/receiving over ipv6 */
137   GstRTSPRange server_port_v6;
138   GstRTSPAddress *server_addr_v6;
139   gboolean have_ipv6;
140
141   /* multicast addresses */
142   GstRTSPAddressPool *pool;
143   GstRTSPAddress *addr_v4;
144   GstRTSPAddress *addr_v6;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   /* UDP sources for UDP multicast transports */
161   GList *transport_sources;
162
163   gint dscp_qos;
164
165   /* stream blocking */
166   gulong blocked_id;
167   gboolean blocking;
168
169   /* pt->caps map for RECORD streams */
170   GHashTable *ptmap;
171 };
172
173 #define DEFAULT_CONTROL         NULL
174 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
175 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
176                                         GST_RTSP_LOWER_TRANS_TCP
177
178 enum
179 {
180   PROP_0,
181   PROP_CONTROL,
182   PROP_PROFILES,
183   PROP_PROTOCOLS,
184   PROP_LAST
185 };
186
187 enum
188 {
189   SIGNAL_NEW_RTP_ENCODER,
190   SIGNAL_NEW_RTCP_ENCODER,
191   SIGNAL_LAST
192 };
193
194 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
195 #define GST_CAT_DEFAULT rtsp_stream_debug
196
197 static GQuark ssrc_stream_map_key;
198
199 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
200     GValue * value, GParamSpec * pspec);
201 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
202     const GValue * value, GParamSpec * pspec);
203
204 static void gst_rtsp_stream_finalize (GObject * obj);
205
206 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
207
208 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
209
210 static void
211 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
212 {
213   GObjectClass *gobject_class;
214
215   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
216
217   gobject_class = G_OBJECT_CLASS (klass);
218
219   gobject_class->get_property = gst_rtsp_stream_get_property;
220   gobject_class->set_property = gst_rtsp_stream_set_property;
221   gobject_class->finalize = gst_rtsp_stream_finalize;
222
223   g_object_class_install_property (gobject_class, PROP_CONTROL,
224       g_param_spec_string ("control", "Control",
225           "The control string for this stream", DEFAULT_CONTROL,
226           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   g_object_class_install_property (gobject_class, PROP_PROFILES,
229       g_param_spec_flags ("profiles", "Profiles",
230           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
231           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232
233   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
234       g_param_spec_flags ("protocols", "Protocols",
235           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
236           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
239       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
241       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
242
243   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
244       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
245       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
246       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
247
248   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
249
250   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
251 }
252
253 static void
254 gst_rtsp_stream_init (GstRTSPStream * stream)
255 {
256   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
257
258   GST_DEBUG ("new stream %p", stream);
259
260   stream->priv = priv;
261
262   priv->dscp_qos = -1;
263   priv->control = g_strdup (DEFAULT_CONTROL);
264   priv->profiles = DEFAULT_PROFILES;
265   priv->protocols = DEFAULT_PROTOCOLS;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   gst_object_unref (priv->payloader);
303   if (priv->srcpad)
304     gst_object_unref (priv->srcpad);
305   if (priv->sinkpad)
306     gst_object_unref (priv->sinkpad);
307   g_free (priv->control);
308   g_mutex_clear (&priv->lock);
309
310   g_hash_table_unref (priv->keys);
311   g_hash_table_destroy (priv->ptmap);
312
313   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
314 }
315
316 static void
317 gst_rtsp_stream_get_property (GObject * object, guint propid,
318     GValue * value, GParamSpec * pspec)
319 {
320   GstRTSPStream *stream = GST_RTSP_STREAM (object);
321
322   switch (propid) {
323     case PROP_CONTROL:
324       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
325       break;
326     case PROP_PROFILES:
327       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
328       break;
329     case PROP_PROTOCOLS:
330       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
331       break;
332     default:
333       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
334   }
335 }
336
337 static void
338 gst_rtsp_stream_set_property (GObject * object, guint propid,
339     const GValue * value, GParamSpec * pspec)
340 {
341   GstRTSPStream *stream = GST_RTSP_STREAM (object);
342
343   switch (propid) {
344     case PROP_CONTROL:
345       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
346       break;
347     case PROP_PROFILES:
348       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
349       break;
350     case PROP_PROTOCOLS:
351       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
352       break;
353     default:
354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
355   }
356 }
357
358 /**
359  * gst_rtsp_stream_new:
360  * @idx: an index
361  * @pad: a #GstPad
362  * @payloader: a #GstElement
363  *
364  * Create a new media stream with index @idx that handles RTP data on
365  * @pad and has a payloader element @payloader if @pad is a source pad
366  * or a depayloader element @payloader if @pad is a sink pad.
367  *
368  * Returns: (transfer full): a new #GstRTSPStream
369  */
370 GstRTSPStream *
371 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
372 {
373   GstRTSPStreamPrivate *priv;
374   GstRTSPStream *stream;
375
376   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
377   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
378
379   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
380   priv = stream->priv;
381   priv->idx = idx;
382   priv->payloader = gst_object_ref (payloader);
383   if (GST_PAD_IS_SRC (pad))
384     priv->srcpad = gst_object_ref (pad);
385   else
386     priv->sinkpad = gst_object_ref (pad);
387
388   return stream;
389 }
390
391 /**
392  * gst_rtsp_stream_get_index:
393  * @stream: a #GstRTSPStream
394  *
395  * Get the stream index.
396  *
397  * Return: the stream index.
398  */
399 guint
400 gst_rtsp_stream_get_index (GstRTSPStream * stream)
401 {
402   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
403
404   return stream->priv->idx;
405 }
406
407 /**
408  * gst_rtsp_stream_get_pt:
409  * @stream: a #GstRTSPStream
410  *
411  * Get the stream payload type.
412  *
413  * Return: the stream payload type.
414  */
415 guint
416 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
417 {
418   GstRTSPStreamPrivate *priv;
419   guint pt;
420
421   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
422
423   priv = stream->priv;
424
425   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
426
427   return pt;
428 }
429
430 /**
431  * gst_rtsp_stream_get_srcpad:
432  * @stream: a #GstRTSPStream
433  *
434  * Get the srcpad associated with @stream.
435  *
436  * Returns: (transfer full): the srcpad. Unref after usage.
437  */
438 GstPad *
439 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
440 {
441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
442
443   if (!stream->priv->srcpad)
444     return NULL;
445
446   return gst_object_ref (stream->priv->srcpad);
447 }
448
449 /**
450  * gst_rtsp_stream_get_sinkpad:
451  * @stream: a #GstRTSPStream
452  *
453  * Get the sinkpad associated with @stream.
454  *
455  * Returns: (transfer full): the sinkpad. Unref after usage.
456  */
457 GstPad *
458 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
459 {
460   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
461
462   if (!stream->priv->sinkpad)
463     return NULL;
464
465   return gst_object_ref (stream->priv->sinkpad);
466 }
467
468 /**
469  * gst_rtsp_stream_get_control:
470  * @stream: a #GstRTSPStream
471  *
472  * Get the control string to identify this stream.
473  *
474  * Returns: (transfer full): the control string. g_free() after usage.
475  */
476 gchar *
477 gst_rtsp_stream_get_control (GstRTSPStream * stream)
478 {
479   GstRTSPStreamPrivate *priv;
480   gchar *result;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if ((result = g_strdup (priv->control)) == NULL)
488     result = g_strdup_printf ("stream=%u", priv->idx);
489   g_mutex_unlock (&priv->lock);
490
491   return result;
492 }
493
494 /**
495  * gst_rtsp_stream_set_control:
496  * @stream: a #GstRTSPStream
497  * @control: a control string
498  *
499  * Set the control string in @stream.
500  */
501 void
502 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   g_mutex_lock (&priv->lock);
511   g_free (priv->control);
512   priv->control = g_strdup (control);
513   g_mutex_unlock (&priv->lock);
514 }
515
516 /**
517  * gst_rtsp_stream_has_control:
518  * @stream: a #GstRTSPStream
519  * @control: a control string
520  *
521  * Check if @stream has the control string @control.
522  *
523  * Returns: %TRUE is @stream has @control as the control string
524  */
525 gboolean
526 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
527 {
528   GstRTSPStreamPrivate *priv;
529   gboolean res;
530
531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
532
533   priv = stream->priv;
534
535   g_mutex_lock (&priv->lock);
536   if (priv->control)
537     res = (g_strcmp0 (priv->control, control) == 0);
538   else {
539     guint streamid;
540
541     if (sscanf (control, "stream=%u", &streamid) > 0)
542       res = (streamid == priv->idx);
543     else
544       res = FALSE;
545   }
546   g_mutex_unlock (&priv->lock);
547
548   return res;
549 }
550
551 /**
552  * gst_rtsp_stream_set_mtu:
553  * @stream: a #GstRTSPStream
554  * @mtu: a new MTU
555  *
556  * Configure the mtu in the payloader of @stream to @mtu.
557  */
558 void
559 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
560 {
561   GstRTSPStreamPrivate *priv;
562
563   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
564
565   priv = stream->priv;
566
567   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
568
569   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
570 }
571
572 /**
573  * gst_rtsp_stream_get_mtu:
574  * @stream: a #GstRTSPStream
575  *
576  * Get the configured MTU in the payloader of @stream.
577  *
578  * Returns: the MTU of the payloader.
579  */
580 guint
581 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
582 {
583   GstRTSPStreamPrivate *priv;
584   guint mtu;
585
586   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
587
588   priv = stream->priv;
589
590   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
591
592   return mtu;
593 }
594
595 /* Update the dscp qos property on the udp sinks */
596 static void
597 update_dscp_qos (GstRTSPStream * stream)
598 {
599   GstRTSPStreamPrivate *priv;
600
601   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
602
603   priv = stream->priv;
604
605   if (priv->udpsink[0]) {
606     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
607         NULL);
608   }
609
610   if (priv->udpsink[1]) {
611     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
612         NULL);
613   }
614 }
615
616 /**
617  * gst_rtsp_stream_set_dscp_qos:
618  * @stream: a #GstRTSPStream
619  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
620  *
621  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
622  */
623 void
624 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
625 {
626   GstRTSPStreamPrivate *priv;
627
628   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
629
630   priv = stream->priv;
631
632   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
633
634   if (dscp_qos < -1 || dscp_qos > 63) {
635     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
636     return;
637   }
638
639   priv->dscp_qos = dscp_qos;
640
641   update_dscp_qos (stream);
642 }
643
644 /**
645  * gst_rtsp_stream_get_dscp_qos:
646  * @stream: a #GstRTSPStream
647  *
648  * Get the configured DSCP QoS in of the outgoing sockets.
649  *
650  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
651  */
652 gint
653 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
654 {
655   GstRTSPStreamPrivate *priv;
656
657   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
658
659   priv = stream->priv;
660
661   return priv->dscp_qos;
662 }
663
664 /**
665  * gst_rtsp_stream_is_transport_supported:
666  * @stream: a #GstRTSPStream
667  * @transport: (transfer none): a #GstRTSPTransport
668  *
669  * Check if @transport can be handled by stream
670  *
671  * Returns: %TRUE if @transport can be handled by @stream.
672  */
673 gboolean
674 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
675     GstRTSPTransport * transport)
676 {
677   GstRTSPStreamPrivate *priv;
678
679   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
680
681   priv = stream->priv;
682
683   g_mutex_lock (&priv->lock);
684   if (transport->trans != GST_RTSP_TRANS_RTP)
685     goto unsupported_transmode;
686
687   if (!(transport->profile & priv->profiles))
688     goto unsupported_profile;
689
690   if (!(transport->lower_transport & priv->protocols))
691     goto unsupported_ltrans;
692
693   g_mutex_unlock (&priv->lock);
694
695   return TRUE;
696
697   /* ERRORS */
698 unsupported_transmode:
699   {
700     GST_DEBUG ("unsupported transport mode %d", transport->trans);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_profile:
705   {
706     GST_DEBUG ("unsupported profile %d", transport->profile);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 unsupported_ltrans:
711   {
712     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
713     g_mutex_unlock (&priv->lock);
714     return FALSE;
715   }
716 }
717
718 /**
719  * gst_rtsp_stream_set_profiles:
720  * @stream: a #GstRTSPStream
721  * @profiles: the new profiles
722  *
723  * Configure the allowed profiles for @stream.
724  */
725 void
726 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
727 {
728   GstRTSPStreamPrivate *priv;
729
730   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
731
732   priv = stream->priv;
733
734   g_mutex_lock (&priv->lock);
735   priv->profiles = profiles;
736   g_mutex_unlock (&priv->lock);
737 }
738
739 /**
740  * gst_rtsp_stream_get_profiles:
741  * @stream: a #GstRTSPStream
742  *
743  * Get the allowed profiles of @stream.
744  *
745  * Returns: a #GstRTSPProfile
746  */
747 GstRTSPProfile
748 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
749 {
750   GstRTSPStreamPrivate *priv;
751   GstRTSPProfile res;
752
753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
754
755   priv = stream->priv;
756
757   g_mutex_lock (&priv->lock);
758   res = priv->profiles;
759   g_mutex_unlock (&priv->lock);
760
761   return res;
762 }
763
764 /**
765  * gst_rtsp_stream_set_protocols:
766  * @stream: a #GstRTSPStream
767  * @protocols: the new flags
768  *
769  * Configure the allowed lower transport for @stream.
770  */
771 void
772 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
773     GstRTSPLowerTrans protocols)
774 {
775   GstRTSPStreamPrivate *priv;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   g_mutex_lock (&priv->lock);
782   priv->protocols = protocols;
783   g_mutex_unlock (&priv->lock);
784 }
785
786 /**
787  * gst_rtsp_stream_get_protocols:
788  * @stream: a #GstRTSPStream
789  *
790  * Get the allowed protocols of @stream.
791  *
792  * Returns: a #GstRTSPLowerTrans
793  */
794 GstRTSPLowerTrans
795 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
796 {
797   GstRTSPStreamPrivate *priv;
798   GstRTSPLowerTrans res;
799
800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
801       GST_RTSP_LOWER_TRANS_UNKNOWN);
802
803   priv = stream->priv;
804
805   g_mutex_lock (&priv->lock);
806   res = priv->protocols;
807   g_mutex_unlock (&priv->lock);
808
809   return res;
810 }
811
812 /**
813  * gst_rtsp_stream_set_address_pool:
814  * @stream: a #GstRTSPStream
815  * @pool: (transfer none): a #GstRTSPAddressPool
816  *
817  * configure @pool to be used as the address pool of @stream.
818  */
819 void
820 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
821     GstRTSPAddressPool * pool)
822 {
823   GstRTSPStreamPrivate *priv;
824   GstRTSPAddressPool *old;
825
826   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
827
828   priv = stream->priv;
829
830   GST_LOG_OBJECT (stream, "set address pool %p", pool);
831
832   g_mutex_lock (&priv->lock);
833   if ((old = priv->pool) != pool)
834     priv->pool = pool ? g_object_ref (pool) : NULL;
835   else
836     old = NULL;
837   g_mutex_unlock (&priv->lock);
838
839   if (old)
840     g_object_unref (old);
841 }
842
843 /**
844  * gst_rtsp_stream_get_address_pool:
845  * @stream: a #GstRTSPStream
846  *
847  * Get the #GstRTSPAddressPool used as the address pool of @stream.
848  *
849  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
850  * usage.
851  */
852 GstRTSPAddressPool *
853 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
854 {
855   GstRTSPStreamPrivate *priv;
856   GstRTSPAddressPool *result;
857
858   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
859
860   priv = stream->priv;
861
862   g_mutex_lock (&priv->lock);
863   if ((result = priv->pool))
864     g_object_ref (result);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868 }
869
870 /**
871  * gst_rtsp_stream_get_multicast_address:
872  * @stream: a #GstRTSPStream
873  * @family: the #GSocketFamily
874  *
875  * Get the multicast address of @stream for @family.
876  *
877  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
878  * or %NULL when no address could be allocated. gst_rtsp_address_free()
879  * after usage.
880  */
881 GstRTSPAddress *
882 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
883     GSocketFamily family)
884 {
885   GstRTSPStreamPrivate *priv;
886   GstRTSPAddress *result;
887   GstRTSPAddress **addrp;
888   GstRTSPAddressFlags flags;
889
890   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
891
892   priv = stream->priv;
893
894   if (family == G_SOCKET_FAMILY_IPV6) {
895     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
896     addrp = &priv->addr_v6;
897   } else {
898     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
899     addrp = &priv->addr_v4;
900   }
901
902   g_mutex_lock (&priv->lock);
903   if (*addrp == NULL) {
904     if (priv->pool == NULL)
905       goto no_pool;
906
907     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
908
909     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
910     if (*addrp == NULL)
911       goto no_address;
912   }
913   result = gst_rtsp_address_copy (*addrp);
914   g_mutex_unlock (&priv->lock);
915
916   return result;
917
918   /* ERRORS */
919 no_pool:
920   {
921     GST_ERROR_OBJECT (stream, "no address pool specified");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 no_address:
926   {
927     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
928     g_mutex_unlock (&priv->lock);
929     return NULL;
930   }
931 }
932
933 /**
934  * gst_rtsp_stream_reserve_address:
935  * @stream: a #GstRTSPStream
936  * @address: an address
937  * @port: a port
938  * @n_ports: n_ports
939  * @ttl: a TTL
940  *
941  * Reserve @address and @port as the address and port of @stream.
942  *
943  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
944  * the address could be reserved. gst_rtsp_address_free() after usage.
945  */
946 GstRTSPAddress *
947 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
948     const gchar * address, guint port, guint n_ports, guint ttl)
949 {
950   GstRTSPStreamPrivate *priv;
951   GstRTSPAddress *result;
952   GInetAddress *addr;
953   GSocketFamily family;
954   GstRTSPAddress **addrp;
955
956   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
957   g_return_val_if_fail (address != NULL, NULL);
958   g_return_val_if_fail (port > 0, NULL);
959   g_return_val_if_fail (n_ports > 0, NULL);
960   g_return_val_if_fail (ttl > 0, NULL);
961
962   priv = stream->priv;
963
964   addr = g_inet_address_new_from_string (address);
965   if (!addr) {
966     GST_ERROR ("failed to get inet addr from %s", address);
967     family = G_SOCKET_FAMILY_IPV4;
968   } else {
969     family = g_inet_address_get_family (addr);
970     g_object_unref (addr);
971   }
972
973   if (family == G_SOCKET_FAMILY_IPV6)
974     addrp = &priv->addr_v6;
975   else
976     addrp = &priv->addr_v4;
977
978   g_mutex_lock (&priv->lock);
979   if (*addrp == NULL) {
980     GstRTSPAddressPoolResult res;
981
982     if (priv->pool == NULL)
983       goto no_pool;
984
985     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
986         port, n_ports, ttl, addrp);
987     if (res != GST_RTSP_ADDRESS_POOL_OK)
988       goto no_address;
989   } else {
990     if (strcmp ((*addrp)->address, address) ||
991         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
992         (*addrp)->ttl != ttl)
993       goto different_address;
994   }
995   result = gst_rtsp_address_copy (*addrp);
996   g_mutex_unlock (&priv->lock);
997
998   return result;
999
1000   /* ERRORS */
1001 no_pool:
1002   {
1003     GST_ERROR_OBJECT (stream, "no address pool specified");
1004     g_mutex_unlock (&priv->lock);
1005     return NULL;
1006   }
1007 no_address:
1008   {
1009     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1010         address);
1011     g_mutex_unlock (&priv->lock);
1012     return NULL;
1013   }
1014 different_address:
1015   {
1016     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1017         " reserved", address);
1018     g_mutex_unlock (&priv->lock);
1019     return NULL;
1020   }
1021 }
1022
1023 /* must be called with lock */
1024 static void
1025 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1026     GSocket * rtcp_socket, GSocketFamily family)
1027 {
1028   GstRTSPStreamPrivate *priv = stream->priv;
1029   const gchar *multisink_socket;
1030
1031   if (family == G_SOCKET_FAMILY_IPV6)
1032     multisink_socket = "socket-v6";
1033   else
1034     multisink_socket = "socket";
1035
1036   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1037       NULL);
1038   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1039       NULL);
1040 }
1041
1042 /* must be called with lock */
1043 static gboolean
1044 create_and_configure_udpsinks (GstRTSPStream * stream)
1045 {
1046   GstRTSPStreamPrivate *priv = stream->priv;
1047   GstElement *udpsink0, *udpsink1;
1048
1049   udpsink0 = NULL;
1050   udpsink1 = NULL;
1051
1052   if (priv->udpsink[0])
1053     udpsink0 = priv->udpsink[0];
1054   else
1055     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1056
1057   if (!udpsink0)
1058     goto no_udp_protocol;
1059
1060   if (priv->udpsink[1])
1061     udpsink1 = priv->udpsink[1];
1062   else
1063     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1064
1065   if (!udpsink1)
1066     goto no_udp_protocol;
1067
1068   /* configure sinks */
1069
1070   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1071   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1072
1073   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1074   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1075
1076   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1077
1078   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1079   /* Needs to be async for RECORD streams, otherwise we will never go to
1080    * PLAYING because the sinks will wait for data while the udpsrc can't
1081    * provide data with timestamps in PAUSED. */
1082   if (priv->sinkpad)
1083     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1085
1086   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1087   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1088
1089   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1090   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1091
1092   /* update the dscp qos field in the sinks */
1093   update_dscp_qos (stream);
1094
1095   priv->udpsink[0] = udpsink0;
1096   priv->udpsink[1] = udpsink1;
1097
1098   return TRUE;
1099
1100   /* ERRORS */
1101 no_udp_protocol:
1102   {
1103     return FALSE;
1104   }
1105 }
1106
1107 static gboolean
1108 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1109     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1110     GstRTSPAddress ** server_addr_out)
1111 {
1112   GstRTSPStreamPrivate *priv = stream->priv;
1113   GstStateChangeReturn ret;
1114   GstElement *udpsrc0, *udpsrc1;
1115   GSocket *rtp_socket = NULL;
1116   GSocket *rtcp_socket;
1117   gint tmp_rtp, tmp_rtcp;
1118   guint count;
1119   gint rtpport, rtcpport;
1120   GList *rejected_addresses = NULL;
1121   GstRTSPAddress *addr = NULL;
1122   GInetAddress *inetaddr = NULL;
1123   GSocketAddress *rtp_sockaddr = NULL;
1124   GSocketAddress *rtcp_sockaddr = NULL;
1125   GstRTSPAddressPool * pool;
1126
1127   pool = priv->pool;
1128   udpsrc0 = NULL;
1129   udpsrc1 = NULL;
1130   count = 0;
1131
1132   /* Start with random port */
1133   tmp_rtp = 0;
1134
1135   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1136       G_SOCKET_PROTOCOL_UDP, NULL);
1137   if (!rtcp_socket)
1138     goto no_udp_protocol;
1139
1140   if (*server_addr_out)
1141     gst_rtsp_address_free (*server_addr_out);
1142
1143   /* try to allocate 2 UDP ports, the RTP port should be an even
1144    * number and the RTCP port should be the next (uneven) port */
1145 again:
1146
1147   if (rtp_socket == NULL) {
1148     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1149         G_SOCKET_PROTOCOL_UDP, NULL);
1150     if (!rtp_socket)
1151       goto no_udp_protocol;
1152   }
1153
1154   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1155     GstRTSPAddressFlags flags;
1156
1157     if (addr)
1158       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1159
1160     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1161     if (family == G_SOCKET_FAMILY_IPV6)
1162       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1163     else
1164       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1165
1166     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1167
1168     if (addr == NULL)
1169       goto no_ports;
1170
1171     tmp_rtp = addr->port;
1172
1173     g_clear_object (&inetaddr);
1174     inetaddr = g_inet_address_new_from_string (addr->address);
1175   } else {
1176     if (tmp_rtp != 0) {
1177       tmp_rtp += 2;
1178       if (++count > 20)
1179         goto no_ports;
1180     }
1181
1182     if (inetaddr == NULL)
1183       inetaddr = g_inet_address_new_any (family);
1184   }
1185
1186   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1187   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1188     g_object_unref (rtp_sockaddr);
1189     goto again;
1190   }
1191   g_object_unref (rtp_sockaddr);
1192
1193   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1194   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1195     g_clear_object (&rtp_sockaddr);
1196     goto socket_error;
1197   }
1198
1199   tmp_rtp =
1200       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1201   g_object_unref (rtp_sockaddr);
1202
1203   /* check if port is even */
1204   if ((tmp_rtp & 1) != 0) {
1205     /* port not even, close and allocate another */
1206     tmp_rtp++;
1207     g_clear_object (&rtp_socket);
1208     goto again;
1209   }
1210
1211   /* set port */
1212   tmp_rtcp = tmp_rtp + 1;
1213
1214   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1215   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1216     g_object_unref (rtcp_sockaddr);
1217     g_clear_object (&rtp_socket);
1218     goto again;
1219   }
1220   g_object_unref (rtcp_sockaddr);
1221
1222   g_clear_object (&inetaddr);
1223
1224   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1225   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1226
1227   if (udpsrc0 == NULL || udpsrc1 == NULL)
1228     goto no_udp_protocol;
1229
1230   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1231   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1232
1233   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1234   if (ret == GST_STATE_CHANGE_FAILURE)
1235     goto element_error;
1236   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1237   if (ret == GST_STATE_CHANGE_FAILURE)
1238     goto element_error;
1239
1240   /* all fine, do port check */
1241   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1242   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1243
1244   /* this should not happen... */
1245   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1246     goto port_error;
1247
1248   if (!create_and_configure_udpsinks (stream))
1249     goto no_udp_protocol;
1250
1251   /* set RTP and RTCP sockets */
1252   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1253
1254   /* we keep these elements, we will further configure them when the
1255    * client told us to really use the UDP ports. */
1256   udpsrc_out[0] = udpsrc0;
1257   udpsrc_out[1] = udpsrc1;
1258
1259   server_port_out->min = rtpport;
1260   server_port_out->max = rtcpport;
1261
1262   *server_addr_out = addr;
1263   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1264
1265   g_object_unref (rtp_socket);
1266   g_object_unref (rtcp_socket);
1267
1268   return TRUE;
1269
1270   /* ERRORS */
1271 no_udp_protocol:
1272   {
1273     goto cleanup;
1274   }
1275 no_ports:
1276   {
1277     goto cleanup;
1278   }
1279 port_error:
1280   {
1281     goto cleanup;
1282   }
1283 socket_error:
1284   {
1285     goto cleanup;
1286   }
1287 element_error:
1288   {
1289     goto cleanup;
1290   }
1291 cleanup:
1292   {
1293     if (udpsrc0) {
1294       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1295       gst_object_unref (udpsrc0);
1296     }
1297     if (udpsrc1) {
1298       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1299       gst_object_unref (udpsrc1);
1300     }
1301     if (inetaddr)
1302       g_object_unref (inetaddr);
1303     g_list_free_full (rejected_addresses,
1304         (GDestroyNotify) gst_rtsp_address_free);
1305     if (addr)
1306       gst_rtsp_address_free (addr);
1307     if (rtp_socket)
1308       g_object_unref (rtp_socket);
1309     if (rtcp_socket)
1310       g_object_unref (rtcp_socket);
1311     return FALSE;
1312   }
1313 }
1314
1315 /* must be called with lock */
1316 static gboolean
1317 alloc_ports (GstRTSPStream * stream)
1318 {
1319   GstRTSPStreamPrivate *priv = stream->priv;
1320
1321   priv->have_ipv4 =
1322       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1323           &priv->server_port_v4, &priv->server_addr_v4);
1324
1325   priv->have_ipv6 =
1326       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1327           &priv->server_port_v6, &priv->server_addr_v6);
1328
1329   return priv->have_ipv4 || priv->have_ipv6;
1330 }
1331
1332 /**
1333  * gst_rtsp_stream_set_client_side:
1334  * @stream: a #GstRTSPStream
1335  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1336  * an RTSP connection.
1337  *
1338  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1339  * streams to an RTSP server via RECORD. This has the practical effect
1340  * of changing which UDP port numbers are used when setting up the local
1341  * side of the stream sending to be either the 'server' or 'client' pair
1342  * of a configured UDP transport.
1343  */
1344 void
1345 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1346 {
1347   GstRTSPStreamPrivate *priv;
1348
1349   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1350   priv = stream->priv;
1351   g_mutex_lock (&priv->lock);
1352   priv->client_side = client_side;
1353   g_mutex_unlock (&priv->lock);
1354 }
1355
1356 /**
1357  * gst_rtsp_stream_set_client_side:
1358  * @stream: a #GstRTSPStream
1359  *
1360  * See gst_rtsp_stream_set_client_side()
1361  *
1362  * Returns: TRUE if this #GstRTSPStream is client-side.
1363  */
1364 gboolean
1365 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1366 {
1367   GstRTSPStreamPrivate *priv;
1368   gboolean ret;
1369
1370   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1371
1372   priv = stream->priv;
1373   g_mutex_lock (&priv->lock);
1374   ret = priv->client_side;
1375   g_mutex_unlock (&priv->lock);
1376
1377   return ret;
1378 }
1379
1380 /**
1381  * gst_rtsp_stream_get_server_port:
1382  * @stream: a #GstRTSPStream
1383  * @server_port: (out): result server port
1384  * @family: the port family to get
1385  *
1386  * Fill @server_port with the port pair used by the server. This function can
1387  * only be called when @stream has been joined.
1388  */
1389 void
1390 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1391     GstRTSPRange * server_port, GSocketFamily family)
1392 {
1393   GstRTSPStreamPrivate *priv;
1394
1395   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1396   priv = stream->priv;
1397   g_return_if_fail (priv->is_joined);
1398
1399   g_mutex_lock (&priv->lock);
1400   if (family == G_SOCKET_FAMILY_IPV4) {
1401     if (server_port)
1402       *server_port = priv->server_port_v4;
1403   } else {
1404     if (server_port)
1405       *server_port = priv->server_port_v6;
1406   }
1407   g_mutex_unlock (&priv->lock);
1408 }
1409
1410 /**
1411  * gst_rtsp_stream_get_rtpsession:
1412  * @stream: a #GstRTSPStream
1413  *
1414  * Get the RTP session of this stream.
1415  *
1416  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1417  */
1418 GObject *
1419 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1420 {
1421   GstRTSPStreamPrivate *priv;
1422   GObject *session;
1423
1424   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1425
1426   priv = stream->priv;
1427
1428   g_mutex_lock (&priv->lock);
1429   if ((session = priv->session))
1430     g_object_ref (session);
1431   g_mutex_unlock (&priv->lock);
1432
1433   return session;
1434 }
1435
1436 /**
1437  * gst_rtsp_stream_get_ssrc:
1438  * @stream: a #GstRTSPStream
1439  * @ssrc: (out): result ssrc
1440  *
1441  * Get the SSRC used by the RTP session of this stream. This function can only
1442  * be called when @stream has been joined.
1443  */
1444 void
1445 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1446 {
1447   GstRTSPStreamPrivate *priv;
1448
1449   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1450   priv = stream->priv;
1451   g_return_if_fail (priv->is_joined);
1452
1453   g_mutex_lock (&priv->lock);
1454   if (ssrc && priv->session)
1455     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1456   g_mutex_unlock (&priv->lock);
1457 }
1458
1459 /**
1460  * gst_rtsp_stream_set_retransmission_time:
1461  * @stream: a #GstRTSPStream
1462  * @time: a #GstClockTime
1463  *
1464  * Set the amount of time to store retransmission packets.
1465  */
1466 void
1467 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1468     GstClockTime time)
1469 {
1470   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1471
1472   g_mutex_lock (&stream->priv->lock);
1473   stream->priv->rtx_time = time;
1474   if (stream->priv->rtxsend)
1475     g_object_set (stream->priv->rtxsend, "max-size-time",
1476         GST_TIME_AS_MSECONDS (time), NULL);
1477   g_mutex_unlock (&stream->priv->lock);
1478 }
1479
1480 /**
1481  * gst_rtsp_stream_get_retransmission_time:
1482  * @stream: a #GstRTSPStream
1483  *
1484  * Get the amount of time to store retransmission data.
1485  *
1486  * Returns: the amount of time to store retransmission data.
1487  */
1488 GstClockTime
1489 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1490 {
1491   GstClockTime ret;
1492
1493   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1494
1495   g_mutex_lock (&stream->priv->lock);
1496   ret = stream->priv->rtx_time;
1497   g_mutex_unlock (&stream->priv->lock);
1498
1499   return ret;
1500 }
1501
1502 /**
1503  * gst_rtsp_stream_set_retransmission_pt:
1504  * @stream: a #GstRTSPStream
1505  * @rtx_pt: a #guint
1506  *
1507  * Set the payload type (pt) for retransmission of this stream.
1508  */
1509 void
1510 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1511 {
1512   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1513
1514   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1515
1516   g_mutex_lock (&stream->priv->lock);
1517   stream->priv->rtx_pt = rtx_pt;
1518   if (stream->priv->rtxsend) {
1519     guint pt = gst_rtsp_stream_get_pt (stream);
1520     gchar *pt_s = g_strdup_printf ("%d", pt);
1521     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1522         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1523     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1524     g_free (pt_s);
1525     gst_structure_free (rtx_pt_map);
1526   }
1527   g_mutex_unlock (&stream->priv->lock);
1528 }
1529
1530 /**
1531  * gst_rtsp_stream_get_retransmission_pt:
1532  * @stream: a #GstRTSPStream
1533  *
1534  * Get the payload-type used for retransmission of this stream
1535  *
1536  * Returns: The retransmission PT.
1537  */
1538 guint
1539 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1540 {
1541   guint rtx_pt;
1542
1543   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1544
1545   g_mutex_lock (&stream->priv->lock);
1546   rtx_pt = stream->priv->rtx_pt;
1547   g_mutex_unlock (&stream->priv->lock);
1548
1549   return rtx_pt;
1550 }
1551
1552 /**
1553  * gst_rtsp_stream_set_buffer_size:
1554  * @stream: a #GstRTSPStream
1555  * @size: the buffer size
1556  *
1557  * Set the size of the UDP transmission buffer (in bytes)
1558  * Needs to be set before the stream is joined to a bin.
1559  *
1560  * Since: 1.6
1561  */
1562 void
1563 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1564 {
1565   g_mutex_lock (&stream->priv->lock);
1566   stream->priv->buffer_size = size;
1567   g_mutex_unlock (&stream->priv->lock);
1568 }
1569
1570 /**
1571  * gst_rtsp_stream_get_buffer_size:
1572  * @stream: a #GstRTSPStream
1573  *
1574  * Get the size of the UDP transmission buffer (in bytes)
1575  *
1576  * Returns: the size of the UDP TX buffer
1577  *
1578  * Since: 1.6
1579  */
1580 guint
1581 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1582 {
1583   guint buffer_size;
1584
1585   g_mutex_lock (&stream->priv->lock);
1586   buffer_size = stream->priv->buffer_size;
1587   g_mutex_unlock (&stream->priv->lock);
1588
1589   return buffer_size;
1590 }
1591
1592 /* executed from streaming thread */
1593 static void
1594 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1595 {
1596   GstRTSPStreamPrivate *priv = stream->priv;
1597   GstCaps *newcaps, *oldcaps;
1598
1599   newcaps = gst_pad_get_current_caps (pad);
1600
1601   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1602       newcaps);
1603
1604   g_mutex_lock (&priv->lock);
1605   oldcaps = priv->caps;
1606   priv->caps = newcaps;
1607   g_mutex_unlock (&priv->lock);
1608
1609   if (oldcaps)
1610     gst_caps_unref (oldcaps);
1611 }
1612
1613 static void
1614 dump_structure (const GstStructure * s)
1615 {
1616   gchar *sstr;
1617
1618   sstr = gst_structure_to_string (s);
1619   GST_INFO ("structure: %s", sstr);
1620   g_free (sstr);
1621 }
1622
1623 static GstRTSPStreamTransport *
1624 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1625 {
1626   GstRTSPStreamPrivate *priv = stream->priv;
1627   GList *walk;
1628   GstRTSPStreamTransport *result = NULL;
1629   const gchar *tmp;
1630   gchar *dest;
1631   guint port;
1632
1633   if (rtcp_from == NULL)
1634     return NULL;
1635
1636   tmp = g_strrstr (rtcp_from, ":");
1637   if (tmp == NULL)
1638     return NULL;
1639
1640   port = atoi (tmp + 1);
1641   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1642
1643   g_mutex_lock (&priv->lock);
1644   GST_INFO ("finding %s:%d in %d transports", dest, port,
1645       g_list_length (priv->transports));
1646
1647   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1648     GstRTSPStreamTransport *trans = walk->data;
1649     const GstRTSPTransport *tr;
1650     gint min, max;
1651
1652     tr = gst_rtsp_stream_transport_get_transport (trans);
1653
1654     if (priv->client_side) {
1655       /* In client side mode the 'destination' is the RTSP server, so send
1656        * to those ports */
1657       min = tr->server_port.min;
1658       max = tr->server_port.max;
1659     } else {
1660       min = tr->client_port.min;
1661       max = tr->client_port.max;
1662     }
1663
1664     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1665       result = trans;
1666       break;
1667     }
1668   }
1669   if (result)
1670     g_object_ref (result);
1671   g_mutex_unlock (&priv->lock);
1672
1673   g_free (dest);
1674
1675   return result;
1676 }
1677
1678 static GstRTSPStreamTransport *
1679 check_transport (GObject * source, GstRTSPStream * stream)
1680 {
1681   GstStructure *stats;
1682   GstRTSPStreamTransport *trans;
1683
1684   /* see if we have a stream to match with the origin of the RTCP packet */
1685   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1686   if (trans == NULL) {
1687     g_object_get (source, "stats", &stats, NULL);
1688     if (stats) {
1689       const gchar *rtcp_from;
1690
1691       dump_structure (stats);
1692
1693       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1694       if ((trans = find_transport (stream, rtcp_from))) {
1695         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1696             source);
1697         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1698             g_object_unref);
1699       }
1700       gst_structure_free (stats);
1701     }
1702   }
1703   return trans;
1704 }
1705
1706
1707 static void
1708 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1709 {
1710   GstRTSPStreamTransport *trans;
1711
1712   GST_INFO ("%p: new source %p", stream, source);
1713
1714   trans = check_transport (source, stream);
1715
1716   if (trans)
1717     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1718 }
1719
1720 static void
1721 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1722 {
1723   GST_INFO ("%p: new SDES %p", stream, source);
1724 }
1725
1726 static void
1727 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1728 {
1729   GstRTSPStreamTransport *trans;
1730
1731   trans = check_transport (source, stream);
1732
1733   if (trans) {
1734     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1735     gst_rtsp_stream_transport_keep_alive (trans);
1736   }
1737 #ifdef DUMP_STATS
1738   {
1739     GstStructure *stats;
1740     g_object_get (source, "stats", &stats, NULL);
1741     if (stats) {
1742       dump_structure (stats);
1743       gst_structure_free (stats);
1744     }
1745   }
1746 #endif
1747 }
1748
1749 static void
1750 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1751 {
1752   GST_INFO ("%p: source %p bye", stream, source);
1753 }
1754
1755 static void
1756 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1757 {
1758   GstRTSPStreamTransport *trans;
1759
1760   GST_INFO ("%p: source %p bye timeout", stream, source);
1761
1762   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1763     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1764     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1765   }
1766 }
1767
1768 static void
1769 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1770 {
1771   GstRTSPStreamTransport *trans;
1772
1773   GST_INFO ("%p: source %p timeout", stream, source);
1774
1775   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1776     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1777     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1778   }
1779 }
1780
1781 static void
1782 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1783 {
1784   GST_INFO ("%p: new sender source %p", stream, source);
1785 #ifndef DUMP_STATS
1786   {
1787     GstStructure *stats;
1788     g_object_get (source, "stats", &stats, NULL);
1789     if (stats) {
1790       dump_structure (stats);
1791       gst_structure_free (stats);
1792     }
1793   }
1794 #endif
1795 }
1796
1797 static void
1798 on_sender_ssrc_active (GObject * session, GObject * source,
1799     GstRTSPStream * stream)
1800 {
1801 #ifndef DUMP_STATS
1802   {
1803     GstStructure *stats;
1804     g_object_get (source, "stats", &stats, NULL);
1805     if (stats) {
1806       dump_structure (stats);
1807       gst_structure_free (stats);
1808     }
1809   }
1810 #endif
1811 }
1812
1813 static void
1814 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1815 {
1816   if (is_rtp) {
1817     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1818     g_list_free (priv->tr_cache_rtp);
1819     priv->tr_cache_rtp = NULL;
1820   } else {
1821     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1822     g_list_free (priv->tr_cache_rtcp);
1823     priv->tr_cache_rtcp = NULL;
1824   }
1825 }
1826
1827 static GstFlowReturn
1828 handle_new_sample (GstAppSink * sink, gpointer user_data)
1829 {
1830   GstRTSPStreamPrivate *priv;
1831   GList *walk;
1832   GstSample *sample;
1833   GstBuffer *buffer;
1834   GstRTSPStream *stream;
1835   gboolean is_rtp;
1836
1837   sample = gst_app_sink_pull_sample (sink);
1838   if (!sample)
1839     return GST_FLOW_OK;
1840
1841   stream = (GstRTSPStream *) user_data;
1842   priv = stream->priv;
1843   buffer = gst_sample_get_buffer (sample);
1844
1845   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1846
1847   g_mutex_lock (&priv->lock);
1848   if (is_rtp) {
1849     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1850       clear_tr_cache (priv, is_rtp);
1851       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1852         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1853         priv->tr_cache_rtp =
1854             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1855       }
1856       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1857     }
1858   } else {
1859     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1860       clear_tr_cache (priv, is_rtp);
1861       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1862         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1863         priv->tr_cache_rtcp =
1864             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1865       }
1866       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1867     }
1868   }
1869   g_mutex_unlock (&priv->lock);
1870
1871   if (is_rtp) {
1872     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1873       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1874       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1875     }
1876   } else {
1877     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1878       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1879       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1880     }
1881   }
1882   gst_sample_unref (sample);
1883
1884   return GST_FLOW_OK;
1885 }
1886
1887 static GstAppSinkCallbacks sink_cb = {
1888   NULL,                         /* not interested in EOS */
1889   NULL,                         /* not interested in preroll samples */
1890   handle_new_sample,
1891 };
1892
1893 static GstElement *
1894 get_rtp_encoder (GstRTSPStream * stream, guint session)
1895 {
1896   GstRTSPStreamPrivate *priv = stream->priv;
1897
1898   if (priv->srtpenc == NULL) {
1899     gchar *name;
1900
1901     name = g_strdup_printf ("srtpenc_%u", session);
1902     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1903     g_free (name);
1904
1905     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1906   }
1907   return gst_object_ref (priv->srtpenc);
1908 }
1909
1910 static GstElement *
1911 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1912 {
1913   GstRTSPStreamPrivate *priv = stream->priv;
1914   GstElement *oldenc, *enc;
1915   GstPad *pad;
1916   gchar *name;
1917
1918   if (priv->idx != session)
1919     return NULL;
1920
1921   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1922
1923   oldenc = priv->srtpenc;
1924   enc = get_rtp_encoder (stream, session);
1925   name = g_strdup_printf ("rtp_sink_%d", session);
1926   pad = gst_element_get_request_pad (enc, name);
1927   g_free (name);
1928   gst_object_unref (pad);
1929
1930   if (oldenc == NULL)
1931     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1932         enc);
1933
1934   return enc;
1935 }
1936
1937 static GstElement *
1938 request_rtcp_encoder (GstElement * rtpbin, guint session,
1939     GstRTSPStream * stream)
1940 {
1941   GstRTSPStreamPrivate *priv = stream->priv;
1942   GstElement *oldenc, *enc;
1943   GstPad *pad;
1944   gchar *name;
1945
1946   if (priv->idx != session)
1947     return NULL;
1948
1949   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1950
1951   oldenc = priv->srtpenc;
1952   enc = get_rtp_encoder (stream, session);
1953   name = g_strdup_printf ("rtcp_sink_%d", session);
1954   pad = gst_element_get_request_pad (enc, name);
1955   g_free (name);
1956   gst_object_unref (pad);
1957
1958   if (oldenc == NULL)
1959     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1960         enc);
1961
1962   return enc;
1963 }
1964
1965 static GstCaps *
1966 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1967 {
1968   GstRTSPStreamPrivate *priv = stream->priv;
1969   GstCaps *caps;
1970
1971   GST_DEBUG ("request key %08x", ssrc);
1972
1973   g_mutex_lock (&priv->lock);
1974   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1975     gst_caps_ref (caps);
1976   g_mutex_unlock (&priv->lock);
1977
1978   return caps;
1979 }
1980
1981 static GstElement *
1982 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1983     GstRTSPStream * stream)
1984 {
1985   GstRTSPStreamPrivate *priv = stream->priv;
1986
1987   if (priv->idx != session)
1988     return NULL;
1989
1990   if (priv->srtpdec == NULL) {
1991     gchar *name;
1992
1993     name = g_strdup_printf ("srtpdec_%u", session);
1994     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1995     g_free (name);
1996
1997     g_signal_connect (priv->srtpdec, "request-key",
1998         (GCallback) request_key, stream);
1999   }
2000   return gst_object_ref (priv->srtpdec);
2001 }
2002
2003 /**
2004  * gst_rtsp_stream_request_aux_sender:
2005  * @stream: a #GstRTSPStream
2006  * @sessid: the session id
2007  *
2008  * Creating a rtxsend bin
2009  *
2010  * Returns: (transfer full): a #GstElement.
2011  *
2012  * Since: 1.6
2013  */
2014 GstElement *
2015 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2016 {
2017   GstElement *bin;
2018   GstPad *pad;
2019   GstStructure *pt_map;
2020   gchar *name;
2021   guint pt, rtx_pt;
2022   gchar *pt_s;
2023
2024   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2025
2026   pt = gst_rtsp_stream_get_pt (stream);
2027   pt_s = g_strdup_printf ("%u", pt);
2028   rtx_pt = stream->priv->rtx_pt;
2029
2030   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2031
2032   bin = gst_bin_new (NULL);
2033   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2034   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2035       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2036   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2037       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2038   g_free (pt_s);
2039   gst_structure_free (pt_map);
2040   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2041
2042   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2043   name = g_strdup_printf ("src_%u", sessid);
2044   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2045   g_free (name);
2046   gst_object_unref (pad);
2047
2048   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2049   name = g_strdup_printf ("sink_%u", sessid);
2050   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2051   g_free (name);
2052   gst_object_unref (pad);
2053
2054   return bin;
2055 }
2056
2057 /**
2058  * gst_rtsp_stream_set_pt_map:
2059  * @stream: a #GstRTSPStream
2060  * @pt: the pt
2061  * @caps: a #GstCaps
2062  *
2063  * Configure a pt map between @pt and @caps.
2064  */
2065 void
2066 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2067 {
2068   GstRTSPStreamPrivate *priv = stream->priv;
2069
2070   g_mutex_lock (&priv->lock);
2071   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2072   g_mutex_unlock (&priv->lock);
2073 }
2074
2075 static GstCaps *
2076 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2077     GstRTSPStream * stream)
2078 {
2079   GstRTSPStreamPrivate *priv = stream->priv;
2080   GstCaps *caps = NULL;
2081
2082   g_mutex_lock (&priv->lock);
2083
2084   if (priv->idx == session) {
2085     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2086     if (caps) {
2087       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2088       gst_caps_ref (caps);
2089     } else {
2090       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2091     }
2092   }
2093
2094   g_mutex_unlock (&priv->lock);
2095
2096   return caps;
2097 }
2098
2099 static void
2100 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2101 {
2102   GstRTSPStreamPrivate *priv = stream->priv;
2103   gchar *name;
2104   GstPadLinkReturn ret;
2105   guint sessid;
2106
2107   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2108       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2109
2110   name = gst_pad_get_name (pad);
2111   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2112     g_free (name);
2113     return;
2114   }
2115   g_free (name);
2116
2117   if (priv->idx != sessid)
2118     return;
2119
2120   if (gst_pad_is_linked (priv->sinkpad)) {
2121     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2122         GST_DEBUG_PAD_NAME (priv->sinkpad));
2123     return;
2124   }
2125
2126   /* link the RTP pad to the session manager, it should not really fail unless
2127    * this is not really an RTP pad */
2128   ret = gst_pad_link (pad, priv->sinkpad);
2129   if (ret != GST_PAD_LINK_OK)
2130     goto link_failed;
2131   priv->recv_rtp_src = gst_object_ref (pad);
2132
2133   return;
2134
2135 /* ERRORS */
2136 link_failed:
2137   {
2138     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2139         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2140   }
2141 }
2142
2143 static void
2144 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2145     GstRTSPStream * stream)
2146 {
2147   /* TODO: What to do here other than this? */
2148   GST_DEBUG ("Stream %p: Got EOS", stream);
2149   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2150 }
2151
2152 /* must be called with lock */
2153 static void
2154 create_sender_part (GstRTSPStream * stream, GstBin * bin,
2155     GstState state)
2156 {
2157   GstRTSPStreamPrivate *priv;
2158   GstPad *pad, *sinkpad = NULL;
2159   gboolean is_tcp = FALSE, is_udp = FALSE;
2160   gint i;
2161
2162   priv = stream->priv;
2163
2164   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2165   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2166       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2167
2168   for (i = 0; i < 2; i++) {
2169     GstPad *teepad, *queuepad;
2170     /* For the sender we create this bit of pipeline for both
2171      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2172      * we need to add a queue before appsink and udpsink to make
2173      * the pipeline not block. For the TCP case, we want to pump
2174      * client as fast as possible anyway. This pipeline is used
2175      * when both TCP and UDP are present.
2176      *
2177      * .--------.      .-----.    .---------.    .---------.
2178      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2179      * |       send->sink   src->sink      src->sink       |
2180      * '--------'      |     |    '---------'    '---------'
2181      *                 |     |    .---------.    .---------.
2182      *                 |     |    |  queue  |    | appsink |
2183      *                 |    src->sink      src->sink       |
2184      *                 '-----'    '---------'    '---------'
2185      *
2186      * When only UDP or only TCP is allowed, we skip the tee and queue
2187      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2188      * the session.
2189      */
2190     /* Only link the RTP send src if we're going to send RTP, link
2191      * the RTCP send src always */
2192     if (priv->srcpad || i == 1) {
2193       if (is_udp) {
2194         /* add udpsink */
2195         gst_bin_add (bin, priv->udpsink[i]);
2196         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2197       }
2198
2199       if (is_tcp) {
2200         /* make appsink */
2201         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2202         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2203         gst_bin_add (bin, priv->appsink[i]);
2204         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2205             &sink_cb, stream, NULL);
2206       }
2207
2208       if (is_udp && is_tcp) {
2209         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2210
2211         /* make tee for RTP/RTCP */
2212         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2213         gst_bin_add (bin, priv->tee[i]);
2214
2215         /* and link to rtpbin send pad */
2216         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2217         gst_pad_link (priv->send_src[i], pad);
2218         gst_object_unref (pad);
2219
2220         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2221         g_object_set (priv->udpqueue[i], "max-size-buffers",
2222             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2223             NULL);
2224         gst_bin_add (bin, priv->udpqueue[i]);
2225         /* link tee to udpqueue */
2226         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2227         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2228         gst_pad_link (teepad, pad);
2229         gst_object_unref (pad);
2230         gst_object_unref (teepad);
2231
2232         /* link udpqueue to udpsink */
2233         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2234         gst_pad_link (queuepad, sinkpad);
2235         gst_object_unref (queuepad);
2236         gst_object_unref (sinkpad);
2237
2238         /* make appqueue */
2239         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2240         g_object_set (priv->appqueue[i], "max-size-buffers",
2241             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2242             NULL);
2243         gst_bin_add (bin, priv->appqueue[i]);
2244         /* and link tee to appqueue */
2245         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2246         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2247         gst_pad_link (teepad, pad);
2248         gst_object_unref (pad);
2249         gst_object_unref (teepad);
2250
2251         /* and link appqueue to appsink */
2252         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2253         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2254         gst_pad_link (queuepad, pad);
2255         gst_object_unref (pad);
2256         gst_object_unref (queuepad);
2257       } else if (is_tcp) {
2258         /* only appsink needed, link it to the session */
2259         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2260         gst_pad_link (priv->send_src[i], pad);
2261         gst_object_unref (pad);
2262
2263         /* when its only TCP, we need to set sync and preroll to FALSE
2264          * for the sink to avoid deadlock. And this is only needed for
2265          * sink used for RTCP data, not the RTP data. */
2266         if (i == 1)
2267           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2268       } else {
2269         /* else only udpsink needed, link it to the session */
2270         gst_pad_link (priv->send_src[i], sinkpad);
2271         gst_object_unref (sinkpad);
2272       }
2273     }
2274
2275     /* check if we need to set to a special state */
2276     if (state != GST_STATE_NULL) {
2277       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2278         gst_element_set_state (priv->udpsink[i], state);
2279       if (priv->appsink[i] && (priv->srcpad || i == 1))
2280         gst_element_set_state (priv->appsink[i], state);
2281       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2282         gst_element_set_state (priv->appqueue[i], state);
2283       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2284         gst_element_set_state (priv->udpqueue[i], state);
2285       if (priv->tee[i] && (priv->srcpad || i == 1))
2286         gst_element_set_state (priv->tee[i], state);
2287     }
2288   }
2289 }
2290
2291 /* must be called with lock */
2292 static void
2293 create_receiver_part (GstRTSPStream * stream, GstBin * bin,
2294     GstState state)
2295 {
2296   GstRTSPStreamPrivate *priv;
2297   GstPad *pad, *selpad;
2298   gboolean is_tcp;
2299   gint i;
2300
2301   priv = stream->priv;
2302
2303   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2304
2305   for (i = 0; i < 2; i++) {
2306     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2307      * RTCP sink always */
2308     if (priv->sinkpad || i == 1) {
2309       /* For the receiver we create this bit of pipeline for both
2310        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2311        * and it is all funneled into the rtpbin receive pad.
2312        *
2313        * .--------.     .--------.    .--------.
2314        * | udpsrc |     | funnel |    | rtpbin |
2315        * |       src->sink      src->sink      |
2316        * '--------'     |        |    '--------'
2317        * .--------.     |        |
2318        * | appsrc |     |        |
2319        * |       src->sink       |
2320        * '--------'     '--------'
2321        */
2322       /* make funnel for the RTP/RTCP receivers */
2323       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2324       gst_bin_add (bin, priv->funnel[i]);
2325
2326       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2327       gst_pad_link (pad, priv->recv_sink[i]);
2328       gst_object_unref (pad);
2329
2330       if (priv->udpsrc_v4[i]) {
2331         if (priv->srcpad) {
2332           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2333            * values. This is only relevant for PLAY pipelines */
2334           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2335           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2336         }
2337         /* add udpsrc */
2338         gst_bin_add (bin, priv->udpsrc_v4[i]);
2339
2340         /* and link to the funnel v4 */
2341         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2342         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2343         gst_pad_link (pad, selpad);
2344         gst_object_unref (pad);
2345         gst_object_unref (selpad);
2346       }
2347
2348       if (priv->udpsrc_v6[i]) {
2349         if (priv->srcpad) {
2350           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2351           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2352         }
2353         gst_bin_add (bin, priv->udpsrc_v6[i]);
2354
2355         /* and link to the funnel v6 */
2356         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2357         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2358         gst_pad_link (pad, selpad);
2359         gst_object_unref (pad);
2360         gst_object_unref (selpad);
2361       }
2362
2363       if (is_tcp) {
2364         /* make and add appsrc */
2365         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2366         priv->appsrc_base_time[i] = -1;
2367         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2368         gst_bin_add (bin, priv->appsrc[i]);
2369         /* and link to the funnel */
2370         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2371         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2372         gst_pad_link (pad, selpad);
2373         gst_object_unref (pad);
2374         gst_object_unref (selpad);
2375       }
2376     }
2377
2378     /* check if we need to set to a special state */
2379     if (state != GST_STATE_NULL) {
2380       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2381         gst_element_set_state (priv->funnel[i], state);
2382       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2383         gst_element_set_state (priv->appsrc[i], state);
2384     }
2385   }
2386 }
2387
2388 /**
2389  * gst_rtsp_stream_join_bin:
2390  * @stream: a #GstRTSPStream
2391  * @bin: (transfer none): a #GstBin to join
2392  * @rtpbin: (transfer none): a rtpbin element in @bin
2393  * @state: the target state of the new elements
2394  *
2395  * Join the #GstBin @bin that contains the element @rtpbin.
2396  *
2397  * @stream will link to @rtpbin, which must be inside @bin. The elements
2398  * added to @bin will be set to the state given in @state.
2399  *
2400  * Returns: %TRUE on success.
2401  */
2402 gboolean
2403 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2404     GstElement * rtpbin, GstState state)
2405 {
2406   GstRTSPStreamPrivate *priv;
2407   guint idx;
2408   gchar *name;
2409   GstPadLinkReturn ret;
2410   gboolean is_udp;
2411
2412   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2413   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2414   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2415
2416   priv = stream->priv;
2417
2418   g_mutex_lock (&priv->lock);
2419   if (priv->is_joined)
2420     goto was_joined;
2421
2422   /* create a session with the same index as the stream */
2423   idx = priv->idx;
2424
2425   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2426
2427   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2428       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2429
2430   if (is_udp && !alloc_ports (stream))
2431     goto no_ports;
2432
2433   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2434       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2435     /* For SRTP */
2436     g_signal_connect (rtpbin, "request-rtp-encoder",
2437         (GCallback) request_rtp_encoder, stream);
2438     g_signal_connect (rtpbin, "request-rtcp-encoder",
2439         (GCallback) request_rtcp_encoder, stream);
2440     g_signal_connect (rtpbin, "request-rtp-decoder",
2441         (GCallback) request_rtp_rtcp_decoder, stream);
2442     g_signal_connect (rtpbin, "request-rtcp-decoder",
2443         (GCallback) request_rtp_rtcp_decoder, stream);
2444   }
2445
2446   if (priv->sinkpad) {
2447     g_signal_connect (rtpbin, "request-pt-map",
2448         (GCallback) request_pt_map, stream);
2449   }
2450
2451   /* get pads from the RTP session element for sending and receiving
2452    * RTP/RTCP*/
2453   if (priv->srcpad) {
2454     /* get a pad for sending RTP */
2455     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2456     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2457     g_free (name);
2458
2459     /* link the RTP pad to the session manager, it should not really fail unless
2460      * this is not really an RTP pad */
2461     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2462     if (ret != GST_PAD_LINK_OK)
2463       goto link_failed;
2464
2465     name = g_strdup_printf ("send_rtp_src_%u", idx);
2466     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2467     g_free (name);
2468   } else {
2469     /* Need to connect our sinkpad from here */
2470     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2471     /* EOS */
2472     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2473
2474     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2475     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2476     g_free (name);
2477   }
2478
2479   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2480   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2481   g_free (name);
2482   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2483   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2484   g_free (name);
2485
2486   /* get the session */
2487   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2488
2489   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2490       stream);
2491   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2492       stream);
2493   g_signal_connect (priv->session, "on-ssrc-active",
2494       (GCallback) on_ssrc_active, stream);
2495   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2496       stream);
2497   g_signal_connect (priv->session, "on-bye-timeout",
2498       (GCallback) on_bye_timeout, stream);
2499   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2500       stream);
2501
2502   /* signal for sender ssrc */
2503   g_signal_connect (priv->session, "on-new-sender-ssrc",
2504       (GCallback) on_new_sender_ssrc, stream);
2505   g_signal_connect (priv->session, "on-sender-ssrc-active",
2506       (GCallback) on_sender_ssrc_active, stream);
2507
2508   create_sender_part (stream, bin, state);
2509
2510   create_receiver_part (stream, bin, state);
2511
2512   if (priv->srcpad) {
2513     /* be notified of caps changes */
2514     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2515         (GCallback) caps_notify, stream);
2516   }
2517
2518   priv->is_joined = TRUE;
2519   g_mutex_unlock (&priv->lock);
2520
2521   return TRUE;
2522
2523   /* ERRORS */
2524 was_joined:
2525   {
2526     g_mutex_unlock (&priv->lock);
2527     return TRUE;
2528   }
2529 no_ports:
2530   {
2531     g_mutex_unlock (&priv->lock);
2532     GST_WARNING ("failed to allocate ports %u", idx);
2533     return FALSE;
2534   }
2535 link_failed:
2536   {
2537     GST_WARNING ("failed to link stream %u", idx);
2538     gst_object_unref (priv->send_rtp_sink);
2539     priv->send_rtp_sink = NULL;
2540     g_mutex_unlock (&priv->lock);
2541     return FALSE;
2542   }
2543 }
2544
2545 /**
2546  * gst_rtsp_stream_leave_bin:
2547  * @stream: a #GstRTSPStream
2548  * @bin: (transfer none): a #GstBin
2549  * @rtpbin: (transfer none): a rtpbin #GstElement
2550  *
2551  * Remove the elements of @stream from @bin.
2552  *
2553  * Return: %TRUE on success.
2554  */
2555 gboolean
2556 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2557     GstElement * rtpbin)
2558 {
2559   GstRTSPStreamPrivate *priv;
2560   gint i;
2561   GList *l;
2562   gboolean is_tcp, is_udp;
2563
2564   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2565   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2566   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2567
2568   priv = stream->priv;
2569
2570   g_mutex_lock (&priv->lock);
2571   if (!priv->is_joined)
2572     goto was_not_joined;
2573
2574   /* all transports must be removed by now */
2575   if (priv->transports != NULL)
2576     goto transports_not_removed;
2577
2578   clear_tr_cache (priv, TRUE);
2579   clear_tr_cache (priv, FALSE);
2580
2581   GST_INFO ("stream %p leaving bin", stream);
2582
2583   if (priv->srcpad) {
2584     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2585
2586     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2587     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2588     gst_object_unref (priv->send_rtp_sink);
2589     priv->send_rtp_sink = NULL;
2590   } else if (priv->recv_rtp_src) {
2591     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2592     gst_object_unref (priv->recv_rtp_src);
2593     priv->recv_rtp_src = NULL;
2594   }
2595
2596   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2597
2598   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2599       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2600
2601
2602   for (i = 0; i < 2; i++) {
2603     if (priv->udpsink[i])
2604       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2605     if (priv->appsink[i])
2606       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2607     if (priv->appqueue[i])
2608       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2609     if (priv->udpqueue[i])
2610       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2611     if (priv->tee[i])
2612       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2613     if (priv->funnel[i])
2614       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2615     if (priv->appsrc[i])
2616       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2617
2618     if (priv->udpsrc_v4[i]) {
2619       if (priv->sinkpad || i == 1) {
2620         /* and set udpsrc to NULL now before removing */
2621         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2622         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2623         /* removing them should also nicely release the request
2624          * pads when they finalize */
2625         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2626       } else {
2627         /* we need to set the state to NULL before unref */
2628         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2629         gst_object_unref (priv->udpsrc_v4[i]);
2630       }
2631     }
2632
2633     if (priv->udpsrc_v6[i]) {
2634       if (priv->sinkpad || i == 1) {
2635         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2636         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2637         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2638       } else {
2639         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2640         gst_object_unref (priv->udpsrc_v6[i]);
2641       }
2642     }
2643
2644     for (l = priv->transport_sources; l; l = l->next) {
2645       GstRTSPMulticastTransportSource *s = l->data;
2646
2647       if (!s->udpsrc[i])
2648         continue;
2649
2650       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2651       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2652       gst_bin_remove (bin, s->udpsrc[i]);
2653     }
2654
2655     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2656       gst_bin_remove (bin, priv->udpsink[i]);
2657     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2658       gst_bin_remove (bin, priv->appsrc[i]);
2659     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2660       gst_bin_remove (bin, priv->appsink[i]);
2661     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2662       gst_bin_remove (bin, priv->appqueue[i]);
2663     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2664       gst_bin_remove (bin, priv->udpqueue[i]);
2665     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2666       gst_bin_remove (bin, priv->tee[i]);
2667     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2668       gst_bin_remove (bin, priv->funnel[i]);
2669
2670     if (priv->sinkpad || i == 1) {
2671       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2672       gst_object_unref (priv->recv_sink[i]);
2673       priv->recv_sink[i] = NULL;
2674     }
2675
2676     priv->udpsrc_v4[i] = NULL;
2677     priv->udpsrc_v6[i] = NULL;
2678     priv->udpsink[i] = NULL;
2679     priv->appsrc[i] = NULL;
2680     priv->appsink[i] = NULL;
2681     priv->appqueue[i] = NULL;
2682     priv->udpqueue[i] = NULL;
2683     priv->tee[i] = NULL;
2684     priv->funnel[i] = NULL;
2685   }
2686
2687   for (l = priv->transport_sources; l; l = l->next) {
2688     GstRTSPMulticastTransportSource *s = l->data;
2689     g_slice_free (GstRTSPMulticastTransportSource, s);
2690   }
2691   g_list_free (priv->transport_sources);
2692   priv->transport_sources = NULL;
2693
2694   if (priv->srcpad) {
2695     gst_object_unref (priv->send_src[0]);
2696     priv->send_src[0] = NULL;
2697   }
2698
2699   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2700   gst_object_unref (priv->send_src[1]);
2701   priv->send_src[1] = NULL;
2702
2703   g_object_unref (priv->session);
2704   priv->session = NULL;
2705   if (priv->caps)
2706     gst_caps_unref (priv->caps);
2707   priv->caps = NULL;
2708
2709   if (priv->srtpenc)
2710     gst_object_unref (priv->srtpenc);
2711   if (priv->srtpdec)
2712     gst_object_unref (priv->srtpdec);
2713
2714   priv->is_joined = FALSE;
2715   g_mutex_unlock (&priv->lock);
2716
2717   return TRUE;
2718
2719 was_not_joined:
2720   {
2721     g_mutex_unlock (&priv->lock);
2722     return TRUE;
2723   }
2724 transports_not_removed:
2725   {
2726     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2727     g_mutex_unlock (&priv->lock);
2728     return FALSE;
2729   }
2730 }
2731
2732 /**
2733  * gst_rtsp_stream_get_rtpinfo:
2734  * @stream: a #GstRTSPStream
2735  * @rtptime: (allow-none): result RTP timestamp
2736  * @seq: (allow-none): result RTP seqnum
2737  * @clock_rate: (allow-none): the clock rate
2738  * @running_time: (allow-none): result running-time
2739  *
2740  * Retrieve the current rtptime, seq and running-time. This is used to
2741  * construct a RTPInfo reply header.
2742  *
2743  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2744  */
2745 gboolean
2746 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2747     guint * rtptime, guint * seq, guint * clock_rate,
2748     GstClockTime * running_time)
2749 {
2750   GstRTSPStreamPrivate *priv;
2751   GstStructure *stats;
2752   GObjectClass *payobjclass;
2753
2754   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2755
2756   priv = stream->priv;
2757
2758   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2759
2760   g_mutex_lock (&priv->lock);
2761
2762   /* First try to extract the information from the last buffer on the sinks.
2763    * This will have a more accurate sequence number and timestamp, as between
2764    * the payloader and the sink there can be some queues
2765    */
2766   if (priv->udpsink[0] || priv->appsink[0]) {
2767     GstSample *last_sample;
2768
2769     if (priv->udpsink[0])
2770       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2771     else
2772       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2773
2774     if (last_sample) {
2775       GstCaps *caps;
2776       GstBuffer *buffer;
2777       GstSegment *segment;
2778       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2779
2780       caps = gst_sample_get_caps (last_sample);
2781       buffer = gst_sample_get_buffer (last_sample);
2782       segment = gst_sample_get_segment (last_sample);
2783
2784       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2785         if (seq) {
2786           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2787         }
2788
2789         if (rtptime) {
2790           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2791         }
2792
2793         gst_rtp_buffer_unmap (&rtp_buffer);
2794
2795         if (running_time) {
2796           *running_time =
2797               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2798               GST_BUFFER_TIMESTAMP (buffer));
2799         }
2800
2801         if (clock_rate) {
2802           GstStructure *s = gst_caps_get_structure (caps, 0);
2803
2804           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2805
2806           if (*clock_rate == 0 && running_time)
2807             *running_time = GST_CLOCK_TIME_NONE;
2808         }
2809         gst_sample_unref (last_sample);
2810
2811         goto done;
2812       } else {
2813         gst_sample_unref (last_sample);
2814       }
2815     }
2816   }
2817
2818   if (g_object_class_find_property (payobjclass, "stats")) {
2819     g_object_get (priv->payloader, "stats", &stats, NULL);
2820     if (stats == NULL)
2821       goto no_stats;
2822
2823     if (seq)
2824       gst_structure_get_uint (stats, "seqnum", seq);
2825
2826     if (rtptime)
2827       gst_structure_get_uint (stats, "timestamp", rtptime);
2828
2829     if (running_time)
2830       gst_structure_get_clock_time (stats, "running-time", running_time);
2831
2832     if (clock_rate) {
2833       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2834       if (*clock_rate == 0 && running_time)
2835         *running_time = GST_CLOCK_TIME_NONE;
2836     }
2837     gst_structure_free (stats);
2838   } else {
2839     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2840         !g_object_class_find_property (payobjclass, "timestamp"))
2841       goto no_stats;
2842
2843     if (seq)
2844       g_object_get (priv->payloader, "seqnum", seq, NULL);
2845
2846     if (rtptime)
2847       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2848
2849     if (running_time)
2850       *running_time = GST_CLOCK_TIME_NONE;
2851   }
2852
2853 done:
2854   g_mutex_unlock (&priv->lock);
2855
2856   return TRUE;
2857
2858   /* ERRORS */
2859 no_stats:
2860   {
2861     GST_WARNING ("Could not get payloader stats");
2862     g_mutex_unlock (&priv->lock);
2863     return FALSE;
2864   }
2865 }
2866
2867 /**
2868  * gst_rtsp_stream_get_caps:
2869  * @stream: a #GstRTSPStream
2870  *
2871  * Retrieve the current caps of @stream.
2872  *
2873  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2874  * after usage.
2875  */
2876 GstCaps *
2877 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2878 {
2879   GstRTSPStreamPrivate *priv;
2880   GstCaps *result;
2881
2882   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2883
2884   priv = stream->priv;
2885
2886   g_mutex_lock (&priv->lock);
2887   if ((result = priv->caps))
2888     gst_caps_ref (result);
2889   g_mutex_unlock (&priv->lock);
2890
2891   return result;
2892 }
2893
2894 /**
2895  * gst_rtsp_stream_recv_rtp:
2896  * @stream: a #GstRTSPStream
2897  * @buffer: (transfer full): a #GstBuffer
2898  *
2899  * Handle an RTP buffer for the stream. This method is usually called when a
2900  * message has been received from a client using the TCP transport.
2901  *
2902  * This function takes ownership of @buffer.
2903  *
2904  * Returns: a GstFlowReturn.
2905  */
2906 GstFlowReturn
2907 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2908 {
2909   GstRTSPStreamPrivate *priv;
2910   GstFlowReturn ret;
2911   GstElement *element;
2912
2913   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2914   priv = stream->priv;
2915   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2916   g_return_val_if_fail (priv->is_joined, FALSE);
2917
2918   g_mutex_lock (&priv->lock);
2919   if (priv->appsrc[0])
2920     element = gst_object_ref (priv->appsrc[0]);
2921   else
2922     element = NULL;
2923   g_mutex_unlock (&priv->lock);
2924
2925   if (element) {
2926     if (priv->appsrc_base_time[0] == -1) {
2927       /* Take current running_time. This timestamp will be put on
2928        * the first buffer of each stream because we are a live source and so we
2929        * timestamp with the running_time. When we are dealing with TCP, we also
2930        * only timestamp the first buffer (using the DISCONT flag) because a server
2931        * typically bursts data, for which we don't want to compensate by speeding
2932        * up the media. The other timestamps will be interpollated from this one
2933        * using the RTP timestamps. */
2934       GST_OBJECT_LOCK (element);
2935       if (GST_ELEMENT_CLOCK (element)) {
2936         GstClockTime now;
2937         GstClockTime base_time;
2938
2939         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2940         base_time = GST_ELEMENT_CAST (element)->base_time;
2941
2942         priv->appsrc_base_time[0] = now - base_time;
2943         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2944         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2945             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2946             GST_TIME_ARGS (base_time));
2947       }
2948       GST_OBJECT_UNLOCK (element);
2949     }
2950
2951     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2952     gst_object_unref (element);
2953   } else {
2954     ret = GST_FLOW_OK;
2955   }
2956   return ret;
2957 }
2958
2959 /**
2960  * gst_rtsp_stream_recv_rtcp:
2961  * @stream: a #GstRTSPStream
2962  * @buffer: (transfer full): a #GstBuffer
2963  *
2964  * Handle an RTCP buffer for the stream. This method is usually called when a
2965  * message has been received from a client using the TCP transport.
2966  *
2967  * This function takes ownership of @buffer.
2968  *
2969  * Returns: a GstFlowReturn.
2970  */
2971 GstFlowReturn
2972 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2973 {
2974   GstRTSPStreamPrivate *priv;
2975   GstFlowReturn ret;
2976   GstElement *element;
2977
2978   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2979   priv = stream->priv;
2980   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2981
2982   if (!priv->is_joined) {
2983     gst_buffer_unref (buffer);
2984     return GST_FLOW_NOT_LINKED;
2985   }
2986   g_mutex_lock (&priv->lock);
2987   if (priv->appsrc[1])
2988     element = gst_object_ref (priv->appsrc[1]);
2989   else
2990     element = NULL;
2991   g_mutex_unlock (&priv->lock);
2992
2993   if (element) {
2994     if (priv->appsrc_base_time[1] == -1) {
2995       /* Take current running_time. This timestamp will be put on
2996        * the first buffer of each stream because we are a live source and so we
2997        * timestamp with the running_time. When we are dealing with TCP, we also
2998        * only timestamp the first buffer (using the DISCONT flag) because a server
2999        * typically bursts data, for which we don't want to compensate by speeding
3000        * up the media. The other timestamps will be interpollated from this one
3001        * using the RTP timestamps. */
3002       GST_OBJECT_LOCK (element);
3003       if (GST_ELEMENT_CLOCK (element)) {
3004         GstClockTime now;
3005         GstClockTime base_time;
3006
3007         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3008         base_time = GST_ELEMENT_CAST (element)->base_time;
3009
3010         priv->appsrc_base_time[1] = now - base_time;
3011         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3012         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3013             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3014             GST_TIME_ARGS (base_time));
3015       }
3016       GST_OBJECT_UNLOCK (element);
3017     }
3018
3019     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3020     gst_object_unref (element);
3021   } else {
3022     ret = GST_FLOW_OK;
3023     gst_buffer_unref (buffer);
3024   }
3025   return ret;
3026 }
3027
3028 /* must be called with lock */
3029 static gboolean
3030 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3031     gboolean add)
3032 {
3033   GstRTSPStreamPrivate *priv = stream->priv;
3034   const GstRTSPTransport *tr;
3035
3036   tr = gst_rtsp_stream_transport_get_transport (trans);
3037
3038   switch (tr->lower_transport) {
3039     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3040     {
3041       GstRTSPMulticastTransportSource *source;
3042       GstBin *bin;
3043
3044       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
3045
3046       if (add) {
3047         gchar *host;
3048         gint i;
3049         GstPad *selpad, *pad;
3050
3051         source = g_slice_new0 (GstRTSPMulticastTransportSource);
3052         source->transport = trans;
3053
3054         for (i = 0; i < 2; i++) {
3055           host =
3056               g_strdup_printf ("udp://%s:%d", tr->destination,
3057               (i == 0) ? tr->port.min : tr->port.max);
3058           source->udpsrc[i] =
3059               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
3060           g_free (host);
3061           g_object_set (source->udpsrc[i], "loop", FALSE, NULL);
3062
3063           if (priv->srcpad) {
3064             /* we set and keep these to playing so that they don't cause NO_PREROLL return
3065              * values. This is only relevant for PLAY pipelines */
3066             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
3067             gst_element_set_locked_state (source->udpsrc[i], TRUE);
3068           }
3069           /* add udpsrc */
3070           gst_bin_add (bin, source->udpsrc[i]);
3071
3072           /* and link to the funnel v4 */
3073           if (priv->sinkpad || i == 1) {
3074             source->selpad[i] = selpad =
3075                 gst_element_get_request_pad (priv->funnel[i], "sink_%u");
3076             pad = gst_element_get_static_pad (source->udpsrc[i], "src");
3077             gst_pad_link (pad, selpad);
3078             gst_object_unref (pad);
3079             gst_object_unref (selpad);
3080           }
3081         }
3082
3083         priv->transport_sources =
3084             g_list_prepend (priv->transport_sources, source);
3085       } else {
3086         GList *l;
3087
3088         for (l = priv->transport_sources; l; l = l->next) {
3089           source = l->data;
3090
3091           if (source->transport == trans) {
3092             priv->transport_sources =
3093                 g_list_delete_link (priv->transport_sources, l);
3094             break;
3095           }
3096         }
3097
3098         if (l != NULL) {
3099           gint i;
3100
3101           for (i = 0; i < 2; i++) {
3102             /* Will automatically unlink everything */
3103             gst_bin_remove (bin,
3104                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
3105
3106             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
3107             gst_object_unref (source->udpsrc[i]);
3108
3109             if (priv->sinkpad || i == 1) {
3110               gst_element_release_request_pad (priv->funnel[i],
3111                   source->selpad[i]);
3112             }
3113           }
3114
3115           g_slice_free (GstRTSPMulticastTransportSource, source);
3116         }
3117       }
3118
3119       gst_object_unref (bin);
3120
3121       /* fall through for the generic case */
3122     }
3123     case GST_RTSP_LOWER_TRANS_UDP:
3124     {
3125       gchar *dest;
3126       gint min, max;
3127       guint ttl = 0;
3128
3129       dest = tr->destination;
3130       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3131         min = tr->port.min;
3132         max = tr->port.max;
3133         ttl = tr->ttl;
3134       } else if (priv->client_side) {
3135         /* In client side mode the 'destination' is the RTSP server, so send
3136          * to those ports */
3137         min = tr->server_port.min;
3138         max = tr->server_port.max;
3139       } else {
3140         min = tr->client_port.min;
3141         max = tr->client_port.max;
3142       }
3143
3144       if (add) {
3145         if (ttl > 0) {
3146           GST_INFO ("setting ttl-mc %d", ttl);
3147           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3148           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3149         }
3150         GST_INFO ("adding %s:%d-%d", dest, min, max);
3151         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3152         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3153         priv->transports = g_list_prepend (priv->transports, trans);
3154       } else {
3155         GST_INFO ("removing %s:%d-%d", dest, min, max);
3156         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3157         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3158         priv->transports = g_list_remove (priv->transports, trans);
3159       }
3160       priv->transports_cookie++;
3161       break;
3162     }
3163     case GST_RTSP_LOWER_TRANS_TCP:
3164       if (add) {
3165         GST_INFO ("adding TCP %s", tr->destination);
3166         priv->transports = g_list_prepend (priv->transports, trans);
3167       } else {
3168         GST_INFO ("removing TCP %s", tr->destination);
3169         priv->transports = g_list_remove (priv->transports, trans);
3170       }
3171       priv->transports_cookie++;
3172       break;
3173     default:
3174       goto unknown_transport;
3175   }
3176   return TRUE;
3177
3178   /* ERRORS */
3179 unknown_transport:
3180   {
3181     GST_INFO ("Unknown transport %d", tr->lower_transport);
3182     return FALSE;
3183   }
3184 }
3185
3186
3187 /**
3188  * gst_rtsp_stream_add_transport:
3189  * @stream: a #GstRTSPStream
3190  * @trans: (transfer none): a #GstRTSPStreamTransport
3191  *
3192  * Add the transport in @trans to @stream. The media of @stream will
3193  * then also be send to the values configured in @trans.
3194  *
3195  * @stream must be joined to a bin.
3196  *
3197  * @trans must contain a valid #GstRTSPTransport.
3198  *
3199  * Returns: %TRUE if @trans was added
3200  */
3201 gboolean
3202 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3203     GstRTSPStreamTransport * trans)
3204 {
3205   GstRTSPStreamPrivate *priv;
3206   gboolean res;
3207
3208   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3209   priv = stream->priv;
3210   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3211   g_return_val_if_fail (priv->is_joined, FALSE);
3212
3213   g_mutex_lock (&priv->lock);
3214   res = update_transport (stream, trans, TRUE);
3215   g_mutex_unlock (&priv->lock);
3216
3217   return res;
3218 }
3219
3220 /**
3221  * gst_rtsp_stream_remove_transport:
3222  * @stream: a #GstRTSPStream
3223  * @trans: (transfer none): a #GstRTSPStreamTransport
3224  *
3225  * Remove the transport in @trans from @stream. The media of @stream will
3226  * not be sent to the values configured in @trans.
3227  *
3228  * @stream must be joined to a bin.
3229  *
3230  * @trans must contain a valid #GstRTSPTransport.
3231  *
3232  * Returns: %TRUE if @trans was removed
3233  */
3234 gboolean
3235 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3236     GstRTSPStreamTransport * trans)
3237 {
3238   GstRTSPStreamPrivate *priv;
3239   gboolean res;
3240
3241   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3242   priv = stream->priv;
3243   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3244   g_return_val_if_fail (priv->is_joined, FALSE);
3245
3246   g_mutex_lock (&priv->lock);
3247   res = update_transport (stream, trans, FALSE);
3248   g_mutex_unlock (&priv->lock);
3249
3250   return res;
3251 }
3252
3253 /**
3254  * gst_rtsp_stream_update_crypto:
3255  * @stream: a #GstRTSPStream
3256  * @ssrc: the SSRC
3257  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3258  *
3259  * Update the new crypto information for @ssrc in @stream. If information
3260  * for @ssrc did not exist, it will be added. If information
3261  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3262  * be removed from @stream.
3263  *
3264  * Returns: %TRUE if @crypto could be updated
3265  */
3266 gboolean
3267 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3268     guint ssrc, GstCaps * crypto)
3269 {
3270   GstRTSPStreamPrivate *priv;
3271
3272   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3273   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3274
3275   priv = stream->priv;
3276
3277   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3278
3279   g_mutex_lock (&priv->lock);
3280   if (crypto)
3281     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3282         gst_caps_ref (crypto));
3283   else
3284     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3285   g_mutex_unlock (&priv->lock);
3286
3287   return TRUE;
3288 }
3289
3290 /**
3291  * gst_rtsp_stream_get_rtp_socket:
3292  * @stream: a #GstRTSPStream
3293  * @family: the socket family
3294  *
3295  * Get the RTP socket from @stream for a @family.
3296  *
3297  * @stream must be joined to a bin.
3298  *
3299  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3300  * socket could be allocated for @family. Unref after usage
3301  */
3302 GSocket *
3303 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3304 {
3305   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3306   GSocket *socket;
3307   const gchar *name;
3308
3309   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3310   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3311       family == G_SOCKET_FAMILY_IPV6, NULL);
3312   g_return_val_if_fail (priv->udpsink[0], NULL);
3313
3314   if (family == G_SOCKET_FAMILY_IPV6)
3315     name = "socket-v6";
3316   else
3317     name = "socket";
3318
3319   g_object_get (priv->udpsink[0], name, &socket, NULL);
3320
3321   return socket;
3322 }
3323
3324 /**
3325  * gst_rtsp_stream_get_rtcp_socket:
3326  * @stream: a #GstRTSPStream
3327  * @family: the socket family
3328  *
3329  * Get the RTCP socket from @stream for a @family.
3330  *
3331  * @stream must be joined to a bin.
3332  *
3333  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3334  * socket could be allocated for @family. Unref after usage
3335  */
3336 GSocket *
3337 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3338 {
3339   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3340   GSocket *socket;
3341   const gchar *name;
3342
3343   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3344   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3345       family == G_SOCKET_FAMILY_IPV6, NULL);
3346   g_return_val_if_fail (priv->udpsink[1], NULL);
3347
3348   if (family == G_SOCKET_FAMILY_IPV6)
3349     name = "socket-v6";
3350   else
3351     name = "socket";
3352
3353   g_object_get (priv->udpsink[1], name, &socket, NULL);
3354
3355   return socket;
3356 }
3357
3358 /**
3359  * gst_rtsp_stream_set_seqnum:
3360  * @stream: a #GstRTSPStream
3361  * @seqnum: a new sequence number
3362  *
3363  * Configure the sequence number in the payloader of @stream to @seqnum.
3364  */
3365 void
3366 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3367 {
3368   GstRTSPStreamPrivate *priv;
3369
3370   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3371
3372   priv = stream->priv;
3373
3374   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3375 }
3376
3377 /**
3378  * gst_rtsp_stream_get_seqnum:
3379  * @stream: a #GstRTSPStream
3380  *
3381  * Get the configured sequence number in the payloader of @stream.
3382  *
3383  * Returns: the sequence number of the payloader.
3384  */
3385 guint16
3386 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3387 {
3388   GstRTSPStreamPrivate *priv;
3389   guint seqnum;
3390
3391   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3392
3393   priv = stream->priv;
3394
3395   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3396
3397   return seqnum;
3398 }
3399
3400 /**
3401  * gst_rtsp_stream_transport_filter:
3402  * @stream: a #GstRTSPStream
3403  * @func: (scope call) (allow-none): a callback
3404  * @user_data: (closure): user data passed to @func
3405  *
3406  * Call @func for each transport managed by @stream. The result value of @func
3407  * determines what happens to the transport. @func will be called with @stream
3408  * locked so no further actions on @stream can be performed from @func.
3409  *
3410  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3411  * @stream.
3412  *
3413  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3414  *
3415  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3416  * will also be added with an additional ref to the result #GList of this
3417  * function..
3418  *
3419  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3420  *
3421  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3422  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3423  * element in the #GList should be unreffed before the list is freed.
3424  */
3425 GList *
3426 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3427     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3428 {
3429   GstRTSPStreamPrivate *priv;
3430   GList *result, *walk, *next;
3431   GHashTable *visited = NULL;
3432   guint cookie;
3433
3434   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3435
3436   priv = stream->priv;
3437
3438   result = NULL;
3439   if (func)
3440     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3441
3442   g_mutex_lock (&priv->lock);
3443 restart:
3444   cookie = priv->transports_cookie;
3445   for (walk = priv->transports; walk; walk = next) {
3446     GstRTSPStreamTransport *trans = walk->data;
3447     GstRTSPFilterResult res;
3448     gboolean changed;
3449
3450     next = g_list_next (walk);
3451
3452     if (func) {
3453       /* only visit each transport once */
3454       if (g_hash_table_contains (visited, trans))
3455         continue;
3456
3457       g_hash_table_add (visited, g_object_ref (trans));
3458       g_mutex_unlock (&priv->lock);
3459
3460       res = func (stream, trans, user_data);
3461
3462       g_mutex_lock (&priv->lock);
3463     } else
3464       res = GST_RTSP_FILTER_REF;
3465
3466     changed = (cookie != priv->transports_cookie);
3467
3468     switch (res) {
3469       case GST_RTSP_FILTER_REMOVE:
3470         update_transport (stream, trans, FALSE);
3471         break;
3472       case GST_RTSP_FILTER_REF:
3473         result = g_list_prepend (result, g_object_ref (trans));
3474         break;
3475       case GST_RTSP_FILTER_KEEP:
3476       default:
3477         break;
3478     }
3479     if (changed)
3480       goto restart;
3481   }
3482   g_mutex_unlock (&priv->lock);
3483
3484   if (func)
3485     g_hash_table_unref (visited);
3486
3487   return result;
3488 }
3489
3490 static GstPadProbeReturn
3491 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3492 {
3493   GstRTSPStreamPrivate *priv;
3494   GstRTSPStream *stream;
3495
3496   stream = user_data;
3497   priv = stream->priv;
3498
3499   GST_DEBUG_OBJECT (pad, "now blocking");
3500
3501   g_mutex_lock (&priv->lock);
3502   priv->blocking = TRUE;
3503   g_mutex_unlock (&priv->lock);
3504
3505   gst_element_post_message (priv->payloader,
3506       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3507           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3508
3509   return GST_PAD_PROBE_OK;
3510 }
3511
3512 /**
3513  * gst_rtsp_stream_set_blocked:
3514  * @stream: a #GstRTSPStream
3515  * @blocked: boolean indicating we should block or unblock
3516  *
3517  * Blocks or unblocks the dataflow on @stream.
3518  *
3519  * Returns: %TRUE on success
3520  */
3521 gboolean
3522 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3523 {
3524   GstRTSPStreamPrivate *priv;
3525
3526   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3527
3528   priv = stream->priv;
3529
3530   g_mutex_lock (&priv->lock);
3531   if (blocked) {
3532     priv->blocking = FALSE;
3533     if (priv->blocked_id == 0) {
3534       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3535           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3536           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3537           g_object_ref (stream), g_object_unref);
3538     }
3539   } else {
3540     if (priv->blocked_id != 0) {
3541       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3542       priv->blocked_id = 0;
3543       priv->blocking = FALSE;
3544     }
3545   }
3546   g_mutex_unlock (&priv->lock);
3547
3548   return TRUE;
3549 }
3550
3551 /**
3552  * gst_rtsp_stream_is_blocking:
3553  * @stream: a #GstRTSPStream
3554  *
3555  * Check if @stream is blocking on a #GstBuffer.
3556  *
3557  * Returns: %TRUE if @stream is blocking
3558  */
3559 gboolean
3560 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3561 {
3562   GstRTSPStreamPrivate *priv;
3563   gboolean result;
3564
3565   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3566
3567   priv = stream->priv;
3568
3569   g_mutex_lock (&priv->lock);
3570   result = priv->blocking;
3571   g_mutex_unlock (&priv->lock);
3572
3573   return result;
3574 }
3575
3576 /**
3577  * gst_rtsp_stream_query_position:
3578  * @stream: a #GstRTSPStream
3579  *
3580  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3581  * the RTP parts of the pipeline and not the RTCP parts.
3582  *
3583  * Returns: %TRUE if the position could be queried
3584  */
3585 gboolean
3586 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3587 {
3588   GstRTSPStreamPrivate *priv;
3589   GstElement *sink;
3590   gboolean ret;
3591
3592   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3593
3594   priv = stream->priv;
3595
3596   g_mutex_lock (&priv->lock);
3597   /* depending on the transport type, it should query corresponding sink */
3598   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3599       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3600     sink = priv->udpsink[0];
3601   else
3602     sink = priv->appsink[0];
3603
3604   if (sink)
3605     gst_object_ref (sink);
3606   g_mutex_unlock (&priv->lock);
3607
3608   if (!sink)
3609     return FALSE;
3610
3611   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3612   gst_object_unref (sink);
3613
3614   return ret;
3615 }
3616
3617 /**
3618  * gst_rtsp_stream_query_stop:
3619  * @stream: a #GstRTSPStream
3620  *
3621  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3622  * the RTP parts of the pipeline and not the RTCP parts.
3623  *
3624  * Returns: %TRUE if the stop could be queried
3625  */
3626 gboolean
3627 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3628 {
3629   GstRTSPStreamPrivate *priv;
3630   GstElement *sink;
3631   GstQuery *query;
3632   gboolean ret;
3633
3634   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3635
3636   priv = stream->priv;
3637
3638   g_mutex_lock (&priv->lock);
3639   /* depending on the transport type, it should query corresponding sink */
3640   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3641       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3642     sink = priv->udpsink[0];
3643   else
3644     sink = priv->appsink[0];
3645
3646   if (sink)
3647     gst_object_ref (sink);
3648   g_mutex_unlock (&priv->lock);
3649
3650   if (!sink)
3651     return FALSE;
3652
3653   query = gst_query_new_segment (GST_FORMAT_TIME);
3654   if ((ret = gst_element_query (sink, query))) {
3655     GstFormat format;
3656
3657     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3658     if (format != GST_FORMAT_TIME)
3659       *stop = -1;
3660   }
3661   gst_query_unref (query);
3662   gst_object_unref (sink);
3663
3664   return ret;
3665
3666 }