rtsp-server: Fix indentation
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   gboolean is_joined;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
98    * sockets */
99   GstElement *udpsrc_v4[2];
100   /* UDP sources for UDP multicast transports */
101   GstElement *udpsrc_mcast_v4[2];
102
103   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
104    * sockets */
105   GstElement *udpsrc_v6[2];
106   /* UDP sources for UDP multicast transports */
107   GstElement *udpsrc_mcast_v6[2];
108
109   GstElement *udpqueue[2];
110   GstElement *udpsink[2];
111
112   /* for TCP transport */
113   GstElement *appsrc[2];
114   GstClockTime appsrc_base_time[2];
115   GstElement *appqueue[2];
116   GstElement *appsink[2];
117
118   GstElement *tee[2];
119   GstElement *funnel[2];
120
121   /* retransmission */
122   GstElement *rtxsend;
123   guint rtx_pt;
124   GstClockTime rtx_time;
125
126   /* server ports for sending/receiving over ipv4 */
127   GstRTSPRange server_port_v4;
128   GstRTSPAddress *server_addr_v4;
129   gboolean have_ipv4;
130
131   /* server ports for sending/receiving over ipv6 */
132   GstRTSPRange server_port_v6;
133   GstRTSPAddress *server_addr_v6;
134   gboolean have_ipv6;
135
136   /* multicast addresses */
137   GstRTSPAddressPool *pool;
138   GstRTSPAddress *addr_v4;
139   GstRTSPAddress *addr_v6;
140   gboolean have_ipv4_mcast;
141   gboolean have_ipv6_mcast;
142
143   /* the caps of the stream */
144   gulong caps_sig;
145   GstCaps *caps;
146
147   /* transports we stream to */
148   guint n_active;
149   GList *transports;
150   guint transports_cookie;
151   GList *tr_cache_rtp;
152   GList *tr_cache_rtcp;
153   guint tr_cache_cookie_rtp;
154   guint tr_cache_cookie_rtcp;
155
156
157   gint dscp_qos;
158
159   /* stream blocking */
160   gulong blocked_id;
161   gboolean blocking;
162
163   /* pt->caps map for RECORD streams */
164   GHashTable *ptmap;
165 };
166
167 #define DEFAULT_CONTROL         NULL
168 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
169 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
170                                         GST_RTSP_LOWER_TRANS_TCP
171
172 enum
173 {
174   PROP_0,
175   PROP_CONTROL,
176   PROP_PROFILES,
177   PROP_PROTOCOLS,
178   PROP_LAST
179 };
180
181 enum
182 {
183   SIGNAL_NEW_RTP_ENCODER,
184   SIGNAL_NEW_RTCP_ENCODER,
185   SIGNAL_LAST
186 };
187
188 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
189 #define GST_CAT_DEFAULT rtsp_stream_debug
190
191 static GQuark ssrc_stream_map_key;
192
193 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
194     GValue * value, GParamSpec * pspec);
195 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
196     const GValue * value, GParamSpec * pspec);
197
198 static void gst_rtsp_stream_finalize (GObject * obj);
199
200 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
201
202 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
203
204 static void
205 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
206 {
207   GObjectClass *gobject_class;
208
209   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
210
211   gobject_class = G_OBJECT_CLASS (klass);
212
213   gobject_class->get_property = gst_rtsp_stream_get_property;
214   gobject_class->set_property = gst_rtsp_stream_set_property;
215   gobject_class->finalize = gst_rtsp_stream_finalize;
216
217   g_object_class_install_property (gobject_class, PROP_CONTROL,
218       g_param_spec_string ("control", "Control",
219           "The control string for this stream", DEFAULT_CONTROL,
220           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
221
222   g_object_class_install_property (gobject_class, PROP_PROFILES,
223       g_param_spec_flags ("profiles", "Profiles",
224           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
225           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
228       g_param_spec_flags ("protocols", "Protocols",
229           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
230           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
233       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
234       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
235       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
236
237   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
238       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
240       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
241
242   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
243
244   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
245 }
246
247 static void
248 gst_rtsp_stream_init (GstRTSPStream * stream)
249 {
250   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
251
252   GST_DEBUG ("new stream %p", stream);
253
254   stream->priv = priv;
255
256   priv->dscp_qos = -1;
257   priv->control = g_strdup (DEFAULT_CONTROL);
258   priv->profiles = DEFAULT_PROFILES;
259   priv->protocols = DEFAULT_PROTOCOLS;
260
261   g_mutex_init (&priv->lock);
262
263   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
264       NULL, (GDestroyNotify) gst_caps_unref);
265   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
266       (GDestroyNotify) gst_caps_unref);
267 }
268
269 static void
270 gst_rtsp_stream_finalize (GObject * obj)
271 {
272   GstRTSPStream *stream;
273   GstRTSPStreamPrivate *priv;
274
275   stream = GST_RTSP_STREAM (obj);
276   priv = stream->priv;
277
278   GST_DEBUG ("finalize stream %p", stream);
279
280   /* we really need to be unjoined now */
281   g_return_if_fail (!priv->is_joined);
282
283   if (priv->addr_v4)
284     gst_rtsp_address_free (priv->addr_v4);
285   if (priv->addr_v6)
286     gst_rtsp_address_free (priv->addr_v6);
287   if (priv->server_addr_v4)
288     gst_rtsp_address_free (priv->server_addr_v4);
289   if (priv->server_addr_v6)
290     gst_rtsp_address_free (priv->server_addr_v6);
291   if (priv->pool)
292     g_object_unref (priv->pool);
293   if (priv->rtxsend)
294     g_object_unref (priv->rtxsend);
295
296   gst_object_unref (priv->payloader);
297   if (priv->srcpad)
298     gst_object_unref (priv->srcpad);
299   if (priv->sinkpad)
300     gst_object_unref (priv->sinkpad);
301   g_free (priv->control);
302   g_mutex_clear (&priv->lock);
303
304   g_hash_table_unref (priv->keys);
305   g_hash_table_destroy (priv->ptmap);
306
307   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
308 }
309
310 static void
311 gst_rtsp_stream_get_property (GObject * object, guint propid,
312     GValue * value, GParamSpec * pspec)
313 {
314   GstRTSPStream *stream = GST_RTSP_STREAM (object);
315
316   switch (propid) {
317     case PROP_CONTROL:
318       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
319       break;
320     case PROP_PROFILES:
321       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
322       break;
323     case PROP_PROTOCOLS:
324       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
325       break;
326     default:
327       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
328   }
329 }
330
331 static void
332 gst_rtsp_stream_set_property (GObject * object, guint propid,
333     const GValue * value, GParamSpec * pspec)
334 {
335   GstRTSPStream *stream = GST_RTSP_STREAM (object);
336
337   switch (propid) {
338     case PROP_CONTROL:
339       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
340       break;
341     case PROP_PROFILES:
342       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
343       break;
344     case PROP_PROTOCOLS:
345       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
349   }
350 }
351
352 /**
353  * gst_rtsp_stream_new:
354  * @idx: an index
355  * @pad: a #GstPad
356  * @payloader: a #GstElement
357  *
358  * Create a new media stream with index @idx that handles RTP data on
359  * @pad and has a payloader element @payloader if @pad is a source pad
360  * or a depayloader element @payloader if @pad is a sink pad.
361  *
362  * Returns: (transfer full): a new #GstRTSPStream
363  */
364 GstRTSPStream *
365 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
366 {
367   GstRTSPStreamPrivate *priv;
368   GstRTSPStream *stream;
369
370   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
371   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
372
373   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
374   priv = stream->priv;
375   priv->idx = idx;
376   priv->payloader = gst_object_ref (payloader);
377   if (GST_PAD_IS_SRC (pad))
378     priv->srcpad = gst_object_ref (pad);
379   else
380     priv->sinkpad = gst_object_ref (pad);
381
382   return stream;
383 }
384
385 /**
386  * gst_rtsp_stream_get_index:
387  * @stream: a #GstRTSPStream
388  *
389  * Get the stream index.
390  *
391  * Return: the stream index.
392  */
393 guint
394 gst_rtsp_stream_get_index (GstRTSPStream * stream)
395 {
396   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
397
398   return stream->priv->idx;
399 }
400
401 /**
402  * gst_rtsp_stream_get_pt:
403  * @stream: a #GstRTSPStream
404  *
405  * Get the stream payload type.
406  *
407  * Return: the stream payload type.
408  */
409 guint
410 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
411 {
412   GstRTSPStreamPrivate *priv;
413   guint pt;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
416
417   priv = stream->priv;
418
419   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
420
421   return pt;
422 }
423
424 /**
425  * gst_rtsp_stream_get_srcpad:
426  * @stream: a #GstRTSPStream
427  *
428  * Get the srcpad associated with @stream.
429  *
430  * Returns: (transfer full): the srcpad. Unref after usage.
431  */
432 GstPad *
433 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
434 {
435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
436
437   if (!stream->priv->srcpad)
438     return NULL;
439
440   return gst_object_ref (stream->priv->srcpad);
441 }
442
443 /**
444  * gst_rtsp_stream_get_sinkpad:
445  * @stream: a #GstRTSPStream
446  *
447  * Get the sinkpad associated with @stream.
448  *
449  * Returns: (transfer full): the sinkpad. Unref after usage.
450  */
451 GstPad *
452 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
453 {
454   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
455
456   if (!stream->priv->sinkpad)
457     return NULL;
458
459   return gst_object_ref (stream->priv->sinkpad);
460 }
461
462 /**
463  * gst_rtsp_stream_get_control:
464  * @stream: a #GstRTSPStream
465  *
466  * Get the control string to identify this stream.
467  *
468  * Returns: (transfer full): the control string. g_free() after usage.
469  */
470 gchar *
471 gst_rtsp_stream_get_control (GstRTSPStream * stream)
472 {
473   GstRTSPStreamPrivate *priv;
474   gchar *result;
475
476   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
477
478   priv = stream->priv;
479
480   g_mutex_lock (&priv->lock);
481   if ((result = g_strdup (priv->control)) == NULL)
482     result = g_strdup_printf ("stream=%u", priv->idx);
483   g_mutex_unlock (&priv->lock);
484
485   return result;
486 }
487
488 /**
489  * gst_rtsp_stream_set_control:
490  * @stream: a #GstRTSPStream
491  * @control: a control string
492  *
493  * Set the control string in @stream.
494  */
495 void
496 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
497 {
498   GstRTSPStreamPrivate *priv;
499
500   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
501
502   priv = stream->priv;
503
504   g_mutex_lock (&priv->lock);
505   g_free (priv->control);
506   priv->control = g_strdup (control);
507   g_mutex_unlock (&priv->lock);
508 }
509
510 /**
511  * gst_rtsp_stream_has_control:
512  * @stream: a #GstRTSPStream
513  * @control: a control string
514  *
515  * Check if @stream has the control string @control.
516  *
517  * Returns: %TRUE is @stream has @control as the control string
518  */
519 gboolean
520 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
521 {
522   GstRTSPStreamPrivate *priv;
523   gboolean res;
524
525   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
526
527   priv = stream->priv;
528
529   g_mutex_lock (&priv->lock);
530   if (priv->control)
531     res = (g_strcmp0 (priv->control, control) == 0);
532   else {
533     guint streamid;
534
535     if (sscanf (control, "stream=%u", &streamid) > 0)
536       res = (streamid == priv->idx);
537     else
538       res = FALSE;
539   }
540   g_mutex_unlock (&priv->lock);
541
542   return res;
543 }
544
545 /**
546  * gst_rtsp_stream_set_mtu:
547  * @stream: a #GstRTSPStream
548  * @mtu: a new MTU
549  *
550  * Configure the mtu in the payloader of @stream to @mtu.
551  */
552 void
553 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
554 {
555   GstRTSPStreamPrivate *priv;
556
557   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
558
559   priv = stream->priv;
560
561   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
562
563   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
564 }
565
566 /**
567  * gst_rtsp_stream_get_mtu:
568  * @stream: a #GstRTSPStream
569  *
570  * Get the configured MTU in the payloader of @stream.
571  *
572  * Returns: the MTU of the payloader.
573  */
574 guint
575 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
576 {
577   GstRTSPStreamPrivate *priv;
578   guint mtu;
579
580   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
581
582   priv = stream->priv;
583
584   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
585
586   return mtu;
587 }
588
589 /* Update the dscp qos property on the udp sinks */
590 static void
591 update_dscp_qos (GstRTSPStream * stream)
592 {
593   GstRTSPStreamPrivate *priv;
594
595   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
596
597   priv = stream->priv;
598
599   if (priv->udpsink[0]) {
600     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
601         NULL);
602   }
603
604   if (priv->udpsink[1]) {
605     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
606         NULL);
607   }
608 }
609
610 /**
611  * gst_rtsp_stream_set_dscp_qos:
612  * @stream: a #GstRTSPStream
613  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
614  *
615  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
616  */
617 void
618 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
619 {
620   GstRTSPStreamPrivate *priv;
621
622   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
623
624   priv = stream->priv;
625
626   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
627
628   if (dscp_qos < -1 || dscp_qos > 63) {
629     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
630     return;
631   }
632
633   priv->dscp_qos = dscp_qos;
634
635   update_dscp_qos (stream);
636 }
637
638 /**
639  * gst_rtsp_stream_get_dscp_qos:
640  * @stream: a #GstRTSPStream
641  *
642  * Get the configured DSCP QoS in of the outgoing sockets.
643  *
644  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
645  */
646 gint
647 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
648 {
649   GstRTSPStreamPrivate *priv;
650
651   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
652
653   priv = stream->priv;
654
655   return priv->dscp_qos;
656 }
657
658 /**
659  * gst_rtsp_stream_is_transport_supported:
660  * @stream: a #GstRTSPStream
661  * @transport: (transfer none): a #GstRTSPTransport
662  *
663  * Check if @transport can be handled by stream
664  *
665  * Returns: %TRUE if @transport can be handled by @stream.
666  */
667 gboolean
668 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
669     GstRTSPTransport * transport)
670 {
671   GstRTSPStreamPrivate *priv;
672
673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
674
675   priv = stream->priv;
676
677   g_mutex_lock (&priv->lock);
678   if (transport->trans != GST_RTSP_TRANS_RTP)
679     goto unsupported_transmode;
680
681   if (!(transport->profile & priv->profiles))
682     goto unsupported_profile;
683
684   if (!(transport->lower_transport & priv->protocols))
685     goto unsupported_ltrans;
686
687   g_mutex_unlock (&priv->lock);
688
689   return TRUE;
690
691   /* ERRORS */
692 unsupported_transmode:
693   {
694     GST_DEBUG ("unsupported transport mode %d", transport->trans);
695     g_mutex_unlock (&priv->lock);
696     return FALSE;
697   }
698 unsupported_profile:
699   {
700     GST_DEBUG ("unsupported profile %d", transport->profile);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_ltrans:
705   {
706     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 }
711
712 /**
713  * gst_rtsp_stream_set_profiles:
714  * @stream: a #GstRTSPStream
715  * @profiles: the new profiles
716  *
717  * Configure the allowed profiles for @stream.
718  */
719 void
720 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
721 {
722   GstRTSPStreamPrivate *priv;
723
724   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
725
726   priv = stream->priv;
727
728   g_mutex_lock (&priv->lock);
729   priv->profiles = profiles;
730   g_mutex_unlock (&priv->lock);
731 }
732
733 /**
734  * gst_rtsp_stream_get_profiles:
735  * @stream: a #GstRTSPStream
736  *
737  * Get the allowed profiles of @stream.
738  *
739  * Returns: a #GstRTSPProfile
740  */
741 GstRTSPProfile
742 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
743 {
744   GstRTSPStreamPrivate *priv;
745   GstRTSPProfile res;
746
747   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
748
749   priv = stream->priv;
750
751   g_mutex_lock (&priv->lock);
752   res = priv->profiles;
753   g_mutex_unlock (&priv->lock);
754
755   return res;
756 }
757
758 /**
759  * gst_rtsp_stream_set_protocols:
760  * @stream: a #GstRTSPStream
761  * @protocols: the new flags
762  *
763  * Configure the allowed lower transport for @stream.
764  */
765 void
766 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
767     GstRTSPLowerTrans protocols)
768 {
769   GstRTSPStreamPrivate *priv;
770
771   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
772
773   priv = stream->priv;
774
775   g_mutex_lock (&priv->lock);
776   priv->protocols = protocols;
777   g_mutex_unlock (&priv->lock);
778 }
779
780 /**
781  * gst_rtsp_stream_get_protocols:
782  * @stream: a #GstRTSPStream
783  *
784  * Get the allowed protocols of @stream.
785  *
786  * Returns: a #GstRTSPLowerTrans
787  */
788 GstRTSPLowerTrans
789 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
790 {
791   GstRTSPStreamPrivate *priv;
792   GstRTSPLowerTrans res;
793
794   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
795       GST_RTSP_LOWER_TRANS_UNKNOWN);
796
797   priv = stream->priv;
798
799   g_mutex_lock (&priv->lock);
800   res = priv->protocols;
801   g_mutex_unlock (&priv->lock);
802
803   return res;
804 }
805
806 /**
807  * gst_rtsp_stream_set_address_pool:
808  * @stream: a #GstRTSPStream
809  * @pool: (transfer none): a #GstRTSPAddressPool
810  *
811  * configure @pool to be used as the address pool of @stream.
812  */
813 void
814 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
815     GstRTSPAddressPool * pool)
816 {
817   GstRTSPStreamPrivate *priv;
818   GstRTSPAddressPool *old;
819
820   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
821
822   priv = stream->priv;
823
824   GST_LOG_OBJECT (stream, "set address pool %p", pool);
825
826   g_mutex_lock (&priv->lock);
827   if ((old = priv->pool) != pool)
828     priv->pool = pool ? g_object_ref (pool) : NULL;
829   else
830     old = NULL;
831   g_mutex_unlock (&priv->lock);
832
833   if (old)
834     g_object_unref (old);
835 }
836
837 /**
838  * gst_rtsp_stream_get_address_pool:
839  * @stream: a #GstRTSPStream
840  *
841  * Get the #GstRTSPAddressPool used as the address pool of @stream.
842  *
843  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
844  * usage.
845  */
846 GstRTSPAddressPool *
847 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
848 {
849   GstRTSPStreamPrivate *priv;
850   GstRTSPAddressPool *result;
851
852   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
853
854   priv = stream->priv;
855
856   g_mutex_lock (&priv->lock);
857   if ((result = priv->pool))
858     g_object_ref (result);
859   g_mutex_unlock (&priv->lock);
860
861   return result;
862 }
863
864 /**
865  * gst_rtsp_stream_get_multicast_address:
866  * @stream: a #GstRTSPStream
867  * @family: the #GSocketFamily
868  *
869  * Get the multicast address of @stream for @family.
870  *
871  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
872  * or %NULL when no address could be allocated. gst_rtsp_address_free()
873  * after usage.
874  */
875 GstRTSPAddress *
876 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
877     GSocketFamily family)
878 {
879   GstRTSPStreamPrivate *priv;
880   GstRTSPAddress *result;
881   GstRTSPAddress **addrp;
882   GstRTSPAddressFlags flags;
883
884   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
885
886   priv = stream->priv;
887
888   if (family == G_SOCKET_FAMILY_IPV6) {
889     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
890     addrp = &priv->addr_v6;
891   } else {
892     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
893     addrp = &priv->addr_v4;
894   }
895
896   g_mutex_lock (&priv->lock);
897   if (*addrp == NULL) {
898     if (priv->pool == NULL)
899       goto no_pool;
900
901     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
902
903     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
904     if (*addrp == NULL)
905       goto no_address;
906   }
907   result = gst_rtsp_address_copy (*addrp);
908   g_mutex_unlock (&priv->lock);
909
910   return result;
911
912   /* ERRORS */
913 no_pool:
914   {
915     GST_ERROR_OBJECT (stream, "no address pool specified");
916     g_mutex_unlock (&priv->lock);
917     return NULL;
918   }
919 no_address:
920   {
921     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 }
926
927 /**
928  * gst_rtsp_stream_reserve_address:
929  * @stream: a #GstRTSPStream
930  * @address: an address
931  * @port: a port
932  * @n_ports: n_ports
933  * @ttl: a TTL
934  *
935  * Reserve @address and @port as the address and port of @stream.
936  *
937  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
938  * the address could be reserved. gst_rtsp_address_free() after usage.
939  */
940 GstRTSPAddress *
941 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
942     const gchar * address, guint port, guint n_ports, guint ttl)
943 {
944   GstRTSPStreamPrivate *priv;
945   GstRTSPAddress *result;
946   GInetAddress *addr;
947   GSocketFamily family;
948   GstRTSPAddress **addrp;
949
950   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
951   g_return_val_if_fail (address != NULL, NULL);
952   g_return_val_if_fail (port > 0, NULL);
953   g_return_val_if_fail (n_ports > 0, NULL);
954   g_return_val_if_fail (ttl > 0, NULL);
955
956   priv = stream->priv;
957
958   addr = g_inet_address_new_from_string (address);
959   if (!addr) {
960     GST_ERROR ("failed to get inet addr from %s", address);
961     family = G_SOCKET_FAMILY_IPV4;
962   } else {
963     family = g_inet_address_get_family (addr);
964     g_object_unref (addr);
965   }
966
967   if (family == G_SOCKET_FAMILY_IPV6)
968     addrp = &priv->addr_v6;
969   else
970     addrp = &priv->addr_v4;
971
972   g_mutex_lock (&priv->lock);
973   if (*addrp == NULL) {
974     GstRTSPAddressPoolResult res;
975
976     if (priv->pool == NULL)
977       goto no_pool;
978
979     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
980         port, n_ports, ttl, addrp);
981     if (res != GST_RTSP_ADDRESS_POOL_OK)
982       goto no_address;
983   } else {
984     if (strcmp ((*addrp)->address, address) ||
985         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
986         (*addrp)->ttl != ttl)
987       goto different_address;
988   }
989   result = gst_rtsp_address_copy (*addrp);
990   g_mutex_unlock (&priv->lock);
991
992   return result;
993
994   /* ERRORS */
995 no_pool:
996   {
997     GST_ERROR_OBJECT (stream, "no address pool specified");
998     g_mutex_unlock (&priv->lock);
999     return NULL;
1000   }
1001 no_address:
1002   {
1003     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1004         address);
1005     g_mutex_unlock (&priv->lock);
1006     return NULL;
1007   }
1008 different_address:
1009   {
1010     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1011         " reserved", address);
1012     g_mutex_unlock (&priv->lock);
1013     return NULL;
1014   }
1015 }
1016
1017 /* must be called with lock */
1018 static void
1019 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1020     GSocket * rtcp_socket, GSocketFamily family)
1021 {
1022   GstRTSPStreamPrivate *priv = stream->priv;
1023   const gchar *multisink_socket;
1024
1025   if (family == G_SOCKET_FAMILY_IPV6)
1026     multisink_socket = "socket-v6";
1027   else
1028     multisink_socket = "socket";
1029
1030   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1031       NULL);
1032   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1033       NULL);
1034 }
1035
1036 /* must be called with lock */
1037 static gboolean
1038 create_and_configure_udpsinks (GstRTSPStream * stream)
1039 {
1040   GstRTSPStreamPrivate *priv = stream->priv;
1041   GstElement *udpsink0, *udpsink1;
1042
1043   udpsink0 = NULL;
1044   udpsink1 = NULL;
1045
1046   if (priv->udpsink[0])
1047     udpsink0 = priv->udpsink[0];
1048   else
1049     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1050
1051   if (!udpsink0)
1052     goto no_udp_protocol;
1053
1054   if (priv->udpsink[1])
1055     udpsink1 = priv->udpsink[1];
1056   else
1057     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1058
1059   if (!udpsink1)
1060     goto no_udp_protocol;
1061
1062   /* configure sinks */
1063
1064   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1065   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1066
1067   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1068   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1069
1070   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1071
1072   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1073   /* Needs to be async for RECORD streams, otherwise we will never go to
1074    * PLAYING because the sinks will wait for data while the udpsrc can't
1075    * provide data with timestamps in PAUSED. */
1076   if (priv->sinkpad)
1077     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1078   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1079
1080   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1081   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1082
1083   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1085
1086   /* update the dscp qos field in the sinks */
1087   update_dscp_qos (stream);
1088
1089   priv->udpsink[0] = udpsink0;
1090   priv->udpsink[1] = udpsink1;
1091
1092   return TRUE;
1093
1094   /* ERRORS */
1095 no_udp_protocol:
1096   {
1097     return FALSE;
1098   }
1099 }
1100
1101 /* must be called with lock */
1102 static void
1103 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1104     GSocketFamily family)
1105 {
1106   GstRTSPStreamPrivate *priv;
1107   GstPad *pad, *selpad;
1108   guint i;
1109   GstBin *bin;
1110
1111   priv = stream->priv;
1112   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1113
1114   for (i = 0; i < 2; i++) {
1115     if (priv->sinkpad || i == 1) {
1116       if (priv->srcpad) {
1117         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1118          * values. This is only relevant for PLAY pipelines */
1119         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1120         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1121       }
1122       /* add udpsrc */
1123       gst_bin_add (bin, udpsrc_out[i]);
1124
1125       /* and link to the funnel */
1126       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1127       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1128       gst_pad_link (pad, selpad);
1129       gst_object_unref (pad);
1130       gst_object_unref (selpad);
1131     }
1132   }
1133
1134   gst_object_unref (bin);
1135 }
1136
1137 /* must be called with lock */
1138 static gboolean
1139 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1140     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1141     const gchar * address, gint rtpport, gint rtcpport,
1142     GstRTSPLowerTrans transport)
1143 {
1144   GstStateChangeReturn ret;
1145
1146   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1147   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1148
1149   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1150     goto error;
1151
1152   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1153     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1154     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1155     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1156     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1157     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1158     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1159   }
1160
1161   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1162   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1163
1164   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1165   if (ret == GST_STATE_CHANGE_FAILURE)
1166     goto error;
1167   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1168   if (ret == GST_STATE_CHANGE_FAILURE)
1169     goto error;
1170
1171   return TRUE;
1172   return TRUE;
1173
1174   /* ERRORS */
1175 error:
1176   {
1177     if (udpsrc_out[0])
1178       gst_object_unref (udpsrc_out[0]);
1179     if (udpsrc_out[1])
1180       gst_object_unref (udpsrc_out[1]);
1181     return FALSE;
1182   }
1183 }
1184
1185 static gboolean
1186 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1187     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1188     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1189     gboolean use_client_settings)
1190 {
1191   GstRTSPStreamPrivate *priv = stream->priv;
1192   GSocket *rtp_socket = NULL;
1193   GSocket *rtcp_socket;
1194   gint tmp_rtp, tmp_rtcp;
1195   guint count;
1196   gint rtpport, rtcpport;
1197   GList *rejected_addresses = NULL;
1198   GstRTSPAddress *addr = NULL;
1199   GInetAddress *inetaddr = NULL;
1200   gchar *addr_str;
1201   GSocketAddress *rtp_sockaddr = NULL;
1202   GSocketAddress *rtcp_sockaddr = NULL;
1203   GstRTSPAddressPool *pool;
1204   GstRTSPLowerTrans transport;
1205
1206   pool = priv->pool;
1207   count = 0;
1208   transport = ct->lower_transport;
1209
1210   /* Start with random port */
1211   tmp_rtp = 0;
1212
1213   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1214       G_SOCKET_PROTOCOL_UDP, NULL);
1215   if (!rtcp_socket)
1216     goto no_udp_protocol;
1217
1218   if (*server_addr_out)
1219     gst_rtsp_address_free (*server_addr_out);
1220
1221   /* try to allocate 2 UDP ports, the RTP port should be an even
1222    * number and the RTCP port should be the next (uneven) port */
1223 again:
1224
1225   if (rtp_socket == NULL) {
1226     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1227         G_SOCKET_PROTOCOL_UDP, NULL);
1228     if (!rtp_socket)
1229       goto no_udp_protocol;
1230   }
1231
1232   if (pool) {
1233     GstRTSPAddressFlags flags;
1234
1235     if (transport == GST_RTSP_LOWER_TRANS_UDP &&
1236         gst_rtsp_address_pool_has_unicast_addresses (pool))
1237       flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1238     else if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)
1239       flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
1240     else
1241       goto no_ports;
1242
1243     if (addr)
1244       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1245
1246     if (family == G_SOCKET_FAMILY_IPV6)
1247       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1248     else
1249       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1250
1251     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1252         && use_client_settings)
1253       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1254           ct->port.min, 2, ct->ttl, &addr);
1255     else
1256       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1257
1258     if (addr == NULL)
1259       goto no_ports;
1260
1261     tmp_rtp = addr->port;
1262
1263     g_clear_object (&inetaddr);
1264     inetaddr = g_inet_address_new_from_string (addr->address);
1265
1266     /* Don't bind to multicast addresses, this does not work on
1267      * Windows. You're supposed to bind to ANY and then join the
1268      * multicast group, which udpsrc/sink does for us already.
1269      */
1270     if (g_inet_address_get_is_multicast (inetaddr)) {
1271       g_object_unref (inetaddr);
1272       inetaddr = g_inet_address_new_any (family);
1273     }
1274   } else {
1275     if (tmp_rtp != 0) {
1276       tmp_rtp += 2;
1277       if (++count > 20)
1278         goto no_ports;
1279     }
1280
1281     if (inetaddr == NULL)
1282       inetaddr = g_inet_address_new_any (family);
1283   }
1284
1285   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1286   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1287     g_object_unref (rtp_sockaddr);
1288     goto again;
1289   }
1290   g_object_unref (rtp_sockaddr);
1291
1292   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1293   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1294     g_clear_object (&rtp_sockaddr);
1295     goto socket_error;
1296   }
1297
1298   tmp_rtp =
1299       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1300   g_object_unref (rtp_sockaddr);
1301
1302   /* check if port is even */
1303   if ((tmp_rtp & 1) != 0) {
1304     /* port not even, close and allocate another */
1305     tmp_rtp++;
1306     g_clear_object (&rtp_socket);
1307     goto again;
1308   }
1309
1310   /* set port */
1311   tmp_rtcp = tmp_rtp + 1;
1312
1313   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1314   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1315     g_object_unref (rtcp_sockaddr);
1316     g_clear_object (&rtp_socket);
1317     goto again;
1318   }
1319   g_object_unref (rtcp_sockaddr);
1320
1321   if (addr == NULL)
1322     addr_str = g_inet_address_to_string (inetaddr);
1323   else
1324     addr_str = addr->address;
1325   g_clear_object (&inetaddr);
1326
1327   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1328           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, transport)) {
1329     if (addr == NULL)
1330       g_free (addr_str);
1331     goto no_udp_protocol;
1332   }
1333
1334   if (addr == NULL)
1335     g_free (addr_str);
1336
1337   play_udpsources_one_family (stream, udpsrc_out, family);
1338
1339   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1340   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1341
1342   /* this should not happen... */
1343   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1344     goto port_error;
1345
1346   /* set RTP and RTCP sockets */
1347   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1348
1349   server_port_out->min = rtpport;
1350   server_port_out->max = rtcpport;
1351
1352   *server_addr_out = addr;
1353   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1354
1355   g_object_unref (rtp_socket);
1356   g_object_unref (rtcp_socket);
1357
1358   return TRUE;
1359
1360   /* ERRORS */
1361 no_udp_protocol:
1362   {
1363     goto cleanup;
1364   }
1365 no_ports:
1366   {
1367     goto cleanup;
1368   }
1369 port_error:
1370   {
1371     goto cleanup;
1372   }
1373 socket_error:
1374   {
1375     goto cleanup;
1376   }
1377 cleanup:
1378   {
1379     if (inetaddr)
1380       g_object_unref (inetaddr);
1381     g_list_free_full (rejected_addresses,
1382         (GDestroyNotify) gst_rtsp_address_free);
1383     if (addr)
1384       gst_rtsp_address_free (addr);
1385     if (rtp_socket)
1386       g_object_unref (rtp_socket);
1387     if (rtcp_socket)
1388       g_object_unref (rtcp_socket);
1389     return FALSE;
1390   }
1391 }
1392
1393 /**
1394  * gst_rtsp_stream_allocate_udp_sockets:
1395  * @stream: a #GstRTSPStream
1396  * @family: protocol family
1397  * @transport_method: transport method
1398  *
1399  * Allocates RTP and RTCP ports.
1400  *
1401  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1402  */
1403 gboolean
1404 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1405     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1406 {
1407   GstRTSPStreamPrivate *priv;
1408   gboolean result = FALSE;
1409   GstRTSPLowerTrans transport = ct->lower_transport;
1410
1411   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1412   priv = stream->priv;
1413   g_return_val_if_fail (priv->is_joined, FALSE);
1414
1415   g_mutex_lock (&priv->lock);
1416
1417   if (family == G_SOCKET_FAMILY_IPV4) {
1418     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1419       if (priv->have_ipv4_mcast)
1420         goto done;
1421       priv->have_ipv4_mcast =
1422           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1423           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1424           use_client_settings);
1425     } else {
1426       priv->have_ipv4 =
1427           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1428           &priv->server_port_v4, ct, &priv->server_addr_v4,
1429           use_client_settings);
1430     }
1431   } else {
1432     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1433       if (priv->have_ipv6_mcast)
1434         goto done;
1435       priv->have_ipv6_mcast =
1436           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1437           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1438           use_client_settings);
1439     } else {
1440       if (priv->have_ipv6)
1441         goto done;
1442       priv->have_ipv6 =
1443           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1444           &priv->server_port_v6, ct, &priv->server_addr_v6,
1445           use_client_settings);
1446     }
1447   }
1448
1449 done:
1450   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1451       priv->have_ipv6_mcast;
1452
1453   g_mutex_unlock (&priv->lock);
1454
1455   return result;
1456 }
1457
1458 /**
1459  * gst_rtsp_stream_set_client_side:
1460  * @stream: a #GstRTSPStream
1461  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1462  * an RTSP connection.
1463  *
1464  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1465  * streams to an RTSP server via RECORD. This has the practical effect
1466  * of changing which UDP port numbers are used when setting up the local
1467  * side of the stream sending to be either the 'server' or 'client' pair
1468  * of a configured UDP transport.
1469  */
1470 void
1471 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1472 {
1473   GstRTSPStreamPrivate *priv;
1474
1475   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1476   priv = stream->priv;
1477   g_mutex_lock (&priv->lock);
1478   priv->client_side = client_side;
1479   g_mutex_unlock (&priv->lock);
1480 }
1481
1482 /**
1483  * gst_rtsp_stream_set_client_side:
1484  * @stream: a #GstRTSPStream
1485  *
1486  * See gst_rtsp_stream_set_client_side()
1487  *
1488  * Returns: TRUE if this #GstRTSPStream is client-side.
1489  */
1490 gboolean
1491 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1492 {
1493   GstRTSPStreamPrivate *priv;
1494   gboolean ret;
1495
1496   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1497
1498   priv = stream->priv;
1499   g_mutex_lock (&priv->lock);
1500   ret = priv->client_side;
1501   g_mutex_unlock (&priv->lock);
1502
1503   return ret;
1504 }
1505
1506 /**
1507  * gst_rtsp_stream_get_server_port:
1508  * @stream: a #GstRTSPStream
1509  * @server_port: (out): result server port
1510  * @family: the port family to get
1511  *
1512  * Fill @server_port with the port pair used by the server. This function can
1513  * only be called when @stream has been joined.
1514  */
1515 void
1516 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1517     GstRTSPRange * server_port, GSocketFamily family)
1518 {
1519   GstRTSPStreamPrivate *priv;
1520
1521   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1522   priv = stream->priv;
1523   g_return_if_fail (priv->is_joined);
1524
1525   g_mutex_lock (&priv->lock);
1526   if (family == G_SOCKET_FAMILY_IPV4) {
1527     if (server_port)
1528       *server_port = priv->server_port_v4;
1529   } else {
1530     if (server_port)
1531       *server_port = priv->server_port_v6;
1532   }
1533   g_mutex_unlock (&priv->lock);
1534 }
1535
1536 /**
1537  * gst_rtsp_stream_get_rtpsession:
1538  * @stream: a #GstRTSPStream
1539  *
1540  * Get the RTP session of this stream.
1541  *
1542  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1543  */
1544 GObject *
1545 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1546 {
1547   GstRTSPStreamPrivate *priv;
1548   GObject *session;
1549
1550   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1551
1552   priv = stream->priv;
1553
1554   g_mutex_lock (&priv->lock);
1555   if ((session = priv->session))
1556     g_object_ref (session);
1557   g_mutex_unlock (&priv->lock);
1558
1559   return session;
1560 }
1561
1562 /**
1563  * gst_rtsp_stream_get_ssrc:
1564  * @stream: a #GstRTSPStream
1565  * @ssrc: (out): result ssrc
1566  *
1567  * Get the SSRC used by the RTP session of this stream. This function can only
1568  * be called when @stream has been joined.
1569  */
1570 void
1571 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1572 {
1573   GstRTSPStreamPrivate *priv;
1574
1575   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1576   priv = stream->priv;
1577   g_return_if_fail (priv->is_joined);
1578
1579   g_mutex_lock (&priv->lock);
1580   if (ssrc && priv->session)
1581     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1582   g_mutex_unlock (&priv->lock);
1583 }
1584
1585 /**
1586  * gst_rtsp_stream_set_retransmission_time:
1587  * @stream: a #GstRTSPStream
1588  * @time: a #GstClockTime
1589  *
1590  * Set the amount of time to store retransmission packets.
1591  */
1592 void
1593 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1594     GstClockTime time)
1595 {
1596   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1597
1598   g_mutex_lock (&stream->priv->lock);
1599   stream->priv->rtx_time = time;
1600   if (stream->priv->rtxsend)
1601     g_object_set (stream->priv->rtxsend, "max-size-time",
1602         GST_TIME_AS_MSECONDS (time), NULL);
1603   g_mutex_unlock (&stream->priv->lock);
1604 }
1605
1606 /**
1607  * gst_rtsp_stream_get_retransmission_time:
1608  * @stream: a #GstRTSPStream
1609  *
1610  * Get the amount of time to store retransmission data.
1611  *
1612  * Returns: the amount of time to store retransmission data.
1613  */
1614 GstClockTime
1615 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1616 {
1617   GstClockTime ret;
1618
1619   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1620
1621   g_mutex_lock (&stream->priv->lock);
1622   ret = stream->priv->rtx_time;
1623   g_mutex_unlock (&stream->priv->lock);
1624
1625   return ret;
1626 }
1627
1628 /**
1629  * gst_rtsp_stream_set_retransmission_pt:
1630  * @stream: a #GstRTSPStream
1631  * @rtx_pt: a #guint
1632  *
1633  * Set the payload type (pt) for retransmission of this stream.
1634  */
1635 void
1636 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1637 {
1638   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1639
1640   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1641
1642   g_mutex_lock (&stream->priv->lock);
1643   stream->priv->rtx_pt = rtx_pt;
1644   if (stream->priv->rtxsend) {
1645     guint pt = gst_rtsp_stream_get_pt (stream);
1646     gchar *pt_s = g_strdup_printf ("%d", pt);
1647     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1648         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1649     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1650     g_free (pt_s);
1651     gst_structure_free (rtx_pt_map);
1652   }
1653   g_mutex_unlock (&stream->priv->lock);
1654 }
1655
1656 /**
1657  * gst_rtsp_stream_get_retransmission_pt:
1658  * @stream: a #GstRTSPStream
1659  *
1660  * Get the payload-type used for retransmission of this stream
1661  *
1662  * Returns: The retransmission PT.
1663  */
1664 guint
1665 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1666 {
1667   guint rtx_pt;
1668
1669   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1670
1671   g_mutex_lock (&stream->priv->lock);
1672   rtx_pt = stream->priv->rtx_pt;
1673   g_mutex_unlock (&stream->priv->lock);
1674
1675   return rtx_pt;
1676 }
1677
1678 /**
1679  * gst_rtsp_stream_set_buffer_size:
1680  * @stream: a #GstRTSPStream
1681  * @size: the buffer size
1682  *
1683  * Set the size of the UDP transmission buffer (in bytes)
1684  * Needs to be set before the stream is joined to a bin.
1685  *
1686  * Since: 1.6
1687  */
1688 void
1689 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1690 {
1691   g_mutex_lock (&stream->priv->lock);
1692   stream->priv->buffer_size = size;
1693   g_mutex_unlock (&stream->priv->lock);
1694 }
1695
1696 /**
1697  * gst_rtsp_stream_get_buffer_size:
1698  * @stream: a #GstRTSPStream
1699  *
1700  * Get the size of the UDP transmission buffer (in bytes)
1701  *
1702  * Returns: the size of the UDP TX buffer
1703  *
1704  * Since: 1.6
1705  */
1706 guint
1707 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1708 {
1709   guint buffer_size;
1710
1711   g_mutex_lock (&stream->priv->lock);
1712   buffer_size = stream->priv->buffer_size;
1713   g_mutex_unlock (&stream->priv->lock);
1714
1715   return buffer_size;
1716 }
1717
1718 /* executed from streaming thread */
1719 static void
1720 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1721 {
1722   GstRTSPStreamPrivate *priv = stream->priv;
1723   GstCaps *newcaps, *oldcaps;
1724
1725   newcaps = gst_pad_get_current_caps (pad);
1726
1727   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1728       newcaps);
1729
1730   g_mutex_lock (&priv->lock);
1731   oldcaps = priv->caps;
1732   priv->caps = newcaps;
1733   g_mutex_unlock (&priv->lock);
1734
1735   if (oldcaps)
1736     gst_caps_unref (oldcaps);
1737 }
1738
1739 static void
1740 dump_structure (const GstStructure * s)
1741 {
1742   gchar *sstr;
1743
1744   sstr = gst_structure_to_string (s);
1745   GST_INFO ("structure: %s", sstr);
1746   g_free (sstr);
1747 }
1748
1749 static GstRTSPStreamTransport *
1750 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1751 {
1752   GstRTSPStreamPrivate *priv = stream->priv;
1753   GList *walk;
1754   GstRTSPStreamTransport *result = NULL;
1755   const gchar *tmp;
1756   gchar *dest;
1757   guint port;
1758
1759   if (rtcp_from == NULL)
1760     return NULL;
1761
1762   tmp = g_strrstr (rtcp_from, ":");
1763   if (tmp == NULL)
1764     return NULL;
1765
1766   port = atoi (tmp + 1);
1767   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1768
1769   g_mutex_lock (&priv->lock);
1770   GST_INFO ("finding %s:%d in %d transports", dest, port,
1771       g_list_length (priv->transports));
1772
1773   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1774     GstRTSPStreamTransport *trans = walk->data;
1775     const GstRTSPTransport *tr;
1776     gint min, max;
1777
1778     tr = gst_rtsp_stream_transport_get_transport (trans);
1779
1780     if (priv->client_side) {
1781       /* In client side mode the 'destination' is the RTSP server, so send
1782        * to those ports */
1783       min = tr->server_port.min;
1784       max = tr->server_port.max;
1785     } else {
1786       min = tr->client_port.min;
1787       max = tr->client_port.max;
1788     }
1789
1790     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1791       result = trans;
1792       break;
1793     }
1794   }
1795   if (result)
1796     g_object_ref (result);
1797   g_mutex_unlock (&priv->lock);
1798
1799   g_free (dest);
1800
1801   return result;
1802 }
1803
1804 static GstRTSPStreamTransport *
1805 check_transport (GObject * source, GstRTSPStream * stream)
1806 {
1807   GstStructure *stats;
1808   GstRTSPStreamTransport *trans;
1809
1810   /* see if we have a stream to match with the origin of the RTCP packet */
1811   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1812   if (trans == NULL) {
1813     g_object_get (source, "stats", &stats, NULL);
1814     if (stats) {
1815       const gchar *rtcp_from;
1816
1817       dump_structure (stats);
1818
1819       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1820       if ((trans = find_transport (stream, rtcp_from))) {
1821         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1822             source);
1823         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1824             g_object_unref);
1825       }
1826       gst_structure_free (stats);
1827     }
1828   }
1829   return trans;
1830 }
1831
1832
1833 static void
1834 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1835 {
1836   GstRTSPStreamTransport *trans;
1837
1838   GST_INFO ("%p: new source %p", stream, source);
1839
1840   trans = check_transport (source, stream);
1841
1842   if (trans)
1843     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1844 }
1845
1846 static void
1847 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1848 {
1849   GST_INFO ("%p: new SDES %p", stream, source);
1850 }
1851
1852 static void
1853 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1854 {
1855   GstRTSPStreamTransport *trans;
1856
1857   trans = check_transport (source, stream);
1858
1859   if (trans) {
1860     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1861     gst_rtsp_stream_transport_keep_alive (trans);
1862   }
1863 #ifdef DUMP_STATS
1864   {
1865     GstStructure *stats;
1866     g_object_get (source, "stats", &stats, NULL);
1867     if (stats) {
1868       dump_structure (stats);
1869       gst_structure_free (stats);
1870     }
1871   }
1872 #endif
1873 }
1874
1875 static void
1876 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1877 {
1878   GST_INFO ("%p: source %p bye", stream, source);
1879 }
1880
1881 static void
1882 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1883 {
1884   GstRTSPStreamTransport *trans;
1885
1886   GST_INFO ("%p: source %p bye timeout", stream, source);
1887
1888   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1889     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1890     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1891   }
1892 }
1893
1894 static void
1895 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1896 {
1897   GstRTSPStreamTransport *trans;
1898
1899   GST_INFO ("%p: source %p timeout", stream, source);
1900
1901   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1902     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1903     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1904   }
1905 }
1906
1907 static void
1908 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1909 {
1910   GST_INFO ("%p: new sender source %p", stream, source);
1911 #ifndef DUMP_STATS
1912   {
1913     GstStructure *stats;
1914     g_object_get (source, "stats", &stats, NULL);
1915     if (stats) {
1916       dump_structure (stats);
1917       gst_structure_free (stats);
1918     }
1919   }
1920 #endif
1921 }
1922
1923 static void
1924 on_sender_ssrc_active (GObject * session, GObject * source,
1925     GstRTSPStream * stream)
1926 {
1927 #ifndef DUMP_STATS
1928   {
1929     GstStructure *stats;
1930     g_object_get (source, "stats", &stats, NULL);
1931     if (stats) {
1932       dump_structure (stats);
1933       gst_structure_free (stats);
1934     }
1935   }
1936 #endif
1937 }
1938
1939 static void
1940 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1941 {
1942   if (is_rtp) {
1943     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1944     g_list_free (priv->tr_cache_rtp);
1945     priv->tr_cache_rtp = NULL;
1946   } else {
1947     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1948     g_list_free (priv->tr_cache_rtcp);
1949     priv->tr_cache_rtcp = NULL;
1950   }
1951 }
1952
1953 static GstFlowReturn
1954 handle_new_sample (GstAppSink * sink, gpointer user_data)
1955 {
1956   GstRTSPStreamPrivate *priv;
1957   GList *walk;
1958   GstSample *sample;
1959   GstBuffer *buffer;
1960   GstRTSPStream *stream;
1961   gboolean is_rtp;
1962
1963   sample = gst_app_sink_pull_sample (sink);
1964   if (!sample)
1965     return GST_FLOW_OK;
1966
1967   stream = (GstRTSPStream *) user_data;
1968   priv = stream->priv;
1969   buffer = gst_sample_get_buffer (sample);
1970
1971   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1972
1973   g_mutex_lock (&priv->lock);
1974   if (is_rtp) {
1975     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1976       clear_tr_cache (priv, is_rtp);
1977       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1978         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1979         priv->tr_cache_rtp =
1980             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1981       }
1982       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1983     }
1984   } else {
1985     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1986       clear_tr_cache (priv, is_rtp);
1987       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1988         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1989         priv->tr_cache_rtcp =
1990             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1991       }
1992       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1993     }
1994   }
1995   g_mutex_unlock (&priv->lock);
1996
1997   if (is_rtp) {
1998     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1999       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2000       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2001     }
2002   } else {
2003     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2004       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2005       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2006     }
2007   }
2008   gst_sample_unref (sample);
2009
2010   return GST_FLOW_OK;
2011 }
2012
2013 static GstAppSinkCallbacks sink_cb = {
2014   NULL,                         /* not interested in EOS */
2015   NULL,                         /* not interested in preroll samples */
2016   handle_new_sample,
2017 };
2018
2019 static GstElement *
2020 get_rtp_encoder (GstRTSPStream * stream, guint session)
2021 {
2022   GstRTSPStreamPrivate *priv = stream->priv;
2023
2024   if (priv->srtpenc == NULL) {
2025     gchar *name;
2026
2027     name = g_strdup_printf ("srtpenc_%u", session);
2028     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2029     g_free (name);
2030
2031     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2032   }
2033   return gst_object_ref (priv->srtpenc);
2034 }
2035
2036 static GstElement *
2037 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2038 {
2039   GstRTSPStreamPrivate *priv = stream->priv;
2040   GstElement *oldenc, *enc;
2041   GstPad *pad;
2042   gchar *name;
2043
2044   if (priv->idx != session)
2045     return NULL;
2046
2047   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2048
2049   oldenc = priv->srtpenc;
2050   enc = get_rtp_encoder (stream, session);
2051   name = g_strdup_printf ("rtp_sink_%d", session);
2052   pad = gst_element_get_request_pad (enc, name);
2053   g_free (name);
2054   gst_object_unref (pad);
2055
2056   if (oldenc == NULL)
2057     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2058         enc);
2059
2060   return enc;
2061 }
2062
2063 static GstElement *
2064 request_rtcp_encoder (GstElement * rtpbin, guint session,
2065     GstRTSPStream * stream)
2066 {
2067   GstRTSPStreamPrivate *priv = stream->priv;
2068   GstElement *oldenc, *enc;
2069   GstPad *pad;
2070   gchar *name;
2071
2072   if (priv->idx != session)
2073     return NULL;
2074
2075   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2076
2077   oldenc = priv->srtpenc;
2078   enc = get_rtp_encoder (stream, session);
2079   name = g_strdup_printf ("rtcp_sink_%d", session);
2080   pad = gst_element_get_request_pad (enc, name);
2081   g_free (name);
2082   gst_object_unref (pad);
2083
2084   if (oldenc == NULL)
2085     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2086         enc);
2087
2088   return enc;
2089 }
2090
2091 static GstCaps *
2092 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2093 {
2094   GstRTSPStreamPrivate *priv = stream->priv;
2095   GstCaps *caps;
2096
2097   GST_DEBUG ("request key %08x", ssrc);
2098
2099   g_mutex_lock (&priv->lock);
2100   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2101     gst_caps_ref (caps);
2102   g_mutex_unlock (&priv->lock);
2103
2104   return caps;
2105 }
2106
2107 static GstElement *
2108 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2109     GstRTSPStream * stream)
2110 {
2111   GstRTSPStreamPrivate *priv = stream->priv;
2112
2113   if (priv->idx != session)
2114     return NULL;
2115
2116   if (priv->srtpdec == NULL) {
2117     gchar *name;
2118
2119     name = g_strdup_printf ("srtpdec_%u", session);
2120     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2121     g_free (name);
2122
2123     g_signal_connect (priv->srtpdec, "request-key",
2124         (GCallback) request_key, stream);
2125   }
2126   return gst_object_ref (priv->srtpdec);
2127 }
2128
2129 /**
2130  * gst_rtsp_stream_request_aux_sender:
2131  * @stream: a #GstRTSPStream
2132  * @sessid: the session id
2133  *
2134  * Creating a rtxsend bin
2135  *
2136  * Returns: (transfer full): a #GstElement.
2137  *
2138  * Since: 1.6
2139  */
2140 GstElement *
2141 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2142 {
2143   GstElement *bin;
2144   GstPad *pad;
2145   GstStructure *pt_map;
2146   gchar *name;
2147   guint pt, rtx_pt;
2148   gchar *pt_s;
2149
2150   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2151
2152   pt = gst_rtsp_stream_get_pt (stream);
2153   pt_s = g_strdup_printf ("%u", pt);
2154   rtx_pt = stream->priv->rtx_pt;
2155
2156   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2157
2158   bin = gst_bin_new (NULL);
2159   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2160   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2161       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2162   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2163       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2164   g_free (pt_s);
2165   gst_structure_free (pt_map);
2166   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2167
2168   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2169   name = g_strdup_printf ("src_%u", sessid);
2170   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2171   g_free (name);
2172   gst_object_unref (pad);
2173
2174   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2175   name = g_strdup_printf ("sink_%u", sessid);
2176   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2177   g_free (name);
2178   gst_object_unref (pad);
2179
2180   return bin;
2181 }
2182
2183 /**
2184  * gst_rtsp_stream_set_pt_map:
2185  * @stream: a #GstRTSPStream
2186  * @pt: the pt
2187  * @caps: a #GstCaps
2188  *
2189  * Configure a pt map between @pt and @caps.
2190  */
2191 void
2192 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2193 {
2194   GstRTSPStreamPrivate *priv = stream->priv;
2195
2196   g_mutex_lock (&priv->lock);
2197   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2198   g_mutex_unlock (&priv->lock);
2199 }
2200
2201 static GstCaps *
2202 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2203     GstRTSPStream * stream)
2204 {
2205   GstRTSPStreamPrivate *priv = stream->priv;
2206   GstCaps *caps = NULL;
2207
2208   g_mutex_lock (&priv->lock);
2209
2210   if (priv->idx == session) {
2211     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2212     if (caps) {
2213       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2214       gst_caps_ref (caps);
2215     } else {
2216       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2217     }
2218   }
2219
2220   g_mutex_unlock (&priv->lock);
2221
2222   return caps;
2223 }
2224
2225 static void
2226 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2227 {
2228   GstRTSPStreamPrivate *priv = stream->priv;
2229   gchar *name;
2230   GstPadLinkReturn ret;
2231   guint sessid;
2232
2233   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2234       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2235
2236   name = gst_pad_get_name (pad);
2237   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2238     g_free (name);
2239     return;
2240   }
2241   g_free (name);
2242
2243   if (priv->idx != sessid)
2244     return;
2245
2246   if (gst_pad_is_linked (priv->sinkpad)) {
2247     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2248         GST_DEBUG_PAD_NAME (priv->sinkpad));
2249     return;
2250   }
2251
2252   /* link the RTP pad to the session manager, it should not really fail unless
2253    * this is not really an RTP pad */
2254   ret = gst_pad_link (pad, priv->sinkpad);
2255   if (ret != GST_PAD_LINK_OK)
2256     goto link_failed;
2257   priv->recv_rtp_src = gst_object_ref (pad);
2258
2259   return;
2260
2261 /* ERRORS */
2262 link_failed:
2263   {
2264     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2265         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2266   }
2267 }
2268
2269 static void
2270 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2271     GstRTSPStream * stream)
2272 {
2273   /* TODO: What to do here other than this? */
2274   GST_DEBUG ("Stream %p: Got EOS", stream);
2275   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2276 }
2277
2278 /* must be called with lock */
2279 static gboolean
2280 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2281 {
2282   GstRTSPStreamPrivate *priv;
2283   GstPad *pad, *sinkpad = NULL;
2284   gboolean is_tcp = FALSE, is_udp = FALSE;
2285   gint i;
2286
2287   priv = stream->priv;
2288
2289   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2290   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2291       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2292
2293   if (is_udp && !create_and_configure_udpsinks (stream))
2294     goto no_udp_protocol;
2295
2296   for (i = 0; i < 2; i++) {
2297     GstPad *teepad, *queuepad;
2298     /* For the sender we create this bit of pipeline for both
2299      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2300      * we need to add a queue before appsink and udpsink to make
2301      * the pipeline not block. For the TCP case, we want to pump
2302      * client as fast as possible anyway. This pipeline is used
2303      * when both TCP and UDP are present.
2304      *
2305      * .--------.      .-----.    .---------.    .---------.
2306      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2307      * |       send->sink   src->sink      src->sink       |
2308      * '--------'      |     |    '---------'    '---------'
2309      *                 |     |    .---------.    .---------.
2310      *                 |     |    |  queue  |    | appsink |
2311      *                 |    src->sink      src->sink       |
2312      *                 '-----'    '---------'    '---------'
2313      *
2314      * When only UDP or only TCP is allowed, we skip the tee and queue
2315      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2316      * the session.
2317      */
2318     /* Only link the RTP send src if we're going to send RTP, link
2319      * the RTCP send src always */
2320     if (priv->srcpad || i == 1) {
2321       if (is_udp) {
2322         /* add udpsink */
2323         gst_bin_add (bin, priv->udpsink[i]);
2324         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2325       }
2326
2327       if (is_tcp) {
2328         /* make appsink */
2329         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2330         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2331         gst_bin_add (bin, priv->appsink[i]);
2332         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2333             &sink_cb, stream, NULL);
2334       }
2335
2336       if (is_udp && is_tcp) {
2337         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2338
2339         /* make tee for RTP/RTCP */
2340         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2341         gst_bin_add (bin, priv->tee[i]);
2342
2343         /* and link to rtpbin send pad */
2344         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2345         gst_pad_link (priv->send_src[i], pad);
2346         gst_object_unref (pad);
2347
2348         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2349         g_object_set (priv->udpqueue[i], "max-size-buffers",
2350             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2351             NULL);
2352         gst_bin_add (bin, priv->udpqueue[i]);
2353         /* link tee to udpqueue */
2354         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2355         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2356         gst_pad_link (teepad, pad);
2357         gst_object_unref (pad);
2358         gst_object_unref (teepad);
2359
2360         /* link udpqueue to udpsink */
2361         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2362         gst_pad_link (queuepad, sinkpad);
2363         gst_object_unref (queuepad);
2364         gst_object_unref (sinkpad);
2365
2366         /* make appqueue */
2367         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2368         g_object_set (priv->appqueue[i], "max-size-buffers",
2369             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2370             NULL);
2371         gst_bin_add (bin, priv->appqueue[i]);
2372         /* and link tee to appqueue */
2373         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2374         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2375         gst_pad_link (teepad, pad);
2376         gst_object_unref (pad);
2377         gst_object_unref (teepad);
2378
2379         /* and link appqueue to appsink */
2380         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2381         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2382         gst_pad_link (queuepad, pad);
2383         gst_object_unref (pad);
2384         gst_object_unref (queuepad);
2385       } else if (is_tcp) {
2386         /* only appsink needed, link it to the session */
2387         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2388         gst_pad_link (priv->send_src[i], pad);
2389         gst_object_unref (pad);
2390
2391         /* when its only TCP, we need to set sync and preroll to FALSE
2392          * for the sink to avoid deadlock. And this is only needed for
2393          * sink used for RTCP data, not the RTP data. */
2394         if (i == 1)
2395           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2396       } else {
2397         /* else only udpsink needed, link it to the session */
2398         gst_pad_link (priv->send_src[i], sinkpad);
2399         gst_object_unref (sinkpad);
2400       }
2401     }
2402
2403     /* check if we need to set to a special state */
2404     if (state != GST_STATE_NULL) {
2405       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2406         gst_element_set_state (priv->udpsink[i], state);
2407       if (priv->appsink[i] && (priv->srcpad || i == 1))
2408         gst_element_set_state (priv->appsink[i], state);
2409       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2410         gst_element_set_state (priv->appqueue[i], state);
2411       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2412         gst_element_set_state (priv->udpqueue[i], state);
2413       if (priv->tee[i] && (priv->srcpad || i == 1))
2414         gst_element_set_state (priv->tee[i], state);
2415     }
2416   }
2417
2418   return TRUE;
2419
2420   /* ERRORS */
2421 no_udp_protocol:
2422   {
2423     return FALSE;
2424   }
2425 }
2426
2427 /* must be called with lock */
2428 static void
2429 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2430 {
2431   GstRTSPStreamPrivate *priv;
2432   GstPad *pad, *selpad;
2433   gboolean is_tcp;
2434   gint i;
2435
2436   priv = stream->priv;
2437
2438   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2439
2440   for (i = 0; i < 2; i++) {
2441     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2442      * RTCP sink always */
2443     if (priv->sinkpad || i == 1) {
2444       /* For the receiver we create this bit of pipeline for both
2445        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2446        * and it is all funneled into the rtpbin receive pad.
2447        *
2448        * .--------.     .--------.    .--------.
2449        * | udpsrc |     | funnel |    | rtpbin |
2450        * |       src->sink      src->sink      |
2451        * '--------'     |        |    '--------'
2452        * .--------.     |        |
2453        * | appsrc |     |        |
2454        * |       src->sink       |
2455        * '--------'     '--------'
2456        */
2457       /* make funnel for the RTP/RTCP receivers */
2458       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2459       gst_bin_add (bin, priv->funnel[i]);
2460
2461       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2462       gst_pad_link (pad, priv->recv_sink[i]);
2463       gst_object_unref (pad);
2464
2465       if (priv->udpsrc_v4[i]) {
2466         if (priv->srcpad) {
2467           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2468            * values. This is only relevant for PLAY pipelines */
2469           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2470           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2471         }
2472         /* add udpsrc */
2473         gst_bin_add (bin, priv->udpsrc_v4[i]);
2474
2475         /* and link to the funnel v4 */
2476         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2477         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2478         gst_pad_link (pad, selpad);
2479         gst_object_unref (pad);
2480         gst_object_unref (selpad);
2481       }
2482
2483       if (priv->udpsrc_v6[i]) {
2484         if (priv->srcpad) {
2485           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2486           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2487         }
2488         gst_bin_add (bin, priv->udpsrc_v6[i]);
2489
2490         /* and link to the funnel v6 */
2491         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2492         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2493         gst_pad_link (pad, selpad);
2494         gst_object_unref (pad);
2495         gst_object_unref (selpad);
2496       }
2497
2498       if (is_tcp) {
2499         /* make and add appsrc */
2500         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2501         priv->appsrc_base_time[i] = -1;
2502         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2503         gst_bin_add (bin, priv->appsrc[i]);
2504         /* and link to the funnel */
2505         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2506         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2507         gst_pad_link (pad, selpad);
2508         gst_object_unref (pad);
2509         gst_object_unref (selpad);
2510       }
2511     }
2512
2513     /* check if we need to set to a special state */
2514     if (state != GST_STATE_NULL) {
2515       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2516         gst_element_set_state (priv->funnel[i], state);
2517       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2518         gst_element_set_state (priv->appsrc[i], state);
2519     }
2520   }
2521 }
2522
2523 /**
2524  * gst_rtsp_stream_join_bin:
2525  * @stream: a #GstRTSPStream
2526  * @bin: (transfer none): a #GstBin to join
2527  * @rtpbin: (transfer none): a rtpbin element in @bin
2528  * @state: the target state of the new elements
2529  *
2530  * Join the #GstBin @bin that contains the element @rtpbin.
2531  *
2532  * @stream will link to @rtpbin, which must be inside @bin. The elements
2533  * added to @bin will be set to the state given in @state.
2534  *
2535  * Returns: %TRUE on success.
2536  */
2537 gboolean
2538 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2539     GstElement * rtpbin, GstState state)
2540 {
2541   GstRTSPStreamPrivate *priv;
2542   guint idx;
2543   gchar *name;
2544   GstPadLinkReturn ret;
2545
2546   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2547   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2548   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2549
2550   priv = stream->priv;
2551
2552   g_mutex_lock (&priv->lock);
2553   if (priv->is_joined)
2554     goto was_joined;
2555
2556   /* create a session with the same index as the stream */
2557   idx = priv->idx;
2558
2559   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2560
2561   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2562       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2563     /* For SRTP */
2564     g_signal_connect (rtpbin, "request-rtp-encoder",
2565         (GCallback) request_rtp_encoder, stream);
2566     g_signal_connect (rtpbin, "request-rtcp-encoder",
2567         (GCallback) request_rtcp_encoder, stream);
2568     g_signal_connect (rtpbin, "request-rtp-decoder",
2569         (GCallback) request_rtp_rtcp_decoder, stream);
2570     g_signal_connect (rtpbin, "request-rtcp-decoder",
2571         (GCallback) request_rtp_rtcp_decoder, stream);
2572   }
2573
2574   if (priv->sinkpad) {
2575     g_signal_connect (rtpbin, "request-pt-map",
2576         (GCallback) request_pt_map, stream);
2577   }
2578
2579   /* get pads from the RTP session element for sending and receiving
2580    * RTP/RTCP*/
2581   if (priv->srcpad) {
2582     /* get a pad for sending RTP */
2583     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2584     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2585     g_free (name);
2586
2587     /* link the RTP pad to the session manager, it should not really fail unless
2588      * this is not really an RTP pad */
2589     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2590     if (ret != GST_PAD_LINK_OK)
2591       goto link_failed;
2592
2593     name = g_strdup_printf ("send_rtp_src_%u", idx);
2594     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2595     g_free (name);
2596   } else {
2597     /* Need to connect our sinkpad from here */
2598     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2599     /* EOS */
2600     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2601
2602     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2603     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2604     g_free (name);
2605   }
2606
2607   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2608   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2609   g_free (name);
2610   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2611   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2612   g_free (name);
2613
2614   /* get the session */
2615   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2616
2617   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2618       stream);
2619   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2620       stream);
2621   g_signal_connect (priv->session, "on-ssrc-active",
2622       (GCallback) on_ssrc_active, stream);
2623   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2624       stream);
2625   g_signal_connect (priv->session, "on-bye-timeout",
2626       (GCallback) on_bye_timeout, stream);
2627   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2628       stream);
2629
2630   /* signal for sender ssrc */
2631   g_signal_connect (priv->session, "on-new-sender-ssrc",
2632       (GCallback) on_new_sender_ssrc, stream);
2633   g_signal_connect (priv->session, "on-sender-ssrc-active",
2634       (GCallback) on_sender_ssrc_active, stream);
2635
2636   if (!create_sender_part (stream, bin, state))
2637     goto no_udp_protocol;
2638
2639   create_receiver_part (stream, bin, state);
2640
2641   if (priv->srcpad) {
2642     /* be notified of caps changes */
2643     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2644         (GCallback) caps_notify, stream);
2645   }
2646
2647   priv->is_joined = TRUE;
2648   g_mutex_unlock (&priv->lock);
2649
2650   return TRUE;
2651
2652   /* ERRORS */
2653 was_joined:
2654   {
2655     g_mutex_unlock (&priv->lock);
2656     return TRUE;
2657   }
2658 link_failed:
2659   {
2660     GST_WARNING ("failed to link stream %u", idx);
2661     gst_object_unref (priv->send_rtp_sink);
2662     priv->send_rtp_sink = NULL;
2663     g_mutex_unlock (&priv->lock);
2664     return FALSE;
2665   }
2666 no_udp_protocol:
2667   {
2668     GST_WARNING ("failed to allocate ports %u", idx);
2669     gst_object_unref (priv->send_rtp_sink);
2670     priv->send_rtp_sink = NULL;
2671     gst_object_unref (priv->send_src[0]);
2672     priv->send_src[0] = NULL;
2673     gst_object_unref (priv->send_src[1]);
2674     priv->send_src[1] = NULL;
2675     gst_object_unref (priv->recv_sink[0]);
2676     priv->recv_sink[0] = NULL;
2677     gst_object_unref (priv->recv_sink[1]);
2678     priv->recv_sink[1] = NULL;
2679     if (priv->udpsink[0])
2680       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2681     if (priv->udpsink[1])
2682       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2683     if (priv->udpsrc_v4[0]) {
2684       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2685       gst_object_unref (priv->udpsrc_v4[0]);
2686       priv->udpsrc_v4[0] = NULL;
2687     }
2688     if (priv->udpsrc_v4[1]) {
2689       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2690       gst_object_unref (priv->udpsrc_v4[1]);
2691       priv->udpsrc_v4[1] = NULL;
2692     }
2693     if (priv->udpsrc_mcast_v4[0]) {
2694       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2695       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2696       priv->udpsrc_mcast_v4[0] = NULL;
2697     }
2698     if (priv->udpsrc_mcast_v4[1]) {
2699       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2700       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2701       priv->udpsrc_mcast_v4[1] = NULL;
2702     }
2703     if (priv->udpsrc_v6[0]) {
2704       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2705       gst_object_unref (priv->udpsrc_v6[0]);
2706       priv->udpsrc_v6[0] = NULL;
2707     }
2708     if (priv->udpsrc_v6[1]) {
2709       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2710       gst_object_unref (priv->udpsrc_v6[1]);
2711       priv->udpsrc_v6[1] = NULL;
2712     }
2713     if (priv->udpsrc_mcast_v6[0]) {
2714       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2715       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2716       priv->udpsrc_mcast_v6[0] = NULL;
2717     }
2718     if (priv->udpsrc_mcast_v6[1]) {
2719       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2720       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2721       priv->udpsrc_mcast_v6[1] = NULL;
2722     }
2723     g_mutex_unlock (&priv->lock);
2724     return FALSE;
2725   }
2726 }
2727
2728 /**
2729  * gst_rtsp_stream_leave_bin:
2730  * @stream: a #GstRTSPStream
2731  * @bin: (transfer none): a #GstBin
2732  * @rtpbin: (transfer none): a rtpbin #GstElement
2733  *
2734  * Remove the elements of @stream from @bin.
2735  *
2736  * Return: %TRUE on success.
2737  */
2738 gboolean
2739 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2740     GstElement * rtpbin)
2741 {
2742   GstRTSPStreamPrivate *priv;
2743   gint i;
2744   gboolean is_tcp, is_udp;
2745
2746   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2747   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2748   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2749
2750   priv = stream->priv;
2751
2752   g_mutex_lock (&priv->lock);
2753   if (!priv->is_joined)
2754     goto was_not_joined;
2755
2756   /* all transports must be removed by now */
2757   if (priv->transports != NULL)
2758     goto transports_not_removed;
2759
2760   clear_tr_cache (priv, TRUE);
2761   clear_tr_cache (priv, FALSE);
2762
2763   GST_INFO ("stream %p leaving bin", stream);
2764
2765   if (priv->srcpad) {
2766     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2767
2768     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2769     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2770     gst_object_unref (priv->send_rtp_sink);
2771     priv->send_rtp_sink = NULL;
2772   } else if (priv->recv_rtp_src) {
2773     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2774     gst_object_unref (priv->recv_rtp_src);
2775     priv->recv_rtp_src = NULL;
2776   }
2777
2778   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2779
2780   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2781       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2782
2783
2784   for (i = 0; i < 2; i++) {
2785     if (priv->udpsink[i])
2786       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2787     if (priv->appsink[i])
2788       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2789     if (priv->appqueue[i])
2790       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2791     if (priv->udpqueue[i])
2792       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2793     if (priv->tee[i])
2794       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2795     if (priv->funnel[i])
2796       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2797     if (priv->appsrc[i])
2798       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2799
2800     if (priv->udpsrc_v4[i]) {
2801       if (priv->sinkpad || i == 1) {
2802         /* and set udpsrc to NULL now before removing */
2803         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2804         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2805         /* removing them should also nicely release the request
2806          * pads when they finalize */
2807         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2808       } else {
2809         /* we need to set the state to NULL before unref */
2810         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2811         gst_object_unref (priv->udpsrc_v4[i]);
2812       }
2813     }
2814
2815     if (priv->udpsrc_mcast_v4[i]) {
2816       if (priv->sinkpad || i == 1) {
2817         /* and set udpsrc to NULL now before removing */
2818         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2819         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2820         /* removing them should also nicely release the request
2821          * pads when they finalize */
2822         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2823       } else {
2824         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2825         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2826       }
2827     }
2828
2829     if (priv->udpsrc_v6[i]) {
2830       if (priv->sinkpad || i == 1) {
2831         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2832         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2833         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2834       } else {
2835         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2836         gst_object_unref (priv->udpsrc_v6[i]);
2837       }
2838     }
2839     if (priv->udpsrc_mcast_v6[i]) {
2840       if (priv->sinkpad || i == 1) {
2841         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2842         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2843         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
2844       } else {
2845         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2846         gst_object_unref (priv->udpsrc_mcast_v6[i]);
2847       }
2848     }
2849
2850     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2851       gst_bin_remove (bin, priv->udpsink[i]);
2852     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2853       gst_bin_remove (bin, priv->appsrc[i]);
2854     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2855       gst_bin_remove (bin, priv->appsink[i]);
2856     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2857       gst_bin_remove (bin, priv->appqueue[i]);
2858     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2859       gst_bin_remove (bin, priv->udpqueue[i]);
2860     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2861       gst_bin_remove (bin, priv->tee[i]);
2862     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2863       gst_bin_remove (bin, priv->funnel[i]);
2864
2865     if (priv->sinkpad || i == 1) {
2866       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2867       gst_object_unref (priv->recv_sink[i]);
2868       priv->recv_sink[i] = NULL;
2869     }
2870
2871     priv->udpsrc_v4[i] = NULL;
2872     priv->udpsrc_v6[i] = NULL;
2873     priv->udpsrc_mcast_v4[i] = NULL;
2874     priv->udpsrc_mcast_v6[i] = NULL;
2875     priv->udpsink[i] = NULL;
2876     priv->appsrc[i] = NULL;
2877     priv->appsink[i] = NULL;
2878     priv->appqueue[i] = NULL;
2879     priv->udpqueue[i] = NULL;
2880     priv->tee[i] = NULL;
2881     priv->funnel[i] = NULL;
2882   }
2883
2884   if (priv->srcpad) {
2885     gst_object_unref (priv->send_src[0]);
2886     priv->send_src[0] = NULL;
2887   }
2888
2889   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2890   gst_object_unref (priv->send_src[1]);
2891   priv->send_src[1] = NULL;
2892
2893   g_object_unref (priv->session);
2894   priv->session = NULL;
2895   if (priv->caps)
2896     gst_caps_unref (priv->caps);
2897   priv->caps = NULL;
2898
2899   if (priv->srtpenc)
2900     gst_object_unref (priv->srtpenc);
2901   if (priv->srtpdec)
2902     gst_object_unref (priv->srtpdec);
2903
2904   priv->is_joined = FALSE;
2905   g_mutex_unlock (&priv->lock);
2906
2907   return TRUE;
2908
2909 was_not_joined:
2910   {
2911     g_mutex_unlock (&priv->lock);
2912     return TRUE;
2913   }
2914 transports_not_removed:
2915   {
2916     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2917     g_mutex_unlock (&priv->lock);
2918     return FALSE;
2919   }
2920 }
2921
2922 /**
2923  * gst_rtsp_stream_get_rtpinfo:
2924  * @stream: a #GstRTSPStream
2925  * @rtptime: (allow-none): result RTP timestamp
2926  * @seq: (allow-none): result RTP seqnum
2927  * @clock_rate: (allow-none): the clock rate
2928  * @running_time: (allow-none): result running-time
2929  *
2930  * Retrieve the current rtptime, seq and running-time. This is used to
2931  * construct a RTPInfo reply header.
2932  *
2933  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2934  */
2935 gboolean
2936 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2937     guint * rtptime, guint * seq, guint * clock_rate,
2938     GstClockTime * running_time)
2939 {
2940   GstRTSPStreamPrivate *priv;
2941   GstStructure *stats;
2942   GObjectClass *payobjclass;
2943
2944   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2945
2946   priv = stream->priv;
2947
2948   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2949
2950   g_mutex_lock (&priv->lock);
2951
2952   /* First try to extract the information from the last buffer on the sinks.
2953    * This will have a more accurate sequence number and timestamp, as between
2954    * the payloader and the sink there can be some queues
2955    */
2956   if (priv->udpsink[0] || priv->appsink[0]) {
2957     GstSample *last_sample;
2958
2959     if (priv->udpsink[0])
2960       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2961     else
2962       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2963
2964     if (last_sample) {
2965       GstCaps *caps;
2966       GstBuffer *buffer;
2967       GstSegment *segment;
2968       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2969
2970       caps = gst_sample_get_caps (last_sample);
2971       buffer = gst_sample_get_buffer (last_sample);
2972       segment = gst_sample_get_segment (last_sample);
2973
2974       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2975         if (seq) {
2976           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2977         }
2978
2979         if (rtptime) {
2980           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2981         }
2982
2983         gst_rtp_buffer_unmap (&rtp_buffer);
2984
2985         if (running_time) {
2986           *running_time =
2987               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2988               GST_BUFFER_TIMESTAMP (buffer));
2989         }
2990
2991         if (clock_rate) {
2992           GstStructure *s = gst_caps_get_structure (caps, 0);
2993
2994           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2995
2996           if (*clock_rate == 0 && running_time)
2997             *running_time = GST_CLOCK_TIME_NONE;
2998         }
2999         gst_sample_unref (last_sample);
3000
3001         goto done;
3002       } else {
3003         gst_sample_unref (last_sample);
3004       }
3005     }
3006   }
3007
3008   if (g_object_class_find_property (payobjclass, "stats")) {
3009     g_object_get (priv->payloader, "stats", &stats, NULL);
3010     if (stats == NULL)
3011       goto no_stats;
3012
3013     if (seq)
3014       gst_structure_get_uint (stats, "seqnum", seq);
3015
3016     if (rtptime)
3017       gst_structure_get_uint (stats, "timestamp", rtptime);
3018
3019     if (running_time)
3020       gst_structure_get_clock_time (stats, "running-time", running_time);
3021
3022     if (clock_rate) {
3023       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3024       if (*clock_rate == 0 && running_time)
3025         *running_time = GST_CLOCK_TIME_NONE;
3026     }
3027     gst_structure_free (stats);
3028   } else {
3029     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3030         !g_object_class_find_property (payobjclass, "timestamp"))
3031       goto no_stats;
3032
3033     if (seq)
3034       g_object_get (priv->payloader, "seqnum", seq, NULL);
3035
3036     if (rtptime)
3037       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3038
3039     if (running_time)
3040       *running_time = GST_CLOCK_TIME_NONE;
3041   }
3042
3043 done:
3044   g_mutex_unlock (&priv->lock);
3045
3046   return TRUE;
3047
3048   /* ERRORS */
3049 no_stats:
3050   {
3051     GST_WARNING ("Could not get payloader stats");
3052     g_mutex_unlock (&priv->lock);
3053     return FALSE;
3054   }
3055 }
3056
3057 /**
3058  * gst_rtsp_stream_get_caps:
3059  * @stream: a #GstRTSPStream
3060  *
3061  * Retrieve the current caps of @stream.
3062  *
3063  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3064  * after usage.
3065  */
3066 GstCaps *
3067 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3068 {
3069   GstRTSPStreamPrivate *priv;
3070   GstCaps *result;
3071
3072   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3073
3074   priv = stream->priv;
3075
3076   g_mutex_lock (&priv->lock);
3077   if ((result = priv->caps))
3078     gst_caps_ref (result);
3079   g_mutex_unlock (&priv->lock);
3080
3081   return result;
3082 }
3083
3084 /**
3085  * gst_rtsp_stream_recv_rtp:
3086  * @stream: a #GstRTSPStream
3087  * @buffer: (transfer full): a #GstBuffer
3088  *
3089  * Handle an RTP buffer for the stream. This method is usually called when a
3090  * message has been received from a client using the TCP transport.
3091  *
3092  * This function takes ownership of @buffer.
3093  *
3094  * Returns: a GstFlowReturn.
3095  */
3096 GstFlowReturn
3097 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3098 {
3099   GstRTSPStreamPrivate *priv;
3100   GstFlowReturn ret;
3101   GstElement *element;
3102
3103   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3104   priv = stream->priv;
3105   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3106   g_return_val_if_fail (priv->is_joined, FALSE);
3107
3108   g_mutex_lock (&priv->lock);
3109   if (priv->appsrc[0])
3110     element = gst_object_ref (priv->appsrc[0]);
3111   else
3112     element = NULL;
3113   g_mutex_unlock (&priv->lock);
3114
3115   if (element) {
3116     if (priv->appsrc_base_time[0] == -1) {
3117       /* Take current running_time. This timestamp will be put on
3118        * the first buffer of each stream because we are a live source and so we
3119        * timestamp with the running_time. When we are dealing with TCP, we also
3120        * only timestamp the first buffer (using the DISCONT flag) because a server
3121        * typically bursts data, for which we don't want to compensate by speeding
3122        * up the media. The other timestamps will be interpollated from this one
3123        * using the RTP timestamps. */
3124       GST_OBJECT_LOCK (element);
3125       if (GST_ELEMENT_CLOCK (element)) {
3126         GstClockTime now;
3127         GstClockTime base_time;
3128
3129         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3130         base_time = GST_ELEMENT_CAST (element)->base_time;
3131
3132         priv->appsrc_base_time[0] = now - base_time;
3133         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3134         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3135             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3136             GST_TIME_ARGS (base_time));
3137       }
3138       GST_OBJECT_UNLOCK (element);
3139     }
3140
3141     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3142     gst_object_unref (element);
3143   } else {
3144     ret = GST_FLOW_OK;
3145   }
3146   return ret;
3147 }
3148
3149 /**
3150  * gst_rtsp_stream_recv_rtcp:
3151  * @stream: a #GstRTSPStream
3152  * @buffer: (transfer full): a #GstBuffer
3153  *
3154  * Handle an RTCP buffer for the stream. This method is usually called when a
3155  * message has been received from a client using the TCP transport.
3156  *
3157  * This function takes ownership of @buffer.
3158  *
3159  * Returns: a GstFlowReturn.
3160  */
3161 GstFlowReturn
3162 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3163 {
3164   GstRTSPStreamPrivate *priv;
3165   GstFlowReturn ret;
3166   GstElement *element;
3167
3168   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3169   priv = stream->priv;
3170   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3171
3172   if (!priv->is_joined) {
3173     gst_buffer_unref (buffer);
3174     return GST_FLOW_NOT_LINKED;
3175   }
3176   g_mutex_lock (&priv->lock);
3177   if (priv->appsrc[1])
3178     element = gst_object_ref (priv->appsrc[1]);
3179   else
3180     element = NULL;
3181   g_mutex_unlock (&priv->lock);
3182
3183   if (element) {
3184     if (priv->appsrc_base_time[1] == -1) {
3185       /* Take current running_time. This timestamp will be put on
3186        * the first buffer of each stream because we are a live source and so we
3187        * timestamp with the running_time. When we are dealing with TCP, we also
3188        * only timestamp the first buffer (using the DISCONT flag) because a server
3189        * typically bursts data, for which we don't want to compensate by speeding
3190        * up the media. The other timestamps will be interpollated from this one
3191        * using the RTP timestamps. */
3192       GST_OBJECT_LOCK (element);
3193       if (GST_ELEMENT_CLOCK (element)) {
3194         GstClockTime now;
3195         GstClockTime base_time;
3196
3197         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3198         base_time = GST_ELEMENT_CAST (element)->base_time;
3199
3200         priv->appsrc_base_time[1] = now - base_time;
3201         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3202         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3203             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3204             GST_TIME_ARGS (base_time));
3205       }
3206       GST_OBJECT_UNLOCK (element);
3207     }
3208
3209     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3210     gst_object_unref (element);
3211   } else {
3212     ret = GST_FLOW_OK;
3213     gst_buffer_unref (buffer);
3214   }
3215   return ret;
3216 }
3217
3218 /* must be called with lock */
3219 static gboolean
3220 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3221     gboolean add)
3222 {
3223   GstRTSPStreamPrivate *priv = stream->priv;
3224   const GstRTSPTransport *tr;
3225
3226   tr = gst_rtsp_stream_transport_get_transport (trans);
3227
3228   switch (tr->lower_transport) {
3229     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3230     case GST_RTSP_LOWER_TRANS_UDP:
3231     {
3232       gchar *dest;
3233       gint min, max;
3234       guint ttl = 0;
3235
3236       dest = tr->destination;
3237       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3238         min = tr->port.min;
3239         max = tr->port.max;
3240         ttl = tr->ttl;
3241       } else if (priv->client_side) {
3242         /* In client side mode the 'destination' is the RTSP server, so send
3243          * to those ports */
3244         min = tr->server_port.min;
3245         max = tr->server_port.max;
3246       } else {
3247         min = tr->client_port.min;
3248         max = tr->client_port.max;
3249       }
3250
3251       if (add) {
3252         if (ttl > 0) {
3253           GST_INFO ("setting ttl-mc %d", ttl);
3254           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3255           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3256         }
3257         GST_INFO ("adding %s:%d-%d", dest, min, max);
3258         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3259         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3260         priv->transports = g_list_prepend (priv->transports, trans);
3261       } else {
3262         GST_INFO ("removing %s:%d-%d", dest, min, max);
3263         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3264         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3265         priv->transports = g_list_remove (priv->transports, trans);
3266       }
3267       priv->transports_cookie++;
3268       break;
3269     }
3270     case GST_RTSP_LOWER_TRANS_TCP:
3271       if (add) {
3272         GST_INFO ("adding TCP %s", tr->destination);
3273         priv->transports = g_list_prepend (priv->transports, trans);
3274       } else {
3275         GST_INFO ("removing TCP %s", tr->destination);
3276         priv->transports = g_list_remove (priv->transports, trans);
3277       }
3278       priv->transports_cookie++;
3279       break;
3280     default:
3281       goto unknown_transport;
3282   }
3283   return TRUE;
3284
3285   /* ERRORS */
3286 unknown_transport:
3287   {
3288     GST_INFO ("Unknown transport %d", tr->lower_transport);
3289     return FALSE;
3290   }
3291 }
3292
3293
3294 /**
3295  * gst_rtsp_stream_add_transport:
3296  * @stream: a #GstRTSPStream
3297  * @trans: (transfer none): a #GstRTSPStreamTransport
3298  *
3299  * Add the transport in @trans to @stream. The media of @stream will
3300  * then also be send to the values configured in @trans.
3301  *
3302  * @stream must be joined to a bin.
3303  *
3304  * @trans must contain a valid #GstRTSPTransport.
3305  *
3306  * Returns: %TRUE if @trans was added
3307  */
3308 gboolean
3309 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3310     GstRTSPStreamTransport * trans)
3311 {
3312   GstRTSPStreamPrivate *priv;
3313   gboolean res;
3314
3315   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3316   priv = stream->priv;
3317   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3318   g_return_val_if_fail (priv->is_joined, FALSE);
3319
3320   g_mutex_lock (&priv->lock);
3321   res = update_transport (stream, trans, TRUE);
3322   g_mutex_unlock (&priv->lock);
3323
3324   return res;
3325 }
3326
3327 /**
3328  * gst_rtsp_stream_remove_transport:
3329  * @stream: a #GstRTSPStream
3330  * @trans: (transfer none): a #GstRTSPStreamTransport
3331  *
3332  * Remove the transport in @trans from @stream. The media of @stream will
3333  * not be sent to the values configured in @trans.
3334  *
3335  * @stream must be joined to a bin.
3336  *
3337  * @trans must contain a valid #GstRTSPTransport.
3338  *
3339  * Returns: %TRUE if @trans was removed
3340  */
3341 gboolean
3342 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3343     GstRTSPStreamTransport * trans)
3344 {
3345   GstRTSPStreamPrivate *priv;
3346   gboolean res;
3347
3348   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3349   priv = stream->priv;
3350   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3351   g_return_val_if_fail (priv->is_joined, FALSE);
3352
3353   g_mutex_lock (&priv->lock);
3354   res = update_transport (stream, trans, FALSE);
3355   g_mutex_unlock (&priv->lock);
3356
3357   return res;
3358 }
3359
3360 /**
3361  * gst_rtsp_stream_update_crypto:
3362  * @stream: a #GstRTSPStream
3363  * @ssrc: the SSRC
3364  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3365  *
3366  * Update the new crypto information for @ssrc in @stream. If information
3367  * for @ssrc did not exist, it will be added. If information
3368  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3369  * be removed from @stream.
3370  *
3371  * Returns: %TRUE if @crypto could be updated
3372  */
3373 gboolean
3374 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3375     guint ssrc, GstCaps * crypto)
3376 {
3377   GstRTSPStreamPrivate *priv;
3378
3379   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3380   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3381
3382   priv = stream->priv;
3383
3384   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3385
3386   g_mutex_lock (&priv->lock);
3387   if (crypto)
3388     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3389         gst_caps_ref (crypto));
3390   else
3391     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3392   g_mutex_unlock (&priv->lock);
3393
3394   return TRUE;
3395 }
3396
3397 /**
3398  * gst_rtsp_stream_get_rtp_socket:
3399  * @stream: a #GstRTSPStream
3400  * @family: the socket family
3401  *
3402  * Get the RTP socket from @stream for a @family.
3403  *
3404  * @stream must be joined to a bin.
3405  *
3406  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3407  * socket could be allocated for @family. Unref after usage
3408  */
3409 GSocket *
3410 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3411 {
3412   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3413   GSocket *socket;
3414   const gchar *name;
3415
3416   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3417   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3418       family == G_SOCKET_FAMILY_IPV6, NULL);
3419   g_return_val_if_fail (priv->udpsink[0], NULL);
3420
3421   if (family == G_SOCKET_FAMILY_IPV6)
3422     name = "socket-v6";
3423   else
3424     name = "socket";
3425
3426   g_object_get (priv->udpsink[0], name, &socket, NULL);
3427
3428   return socket;
3429 }
3430
3431 /**
3432  * gst_rtsp_stream_get_rtcp_socket:
3433  * @stream: a #GstRTSPStream
3434  * @family: the socket family
3435  *
3436  * Get the RTCP socket from @stream for a @family.
3437  *
3438  * @stream must be joined to a bin.
3439  *
3440  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3441  * socket could be allocated for @family. Unref after usage
3442  */
3443 GSocket *
3444 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3445 {
3446   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3447   GSocket *socket;
3448   const gchar *name;
3449
3450   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3451   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3452       family == G_SOCKET_FAMILY_IPV6, NULL);
3453   g_return_val_if_fail (priv->udpsink[1], NULL);
3454
3455   if (family == G_SOCKET_FAMILY_IPV6)
3456     name = "socket-v6";
3457   else
3458     name = "socket";
3459
3460   g_object_get (priv->udpsink[1], name, &socket, NULL);
3461
3462   return socket;
3463 }
3464
3465 /**
3466  * gst_rtsp_stream_set_seqnum:
3467  * @stream: a #GstRTSPStream
3468  * @seqnum: a new sequence number
3469  *
3470  * Configure the sequence number in the payloader of @stream to @seqnum.
3471  */
3472 void
3473 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3474 {
3475   GstRTSPStreamPrivate *priv;
3476
3477   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3478
3479   priv = stream->priv;
3480
3481   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3482 }
3483
3484 /**
3485  * gst_rtsp_stream_get_seqnum:
3486  * @stream: a #GstRTSPStream
3487  *
3488  * Get the configured sequence number in the payloader of @stream.
3489  *
3490  * Returns: the sequence number of the payloader.
3491  */
3492 guint16
3493 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3494 {
3495   GstRTSPStreamPrivate *priv;
3496   guint seqnum;
3497
3498   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3499
3500   priv = stream->priv;
3501
3502   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3503
3504   return seqnum;
3505 }
3506
3507 /**
3508  * gst_rtsp_stream_transport_filter:
3509  * @stream: a #GstRTSPStream
3510  * @func: (scope call) (allow-none): a callback
3511  * @user_data: (closure): user data passed to @func
3512  *
3513  * Call @func for each transport managed by @stream. The result value of @func
3514  * determines what happens to the transport. @func will be called with @stream
3515  * locked so no further actions on @stream can be performed from @func.
3516  *
3517  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3518  * @stream.
3519  *
3520  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3521  *
3522  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3523  * will also be added with an additional ref to the result #GList of this
3524  * function..
3525  *
3526  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3527  *
3528  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3529  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3530  * element in the #GList should be unreffed before the list is freed.
3531  */
3532 GList *
3533 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3534     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3535 {
3536   GstRTSPStreamPrivate *priv;
3537   GList *result, *walk, *next;
3538   GHashTable *visited = NULL;
3539   guint cookie;
3540
3541   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3542
3543   priv = stream->priv;
3544
3545   result = NULL;
3546   if (func)
3547     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3548
3549   g_mutex_lock (&priv->lock);
3550 restart:
3551   cookie = priv->transports_cookie;
3552   for (walk = priv->transports; walk; walk = next) {
3553     GstRTSPStreamTransport *trans = walk->data;
3554     GstRTSPFilterResult res;
3555     gboolean changed;
3556
3557     next = g_list_next (walk);
3558
3559     if (func) {
3560       /* only visit each transport once */
3561       if (g_hash_table_contains (visited, trans))
3562         continue;
3563
3564       g_hash_table_add (visited, g_object_ref (trans));
3565       g_mutex_unlock (&priv->lock);
3566
3567       res = func (stream, trans, user_data);
3568
3569       g_mutex_lock (&priv->lock);
3570     } else
3571       res = GST_RTSP_FILTER_REF;
3572
3573     changed = (cookie != priv->transports_cookie);
3574
3575     switch (res) {
3576       case GST_RTSP_FILTER_REMOVE:
3577         update_transport (stream, trans, FALSE);
3578         break;
3579       case GST_RTSP_FILTER_REF:
3580         result = g_list_prepend (result, g_object_ref (trans));
3581         break;
3582       case GST_RTSP_FILTER_KEEP:
3583       default:
3584         break;
3585     }
3586     if (changed)
3587       goto restart;
3588   }
3589   g_mutex_unlock (&priv->lock);
3590
3591   if (func)
3592     g_hash_table_unref (visited);
3593
3594   return result;
3595 }
3596
3597 static GstPadProbeReturn
3598 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3599 {
3600   GstRTSPStreamPrivate *priv;
3601   GstRTSPStream *stream;
3602
3603   stream = user_data;
3604   priv = stream->priv;
3605
3606   GST_DEBUG_OBJECT (pad, "now blocking");
3607
3608   g_mutex_lock (&priv->lock);
3609   priv->blocking = TRUE;
3610   g_mutex_unlock (&priv->lock);
3611
3612   gst_element_post_message (priv->payloader,
3613       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3614           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3615
3616   return GST_PAD_PROBE_OK;
3617 }
3618
3619 /**
3620  * gst_rtsp_stream_set_blocked:
3621  * @stream: a #GstRTSPStream
3622  * @blocked: boolean indicating we should block or unblock
3623  *
3624  * Blocks or unblocks the dataflow on @stream.
3625  *
3626  * Returns: %TRUE on success
3627  */
3628 gboolean
3629 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3630 {
3631   GstRTSPStreamPrivate *priv;
3632
3633   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3634
3635   priv = stream->priv;
3636
3637   g_mutex_lock (&priv->lock);
3638   if (blocked) {
3639     priv->blocking = FALSE;
3640     if (priv->blocked_id == 0) {
3641       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3642           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3643           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3644           g_object_ref (stream), g_object_unref);
3645     }
3646   } else {
3647     if (priv->blocked_id != 0) {
3648       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3649       priv->blocked_id = 0;
3650       priv->blocking = FALSE;
3651     }
3652   }
3653   g_mutex_unlock (&priv->lock);
3654
3655   return TRUE;
3656 }
3657
3658 /**
3659  * gst_rtsp_stream_is_blocking:
3660  * @stream: a #GstRTSPStream
3661  *
3662  * Check if @stream is blocking on a #GstBuffer.
3663  *
3664  * Returns: %TRUE if @stream is blocking
3665  */
3666 gboolean
3667 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3668 {
3669   GstRTSPStreamPrivate *priv;
3670   gboolean result;
3671
3672   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3673
3674   priv = stream->priv;
3675
3676   g_mutex_lock (&priv->lock);
3677   result = priv->blocking;
3678   g_mutex_unlock (&priv->lock);
3679
3680   return result;
3681 }
3682
3683 /**
3684  * gst_rtsp_stream_query_position:
3685  * @stream: a #GstRTSPStream
3686  *
3687  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3688  * the RTP parts of the pipeline and not the RTCP parts.
3689  *
3690  * Returns: %TRUE if the position could be queried
3691  */
3692 gboolean
3693 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3694 {
3695   GstRTSPStreamPrivate *priv;
3696   GstElement *sink;
3697   gboolean ret;
3698
3699   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3700
3701   priv = stream->priv;
3702
3703   g_mutex_lock (&priv->lock);
3704   /* depending on the transport type, it should query corresponding sink */
3705   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3706       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3707     sink = priv->udpsink[0];
3708   else
3709     sink = priv->appsink[0];
3710
3711   if (sink)
3712     gst_object_ref (sink);
3713   g_mutex_unlock (&priv->lock);
3714
3715   if (!sink)
3716     return FALSE;
3717
3718   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3719   gst_object_unref (sink);
3720
3721   return ret;
3722 }
3723
3724 /**
3725  * gst_rtsp_stream_query_stop:
3726  * @stream: a #GstRTSPStream
3727  *
3728  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3729  * the RTP parts of the pipeline and not the RTCP parts.
3730  *
3731  * Returns: %TRUE if the stop could be queried
3732  */
3733 gboolean
3734 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3735 {
3736   GstRTSPStreamPrivate *priv;
3737   GstElement *sink;
3738   GstQuery *query;
3739   gboolean ret;
3740
3741   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3742
3743   priv = stream->priv;
3744
3745   g_mutex_lock (&priv->lock);
3746   /* depending on the transport type, it should query corresponding sink */
3747   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3748       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3749     sink = priv->udpsink[0];
3750   else
3751     sink = priv->appsink[0];
3752
3753   if (sink)
3754     gst_object_ref (sink);
3755   g_mutex_unlock (&priv->lock);
3756
3757   if (!sink)
3758     return FALSE;
3759
3760   query = gst_query_new_segment (GST_FORMAT_TIME);
3761   if ((ret = gst_element_query (sink, query))) {
3762     GstFormat format;
3763
3764     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3765     if (format != GST_FORMAT_TIME)
3766       *stop = -1;
3767   }
3768   gst_query_unref (query);
3769   gst_object_unref (sink);
3770
3771   return ret;
3772
3773 }