15fca1918fccaad3a40a5888aa3f0bddbf1b366e
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83
84   /* TRUE if this stream is running on
85    * the client side of an RTSP link (for RECORD) */
86   gboolean client_side;
87   gchar *control;
88
89   GstRTSPProfile profiles;
90   GstRTSPLowerTrans protocols;
91
92   /* pads on the rtpbin */
93   GstPad *send_rtp_sink;
94   GstPad *recv_rtp_src;
95   GstPad *recv_sink[2];
96   GstPad *send_src[2];
97
98   /* the RTPSession object */
99   GObject *session;
100
101   /* SRTP encoder/decoder */
102   GstElement *srtpenc;
103   GstElement *srtpdec;
104   GHashTable *keys;
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
107    * sockets */
108   GstElement *udpsrc_v4[2];
109
110   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
111    * sockets */
112   GstElement *udpsrc_v6[2];
113
114   GstElement *udpqueue[2];
115   GstElement *udpsink[2];
116
117   /* for TCP transport */
118   GstElement *appsrc[2];
119   GstClockTime appsrc_base_time[2];
120   GstElement *appqueue[2];
121   GstElement *appsink[2];
122
123   GstElement *tee[2];
124   GstElement *funnel[2];
125
126   /* retransmission */
127   GstElement *rtxsend;
128   guint rtx_pt;
129   GstClockTime rtx_time;
130
131   /* server ports for sending/receiving over ipv4 */
132   GstRTSPRange server_port_v4;
133   GstRTSPAddress *server_addr_v4;
134   gboolean have_ipv4;
135
136   /* server ports for sending/receiving over ipv6 */
137   GstRTSPRange server_port_v6;
138   GstRTSPAddress *server_addr_v6;
139   gboolean have_ipv6;
140
141   /* multicast addresses */
142   GstRTSPAddressPool *pool;
143   GstRTSPAddress *addr_v4;
144   GstRTSPAddress *addr_v6;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   /* UDP sources for UDP multicast transports */
161   GList *transport_sources;
162
163   gint dscp_qos;
164
165   /* stream blocking */
166   gulong blocked_id;
167   gboolean blocking;
168
169   /* pt->caps map for RECORD streams */
170   GHashTable *ptmap;
171 };
172
173 #define DEFAULT_CONTROL         NULL
174 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
175 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
176                                         GST_RTSP_LOWER_TRANS_TCP
177
178 enum
179 {
180   PROP_0,
181   PROP_CONTROL,
182   PROP_PROFILES,
183   PROP_PROTOCOLS,
184   PROP_LAST
185 };
186
187 enum
188 {
189   SIGNAL_NEW_RTP_ENCODER,
190   SIGNAL_NEW_RTCP_ENCODER,
191   SIGNAL_LAST
192 };
193
194 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
195 #define GST_CAT_DEFAULT rtsp_stream_debug
196
197 static GQuark ssrc_stream_map_key;
198
199 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
200     GValue * value, GParamSpec * pspec);
201 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
202     const GValue * value, GParamSpec * pspec);
203
204 static void gst_rtsp_stream_finalize (GObject * obj);
205
206 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
207
208 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
209
210 static void
211 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
212 {
213   GObjectClass *gobject_class;
214
215   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
216
217   gobject_class = G_OBJECT_CLASS (klass);
218
219   gobject_class->get_property = gst_rtsp_stream_get_property;
220   gobject_class->set_property = gst_rtsp_stream_set_property;
221   gobject_class->finalize = gst_rtsp_stream_finalize;
222
223   g_object_class_install_property (gobject_class, PROP_CONTROL,
224       g_param_spec_string ("control", "Control",
225           "The control string for this stream", DEFAULT_CONTROL,
226           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   g_object_class_install_property (gobject_class, PROP_PROFILES,
229       g_param_spec_flags ("profiles", "Profiles",
230           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
231           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232
233   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
234       g_param_spec_flags ("protocols", "Protocols",
235           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
236           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
239       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
241       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
242
243   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
244       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
245       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
246       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
247
248   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
249
250   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
251 }
252
253 static void
254 gst_rtsp_stream_init (GstRTSPStream * stream)
255 {
256   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
257
258   GST_DEBUG ("new stream %p", stream);
259
260   stream->priv = priv;
261
262   priv->dscp_qos = -1;
263   priv->control = g_strdup (DEFAULT_CONTROL);
264   priv->profiles = DEFAULT_PROFILES;
265   priv->protocols = DEFAULT_PROTOCOLS;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   gst_object_unref (priv->payloader);
303   if (priv->srcpad)
304     gst_object_unref (priv->srcpad);
305   if (priv->sinkpad)
306     gst_object_unref (priv->sinkpad);
307   g_free (priv->control);
308   g_mutex_clear (&priv->lock);
309
310   g_hash_table_unref (priv->keys);
311   g_hash_table_destroy (priv->ptmap);
312
313   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
314 }
315
316 static void
317 gst_rtsp_stream_get_property (GObject * object, guint propid,
318     GValue * value, GParamSpec * pspec)
319 {
320   GstRTSPStream *stream = GST_RTSP_STREAM (object);
321
322   switch (propid) {
323     case PROP_CONTROL:
324       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
325       break;
326     case PROP_PROFILES:
327       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
328       break;
329     case PROP_PROTOCOLS:
330       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
331       break;
332     default:
333       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
334   }
335 }
336
337 static void
338 gst_rtsp_stream_set_property (GObject * object, guint propid,
339     const GValue * value, GParamSpec * pspec)
340 {
341   GstRTSPStream *stream = GST_RTSP_STREAM (object);
342
343   switch (propid) {
344     case PROP_CONTROL:
345       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
346       break;
347     case PROP_PROFILES:
348       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
349       break;
350     case PROP_PROTOCOLS:
351       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
352       break;
353     default:
354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
355   }
356 }
357
358 /**
359  * gst_rtsp_stream_new:
360  * @idx: an index
361  * @pad: a #GstPad
362  * @payloader: a #GstElement
363  *
364  * Create a new media stream with index @idx that handles RTP data on
365  * @pad and has a payloader element @payloader if @pad is a source pad
366  * or a depayloader element @payloader if @pad is a sink pad.
367  *
368  * Returns: (transfer full): a new #GstRTSPStream
369  */
370 GstRTSPStream *
371 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
372 {
373   GstRTSPStreamPrivate *priv;
374   GstRTSPStream *stream;
375
376   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
377   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
378
379   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
380   priv = stream->priv;
381   priv->idx = idx;
382   priv->payloader = gst_object_ref (payloader);
383   if (GST_PAD_IS_SRC (pad))
384     priv->srcpad = gst_object_ref (pad);
385   else
386     priv->sinkpad = gst_object_ref (pad);
387
388   return stream;
389 }
390
391 /**
392  * gst_rtsp_stream_get_index:
393  * @stream: a #GstRTSPStream
394  *
395  * Get the stream index.
396  *
397  * Return: the stream index.
398  */
399 guint
400 gst_rtsp_stream_get_index (GstRTSPStream * stream)
401 {
402   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
403
404   return stream->priv->idx;
405 }
406
407 /**
408  * gst_rtsp_stream_get_pt:
409  * @stream: a #GstRTSPStream
410  *
411  * Get the stream payload type.
412  *
413  * Return: the stream payload type.
414  */
415 guint
416 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
417 {
418   GstRTSPStreamPrivate *priv;
419   guint pt;
420
421   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
422
423   priv = stream->priv;
424
425   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
426
427   return pt;
428 }
429
430 /**
431  * gst_rtsp_stream_get_srcpad:
432  * @stream: a #GstRTSPStream
433  *
434  * Get the srcpad associated with @stream.
435  *
436  * Returns: (transfer full): the srcpad. Unref after usage.
437  */
438 GstPad *
439 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
440 {
441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
442
443   if (!stream->priv->srcpad)
444     return NULL;
445
446   return gst_object_ref (stream->priv->srcpad);
447 }
448
449 /**
450  * gst_rtsp_stream_get_sinkpad:
451  * @stream: a #GstRTSPStream
452  *
453  * Get the sinkpad associated with @stream.
454  *
455  * Returns: (transfer full): the sinkpad. Unref after usage.
456  */
457 GstPad *
458 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
459 {
460   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
461
462   if (!stream->priv->sinkpad)
463     return NULL;
464
465   return gst_object_ref (stream->priv->sinkpad);
466 }
467
468 /**
469  * gst_rtsp_stream_get_control:
470  * @stream: a #GstRTSPStream
471  *
472  * Get the control string to identify this stream.
473  *
474  * Returns: (transfer full): the control string. g_free() after usage.
475  */
476 gchar *
477 gst_rtsp_stream_get_control (GstRTSPStream * stream)
478 {
479   GstRTSPStreamPrivate *priv;
480   gchar *result;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if ((result = g_strdup (priv->control)) == NULL)
488     result = g_strdup_printf ("stream=%u", priv->idx);
489   g_mutex_unlock (&priv->lock);
490
491   return result;
492 }
493
494 /**
495  * gst_rtsp_stream_set_control:
496  * @stream: a #GstRTSPStream
497  * @control: a control string
498  *
499  * Set the control string in @stream.
500  */
501 void
502 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   g_mutex_lock (&priv->lock);
511   g_free (priv->control);
512   priv->control = g_strdup (control);
513   g_mutex_unlock (&priv->lock);
514 }
515
516 /**
517  * gst_rtsp_stream_has_control:
518  * @stream: a #GstRTSPStream
519  * @control: a control string
520  *
521  * Check if @stream has the control string @control.
522  *
523  * Returns: %TRUE is @stream has @control as the control string
524  */
525 gboolean
526 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
527 {
528   GstRTSPStreamPrivate *priv;
529   gboolean res;
530
531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
532
533   priv = stream->priv;
534
535   g_mutex_lock (&priv->lock);
536   if (priv->control)
537     res = (g_strcmp0 (priv->control, control) == 0);
538   else {
539     guint streamid;
540
541     if (sscanf (control, "stream=%u", &streamid) > 0)
542       res = (streamid == priv->idx);
543     else
544       res = FALSE;
545   }
546   g_mutex_unlock (&priv->lock);
547
548   return res;
549 }
550
551 /**
552  * gst_rtsp_stream_set_mtu:
553  * @stream: a #GstRTSPStream
554  * @mtu: a new MTU
555  *
556  * Configure the mtu in the payloader of @stream to @mtu.
557  */
558 void
559 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
560 {
561   GstRTSPStreamPrivate *priv;
562
563   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
564
565   priv = stream->priv;
566
567   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
568
569   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
570 }
571
572 /**
573  * gst_rtsp_stream_get_mtu:
574  * @stream: a #GstRTSPStream
575  *
576  * Get the configured MTU in the payloader of @stream.
577  *
578  * Returns: the MTU of the payloader.
579  */
580 guint
581 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
582 {
583   GstRTSPStreamPrivate *priv;
584   guint mtu;
585
586   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
587
588   priv = stream->priv;
589
590   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
591
592   return mtu;
593 }
594
595 /* Update the dscp qos property on the udp sinks */
596 static void
597 update_dscp_qos (GstRTSPStream * stream)
598 {
599   GstRTSPStreamPrivate *priv;
600
601   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
602
603   priv = stream->priv;
604
605   if (priv->udpsink[0]) {
606     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
607         NULL);
608   }
609
610   if (priv->udpsink[1]) {
611     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
612         NULL);
613   }
614 }
615
616 /**
617  * gst_rtsp_stream_set_dscp_qos:
618  * @stream: a #GstRTSPStream
619  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
620  *
621  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
622  */
623 void
624 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
625 {
626   GstRTSPStreamPrivate *priv;
627
628   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
629
630   priv = stream->priv;
631
632   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
633
634   if (dscp_qos < -1 || dscp_qos > 63) {
635     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
636     return;
637   }
638
639   priv->dscp_qos = dscp_qos;
640
641   update_dscp_qos (stream);
642 }
643
644 /**
645  * gst_rtsp_stream_get_dscp_qos:
646  * @stream: a #GstRTSPStream
647  *
648  * Get the configured DSCP QoS in of the outgoing sockets.
649  *
650  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
651  */
652 gint
653 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
654 {
655   GstRTSPStreamPrivate *priv;
656
657   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
658
659   priv = stream->priv;
660
661   return priv->dscp_qos;
662 }
663
664 /**
665  * gst_rtsp_stream_is_transport_supported:
666  * @stream: a #GstRTSPStream
667  * @transport: (transfer none): a #GstRTSPTransport
668  *
669  * Check if @transport can be handled by stream
670  *
671  * Returns: %TRUE if @transport can be handled by @stream.
672  */
673 gboolean
674 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
675     GstRTSPTransport * transport)
676 {
677   GstRTSPStreamPrivate *priv;
678
679   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
680
681   priv = stream->priv;
682
683   g_mutex_lock (&priv->lock);
684   if (transport->trans != GST_RTSP_TRANS_RTP)
685     goto unsupported_transmode;
686
687   if (!(transport->profile & priv->profiles))
688     goto unsupported_profile;
689
690   if (!(transport->lower_transport & priv->protocols))
691     goto unsupported_ltrans;
692
693   g_mutex_unlock (&priv->lock);
694
695   return TRUE;
696
697   /* ERRORS */
698 unsupported_transmode:
699   {
700     GST_DEBUG ("unsupported transport mode %d", transport->trans);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_profile:
705   {
706     GST_DEBUG ("unsupported profile %d", transport->profile);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 unsupported_ltrans:
711   {
712     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
713     g_mutex_unlock (&priv->lock);
714     return FALSE;
715   }
716 }
717
718 /**
719  * gst_rtsp_stream_set_profiles:
720  * @stream: a #GstRTSPStream
721  * @profiles: the new profiles
722  *
723  * Configure the allowed profiles for @stream.
724  */
725 void
726 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
727 {
728   GstRTSPStreamPrivate *priv;
729
730   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
731
732   priv = stream->priv;
733
734   g_mutex_lock (&priv->lock);
735   priv->profiles = profiles;
736   g_mutex_unlock (&priv->lock);
737 }
738
739 /**
740  * gst_rtsp_stream_get_profiles:
741  * @stream: a #GstRTSPStream
742  *
743  * Get the allowed profiles of @stream.
744  *
745  * Returns: a #GstRTSPProfile
746  */
747 GstRTSPProfile
748 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
749 {
750   GstRTSPStreamPrivate *priv;
751   GstRTSPProfile res;
752
753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
754
755   priv = stream->priv;
756
757   g_mutex_lock (&priv->lock);
758   res = priv->profiles;
759   g_mutex_unlock (&priv->lock);
760
761   return res;
762 }
763
764 /**
765  * gst_rtsp_stream_set_protocols:
766  * @stream: a #GstRTSPStream
767  * @protocols: the new flags
768  *
769  * Configure the allowed lower transport for @stream.
770  */
771 void
772 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
773     GstRTSPLowerTrans protocols)
774 {
775   GstRTSPStreamPrivate *priv;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   g_mutex_lock (&priv->lock);
782   priv->protocols = protocols;
783   g_mutex_unlock (&priv->lock);
784 }
785
786 /**
787  * gst_rtsp_stream_get_protocols:
788  * @stream: a #GstRTSPStream
789  *
790  * Get the allowed protocols of @stream.
791  *
792  * Returns: a #GstRTSPLowerTrans
793  */
794 GstRTSPLowerTrans
795 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
796 {
797   GstRTSPStreamPrivate *priv;
798   GstRTSPLowerTrans res;
799
800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
801       GST_RTSP_LOWER_TRANS_UNKNOWN);
802
803   priv = stream->priv;
804
805   g_mutex_lock (&priv->lock);
806   res = priv->protocols;
807   g_mutex_unlock (&priv->lock);
808
809   return res;
810 }
811
812 /**
813  * gst_rtsp_stream_set_address_pool:
814  * @stream: a #GstRTSPStream
815  * @pool: (transfer none): a #GstRTSPAddressPool
816  *
817  * configure @pool to be used as the address pool of @stream.
818  */
819 void
820 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
821     GstRTSPAddressPool * pool)
822 {
823   GstRTSPStreamPrivate *priv;
824   GstRTSPAddressPool *old;
825
826   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
827
828   priv = stream->priv;
829
830   GST_LOG_OBJECT (stream, "set address pool %p", pool);
831
832   g_mutex_lock (&priv->lock);
833   if ((old = priv->pool) != pool)
834     priv->pool = pool ? g_object_ref (pool) : NULL;
835   else
836     old = NULL;
837   g_mutex_unlock (&priv->lock);
838
839   if (old)
840     g_object_unref (old);
841 }
842
843 /**
844  * gst_rtsp_stream_get_address_pool:
845  * @stream: a #GstRTSPStream
846  *
847  * Get the #GstRTSPAddressPool used as the address pool of @stream.
848  *
849  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
850  * usage.
851  */
852 GstRTSPAddressPool *
853 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
854 {
855   GstRTSPStreamPrivate *priv;
856   GstRTSPAddressPool *result;
857
858   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
859
860   priv = stream->priv;
861
862   g_mutex_lock (&priv->lock);
863   if ((result = priv->pool))
864     g_object_ref (result);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868 }
869
870 /**
871  * gst_rtsp_stream_get_multicast_address:
872  * @stream: a #GstRTSPStream
873  * @family: the #GSocketFamily
874  *
875  * Get the multicast address of @stream for @family.
876  *
877  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
878  * or %NULL when no address could be allocated. gst_rtsp_address_free()
879  * after usage.
880  */
881 GstRTSPAddress *
882 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
883     GSocketFamily family)
884 {
885   GstRTSPStreamPrivate *priv;
886   GstRTSPAddress *result;
887   GstRTSPAddress **addrp;
888   GstRTSPAddressFlags flags;
889
890   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
891
892   priv = stream->priv;
893
894   if (family == G_SOCKET_FAMILY_IPV6) {
895     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
896     addrp = &priv->addr_v6;
897   } else {
898     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
899     addrp = &priv->addr_v4;
900   }
901
902   g_mutex_lock (&priv->lock);
903   if (*addrp == NULL) {
904     if (priv->pool == NULL)
905       goto no_pool;
906
907     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
908
909     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
910     if (*addrp == NULL)
911       goto no_address;
912   }
913   result = gst_rtsp_address_copy (*addrp);
914   g_mutex_unlock (&priv->lock);
915
916   return result;
917
918   /* ERRORS */
919 no_pool:
920   {
921     GST_ERROR_OBJECT (stream, "no address pool specified");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 no_address:
926   {
927     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
928     g_mutex_unlock (&priv->lock);
929     return NULL;
930   }
931 }
932
933 /**
934  * gst_rtsp_stream_reserve_address:
935  * @stream: a #GstRTSPStream
936  * @address: an address
937  * @port: a port
938  * @n_ports: n_ports
939  * @ttl: a TTL
940  *
941  * Reserve @address and @port as the address and port of @stream.
942  *
943  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
944  * the address could be reserved. gst_rtsp_address_free() after usage.
945  */
946 GstRTSPAddress *
947 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
948     const gchar * address, guint port, guint n_ports, guint ttl)
949 {
950   GstRTSPStreamPrivate *priv;
951   GstRTSPAddress *result;
952   GInetAddress *addr;
953   GSocketFamily family;
954   GstRTSPAddress **addrp;
955
956   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
957   g_return_val_if_fail (address != NULL, NULL);
958   g_return_val_if_fail (port > 0, NULL);
959   g_return_val_if_fail (n_ports > 0, NULL);
960   g_return_val_if_fail (ttl > 0, NULL);
961
962   priv = stream->priv;
963
964   addr = g_inet_address_new_from_string (address);
965   if (!addr) {
966     GST_ERROR ("failed to get inet addr from %s", address);
967     family = G_SOCKET_FAMILY_IPV4;
968   } else {
969     family = g_inet_address_get_family (addr);
970     g_object_unref (addr);
971   }
972
973   if (family == G_SOCKET_FAMILY_IPV6)
974     addrp = &priv->addr_v6;
975   else
976     addrp = &priv->addr_v4;
977
978   g_mutex_lock (&priv->lock);
979   if (*addrp == NULL) {
980     GstRTSPAddressPoolResult res;
981
982     if (priv->pool == NULL)
983       goto no_pool;
984
985     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
986         port, n_ports, ttl, addrp);
987     if (res != GST_RTSP_ADDRESS_POOL_OK)
988       goto no_address;
989   } else {
990     if (strcmp ((*addrp)->address, address) ||
991         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
992         (*addrp)->ttl != ttl)
993       goto different_address;
994   }
995   result = gst_rtsp_address_copy (*addrp);
996   g_mutex_unlock (&priv->lock);
997
998   return result;
999
1000   /* ERRORS */
1001 no_pool:
1002   {
1003     GST_ERROR_OBJECT (stream, "no address pool specified");
1004     g_mutex_unlock (&priv->lock);
1005     return NULL;
1006   }
1007 no_address:
1008   {
1009     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1010         address);
1011     g_mutex_unlock (&priv->lock);
1012     return NULL;
1013   }
1014 different_address:
1015   {
1016     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1017         " reserved", address);
1018     g_mutex_unlock (&priv->lock);
1019     return NULL;
1020   }
1021 }
1022
1023 /* must be called with lock */
1024 static void
1025 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1026     GSocket * rtcp_socket, GSocketFamily family)
1027 {
1028   GstRTSPStreamPrivate *priv = stream->priv;
1029   const gchar *multisink_socket;
1030
1031   if (family == G_SOCKET_FAMILY_IPV6)
1032     multisink_socket = "socket-v6";
1033   else
1034     multisink_socket = "socket";
1035
1036   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1037       NULL);
1038   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1039       NULL);
1040 }
1041
1042 /* must be called with lock */
1043 static gboolean
1044 create_and_configure_udpsinks (GstRTSPStream * stream)
1045 {
1046   GstRTSPStreamPrivate *priv = stream->priv;
1047   GstElement *udpsink0, *udpsink1;
1048
1049   udpsink0 = NULL;
1050   udpsink1 = NULL;
1051
1052   if (priv->udpsink[0])
1053     udpsink0 = priv->udpsink[0];
1054   else
1055     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1056
1057   if (!udpsink0)
1058     goto no_udp_protocol;
1059
1060   if (priv->udpsink[1])
1061     udpsink1 = priv->udpsink[1];
1062   else
1063     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1064
1065   if (!udpsink1)
1066     goto no_udp_protocol;
1067
1068   /* configure sinks */
1069
1070   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1071   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1072
1073   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1074   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1075
1076   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1077
1078   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1079   /* Needs to be async for RECORD streams, otherwise we will never go to
1080    * PLAYING because the sinks will wait for data while the udpsrc can't
1081    * provide data with timestamps in PAUSED. */
1082   if (priv->sinkpad)
1083     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1085
1086   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1087   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1088
1089   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1090   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1091
1092   /* update the dscp qos field in the sinks */
1093   update_dscp_qos (stream);
1094
1095   priv->udpsink[0] = udpsink0;
1096   priv->udpsink[1] = udpsink1;
1097
1098   return TRUE;
1099
1100   /* ERRORS */
1101 no_udp_protocol:
1102   {
1103     return FALSE;
1104   }
1105 }
1106
1107 /* must be called with lock */
1108 static void
1109 play_udpsources_one_family (GstRTSPStream * stream, GSocketFamily family)
1110 {
1111   GstRTSPStreamPrivate *priv;
1112   GstPad *pad, *selpad;
1113   guint i;
1114   GstBin *bin;
1115
1116   priv = stream->priv;
1117   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1118
1119   for (i = 0; i < 2; i++) {
1120     if (priv->sinkpad || i == 1) {
1121       if (family == G_SOCKET_FAMILY_IPV4 && priv->udpsrc_v4[i]) {
1122         if (priv->srcpad) {
1123           /* we set and keep these to playing so that they don't cause NO_PREROLL return
1124            * values. This is only relevant for PLAY pipelines */
1125           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1126           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1127         }
1128         /* add udpsrc */
1129         gst_bin_add (bin, priv->udpsrc_v4[i]);
1130
1131         /* and link to the funnel v4 */
1132         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1133         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1134         gst_pad_link (pad, selpad);
1135         gst_object_unref (pad);
1136         gst_object_unref (selpad);
1137       }
1138
1139       if (family == G_SOCKET_FAMILY_IPV6 && priv->udpsrc_v6[i]) {
1140         if (priv->srcpad) {
1141           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1142           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1143         }
1144         gst_bin_add (bin, priv->udpsrc_v6[i]);
1145
1146         /* and link to the funnel v6 */
1147         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1148         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1149         gst_pad_link (pad, selpad);
1150         gst_object_unref (pad);
1151         gst_object_unref (selpad);
1152       }
1153     }
1154   }
1155
1156   gst_object_unref (bin);
1157 }
1158
1159 /* must be called with lock */
1160 static gboolean
1161 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1162     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family)
1163 {
1164   GstStateChangeReturn ret;
1165
1166   /* we keep these elements, we will further configure them when the
1167    * client told us to really use the UDP ports. */
1168   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1169   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1170
1171   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1172     goto error;
1173
1174   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1175   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1176
1177   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1178   if (ret == GST_STATE_CHANGE_FAILURE)
1179     goto error;
1180   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1181   if (ret == GST_STATE_CHANGE_FAILURE)
1182     goto error;
1183
1184   return TRUE;
1185   return TRUE;
1186
1187   /* ERRORS */
1188 error:
1189   {
1190     if (udpsrc_out[0])
1191       gst_object_unref (udpsrc_out[0]);
1192     if (udpsrc_out[1])
1193       gst_object_unref (udpsrc_out[1]);
1194     return FALSE;
1195   }
1196 }
1197
1198 static gboolean
1199 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1200     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1201     GstRTSPAddress ** server_addr_out)
1202 {
1203   GstRTSPStreamPrivate *priv = stream->priv;
1204   GSocket *rtp_socket = NULL;
1205   GSocket *rtcp_socket;
1206   gint tmp_rtp, tmp_rtcp;
1207   guint count;
1208   gint rtpport, rtcpport;
1209   GList *rejected_addresses = NULL;
1210   GstRTSPAddress *addr = NULL;
1211   GInetAddress *inetaddr = NULL;
1212   GSocketAddress *rtp_sockaddr = NULL;
1213   GSocketAddress *rtcp_sockaddr = NULL;
1214   GstRTSPAddressPool * pool;
1215
1216   pool = priv->pool;
1217   count = 0;
1218
1219   /* Start with random port */
1220   tmp_rtp = 0;
1221
1222   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1223       G_SOCKET_PROTOCOL_UDP, NULL);
1224   if (!rtcp_socket)
1225     goto no_udp_protocol;
1226
1227   if (*server_addr_out)
1228     gst_rtsp_address_free (*server_addr_out);
1229
1230   /* try to allocate 2 UDP ports, the RTP port should be an even
1231    * number and the RTCP port should be the next (uneven) port */
1232 again:
1233
1234   if (rtp_socket == NULL) {
1235     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1236         G_SOCKET_PROTOCOL_UDP, NULL);
1237     if (!rtp_socket)
1238       goto no_udp_protocol;
1239   }
1240
1241   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1242     GstRTSPAddressFlags flags;
1243
1244     if (addr)
1245       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1246
1247     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1248     if (family == G_SOCKET_FAMILY_IPV6)
1249       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1250     else
1251       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1252
1253     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1254
1255     if (addr == NULL)
1256       goto no_ports;
1257
1258     tmp_rtp = addr->port;
1259
1260     g_clear_object (&inetaddr);
1261     inetaddr = g_inet_address_new_from_string (addr->address);
1262   } else {
1263     if (tmp_rtp != 0) {
1264       tmp_rtp += 2;
1265       if (++count > 20)
1266         goto no_ports;
1267     }
1268
1269     if (inetaddr == NULL)
1270       inetaddr = g_inet_address_new_any (family);
1271   }
1272
1273   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1274   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1275     g_object_unref (rtp_sockaddr);
1276     goto again;
1277   }
1278   g_object_unref (rtp_sockaddr);
1279
1280   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1281   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1282     g_clear_object (&rtp_sockaddr);
1283     goto socket_error;
1284   }
1285
1286   tmp_rtp =
1287       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1288   g_object_unref (rtp_sockaddr);
1289
1290   /* check if port is even */
1291   if ((tmp_rtp & 1) != 0) {
1292     /* port not even, close and allocate another */
1293     tmp_rtp++;
1294     g_clear_object (&rtp_socket);
1295     goto again;
1296   }
1297
1298   /* set port */
1299   tmp_rtcp = tmp_rtp + 1;
1300
1301   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1302   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1303     g_object_unref (rtcp_sockaddr);
1304     g_clear_object (&rtp_socket);
1305     goto again;
1306   }
1307   g_object_unref (rtcp_sockaddr);
1308
1309   g_clear_object (&inetaddr);
1310
1311   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1312         rtcp_socket, family))
1313     goto no_udp_protocol;
1314
1315   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1316   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1317
1318   /* this should not happen... */
1319   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1320     goto port_error;
1321
1322   if (!create_and_configure_udpsinks (stream))
1323     goto no_udp_protocol;
1324
1325   /* set RTP and RTCP sockets */
1326   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1327
1328   server_port_out->min = rtpport;
1329   server_port_out->max = rtcpport;
1330
1331   *server_addr_out = addr;
1332   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1333
1334   g_object_unref (rtp_socket);
1335   g_object_unref (rtcp_socket);
1336
1337   return TRUE;
1338
1339   /* ERRORS */
1340 no_udp_protocol:
1341   {
1342     goto cleanup;
1343   }
1344 no_ports:
1345   {
1346     goto cleanup;
1347   }
1348 port_error:
1349   {
1350     goto cleanup;
1351   }
1352 socket_error:
1353   {
1354     goto cleanup;
1355   }
1356 cleanup:
1357   {
1358     if (inetaddr)
1359       g_object_unref (inetaddr);
1360     g_list_free_full (rejected_addresses,
1361         (GDestroyNotify) gst_rtsp_address_free);
1362     if (addr)
1363       gst_rtsp_address_free (addr);
1364     if (rtp_socket)
1365       g_object_unref (rtp_socket);
1366     if (rtcp_socket)
1367       g_object_unref (rtcp_socket);
1368     return FALSE;
1369   }
1370 }
1371
1372 /* must be called with lock */
1373 static gboolean
1374 alloc_ports (GstRTSPStream * stream)
1375 {
1376   GstRTSPStreamPrivate *priv = stream->priv;
1377
1378   priv->have_ipv4 =
1379       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1380           &priv->server_port_v4, &priv->server_addr_v4);
1381
1382   priv->have_ipv6 =
1383       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1384           &priv->server_port_v6, &priv->server_addr_v6);
1385
1386   return priv->have_ipv4 || priv->have_ipv6;
1387 }
1388
1389 /**
1390  * gst_rtsp_stream_set_client_side:
1391  * @stream: a #GstRTSPStream
1392  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1393  * an RTSP connection.
1394  *
1395  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1396  * streams to an RTSP server via RECORD. This has the practical effect
1397  * of changing which UDP port numbers are used when setting up the local
1398  * side of the stream sending to be either the 'server' or 'client' pair
1399  * of a configured UDP transport.
1400  */
1401 void
1402 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1403 {
1404   GstRTSPStreamPrivate *priv;
1405
1406   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1407   priv = stream->priv;
1408   g_mutex_lock (&priv->lock);
1409   priv->client_side = client_side;
1410   g_mutex_unlock (&priv->lock);
1411 }
1412
1413 /**
1414  * gst_rtsp_stream_set_client_side:
1415  * @stream: a #GstRTSPStream
1416  *
1417  * See gst_rtsp_stream_set_client_side()
1418  *
1419  * Returns: TRUE if this #GstRTSPStream is client-side.
1420  */
1421 gboolean
1422 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1423 {
1424   GstRTSPStreamPrivate *priv;
1425   gboolean ret;
1426
1427   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1428
1429   priv = stream->priv;
1430   g_mutex_lock (&priv->lock);
1431   ret = priv->client_side;
1432   g_mutex_unlock (&priv->lock);
1433
1434   return ret;
1435 }
1436
1437 /**
1438  * gst_rtsp_stream_get_server_port:
1439  * @stream: a #GstRTSPStream
1440  * @server_port: (out): result server port
1441  * @family: the port family to get
1442  *
1443  * Fill @server_port with the port pair used by the server. This function can
1444  * only be called when @stream has been joined.
1445  */
1446 void
1447 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1448     GstRTSPRange * server_port, GSocketFamily family)
1449 {
1450   GstRTSPStreamPrivate *priv;
1451
1452   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1453   priv = stream->priv;
1454   g_return_if_fail (priv->is_joined);
1455
1456   g_mutex_lock (&priv->lock);
1457   if (family == G_SOCKET_FAMILY_IPV4) {
1458     if (server_port)
1459       *server_port = priv->server_port_v4;
1460   } else {
1461     if (server_port)
1462       *server_port = priv->server_port_v6;
1463   }
1464   g_mutex_unlock (&priv->lock);
1465 }
1466
1467 /**
1468  * gst_rtsp_stream_get_rtpsession:
1469  * @stream: a #GstRTSPStream
1470  *
1471  * Get the RTP session of this stream.
1472  *
1473  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1474  */
1475 GObject *
1476 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1477 {
1478   GstRTSPStreamPrivate *priv;
1479   GObject *session;
1480
1481   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1482
1483   priv = stream->priv;
1484
1485   g_mutex_lock (&priv->lock);
1486   if ((session = priv->session))
1487     g_object_ref (session);
1488   g_mutex_unlock (&priv->lock);
1489
1490   return session;
1491 }
1492
1493 /**
1494  * gst_rtsp_stream_get_ssrc:
1495  * @stream: a #GstRTSPStream
1496  * @ssrc: (out): result ssrc
1497  *
1498  * Get the SSRC used by the RTP session of this stream. This function can only
1499  * be called when @stream has been joined.
1500  */
1501 void
1502 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1503 {
1504   GstRTSPStreamPrivate *priv;
1505
1506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1507   priv = stream->priv;
1508   g_return_if_fail (priv->is_joined);
1509
1510   g_mutex_lock (&priv->lock);
1511   if (ssrc && priv->session)
1512     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1513   g_mutex_unlock (&priv->lock);
1514 }
1515
1516 /**
1517  * gst_rtsp_stream_set_retransmission_time:
1518  * @stream: a #GstRTSPStream
1519  * @time: a #GstClockTime
1520  *
1521  * Set the amount of time to store retransmission packets.
1522  */
1523 void
1524 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1525     GstClockTime time)
1526 {
1527   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1528
1529   g_mutex_lock (&stream->priv->lock);
1530   stream->priv->rtx_time = time;
1531   if (stream->priv->rtxsend)
1532     g_object_set (stream->priv->rtxsend, "max-size-time",
1533         GST_TIME_AS_MSECONDS (time), NULL);
1534   g_mutex_unlock (&stream->priv->lock);
1535 }
1536
1537 /**
1538  * gst_rtsp_stream_get_retransmission_time:
1539  * @stream: a #GstRTSPStream
1540  *
1541  * Get the amount of time to store retransmission data.
1542  *
1543  * Returns: the amount of time to store retransmission data.
1544  */
1545 GstClockTime
1546 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1547 {
1548   GstClockTime ret;
1549
1550   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1551
1552   g_mutex_lock (&stream->priv->lock);
1553   ret = stream->priv->rtx_time;
1554   g_mutex_unlock (&stream->priv->lock);
1555
1556   return ret;
1557 }
1558
1559 /**
1560  * gst_rtsp_stream_set_retransmission_pt:
1561  * @stream: a #GstRTSPStream
1562  * @rtx_pt: a #guint
1563  *
1564  * Set the payload type (pt) for retransmission of this stream.
1565  */
1566 void
1567 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1568 {
1569   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1570
1571   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1572
1573   g_mutex_lock (&stream->priv->lock);
1574   stream->priv->rtx_pt = rtx_pt;
1575   if (stream->priv->rtxsend) {
1576     guint pt = gst_rtsp_stream_get_pt (stream);
1577     gchar *pt_s = g_strdup_printf ("%d", pt);
1578     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1579         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1580     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1581     g_free (pt_s);
1582     gst_structure_free (rtx_pt_map);
1583   }
1584   g_mutex_unlock (&stream->priv->lock);
1585 }
1586
1587 /**
1588  * gst_rtsp_stream_get_retransmission_pt:
1589  * @stream: a #GstRTSPStream
1590  *
1591  * Get the payload-type used for retransmission of this stream
1592  *
1593  * Returns: The retransmission PT.
1594  */
1595 guint
1596 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1597 {
1598   guint rtx_pt;
1599
1600   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1601
1602   g_mutex_lock (&stream->priv->lock);
1603   rtx_pt = stream->priv->rtx_pt;
1604   g_mutex_unlock (&stream->priv->lock);
1605
1606   return rtx_pt;
1607 }
1608
1609 /**
1610  * gst_rtsp_stream_set_buffer_size:
1611  * @stream: a #GstRTSPStream
1612  * @size: the buffer size
1613  *
1614  * Set the size of the UDP transmission buffer (in bytes)
1615  * Needs to be set before the stream is joined to a bin.
1616  *
1617  * Since: 1.6
1618  */
1619 void
1620 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1621 {
1622   g_mutex_lock (&stream->priv->lock);
1623   stream->priv->buffer_size = size;
1624   g_mutex_unlock (&stream->priv->lock);
1625 }
1626
1627 /**
1628  * gst_rtsp_stream_get_buffer_size:
1629  * @stream: a #GstRTSPStream
1630  *
1631  * Get the size of the UDP transmission buffer (in bytes)
1632  *
1633  * Returns: the size of the UDP TX buffer
1634  *
1635  * Since: 1.6
1636  */
1637 guint
1638 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1639 {
1640   guint buffer_size;
1641
1642   g_mutex_lock (&stream->priv->lock);
1643   buffer_size = stream->priv->buffer_size;
1644   g_mutex_unlock (&stream->priv->lock);
1645
1646   return buffer_size;
1647 }
1648
1649 /* executed from streaming thread */
1650 static void
1651 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1652 {
1653   GstRTSPStreamPrivate *priv = stream->priv;
1654   GstCaps *newcaps, *oldcaps;
1655
1656   newcaps = gst_pad_get_current_caps (pad);
1657
1658   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1659       newcaps);
1660
1661   g_mutex_lock (&priv->lock);
1662   oldcaps = priv->caps;
1663   priv->caps = newcaps;
1664   g_mutex_unlock (&priv->lock);
1665
1666   if (oldcaps)
1667     gst_caps_unref (oldcaps);
1668 }
1669
1670 static void
1671 dump_structure (const GstStructure * s)
1672 {
1673   gchar *sstr;
1674
1675   sstr = gst_structure_to_string (s);
1676   GST_INFO ("structure: %s", sstr);
1677   g_free (sstr);
1678 }
1679
1680 static GstRTSPStreamTransport *
1681 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1682 {
1683   GstRTSPStreamPrivate *priv = stream->priv;
1684   GList *walk;
1685   GstRTSPStreamTransport *result = NULL;
1686   const gchar *tmp;
1687   gchar *dest;
1688   guint port;
1689
1690   if (rtcp_from == NULL)
1691     return NULL;
1692
1693   tmp = g_strrstr (rtcp_from, ":");
1694   if (tmp == NULL)
1695     return NULL;
1696
1697   port = atoi (tmp + 1);
1698   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1699
1700   g_mutex_lock (&priv->lock);
1701   GST_INFO ("finding %s:%d in %d transports", dest, port,
1702       g_list_length (priv->transports));
1703
1704   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1705     GstRTSPStreamTransport *trans = walk->data;
1706     const GstRTSPTransport *tr;
1707     gint min, max;
1708
1709     tr = gst_rtsp_stream_transport_get_transport (trans);
1710
1711     if (priv->client_side) {
1712       /* In client side mode the 'destination' is the RTSP server, so send
1713        * to those ports */
1714       min = tr->server_port.min;
1715       max = tr->server_port.max;
1716     } else {
1717       min = tr->client_port.min;
1718       max = tr->client_port.max;
1719     }
1720
1721     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1722       result = trans;
1723       break;
1724     }
1725   }
1726   if (result)
1727     g_object_ref (result);
1728   g_mutex_unlock (&priv->lock);
1729
1730   g_free (dest);
1731
1732   return result;
1733 }
1734
1735 static GstRTSPStreamTransport *
1736 check_transport (GObject * source, GstRTSPStream * stream)
1737 {
1738   GstStructure *stats;
1739   GstRTSPStreamTransport *trans;
1740
1741   /* see if we have a stream to match with the origin of the RTCP packet */
1742   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1743   if (trans == NULL) {
1744     g_object_get (source, "stats", &stats, NULL);
1745     if (stats) {
1746       const gchar *rtcp_from;
1747
1748       dump_structure (stats);
1749
1750       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1751       if ((trans = find_transport (stream, rtcp_from))) {
1752         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1753             source);
1754         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1755             g_object_unref);
1756       }
1757       gst_structure_free (stats);
1758     }
1759   }
1760   return trans;
1761 }
1762
1763
1764 static void
1765 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1766 {
1767   GstRTSPStreamTransport *trans;
1768
1769   GST_INFO ("%p: new source %p", stream, source);
1770
1771   trans = check_transport (source, stream);
1772
1773   if (trans)
1774     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1775 }
1776
1777 static void
1778 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1779 {
1780   GST_INFO ("%p: new SDES %p", stream, source);
1781 }
1782
1783 static void
1784 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1785 {
1786   GstRTSPStreamTransport *trans;
1787
1788   trans = check_transport (source, stream);
1789
1790   if (trans) {
1791     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1792     gst_rtsp_stream_transport_keep_alive (trans);
1793   }
1794 #ifdef DUMP_STATS
1795   {
1796     GstStructure *stats;
1797     g_object_get (source, "stats", &stats, NULL);
1798     if (stats) {
1799       dump_structure (stats);
1800       gst_structure_free (stats);
1801     }
1802   }
1803 #endif
1804 }
1805
1806 static void
1807 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1808 {
1809   GST_INFO ("%p: source %p bye", stream, source);
1810 }
1811
1812 static void
1813 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1814 {
1815   GstRTSPStreamTransport *trans;
1816
1817   GST_INFO ("%p: source %p bye timeout", stream, source);
1818
1819   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1820     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1821     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1822   }
1823 }
1824
1825 static void
1826 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1827 {
1828   GstRTSPStreamTransport *trans;
1829
1830   GST_INFO ("%p: source %p timeout", stream, source);
1831
1832   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1833     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1834     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1835   }
1836 }
1837
1838 static void
1839 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1840 {
1841   GST_INFO ("%p: new sender source %p", stream, source);
1842 #ifndef DUMP_STATS
1843   {
1844     GstStructure *stats;
1845     g_object_get (source, "stats", &stats, NULL);
1846     if (stats) {
1847       dump_structure (stats);
1848       gst_structure_free (stats);
1849     }
1850   }
1851 #endif
1852 }
1853
1854 static void
1855 on_sender_ssrc_active (GObject * session, GObject * source,
1856     GstRTSPStream * stream)
1857 {
1858 #ifndef DUMP_STATS
1859   {
1860     GstStructure *stats;
1861     g_object_get (source, "stats", &stats, NULL);
1862     if (stats) {
1863       dump_structure (stats);
1864       gst_structure_free (stats);
1865     }
1866   }
1867 #endif
1868 }
1869
1870 static void
1871 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1872 {
1873   if (is_rtp) {
1874     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1875     g_list_free (priv->tr_cache_rtp);
1876     priv->tr_cache_rtp = NULL;
1877   } else {
1878     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1879     g_list_free (priv->tr_cache_rtcp);
1880     priv->tr_cache_rtcp = NULL;
1881   }
1882 }
1883
1884 static GstFlowReturn
1885 handle_new_sample (GstAppSink * sink, gpointer user_data)
1886 {
1887   GstRTSPStreamPrivate *priv;
1888   GList *walk;
1889   GstSample *sample;
1890   GstBuffer *buffer;
1891   GstRTSPStream *stream;
1892   gboolean is_rtp;
1893
1894   sample = gst_app_sink_pull_sample (sink);
1895   if (!sample)
1896     return GST_FLOW_OK;
1897
1898   stream = (GstRTSPStream *) user_data;
1899   priv = stream->priv;
1900   buffer = gst_sample_get_buffer (sample);
1901
1902   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1903
1904   g_mutex_lock (&priv->lock);
1905   if (is_rtp) {
1906     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1907       clear_tr_cache (priv, is_rtp);
1908       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1909         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1910         priv->tr_cache_rtp =
1911             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1912       }
1913       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1914     }
1915   } else {
1916     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1917       clear_tr_cache (priv, is_rtp);
1918       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1919         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1920         priv->tr_cache_rtcp =
1921             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1922       }
1923       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1924     }
1925   }
1926   g_mutex_unlock (&priv->lock);
1927
1928   if (is_rtp) {
1929     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1930       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1931       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1932     }
1933   } else {
1934     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1935       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1936       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1937     }
1938   }
1939   gst_sample_unref (sample);
1940
1941   return GST_FLOW_OK;
1942 }
1943
1944 static GstAppSinkCallbacks sink_cb = {
1945   NULL,                         /* not interested in EOS */
1946   NULL,                         /* not interested in preroll samples */
1947   handle_new_sample,
1948 };
1949
1950 static GstElement *
1951 get_rtp_encoder (GstRTSPStream * stream, guint session)
1952 {
1953   GstRTSPStreamPrivate *priv = stream->priv;
1954
1955   if (priv->srtpenc == NULL) {
1956     gchar *name;
1957
1958     name = g_strdup_printf ("srtpenc_%u", session);
1959     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1960     g_free (name);
1961
1962     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1963   }
1964   return gst_object_ref (priv->srtpenc);
1965 }
1966
1967 static GstElement *
1968 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1969 {
1970   GstRTSPStreamPrivate *priv = stream->priv;
1971   GstElement *oldenc, *enc;
1972   GstPad *pad;
1973   gchar *name;
1974
1975   if (priv->idx != session)
1976     return NULL;
1977
1978   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1979
1980   oldenc = priv->srtpenc;
1981   enc = get_rtp_encoder (stream, session);
1982   name = g_strdup_printf ("rtp_sink_%d", session);
1983   pad = gst_element_get_request_pad (enc, name);
1984   g_free (name);
1985   gst_object_unref (pad);
1986
1987   if (oldenc == NULL)
1988     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1989         enc);
1990
1991   return enc;
1992 }
1993
1994 static GstElement *
1995 request_rtcp_encoder (GstElement * rtpbin, guint session,
1996     GstRTSPStream * stream)
1997 {
1998   GstRTSPStreamPrivate *priv = stream->priv;
1999   GstElement *oldenc, *enc;
2000   GstPad *pad;
2001   gchar *name;
2002
2003   if (priv->idx != session)
2004     return NULL;
2005
2006   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2007
2008   oldenc = priv->srtpenc;
2009   enc = get_rtp_encoder (stream, session);
2010   name = g_strdup_printf ("rtcp_sink_%d", session);
2011   pad = gst_element_get_request_pad (enc, name);
2012   g_free (name);
2013   gst_object_unref (pad);
2014
2015   if (oldenc == NULL)
2016     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2017         enc);
2018
2019   return enc;
2020 }
2021
2022 static GstCaps *
2023 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2024 {
2025   GstRTSPStreamPrivate *priv = stream->priv;
2026   GstCaps *caps;
2027
2028   GST_DEBUG ("request key %08x", ssrc);
2029
2030   g_mutex_lock (&priv->lock);
2031   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2032     gst_caps_ref (caps);
2033   g_mutex_unlock (&priv->lock);
2034
2035   return caps;
2036 }
2037
2038 static GstElement *
2039 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2040     GstRTSPStream * stream)
2041 {
2042   GstRTSPStreamPrivate *priv = stream->priv;
2043
2044   if (priv->idx != session)
2045     return NULL;
2046
2047   if (priv->srtpdec == NULL) {
2048     gchar *name;
2049
2050     name = g_strdup_printf ("srtpdec_%u", session);
2051     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2052     g_free (name);
2053
2054     g_signal_connect (priv->srtpdec, "request-key",
2055         (GCallback) request_key, stream);
2056   }
2057   return gst_object_ref (priv->srtpdec);
2058 }
2059
2060 /**
2061  * gst_rtsp_stream_request_aux_sender:
2062  * @stream: a #GstRTSPStream
2063  * @sessid: the session id
2064  *
2065  * Creating a rtxsend bin
2066  *
2067  * Returns: (transfer full): a #GstElement.
2068  *
2069  * Since: 1.6
2070  */
2071 GstElement *
2072 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2073 {
2074   GstElement *bin;
2075   GstPad *pad;
2076   GstStructure *pt_map;
2077   gchar *name;
2078   guint pt, rtx_pt;
2079   gchar *pt_s;
2080
2081   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2082
2083   pt = gst_rtsp_stream_get_pt (stream);
2084   pt_s = g_strdup_printf ("%u", pt);
2085   rtx_pt = stream->priv->rtx_pt;
2086
2087   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2088
2089   bin = gst_bin_new (NULL);
2090   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2091   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2092       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2093   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2094       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2095   g_free (pt_s);
2096   gst_structure_free (pt_map);
2097   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2098
2099   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2100   name = g_strdup_printf ("src_%u", sessid);
2101   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2102   g_free (name);
2103   gst_object_unref (pad);
2104
2105   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2106   name = g_strdup_printf ("sink_%u", sessid);
2107   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2108   g_free (name);
2109   gst_object_unref (pad);
2110
2111   return bin;
2112 }
2113
2114 /**
2115  * gst_rtsp_stream_set_pt_map:
2116  * @stream: a #GstRTSPStream
2117  * @pt: the pt
2118  * @caps: a #GstCaps
2119  *
2120  * Configure a pt map between @pt and @caps.
2121  */
2122 void
2123 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2124 {
2125   GstRTSPStreamPrivate *priv = stream->priv;
2126
2127   g_mutex_lock (&priv->lock);
2128   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2129   g_mutex_unlock (&priv->lock);
2130 }
2131
2132 static GstCaps *
2133 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2134     GstRTSPStream * stream)
2135 {
2136   GstRTSPStreamPrivate *priv = stream->priv;
2137   GstCaps *caps = NULL;
2138
2139   g_mutex_lock (&priv->lock);
2140
2141   if (priv->idx == session) {
2142     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2143     if (caps) {
2144       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2145       gst_caps_ref (caps);
2146     } else {
2147       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2148     }
2149   }
2150
2151   g_mutex_unlock (&priv->lock);
2152
2153   return caps;
2154 }
2155
2156 static void
2157 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2158 {
2159   GstRTSPStreamPrivate *priv = stream->priv;
2160   gchar *name;
2161   GstPadLinkReturn ret;
2162   guint sessid;
2163
2164   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2165       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2166
2167   name = gst_pad_get_name (pad);
2168   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2169     g_free (name);
2170     return;
2171   }
2172   g_free (name);
2173
2174   if (priv->idx != sessid)
2175     return;
2176
2177   if (gst_pad_is_linked (priv->sinkpad)) {
2178     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2179         GST_DEBUG_PAD_NAME (priv->sinkpad));
2180     return;
2181   }
2182
2183   /* link the RTP pad to the session manager, it should not really fail unless
2184    * this is not really an RTP pad */
2185   ret = gst_pad_link (pad, priv->sinkpad);
2186   if (ret != GST_PAD_LINK_OK)
2187     goto link_failed;
2188   priv->recv_rtp_src = gst_object_ref (pad);
2189
2190   return;
2191
2192 /* ERRORS */
2193 link_failed:
2194   {
2195     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2196         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2197   }
2198 }
2199
2200 static void
2201 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2202     GstRTSPStream * stream)
2203 {
2204   /* TODO: What to do here other than this? */
2205   GST_DEBUG ("Stream %p: Got EOS", stream);
2206   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2207 }
2208
2209 /* must be called with lock */
2210 static void
2211 create_sender_part (GstRTSPStream * stream, GstBin * bin,
2212     GstState state)
2213 {
2214   GstRTSPStreamPrivate *priv;
2215   GstPad *pad, *sinkpad = NULL;
2216   gboolean is_tcp = FALSE, is_udp = FALSE;
2217   gint i;
2218
2219   priv = stream->priv;
2220
2221   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2222   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2223       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2224
2225   for (i = 0; i < 2; i++) {
2226     GstPad *teepad, *queuepad;
2227     /* For the sender we create this bit of pipeline for both
2228      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2229      * we need to add a queue before appsink and udpsink to make
2230      * the pipeline not block. For the TCP case, we want to pump
2231      * client as fast as possible anyway. This pipeline is used
2232      * when both TCP and UDP are present.
2233      *
2234      * .--------.      .-----.    .---------.    .---------.
2235      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2236      * |       send->sink   src->sink      src->sink       |
2237      * '--------'      |     |    '---------'    '---------'
2238      *                 |     |    .---------.    .---------.
2239      *                 |     |    |  queue  |    | appsink |
2240      *                 |    src->sink      src->sink       |
2241      *                 '-----'    '---------'    '---------'
2242      *
2243      * When only UDP or only TCP is allowed, we skip the tee and queue
2244      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2245      * the session.
2246      */
2247     /* Only link the RTP send src if we're going to send RTP, link
2248      * the RTCP send src always */
2249     if (priv->srcpad || i == 1) {
2250       if (is_udp) {
2251         /* add udpsink */
2252         gst_bin_add (bin, priv->udpsink[i]);
2253         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2254       }
2255
2256       if (is_tcp) {
2257         /* make appsink */
2258         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2259         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2260         gst_bin_add (bin, priv->appsink[i]);
2261         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2262             &sink_cb, stream, NULL);
2263       }
2264
2265       if (is_udp && is_tcp) {
2266         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2267
2268         /* make tee for RTP/RTCP */
2269         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2270         gst_bin_add (bin, priv->tee[i]);
2271
2272         /* and link to rtpbin send pad */
2273         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2274         gst_pad_link (priv->send_src[i], pad);
2275         gst_object_unref (pad);
2276
2277         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2278         g_object_set (priv->udpqueue[i], "max-size-buffers",
2279             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2280             NULL);
2281         gst_bin_add (bin, priv->udpqueue[i]);
2282         /* link tee to udpqueue */
2283         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2284         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2285         gst_pad_link (teepad, pad);
2286         gst_object_unref (pad);
2287         gst_object_unref (teepad);
2288
2289         /* link udpqueue to udpsink */
2290         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2291         gst_pad_link (queuepad, sinkpad);
2292         gst_object_unref (queuepad);
2293         gst_object_unref (sinkpad);
2294
2295         /* make appqueue */
2296         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2297         g_object_set (priv->appqueue[i], "max-size-buffers",
2298             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2299             NULL);
2300         gst_bin_add (bin, priv->appqueue[i]);
2301         /* and link tee to appqueue */
2302         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2303         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2304         gst_pad_link (teepad, pad);
2305         gst_object_unref (pad);
2306         gst_object_unref (teepad);
2307
2308         /* and link appqueue to appsink */
2309         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2310         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2311         gst_pad_link (queuepad, pad);
2312         gst_object_unref (pad);
2313         gst_object_unref (queuepad);
2314       } else if (is_tcp) {
2315         /* only appsink needed, link it to the session */
2316         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2317         gst_pad_link (priv->send_src[i], pad);
2318         gst_object_unref (pad);
2319
2320         /* when its only TCP, we need to set sync and preroll to FALSE
2321          * for the sink to avoid deadlock. And this is only needed for
2322          * sink used for RTCP data, not the RTP data. */
2323         if (i == 1)
2324           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2325       } else {
2326         /* else only udpsink needed, link it to the session */
2327         gst_pad_link (priv->send_src[i], sinkpad);
2328         gst_object_unref (sinkpad);
2329       }
2330     }
2331
2332     /* check if we need to set to a special state */
2333     if (state != GST_STATE_NULL) {
2334       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2335         gst_element_set_state (priv->udpsink[i], state);
2336       if (priv->appsink[i] && (priv->srcpad || i == 1))
2337         gst_element_set_state (priv->appsink[i], state);
2338       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2339         gst_element_set_state (priv->appqueue[i], state);
2340       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2341         gst_element_set_state (priv->udpqueue[i], state);
2342       if (priv->tee[i] && (priv->srcpad || i == 1))
2343         gst_element_set_state (priv->tee[i], state);
2344     }
2345   }
2346 }
2347
2348 /* must be called with lock */
2349 static void
2350 create_receiver_part (GstRTSPStream * stream, GstBin * bin,
2351     GstState state)
2352 {
2353   GstRTSPStreamPrivate *priv;
2354   GstPad *pad, *selpad;
2355   gboolean is_tcp;
2356   gint i;
2357
2358   priv = stream->priv;
2359
2360   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2361
2362   for (i = 0; i < 2; i++) {
2363     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2364      * RTCP sink always */
2365     if (priv->sinkpad || i == 1) {
2366       /* For the receiver we create this bit of pipeline for both
2367        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2368        * and it is all funneled into the rtpbin receive pad.
2369        *
2370        * .--------.     .--------.    .--------.
2371        * | udpsrc |     | funnel |    | rtpbin |
2372        * |       src->sink      src->sink      |
2373        * '--------'     |        |    '--------'
2374        * .--------.     |        |
2375        * | appsrc |     |        |
2376        * |       src->sink       |
2377        * '--------'     '--------'
2378        */
2379       /* make funnel for the RTP/RTCP receivers */
2380       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2381       gst_bin_add (bin, priv->funnel[i]);
2382
2383       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2384       gst_pad_link (pad, priv->recv_sink[i]);
2385       gst_object_unref (pad);
2386
2387       if (priv->udpsrc_v4[i]) {
2388         if (priv->srcpad) {
2389           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2390            * values. This is only relevant for PLAY pipelines */
2391           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2392           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2393         }
2394         /* add udpsrc */
2395         gst_bin_add (bin, priv->udpsrc_v4[i]);
2396
2397         /* and link to the funnel v4 */
2398         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2399         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2400         gst_pad_link (pad, selpad);
2401         gst_object_unref (pad);
2402         gst_object_unref (selpad);
2403       }
2404
2405       if (priv->udpsrc_v6[i]) {
2406         if (priv->srcpad) {
2407           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2408           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2409         }
2410         gst_bin_add (bin, priv->udpsrc_v6[i]);
2411
2412         /* and link to the funnel v6 */
2413         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2414         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2415         gst_pad_link (pad, selpad);
2416         gst_object_unref (pad);
2417         gst_object_unref (selpad);
2418       }
2419
2420       if (is_tcp) {
2421         /* make and add appsrc */
2422         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2423         priv->appsrc_base_time[i] = -1;
2424         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2425         gst_bin_add (bin, priv->appsrc[i]);
2426         /* and link to the funnel */
2427         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2428         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2429         gst_pad_link (pad, selpad);
2430         gst_object_unref (pad);
2431         gst_object_unref (selpad);
2432       }
2433     }
2434
2435     /* check if we need to set to a special state */
2436     if (state != GST_STATE_NULL) {
2437       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2438         gst_element_set_state (priv->funnel[i], state);
2439       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2440         gst_element_set_state (priv->appsrc[i], state);
2441     }
2442   }
2443 }
2444
2445 /**
2446  * gst_rtsp_stream_join_bin:
2447  * @stream: a #GstRTSPStream
2448  * @bin: (transfer none): a #GstBin to join
2449  * @rtpbin: (transfer none): a rtpbin element in @bin
2450  * @state: the target state of the new elements
2451  *
2452  * Join the #GstBin @bin that contains the element @rtpbin.
2453  *
2454  * @stream will link to @rtpbin, which must be inside @bin. The elements
2455  * added to @bin will be set to the state given in @state.
2456  *
2457  * Returns: %TRUE on success.
2458  */
2459 gboolean
2460 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2461     GstElement * rtpbin, GstState state)
2462 {
2463   GstRTSPStreamPrivate *priv;
2464   guint idx;
2465   gchar *name;
2466   GstPadLinkReturn ret;
2467   gboolean is_udp;
2468
2469   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2470   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2471   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2472
2473   priv = stream->priv;
2474
2475   g_mutex_lock (&priv->lock);
2476   if (priv->is_joined)
2477     goto was_joined;
2478
2479   /* create a session with the same index as the stream */
2480   idx = priv->idx;
2481
2482   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2483
2484   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2485       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2486
2487   if (is_udp && !alloc_ports (stream))
2488     goto no_ports;
2489
2490   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2491       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2492     /* For SRTP */
2493     g_signal_connect (rtpbin, "request-rtp-encoder",
2494         (GCallback) request_rtp_encoder, stream);
2495     g_signal_connect (rtpbin, "request-rtcp-encoder",
2496         (GCallback) request_rtcp_encoder, stream);
2497     g_signal_connect (rtpbin, "request-rtp-decoder",
2498         (GCallback) request_rtp_rtcp_decoder, stream);
2499     g_signal_connect (rtpbin, "request-rtcp-decoder",
2500         (GCallback) request_rtp_rtcp_decoder, stream);
2501   }
2502
2503   if (priv->sinkpad) {
2504     g_signal_connect (rtpbin, "request-pt-map",
2505         (GCallback) request_pt_map, stream);
2506   }
2507
2508   /* get pads from the RTP session element for sending and receiving
2509    * RTP/RTCP*/
2510   if (priv->srcpad) {
2511     /* get a pad for sending RTP */
2512     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2513     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2514     g_free (name);
2515
2516     /* link the RTP pad to the session manager, it should not really fail unless
2517      * this is not really an RTP pad */
2518     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2519     if (ret != GST_PAD_LINK_OK)
2520       goto link_failed;
2521
2522     name = g_strdup_printf ("send_rtp_src_%u", idx);
2523     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2524     g_free (name);
2525   } else {
2526     /* Need to connect our sinkpad from here */
2527     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2528     /* EOS */
2529     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2530
2531     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2532     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2533     g_free (name);
2534   }
2535
2536   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2537   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2538   g_free (name);
2539   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2540   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2541   g_free (name);
2542
2543   /* get the session */
2544   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2545
2546   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2547       stream);
2548   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2549       stream);
2550   g_signal_connect (priv->session, "on-ssrc-active",
2551       (GCallback) on_ssrc_active, stream);
2552   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2553       stream);
2554   g_signal_connect (priv->session, "on-bye-timeout",
2555       (GCallback) on_bye_timeout, stream);
2556   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2557       stream);
2558
2559   /* signal for sender ssrc */
2560   g_signal_connect (priv->session, "on-new-sender-ssrc",
2561       (GCallback) on_new_sender_ssrc, stream);
2562   g_signal_connect (priv->session, "on-sender-ssrc-active",
2563       (GCallback) on_sender_ssrc_active, stream);
2564
2565   create_sender_part (stream, bin, state);
2566
2567   create_receiver_part (stream, bin, state);
2568   play_udpsources_one_family (stream, G_SOCKET_FAMILY_IPV4);
2569   play_udpsources_one_family (stream, G_SOCKET_FAMILY_IPV6);
2570
2571   if (priv->srcpad) {
2572     /* be notified of caps changes */
2573     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2574         (GCallback) caps_notify, stream);
2575   }
2576
2577   priv->is_joined = TRUE;
2578   g_mutex_unlock (&priv->lock);
2579
2580   return TRUE;
2581
2582   /* ERRORS */
2583 was_joined:
2584   {
2585     g_mutex_unlock (&priv->lock);
2586     return TRUE;
2587   }
2588 no_ports:
2589   {
2590     g_mutex_unlock (&priv->lock);
2591     GST_WARNING ("failed to allocate ports %u", idx);
2592     return FALSE;
2593   }
2594 link_failed:
2595   {
2596     GST_WARNING ("failed to link stream %u", idx);
2597     gst_object_unref (priv->send_rtp_sink);
2598     priv->send_rtp_sink = NULL;
2599     g_mutex_unlock (&priv->lock);
2600     return FALSE;
2601   }
2602 }
2603
2604 /**
2605  * gst_rtsp_stream_leave_bin:
2606  * @stream: a #GstRTSPStream
2607  * @bin: (transfer none): a #GstBin
2608  * @rtpbin: (transfer none): a rtpbin #GstElement
2609  *
2610  * Remove the elements of @stream from @bin.
2611  *
2612  * Return: %TRUE on success.
2613  */
2614 gboolean
2615 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2616     GstElement * rtpbin)
2617 {
2618   GstRTSPStreamPrivate *priv;
2619   gint i;
2620   GList *l;
2621   gboolean is_tcp, is_udp;
2622
2623   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2624   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2625   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2626
2627   priv = stream->priv;
2628
2629   g_mutex_lock (&priv->lock);
2630   if (!priv->is_joined)
2631     goto was_not_joined;
2632
2633   /* all transports must be removed by now */
2634   if (priv->transports != NULL)
2635     goto transports_not_removed;
2636
2637   clear_tr_cache (priv, TRUE);
2638   clear_tr_cache (priv, FALSE);
2639
2640   GST_INFO ("stream %p leaving bin", stream);
2641
2642   if (priv->srcpad) {
2643     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2644
2645     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2646     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2647     gst_object_unref (priv->send_rtp_sink);
2648     priv->send_rtp_sink = NULL;
2649   } else if (priv->recv_rtp_src) {
2650     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2651     gst_object_unref (priv->recv_rtp_src);
2652     priv->recv_rtp_src = NULL;
2653   }
2654
2655   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2656
2657   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2658       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2659
2660
2661   for (i = 0; i < 2; i++) {
2662     if (priv->udpsink[i])
2663       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2664     if (priv->appsink[i])
2665       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2666     if (priv->appqueue[i])
2667       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2668     if (priv->udpqueue[i])
2669       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2670     if (priv->tee[i])
2671       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2672     if (priv->funnel[i])
2673       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2674     if (priv->appsrc[i])
2675       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2676
2677     if (priv->udpsrc_v4[i]) {
2678       if (priv->sinkpad || i == 1) {
2679         /* and set udpsrc to NULL now before removing */
2680         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2681         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2682         /* removing them should also nicely release the request
2683          * pads when they finalize */
2684         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2685       } else {
2686         /* we need to set the state to NULL before unref */
2687         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2688         gst_object_unref (priv->udpsrc_v4[i]);
2689       }
2690     }
2691
2692     if (priv->udpsrc_v6[i]) {
2693       if (priv->sinkpad || i == 1) {
2694         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2695         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2696         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2697       } else {
2698         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2699         gst_object_unref (priv->udpsrc_v6[i]);
2700       }
2701     }
2702
2703     for (l = priv->transport_sources; l; l = l->next) {
2704       GstRTSPMulticastTransportSource *s = l->data;
2705
2706       if (!s->udpsrc[i])
2707         continue;
2708
2709       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2710       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2711       gst_bin_remove (bin, s->udpsrc[i]);
2712     }
2713
2714     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2715       gst_bin_remove (bin, priv->udpsink[i]);
2716     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2717       gst_bin_remove (bin, priv->appsrc[i]);
2718     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2719       gst_bin_remove (bin, priv->appsink[i]);
2720     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2721       gst_bin_remove (bin, priv->appqueue[i]);
2722     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2723       gst_bin_remove (bin, priv->udpqueue[i]);
2724     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2725       gst_bin_remove (bin, priv->tee[i]);
2726     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2727       gst_bin_remove (bin, priv->funnel[i]);
2728
2729     if (priv->sinkpad || i == 1) {
2730       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2731       gst_object_unref (priv->recv_sink[i]);
2732       priv->recv_sink[i] = NULL;
2733     }
2734
2735     priv->udpsrc_v4[i] = NULL;
2736     priv->udpsrc_v6[i] = NULL;
2737     priv->udpsink[i] = NULL;
2738     priv->appsrc[i] = NULL;
2739     priv->appsink[i] = NULL;
2740     priv->appqueue[i] = NULL;
2741     priv->udpqueue[i] = NULL;
2742     priv->tee[i] = NULL;
2743     priv->funnel[i] = NULL;
2744   }
2745
2746   for (l = priv->transport_sources; l; l = l->next) {
2747     GstRTSPMulticastTransportSource *s = l->data;
2748     g_slice_free (GstRTSPMulticastTransportSource, s);
2749   }
2750   g_list_free (priv->transport_sources);
2751   priv->transport_sources = NULL;
2752
2753   if (priv->srcpad) {
2754     gst_object_unref (priv->send_src[0]);
2755     priv->send_src[0] = NULL;
2756   }
2757
2758   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2759   gst_object_unref (priv->send_src[1]);
2760   priv->send_src[1] = NULL;
2761
2762   g_object_unref (priv->session);
2763   priv->session = NULL;
2764   if (priv->caps)
2765     gst_caps_unref (priv->caps);
2766   priv->caps = NULL;
2767
2768   if (priv->srtpenc)
2769     gst_object_unref (priv->srtpenc);
2770   if (priv->srtpdec)
2771     gst_object_unref (priv->srtpdec);
2772
2773   priv->is_joined = FALSE;
2774   g_mutex_unlock (&priv->lock);
2775
2776   return TRUE;
2777
2778 was_not_joined:
2779   {
2780     g_mutex_unlock (&priv->lock);
2781     return TRUE;
2782   }
2783 transports_not_removed:
2784   {
2785     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2786     g_mutex_unlock (&priv->lock);
2787     return FALSE;
2788   }
2789 }
2790
2791 /**
2792  * gst_rtsp_stream_get_rtpinfo:
2793  * @stream: a #GstRTSPStream
2794  * @rtptime: (allow-none): result RTP timestamp
2795  * @seq: (allow-none): result RTP seqnum
2796  * @clock_rate: (allow-none): the clock rate
2797  * @running_time: (allow-none): result running-time
2798  *
2799  * Retrieve the current rtptime, seq and running-time. This is used to
2800  * construct a RTPInfo reply header.
2801  *
2802  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2803  */
2804 gboolean
2805 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2806     guint * rtptime, guint * seq, guint * clock_rate,
2807     GstClockTime * running_time)
2808 {
2809   GstRTSPStreamPrivate *priv;
2810   GstStructure *stats;
2811   GObjectClass *payobjclass;
2812
2813   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2814
2815   priv = stream->priv;
2816
2817   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2818
2819   g_mutex_lock (&priv->lock);
2820
2821   /* First try to extract the information from the last buffer on the sinks.
2822    * This will have a more accurate sequence number and timestamp, as between
2823    * the payloader and the sink there can be some queues
2824    */
2825   if (priv->udpsink[0] || priv->appsink[0]) {
2826     GstSample *last_sample;
2827
2828     if (priv->udpsink[0])
2829       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2830     else
2831       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2832
2833     if (last_sample) {
2834       GstCaps *caps;
2835       GstBuffer *buffer;
2836       GstSegment *segment;
2837       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2838
2839       caps = gst_sample_get_caps (last_sample);
2840       buffer = gst_sample_get_buffer (last_sample);
2841       segment = gst_sample_get_segment (last_sample);
2842
2843       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2844         if (seq) {
2845           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2846         }
2847
2848         if (rtptime) {
2849           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2850         }
2851
2852         gst_rtp_buffer_unmap (&rtp_buffer);
2853
2854         if (running_time) {
2855           *running_time =
2856               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2857               GST_BUFFER_TIMESTAMP (buffer));
2858         }
2859
2860         if (clock_rate) {
2861           GstStructure *s = gst_caps_get_structure (caps, 0);
2862
2863           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2864
2865           if (*clock_rate == 0 && running_time)
2866             *running_time = GST_CLOCK_TIME_NONE;
2867         }
2868         gst_sample_unref (last_sample);
2869
2870         goto done;
2871       } else {
2872         gst_sample_unref (last_sample);
2873       }
2874     }
2875   }
2876
2877   if (g_object_class_find_property (payobjclass, "stats")) {
2878     g_object_get (priv->payloader, "stats", &stats, NULL);
2879     if (stats == NULL)
2880       goto no_stats;
2881
2882     if (seq)
2883       gst_structure_get_uint (stats, "seqnum", seq);
2884
2885     if (rtptime)
2886       gst_structure_get_uint (stats, "timestamp", rtptime);
2887
2888     if (running_time)
2889       gst_structure_get_clock_time (stats, "running-time", running_time);
2890
2891     if (clock_rate) {
2892       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2893       if (*clock_rate == 0 && running_time)
2894         *running_time = GST_CLOCK_TIME_NONE;
2895     }
2896     gst_structure_free (stats);
2897   } else {
2898     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2899         !g_object_class_find_property (payobjclass, "timestamp"))
2900       goto no_stats;
2901
2902     if (seq)
2903       g_object_get (priv->payloader, "seqnum", seq, NULL);
2904
2905     if (rtptime)
2906       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2907
2908     if (running_time)
2909       *running_time = GST_CLOCK_TIME_NONE;
2910   }
2911
2912 done:
2913   g_mutex_unlock (&priv->lock);
2914
2915   return TRUE;
2916
2917   /* ERRORS */
2918 no_stats:
2919   {
2920     GST_WARNING ("Could not get payloader stats");
2921     g_mutex_unlock (&priv->lock);
2922     return FALSE;
2923   }
2924 }
2925
2926 /**
2927  * gst_rtsp_stream_get_caps:
2928  * @stream: a #GstRTSPStream
2929  *
2930  * Retrieve the current caps of @stream.
2931  *
2932  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2933  * after usage.
2934  */
2935 GstCaps *
2936 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2937 {
2938   GstRTSPStreamPrivate *priv;
2939   GstCaps *result;
2940
2941   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2942
2943   priv = stream->priv;
2944
2945   g_mutex_lock (&priv->lock);
2946   if ((result = priv->caps))
2947     gst_caps_ref (result);
2948   g_mutex_unlock (&priv->lock);
2949
2950   return result;
2951 }
2952
2953 /**
2954  * gst_rtsp_stream_recv_rtp:
2955  * @stream: a #GstRTSPStream
2956  * @buffer: (transfer full): a #GstBuffer
2957  *
2958  * Handle an RTP buffer for the stream. This method is usually called when a
2959  * message has been received from a client using the TCP transport.
2960  *
2961  * This function takes ownership of @buffer.
2962  *
2963  * Returns: a GstFlowReturn.
2964  */
2965 GstFlowReturn
2966 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2967 {
2968   GstRTSPStreamPrivate *priv;
2969   GstFlowReturn ret;
2970   GstElement *element;
2971
2972   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2973   priv = stream->priv;
2974   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2975   g_return_val_if_fail (priv->is_joined, FALSE);
2976
2977   g_mutex_lock (&priv->lock);
2978   if (priv->appsrc[0])
2979     element = gst_object_ref (priv->appsrc[0]);
2980   else
2981     element = NULL;
2982   g_mutex_unlock (&priv->lock);
2983
2984   if (element) {
2985     if (priv->appsrc_base_time[0] == -1) {
2986       /* Take current running_time. This timestamp will be put on
2987        * the first buffer of each stream because we are a live source and so we
2988        * timestamp with the running_time. When we are dealing with TCP, we also
2989        * only timestamp the first buffer (using the DISCONT flag) because a server
2990        * typically bursts data, for which we don't want to compensate by speeding
2991        * up the media. The other timestamps will be interpollated from this one
2992        * using the RTP timestamps. */
2993       GST_OBJECT_LOCK (element);
2994       if (GST_ELEMENT_CLOCK (element)) {
2995         GstClockTime now;
2996         GstClockTime base_time;
2997
2998         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2999         base_time = GST_ELEMENT_CAST (element)->base_time;
3000
3001         priv->appsrc_base_time[0] = now - base_time;
3002         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3003         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3004             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3005             GST_TIME_ARGS (base_time));
3006       }
3007       GST_OBJECT_UNLOCK (element);
3008     }
3009
3010     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3011     gst_object_unref (element);
3012   } else {
3013     ret = GST_FLOW_OK;
3014   }
3015   return ret;
3016 }
3017
3018 /**
3019  * gst_rtsp_stream_recv_rtcp:
3020  * @stream: a #GstRTSPStream
3021  * @buffer: (transfer full): a #GstBuffer
3022  *
3023  * Handle an RTCP buffer for the stream. This method is usually called when a
3024  * message has been received from a client using the TCP transport.
3025  *
3026  * This function takes ownership of @buffer.
3027  *
3028  * Returns: a GstFlowReturn.
3029  */
3030 GstFlowReturn
3031 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3032 {
3033   GstRTSPStreamPrivate *priv;
3034   GstFlowReturn ret;
3035   GstElement *element;
3036
3037   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3038   priv = stream->priv;
3039   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3040
3041   if (!priv->is_joined) {
3042     gst_buffer_unref (buffer);
3043     return GST_FLOW_NOT_LINKED;
3044   }
3045   g_mutex_lock (&priv->lock);
3046   if (priv->appsrc[1])
3047     element = gst_object_ref (priv->appsrc[1]);
3048   else
3049     element = NULL;
3050   g_mutex_unlock (&priv->lock);
3051
3052   if (element) {
3053     if (priv->appsrc_base_time[1] == -1) {
3054       /* Take current running_time. This timestamp will be put on
3055        * the first buffer of each stream because we are a live source and so we
3056        * timestamp with the running_time. When we are dealing with TCP, we also
3057        * only timestamp the first buffer (using the DISCONT flag) because a server
3058        * typically bursts data, for which we don't want to compensate by speeding
3059        * up the media. The other timestamps will be interpollated from this one
3060        * using the RTP timestamps. */
3061       GST_OBJECT_LOCK (element);
3062       if (GST_ELEMENT_CLOCK (element)) {
3063         GstClockTime now;
3064         GstClockTime base_time;
3065
3066         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3067         base_time = GST_ELEMENT_CAST (element)->base_time;
3068
3069         priv->appsrc_base_time[1] = now - base_time;
3070         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3071         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3072             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3073             GST_TIME_ARGS (base_time));
3074       }
3075       GST_OBJECT_UNLOCK (element);
3076     }
3077
3078     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3079     gst_object_unref (element);
3080   } else {
3081     ret = GST_FLOW_OK;
3082     gst_buffer_unref (buffer);
3083   }
3084   return ret;
3085 }
3086
3087 /* must be called with lock */
3088 static gboolean
3089 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3090     gboolean add)
3091 {
3092   GstRTSPStreamPrivate *priv = stream->priv;
3093   const GstRTSPTransport *tr;
3094
3095   tr = gst_rtsp_stream_transport_get_transport (trans);
3096
3097   switch (tr->lower_transport) {
3098     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3099     {
3100       GstRTSPMulticastTransportSource *source;
3101       GstBin *bin;
3102
3103       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
3104
3105       if (add) {
3106         gchar *host;
3107         gint i;
3108         GstPad *selpad, *pad;
3109
3110         source = g_slice_new0 (GstRTSPMulticastTransportSource);
3111         source->transport = trans;
3112
3113         for (i = 0; i < 2; i++) {
3114           host =
3115               g_strdup_printf ("udp://%s:%d", tr->destination,
3116               (i == 0) ? tr->port.min : tr->port.max);
3117           source->udpsrc[i] =
3118               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
3119           g_free (host);
3120           g_object_set (source->udpsrc[i], "loop", FALSE, NULL);
3121
3122           if (priv->srcpad) {
3123             /* we set and keep these to playing so that they don't cause NO_PREROLL return
3124              * values. This is only relevant for PLAY pipelines */
3125             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
3126             gst_element_set_locked_state (source->udpsrc[i], TRUE);
3127           }
3128           /* add udpsrc */
3129           gst_bin_add (bin, source->udpsrc[i]);
3130
3131           /* and link to the funnel v4 */
3132           if (priv->sinkpad || i == 1) {
3133             source->selpad[i] = selpad =
3134                 gst_element_get_request_pad (priv->funnel[i], "sink_%u");
3135             pad = gst_element_get_static_pad (source->udpsrc[i], "src");
3136             gst_pad_link (pad, selpad);
3137             gst_object_unref (pad);
3138             gst_object_unref (selpad);
3139           }
3140         }
3141
3142         priv->transport_sources =
3143             g_list_prepend (priv->transport_sources, source);
3144       } else {
3145         GList *l;
3146
3147         for (l = priv->transport_sources; l; l = l->next) {
3148           source = l->data;
3149
3150           if (source->transport == trans) {
3151             priv->transport_sources =
3152                 g_list_delete_link (priv->transport_sources, l);
3153             break;
3154           }
3155         }
3156
3157         if (l != NULL) {
3158           gint i;
3159
3160           for (i = 0; i < 2; i++) {
3161             /* Will automatically unlink everything */
3162             gst_bin_remove (bin,
3163                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
3164
3165             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
3166             gst_object_unref (source->udpsrc[i]);
3167
3168             if (priv->sinkpad || i == 1) {
3169               gst_element_release_request_pad (priv->funnel[i],
3170                   source->selpad[i]);
3171             }
3172           }
3173
3174           g_slice_free (GstRTSPMulticastTransportSource, source);
3175         }
3176       }
3177
3178       gst_object_unref (bin);
3179
3180       /* fall through for the generic case */
3181     }
3182     case GST_RTSP_LOWER_TRANS_UDP:
3183     {
3184       gchar *dest;
3185       gint min, max;
3186       guint ttl = 0;
3187
3188       dest = tr->destination;
3189       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3190         min = tr->port.min;
3191         max = tr->port.max;
3192         ttl = tr->ttl;
3193       } else if (priv->client_side) {
3194         /* In client side mode the 'destination' is the RTSP server, so send
3195          * to those ports */
3196         min = tr->server_port.min;
3197         max = tr->server_port.max;
3198       } else {
3199         min = tr->client_port.min;
3200         max = tr->client_port.max;
3201       }
3202
3203       if (add) {
3204         if (ttl > 0) {
3205           GST_INFO ("setting ttl-mc %d", ttl);
3206           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3207           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3208         }
3209         GST_INFO ("adding %s:%d-%d", dest, min, max);
3210         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3211         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3212         priv->transports = g_list_prepend (priv->transports, trans);
3213       } else {
3214         GST_INFO ("removing %s:%d-%d", dest, min, max);
3215         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3216         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3217         priv->transports = g_list_remove (priv->transports, trans);
3218       }
3219       priv->transports_cookie++;
3220       break;
3221     }
3222     case GST_RTSP_LOWER_TRANS_TCP:
3223       if (add) {
3224         GST_INFO ("adding TCP %s", tr->destination);
3225         priv->transports = g_list_prepend (priv->transports, trans);
3226       } else {
3227         GST_INFO ("removing TCP %s", tr->destination);
3228         priv->transports = g_list_remove (priv->transports, trans);
3229       }
3230       priv->transports_cookie++;
3231       break;
3232     default:
3233       goto unknown_transport;
3234   }
3235   return TRUE;
3236
3237   /* ERRORS */
3238 unknown_transport:
3239   {
3240     GST_INFO ("Unknown transport %d", tr->lower_transport);
3241     return FALSE;
3242   }
3243 }
3244
3245
3246 /**
3247  * gst_rtsp_stream_add_transport:
3248  * @stream: a #GstRTSPStream
3249  * @trans: (transfer none): a #GstRTSPStreamTransport
3250  *
3251  * Add the transport in @trans to @stream. The media of @stream will
3252  * then also be send to the values configured in @trans.
3253  *
3254  * @stream must be joined to a bin.
3255  *
3256  * @trans must contain a valid #GstRTSPTransport.
3257  *
3258  * Returns: %TRUE if @trans was added
3259  */
3260 gboolean
3261 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3262     GstRTSPStreamTransport * trans)
3263 {
3264   GstRTSPStreamPrivate *priv;
3265   gboolean res;
3266
3267   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3268   priv = stream->priv;
3269   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3270   g_return_val_if_fail (priv->is_joined, FALSE);
3271
3272   g_mutex_lock (&priv->lock);
3273   res = update_transport (stream, trans, TRUE);
3274   g_mutex_unlock (&priv->lock);
3275
3276   return res;
3277 }
3278
3279 /**
3280  * gst_rtsp_stream_remove_transport:
3281  * @stream: a #GstRTSPStream
3282  * @trans: (transfer none): a #GstRTSPStreamTransport
3283  *
3284  * Remove the transport in @trans from @stream. The media of @stream will
3285  * not be sent to the values configured in @trans.
3286  *
3287  * @stream must be joined to a bin.
3288  *
3289  * @trans must contain a valid #GstRTSPTransport.
3290  *
3291  * Returns: %TRUE if @trans was removed
3292  */
3293 gboolean
3294 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3295     GstRTSPStreamTransport * trans)
3296 {
3297   GstRTSPStreamPrivate *priv;
3298   gboolean res;
3299
3300   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3301   priv = stream->priv;
3302   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3303   g_return_val_if_fail (priv->is_joined, FALSE);
3304
3305   g_mutex_lock (&priv->lock);
3306   res = update_transport (stream, trans, FALSE);
3307   g_mutex_unlock (&priv->lock);
3308
3309   return res;
3310 }
3311
3312 /**
3313  * gst_rtsp_stream_update_crypto:
3314  * @stream: a #GstRTSPStream
3315  * @ssrc: the SSRC
3316  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3317  *
3318  * Update the new crypto information for @ssrc in @stream. If information
3319  * for @ssrc did not exist, it will be added. If information
3320  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3321  * be removed from @stream.
3322  *
3323  * Returns: %TRUE if @crypto could be updated
3324  */
3325 gboolean
3326 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3327     guint ssrc, GstCaps * crypto)
3328 {
3329   GstRTSPStreamPrivate *priv;
3330
3331   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3332   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3333
3334   priv = stream->priv;
3335
3336   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3337
3338   g_mutex_lock (&priv->lock);
3339   if (crypto)
3340     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3341         gst_caps_ref (crypto));
3342   else
3343     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3344   g_mutex_unlock (&priv->lock);
3345
3346   return TRUE;
3347 }
3348
3349 /**
3350  * gst_rtsp_stream_get_rtp_socket:
3351  * @stream: a #GstRTSPStream
3352  * @family: the socket family
3353  *
3354  * Get the RTP socket from @stream for a @family.
3355  *
3356  * @stream must be joined to a bin.
3357  *
3358  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3359  * socket could be allocated for @family. Unref after usage
3360  */
3361 GSocket *
3362 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3363 {
3364   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3365   GSocket *socket;
3366   const gchar *name;
3367
3368   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3369   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3370       family == G_SOCKET_FAMILY_IPV6, NULL);
3371   g_return_val_if_fail (priv->udpsink[0], NULL);
3372
3373   if (family == G_SOCKET_FAMILY_IPV6)
3374     name = "socket-v6";
3375   else
3376     name = "socket";
3377
3378   g_object_get (priv->udpsink[0], name, &socket, NULL);
3379
3380   return socket;
3381 }
3382
3383 /**
3384  * gst_rtsp_stream_get_rtcp_socket:
3385  * @stream: a #GstRTSPStream
3386  * @family: the socket family
3387  *
3388  * Get the RTCP socket from @stream for a @family.
3389  *
3390  * @stream must be joined to a bin.
3391  *
3392  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3393  * socket could be allocated for @family. Unref after usage
3394  */
3395 GSocket *
3396 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3397 {
3398   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3399   GSocket *socket;
3400   const gchar *name;
3401
3402   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3403   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3404       family == G_SOCKET_FAMILY_IPV6, NULL);
3405   g_return_val_if_fail (priv->udpsink[1], NULL);
3406
3407   if (family == G_SOCKET_FAMILY_IPV6)
3408     name = "socket-v6";
3409   else
3410     name = "socket";
3411
3412   g_object_get (priv->udpsink[1], name, &socket, NULL);
3413
3414   return socket;
3415 }
3416
3417 /**
3418  * gst_rtsp_stream_set_seqnum:
3419  * @stream: a #GstRTSPStream
3420  * @seqnum: a new sequence number
3421  *
3422  * Configure the sequence number in the payloader of @stream to @seqnum.
3423  */
3424 void
3425 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3426 {
3427   GstRTSPStreamPrivate *priv;
3428
3429   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3430
3431   priv = stream->priv;
3432
3433   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3434 }
3435
3436 /**
3437  * gst_rtsp_stream_get_seqnum:
3438  * @stream: a #GstRTSPStream
3439  *
3440  * Get the configured sequence number in the payloader of @stream.
3441  *
3442  * Returns: the sequence number of the payloader.
3443  */
3444 guint16
3445 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3446 {
3447   GstRTSPStreamPrivate *priv;
3448   guint seqnum;
3449
3450   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3451
3452   priv = stream->priv;
3453
3454   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3455
3456   return seqnum;
3457 }
3458
3459 /**
3460  * gst_rtsp_stream_transport_filter:
3461  * @stream: a #GstRTSPStream
3462  * @func: (scope call) (allow-none): a callback
3463  * @user_data: (closure): user data passed to @func
3464  *
3465  * Call @func for each transport managed by @stream. The result value of @func
3466  * determines what happens to the transport. @func will be called with @stream
3467  * locked so no further actions on @stream can be performed from @func.
3468  *
3469  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3470  * @stream.
3471  *
3472  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3473  *
3474  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3475  * will also be added with an additional ref to the result #GList of this
3476  * function..
3477  *
3478  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3479  *
3480  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3481  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3482  * element in the #GList should be unreffed before the list is freed.
3483  */
3484 GList *
3485 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3486     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3487 {
3488   GstRTSPStreamPrivate *priv;
3489   GList *result, *walk, *next;
3490   GHashTable *visited = NULL;
3491   guint cookie;
3492
3493   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3494
3495   priv = stream->priv;
3496
3497   result = NULL;
3498   if (func)
3499     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3500
3501   g_mutex_lock (&priv->lock);
3502 restart:
3503   cookie = priv->transports_cookie;
3504   for (walk = priv->transports; walk; walk = next) {
3505     GstRTSPStreamTransport *trans = walk->data;
3506     GstRTSPFilterResult res;
3507     gboolean changed;
3508
3509     next = g_list_next (walk);
3510
3511     if (func) {
3512       /* only visit each transport once */
3513       if (g_hash_table_contains (visited, trans))
3514         continue;
3515
3516       g_hash_table_add (visited, g_object_ref (trans));
3517       g_mutex_unlock (&priv->lock);
3518
3519       res = func (stream, trans, user_data);
3520
3521       g_mutex_lock (&priv->lock);
3522     } else
3523       res = GST_RTSP_FILTER_REF;
3524
3525     changed = (cookie != priv->transports_cookie);
3526
3527     switch (res) {
3528       case GST_RTSP_FILTER_REMOVE:
3529         update_transport (stream, trans, FALSE);
3530         break;
3531       case GST_RTSP_FILTER_REF:
3532         result = g_list_prepend (result, g_object_ref (trans));
3533         break;
3534       case GST_RTSP_FILTER_KEEP:
3535       default:
3536         break;
3537     }
3538     if (changed)
3539       goto restart;
3540   }
3541   g_mutex_unlock (&priv->lock);
3542
3543   if (func)
3544     g_hash_table_unref (visited);
3545
3546   return result;
3547 }
3548
3549 static GstPadProbeReturn
3550 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3551 {
3552   GstRTSPStreamPrivate *priv;
3553   GstRTSPStream *stream;
3554
3555   stream = user_data;
3556   priv = stream->priv;
3557
3558   GST_DEBUG_OBJECT (pad, "now blocking");
3559
3560   g_mutex_lock (&priv->lock);
3561   priv->blocking = TRUE;
3562   g_mutex_unlock (&priv->lock);
3563
3564   gst_element_post_message (priv->payloader,
3565       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3566           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3567
3568   return GST_PAD_PROBE_OK;
3569 }
3570
3571 /**
3572  * gst_rtsp_stream_set_blocked:
3573  * @stream: a #GstRTSPStream
3574  * @blocked: boolean indicating we should block or unblock
3575  *
3576  * Blocks or unblocks the dataflow on @stream.
3577  *
3578  * Returns: %TRUE on success
3579  */
3580 gboolean
3581 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3582 {
3583   GstRTSPStreamPrivate *priv;
3584
3585   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3586
3587   priv = stream->priv;
3588
3589   g_mutex_lock (&priv->lock);
3590   if (blocked) {
3591     priv->blocking = FALSE;
3592     if (priv->blocked_id == 0) {
3593       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3594           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3595           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3596           g_object_ref (stream), g_object_unref);
3597     }
3598   } else {
3599     if (priv->blocked_id != 0) {
3600       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3601       priv->blocked_id = 0;
3602       priv->blocking = FALSE;
3603     }
3604   }
3605   g_mutex_unlock (&priv->lock);
3606
3607   return TRUE;
3608 }
3609
3610 /**
3611  * gst_rtsp_stream_is_blocking:
3612  * @stream: a #GstRTSPStream
3613  *
3614  * Check if @stream is blocking on a #GstBuffer.
3615  *
3616  * Returns: %TRUE if @stream is blocking
3617  */
3618 gboolean
3619 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3620 {
3621   GstRTSPStreamPrivate *priv;
3622   gboolean result;
3623
3624   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3625
3626   priv = stream->priv;
3627
3628   g_mutex_lock (&priv->lock);
3629   result = priv->blocking;
3630   g_mutex_unlock (&priv->lock);
3631
3632   return result;
3633 }
3634
3635 /**
3636  * gst_rtsp_stream_query_position:
3637  * @stream: a #GstRTSPStream
3638  *
3639  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3640  * the RTP parts of the pipeline and not the RTCP parts.
3641  *
3642  * Returns: %TRUE if the position could be queried
3643  */
3644 gboolean
3645 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3646 {
3647   GstRTSPStreamPrivate *priv;
3648   GstElement *sink;
3649   gboolean ret;
3650
3651   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3652
3653   priv = stream->priv;
3654
3655   g_mutex_lock (&priv->lock);
3656   /* depending on the transport type, it should query corresponding sink */
3657   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3658       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3659     sink = priv->udpsink[0];
3660   else
3661     sink = priv->appsink[0];
3662
3663   if (sink)
3664     gst_object_ref (sink);
3665   g_mutex_unlock (&priv->lock);
3666
3667   if (!sink)
3668     return FALSE;
3669
3670   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3671   gst_object_unref (sink);
3672
3673   return ret;
3674 }
3675
3676 /**
3677  * gst_rtsp_stream_query_stop:
3678  * @stream: a #GstRTSPStream
3679  *
3680  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3681  * the RTP parts of the pipeline and not the RTCP parts.
3682  *
3683  * Returns: %TRUE if the stop could be queried
3684  */
3685 gboolean
3686 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3687 {
3688   GstRTSPStreamPrivate *priv;
3689   GstElement *sink;
3690   GstQuery *query;
3691   gboolean ret;
3692
3693   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3694
3695   priv = stream->priv;
3696
3697   g_mutex_lock (&priv->lock);
3698   /* depending on the transport type, it should query corresponding sink */
3699   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3700       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3701     sink = priv->udpsink[0];
3702   else
3703     sink = priv->appsink[0];
3704
3705   if (sink)
3706     gst_object_ref (sink);
3707   g_mutex_unlock (&priv->lock);
3708
3709   if (!sink)
3710     return FALSE;
3711
3712   query = gst_query_new_segment (GST_FORMAT_TIME);
3713   if ((ret = gst_element_query (sink, query))) {
3714     GstFormat format;
3715
3716     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3717     if (format != GST_FORMAT_TIME)
3718       *stop = -1;
3719   }
3720   gst_query_unref (query);
3721   gst_object_unref (sink);
3722
3723   return ret;
3724
3725 }