rtsp-stream: Don't bind the sockets to multicast addresses
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   gboolean is_joined;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
98    * sockets */
99   GstElement *udpsrc_v4[2];
100   /* UDP sources for UDP multicast transports */
101   GstElement *udpsrc_mcast_v4[2];
102
103   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
104    * sockets */
105   GstElement *udpsrc_v6[2];
106   /* UDP sources for UDP multicast transports */
107   GstElement *udpsrc_mcast_v6[2];
108
109   GstElement *udpqueue[2];
110   GstElement *udpsink[2];
111
112   /* for TCP transport */
113   GstElement *appsrc[2];
114   GstClockTime appsrc_base_time[2];
115   GstElement *appqueue[2];
116   GstElement *appsink[2];
117
118   GstElement *tee[2];
119   GstElement *funnel[2];
120
121   /* retransmission */
122   GstElement *rtxsend;
123   guint rtx_pt;
124   GstClockTime rtx_time;
125
126   /* server ports for sending/receiving over ipv4 */
127   GstRTSPRange server_port_v4;
128   GstRTSPAddress *server_addr_v4;
129   gboolean have_ipv4;
130
131   /* server ports for sending/receiving over ipv6 */
132   GstRTSPRange server_port_v6;
133   GstRTSPAddress *server_addr_v6;
134   gboolean have_ipv6;
135
136   /* multicast addresses */
137   GstRTSPAddressPool *pool;
138   GstRTSPAddress *addr_v4;
139   GstRTSPAddress *addr_v6;
140   gboolean have_ipv4_mcast;
141   gboolean have_ipv6_mcast;
142
143   /* the caps of the stream */
144   gulong caps_sig;
145   GstCaps *caps;
146
147   /* transports we stream to */
148   guint n_active;
149   GList *transports;
150   guint transports_cookie;
151   GList *tr_cache_rtp;
152   GList *tr_cache_rtcp;
153   guint tr_cache_cookie_rtp;
154   guint tr_cache_cookie_rtcp;
155
156
157   gint dscp_qos;
158
159   /* stream blocking */
160   gulong blocked_id;
161   gboolean blocking;
162
163   /* pt->caps map for RECORD streams */
164   GHashTable *ptmap;
165 };
166
167 #define DEFAULT_CONTROL         NULL
168 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
169 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
170                                         GST_RTSP_LOWER_TRANS_TCP
171
172 enum
173 {
174   PROP_0,
175   PROP_CONTROL,
176   PROP_PROFILES,
177   PROP_PROTOCOLS,
178   PROP_LAST
179 };
180
181 enum
182 {
183   SIGNAL_NEW_RTP_ENCODER,
184   SIGNAL_NEW_RTCP_ENCODER,
185   SIGNAL_LAST
186 };
187
188 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
189 #define GST_CAT_DEFAULT rtsp_stream_debug
190
191 static GQuark ssrc_stream_map_key;
192
193 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
194     GValue * value, GParamSpec * pspec);
195 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
196     const GValue * value, GParamSpec * pspec);
197
198 static void gst_rtsp_stream_finalize (GObject * obj);
199
200 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
201
202 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
203
204 static void
205 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
206 {
207   GObjectClass *gobject_class;
208
209   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
210
211   gobject_class = G_OBJECT_CLASS (klass);
212
213   gobject_class->get_property = gst_rtsp_stream_get_property;
214   gobject_class->set_property = gst_rtsp_stream_set_property;
215   gobject_class->finalize = gst_rtsp_stream_finalize;
216
217   g_object_class_install_property (gobject_class, PROP_CONTROL,
218       g_param_spec_string ("control", "Control",
219           "The control string for this stream", DEFAULT_CONTROL,
220           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
221
222   g_object_class_install_property (gobject_class, PROP_PROFILES,
223       g_param_spec_flags ("profiles", "Profiles",
224           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
225           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
228       g_param_spec_flags ("protocols", "Protocols",
229           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
230           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
233       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
234       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
235       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
236
237   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
238       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
240       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
241
242   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
243
244   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
245 }
246
247 static void
248 gst_rtsp_stream_init (GstRTSPStream * stream)
249 {
250   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
251
252   GST_DEBUG ("new stream %p", stream);
253
254   stream->priv = priv;
255
256   priv->dscp_qos = -1;
257   priv->control = g_strdup (DEFAULT_CONTROL);
258   priv->profiles = DEFAULT_PROFILES;
259   priv->protocols = DEFAULT_PROTOCOLS;
260
261   g_mutex_init (&priv->lock);
262
263   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
264       NULL, (GDestroyNotify) gst_caps_unref);
265   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
266       (GDestroyNotify) gst_caps_unref);
267 }
268
269 static void
270 gst_rtsp_stream_finalize (GObject * obj)
271 {
272   GstRTSPStream *stream;
273   GstRTSPStreamPrivate *priv;
274
275   stream = GST_RTSP_STREAM (obj);
276   priv = stream->priv;
277
278   GST_DEBUG ("finalize stream %p", stream);
279
280   /* we really need to be unjoined now */
281   g_return_if_fail (!priv->is_joined);
282
283   if (priv->addr_v4)
284     gst_rtsp_address_free (priv->addr_v4);
285   if (priv->addr_v6)
286     gst_rtsp_address_free (priv->addr_v6);
287   if (priv->server_addr_v4)
288     gst_rtsp_address_free (priv->server_addr_v4);
289   if (priv->server_addr_v6)
290     gst_rtsp_address_free (priv->server_addr_v6);
291   if (priv->pool)
292     g_object_unref (priv->pool);
293   if (priv->rtxsend)
294     g_object_unref (priv->rtxsend);
295
296   gst_object_unref (priv->payloader);
297   if (priv->srcpad)
298     gst_object_unref (priv->srcpad);
299   if (priv->sinkpad)
300     gst_object_unref (priv->sinkpad);
301   g_free (priv->control);
302   g_mutex_clear (&priv->lock);
303
304   g_hash_table_unref (priv->keys);
305   g_hash_table_destroy (priv->ptmap);
306
307   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
308 }
309
310 static void
311 gst_rtsp_stream_get_property (GObject * object, guint propid,
312     GValue * value, GParamSpec * pspec)
313 {
314   GstRTSPStream *stream = GST_RTSP_STREAM (object);
315
316   switch (propid) {
317     case PROP_CONTROL:
318       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
319       break;
320     case PROP_PROFILES:
321       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
322       break;
323     case PROP_PROTOCOLS:
324       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
325       break;
326     default:
327       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
328   }
329 }
330
331 static void
332 gst_rtsp_stream_set_property (GObject * object, guint propid,
333     const GValue * value, GParamSpec * pspec)
334 {
335   GstRTSPStream *stream = GST_RTSP_STREAM (object);
336
337   switch (propid) {
338     case PROP_CONTROL:
339       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
340       break;
341     case PROP_PROFILES:
342       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
343       break;
344     case PROP_PROTOCOLS:
345       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
349   }
350 }
351
352 /**
353  * gst_rtsp_stream_new:
354  * @idx: an index
355  * @pad: a #GstPad
356  * @payloader: a #GstElement
357  *
358  * Create a new media stream with index @idx that handles RTP data on
359  * @pad and has a payloader element @payloader if @pad is a source pad
360  * or a depayloader element @payloader if @pad is a sink pad.
361  *
362  * Returns: (transfer full): a new #GstRTSPStream
363  */
364 GstRTSPStream *
365 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
366 {
367   GstRTSPStreamPrivate *priv;
368   GstRTSPStream *stream;
369
370   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
371   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
372
373   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
374   priv = stream->priv;
375   priv->idx = idx;
376   priv->payloader = gst_object_ref (payloader);
377   if (GST_PAD_IS_SRC (pad))
378     priv->srcpad = gst_object_ref (pad);
379   else
380     priv->sinkpad = gst_object_ref (pad);
381
382   return stream;
383 }
384
385 /**
386  * gst_rtsp_stream_get_index:
387  * @stream: a #GstRTSPStream
388  *
389  * Get the stream index.
390  *
391  * Return: the stream index.
392  */
393 guint
394 gst_rtsp_stream_get_index (GstRTSPStream * stream)
395 {
396   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
397
398   return stream->priv->idx;
399 }
400
401 /**
402  * gst_rtsp_stream_get_pt:
403  * @stream: a #GstRTSPStream
404  *
405  * Get the stream payload type.
406  *
407  * Return: the stream payload type.
408  */
409 guint
410 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
411 {
412   GstRTSPStreamPrivate *priv;
413   guint pt;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
416
417   priv = stream->priv;
418
419   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
420
421   return pt;
422 }
423
424 /**
425  * gst_rtsp_stream_get_srcpad:
426  * @stream: a #GstRTSPStream
427  *
428  * Get the srcpad associated with @stream.
429  *
430  * Returns: (transfer full): the srcpad. Unref after usage.
431  */
432 GstPad *
433 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
434 {
435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
436
437   if (!stream->priv->srcpad)
438     return NULL;
439
440   return gst_object_ref (stream->priv->srcpad);
441 }
442
443 /**
444  * gst_rtsp_stream_get_sinkpad:
445  * @stream: a #GstRTSPStream
446  *
447  * Get the sinkpad associated with @stream.
448  *
449  * Returns: (transfer full): the sinkpad. Unref after usage.
450  */
451 GstPad *
452 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
453 {
454   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
455
456   if (!stream->priv->sinkpad)
457     return NULL;
458
459   return gst_object_ref (stream->priv->sinkpad);
460 }
461
462 /**
463  * gst_rtsp_stream_get_control:
464  * @stream: a #GstRTSPStream
465  *
466  * Get the control string to identify this stream.
467  *
468  * Returns: (transfer full): the control string. g_free() after usage.
469  */
470 gchar *
471 gst_rtsp_stream_get_control (GstRTSPStream * stream)
472 {
473   GstRTSPStreamPrivate *priv;
474   gchar *result;
475
476   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
477
478   priv = stream->priv;
479
480   g_mutex_lock (&priv->lock);
481   if ((result = g_strdup (priv->control)) == NULL)
482     result = g_strdup_printf ("stream=%u", priv->idx);
483   g_mutex_unlock (&priv->lock);
484
485   return result;
486 }
487
488 /**
489  * gst_rtsp_stream_set_control:
490  * @stream: a #GstRTSPStream
491  * @control: a control string
492  *
493  * Set the control string in @stream.
494  */
495 void
496 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
497 {
498   GstRTSPStreamPrivate *priv;
499
500   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
501
502   priv = stream->priv;
503
504   g_mutex_lock (&priv->lock);
505   g_free (priv->control);
506   priv->control = g_strdup (control);
507   g_mutex_unlock (&priv->lock);
508 }
509
510 /**
511  * gst_rtsp_stream_has_control:
512  * @stream: a #GstRTSPStream
513  * @control: a control string
514  *
515  * Check if @stream has the control string @control.
516  *
517  * Returns: %TRUE is @stream has @control as the control string
518  */
519 gboolean
520 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
521 {
522   GstRTSPStreamPrivate *priv;
523   gboolean res;
524
525   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
526
527   priv = stream->priv;
528
529   g_mutex_lock (&priv->lock);
530   if (priv->control)
531     res = (g_strcmp0 (priv->control, control) == 0);
532   else {
533     guint streamid;
534
535     if (sscanf (control, "stream=%u", &streamid) > 0)
536       res = (streamid == priv->idx);
537     else
538       res = FALSE;
539   }
540   g_mutex_unlock (&priv->lock);
541
542   return res;
543 }
544
545 /**
546  * gst_rtsp_stream_set_mtu:
547  * @stream: a #GstRTSPStream
548  * @mtu: a new MTU
549  *
550  * Configure the mtu in the payloader of @stream to @mtu.
551  */
552 void
553 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
554 {
555   GstRTSPStreamPrivate *priv;
556
557   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
558
559   priv = stream->priv;
560
561   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
562
563   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
564 }
565
566 /**
567  * gst_rtsp_stream_get_mtu:
568  * @stream: a #GstRTSPStream
569  *
570  * Get the configured MTU in the payloader of @stream.
571  *
572  * Returns: the MTU of the payloader.
573  */
574 guint
575 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
576 {
577   GstRTSPStreamPrivate *priv;
578   guint mtu;
579
580   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
581
582   priv = stream->priv;
583
584   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
585
586   return mtu;
587 }
588
589 /* Update the dscp qos property on the udp sinks */
590 static void
591 update_dscp_qos (GstRTSPStream * stream)
592 {
593   GstRTSPStreamPrivate *priv;
594
595   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
596
597   priv = stream->priv;
598
599   if (priv->udpsink[0]) {
600     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
601         NULL);
602   }
603
604   if (priv->udpsink[1]) {
605     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
606         NULL);
607   }
608 }
609
610 /**
611  * gst_rtsp_stream_set_dscp_qos:
612  * @stream: a #GstRTSPStream
613  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
614  *
615  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
616  */
617 void
618 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
619 {
620   GstRTSPStreamPrivate *priv;
621
622   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
623
624   priv = stream->priv;
625
626   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
627
628   if (dscp_qos < -1 || dscp_qos > 63) {
629     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
630     return;
631   }
632
633   priv->dscp_qos = dscp_qos;
634
635   update_dscp_qos (stream);
636 }
637
638 /**
639  * gst_rtsp_stream_get_dscp_qos:
640  * @stream: a #GstRTSPStream
641  *
642  * Get the configured DSCP QoS in of the outgoing sockets.
643  *
644  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
645  */
646 gint
647 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
648 {
649   GstRTSPStreamPrivate *priv;
650
651   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
652
653   priv = stream->priv;
654
655   return priv->dscp_qos;
656 }
657
658 /**
659  * gst_rtsp_stream_is_transport_supported:
660  * @stream: a #GstRTSPStream
661  * @transport: (transfer none): a #GstRTSPTransport
662  *
663  * Check if @transport can be handled by stream
664  *
665  * Returns: %TRUE if @transport can be handled by @stream.
666  */
667 gboolean
668 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
669     GstRTSPTransport * transport)
670 {
671   GstRTSPStreamPrivate *priv;
672
673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
674
675   priv = stream->priv;
676
677   g_mutex_lock (&priv->lock);
678   if (transport->trans != GST_RTSP_TRANS_RTP)
679     goto unsupported_transmode;
680
681   if (!(transport->profile & priv->profiles))
682     goto unsupported_profile;
683
684   if (!(transport->lower_transport & priv->protocols))
685     goto unsupported_ltrans;
686
687   g_mutex_unlock (&priv->lock);
688
689   return TRUE;
690
691   /* ERRORS */
692 unsupported_transmode:
693   {
694     GST_DEBUG ("unsupported transport mode %d", transport->trans);
695     g_mutex_unlock (&priv->lock);
696     return FALSE;
697   }
698 unsupported_profile:
699   {
700     GST_DEBUG ("unsupported profile %d", transport->profile);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_ltrans:
705   {
706     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 }
711
712 /**
713  * gst_rtsp_stream_set_profiles:
714  * @stream: a #GstRTSPStream
715  * @profiles: the new profiles
716  *
717  * Configure the allowed profiles for @stream.
718  */
719 void
720 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
721 {
722   GstRTSPStreamPrivate *priv;
723
724   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
725
726   priv = stream->priv;
727
728   g_mutex_lock (&priv->lock);
729   priv->profiles = profiles;
730   g_mutex_unlock (&priv->lock);
731 }
732
733 /**
734  * gst_rtsp_stream_get_profiles:
735  * @stream: a #GstRTSPStream
736  *
737  * Get the allowed profiles of @stream.
738  *
739  * Returns: a #GstRTSPProfile
740  */
741 GstRTSPProfile
742 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
743 {
744   GstRTSPStreamPrivate *priv;
745   GstRTSPProfile res;
746
747   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
748
749   priv = stream->priv;
750
751   g_mutex_lock (&priv->lock);
752   res = priv->profiles;
753   g_mutex_unlock (&priv->lock);
754
755   return res;
756 }
757
758 /**
759  * gst_rtsp_stream_set_protocols:
760  * @stream: a #GstRTSPStream
761  * @protocols: the new flags
762  *
763  * Configure the allowed lower transport for @stream.
764  */
765 void
766 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
767     GstRTSPLowerTrans protocols)
768 {
769   GstRTSPStreamPrivate *priv;
770
771   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
772
773   priv = stream->priv;
774
775   g_mutex_lock (&priv->lock);
776   priv->protocols = protocols;
777   g_mutex_unlock (&priv->lock);
778 }
779
780 /**
781  * gst_rtsp_stream_get_protocols:
782  * @stream: a #GstRTSPStream
783  *
784  * Get the allowed protocols of @stream.
785  *
786  * Returns: a #GstRTSPLowerTrans
787  */
788 GstRTSPLowerTrans
789 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
790 {
791   GstRTSPStreamPrivate *priv;
792   GstRTSPLowerTrans res;
793
794   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
795       GST_RTSP_LOWER_TRANS_UNKNOWN);
796
797   priv = stream->priv;
798
799   g_mutex_lock (&priv->lock);
800   res = priv->protocols;
801   g_mutex_unlock (&priv->lock);
802
803   return res;
804 }
805
806 /**
807  * gst_rtsp_stream_set_address_pool:
808  * @stream: a #GstRTSPStream
809  * @pool: (transfer none): a #GstRTSPAddressPool
810  *
811  * configure @pool to be used as the address pool of @stream.
812  */
813 void
814 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
815     GstRTSPAddressPool * pool)
816 {
817   GstRTSPStreamPrivate *priv;
818   GstRTSPAddressPool *old;
819
820   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
821
822   priv = stream->priv;
823
824   GST_LOG_OBJECT (stream, "set address pool %p", pool);
825
826   g_mutex_lock (&priv->lock);
827   if ((old = priv->pool) != pool)
828     priv->pool = pool ? g_object_ref (pool) : NULL;
829   else
830     old = NULL;
831   g_mutex_unlock (&priv->lock);
832
833   if (old)
834     g_object_unref (old);
835 }
836
837 /**
838  * gst_rtsp_stream_get_address_pool:
839  * @stream: a #GstRTSPStream
840  *
841  * Get the #GstRTSPAddressPool used as the address pool of @stream.
842  *
843  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
844  * usage.
845  */
846 GstRTSPAddressPool *
847 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
848 {
849   GstRTSPStreamPrivate *priv;
850   GstRTSPAddressPool *result;
851
852   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
853
854   priv = stream->priv;
855
856   g_mutex_lock (&priv->lock);
857   if ((result = priv->pool))
858     g_object_ref (result);
859   g_mutex_unlock (&priv->lock);
860
861   return result;
862 }
863
864 /**
865  * gst_rtsp_stream_get_multicast_address:
866  * @stream: a #GstRTSPStream
867  * @family: the #GSocketFamily
868  *
869  * Get the multicast address of @stream for @family.
870  *
871  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
872  * or %NULL when no address could be allocated. gst_rtsp_address_free()
873  * after usage.
874  */
875 GstRTSPAddress *
876 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
877     GSocketFamily family)
878 {
879   GstRTSPStreamPrivate *priv;
880   GstRTSPAddress *result;
881   GstRTSPAddress **addrp;
882   GstRTSPAddressFlags flags;
883
884   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
885
886   priv = stream->priv;
887
888   if (family == G_SOCKET_FAMILY_IPV6) {
889     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
890     addrp = &priv->addr_v6;
891   } else {
892     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
893     addrp = &priv->addr_v4;
894   }
895
896   g_mutex_lock (&priv->lock);
897   if (*addrp == NULL) {
898     if (priv->pool == NULL)
899       goto no_pool;
900
901     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
902
903     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
904     if (*addrp == NULL)
905       goto no_address;
906   }
907   result = gst_rtsp_address_copy (*addrp);
908   g_mutex_unlock (&priv->lock);
909
910   return result;
911
912   /* ERRORS */
913 no_pool:
914   {
915     GST_ERROR_OBJECT (stream, "no address pool specified");
916     g_mutex_unlock (&priv->lock);
917     return NULL;
918   }
919 no_address:
920   {
921     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 }
926
927 /**
928  * gst_rtsp_stream_reserve_address:
929  * @stream: a #GstRTSPStream
930  * @address: an address
931  * @port: a port
932  * @n_ports: n_ports
933  * @ttl: a TTL
934  *
935  * Reserve @address and @port as the address and port of @stream.
936  *
937  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
938  * the address could be reserved. gst_rtsp_address_free() after usage.
939  */
940 GstRTSPAddress *
941 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
942     const gchar * address, guint port, guint n_ports, guint ttl)
943 {
944   GstRTSPStreamPrivate *priv;
945   GstRTSPAddress *result;
946   GInetAddress *addr;
947   GSocketFamily family;
948   GstRTSPAddress **addrp;
949
950   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
951   g_return_val_if_fail (address != NULL, NULL);
952   g_return_val_if_fail (port > 0, NULL);
953   g_return_val_if_fail (n_ports > 0, NULL);
954   g_return_val_if_fail (ttl > 0, NULL);
955
956   priv = stream->priv;
957
958   addr = g_inet_address_new_from_string (address);
959   if (!addr) {
960     GST_ERROR ("failed to get inet addr from %s", address);
961     family = G_SOCKET_FAMILY_IPV4;
962   } else {
963     family = g_inet_address_get_family (addr);
964     g_object_unref (addr);
965   }
966
967   if (family == G_SOCKET_FAMILY_IPV6)
968     addrp = &priv->addr_v6;
969   else
970     addrp = &priv->addr_v4;
971
972   g_mutex_lock (&priv->lock);
973   if (*addrp == NULL) {
974     GstRTSPAddressPoolResult res;
975
976     if (priv->pool == NULL)
977       goto no_pool;
978
979     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
980         port, n_ports, ttl, addrp);
981     if (res != GST_RTSP_ADDRESS_POOL_OK)
982       goto no_address;
983   } else {
984     if (strcmp ((*addrp)->address, address) ||
985         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
986         (*addrp)->ttl != ttl)
987       goto different_address;
988   }
989   result = gst_rtsp_address_copy (*addrp);
990   g_mutex_unlock (&priv->lock);
991
992   return result;
993
994   /* ERRORS */
995 no_pool:
996   {
997     GST_ERROR_OBJECT (stream, "no address pool specified");
998     g_mutex_unlock (&priv->lock);
999     return NULL;
1000   }
1001 no_address:
1002   {
1003     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1004         address);
1005     g_mutex_unlock (&priv->lock);
1006     return NULL;
1007   }
1008 different_address:
1009   {
1010     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1011         " reserved", address);
1012     g_mutex_unlock (&priv->lock);
1013     return NULL;
1014   }
1015 }
1016
1017 /* must be called with lock */
1018 static void
1019 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1020     GSocket * rtcp_socket, GSocketFamily family)
1021 {
1022   GstRTSPStreamPrivate *priv = stream->priv;
1023   const gchar *multisink_socket;
1024
1025   if (family == G_SOCKET_FAMILY_IPV6)
1026     multisink_socket = "socket-v6";
1027   else
1028     multisink_socket = "socket";
1029
1030   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1031       NULL);
1032   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1033       NULL);
1034 }
1035
1036 /* must be called with lock */
1037 static gboolean
1038 create_and_configure_udpsinks (GstRTSPStream * stream)
1039 {
1040   GstRTSPStreamPrivate *priv = stream->priv;
1041   GstElement *udpsink0, *udpsink1;
1042
1043   udpsink0 = NULL;
1044   udpsink1 = NULL;
1045
1046   if (priv->udpsink[0])
1047     udpsink0 = priv->udpsink[0];
1048   else
1049     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1050
1051   if (!udpsink0)
1052     goto no_udp_protocol;
1053
1054   if (priv->udpsink[1])
1055     udpsink1 = priv->udpsink[1];
1056   else
1057     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1058
1059   if (!udpsink1)
1060     goto no_udp_protocol;
1061
1062   /* configure sinks */
1063
1064   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1065   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1066
1067   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1068   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1069
1070   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1071
1072   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1073   /* Needs to be async for RECORD streams, otherwise we will never go to
1074    * PLAYING because the sinks will wait for data while the udpsrc can't
1075    * provide data with timestamps in PAUSED. */
1076   if (priv->sinkpad)
1077     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1078   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1079
1080   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1081   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1082
1083   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1085
1086   /* update the dscp qos field in the sinks */
1087   update_dscp_qos (stream);
1088
1089   priv->udpsink[0] = udpsink0;
1090   priv->udpsink[1] = udpsink1;
1091
1092   return TRUE;
1093
1094   /* ERRORS */
1095 no_udp_protocol:
1096   {
1097     return FALSE;
1098   }
1099 }
1100
1101 /* must be called with lock */
1102 static void
1103 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1104     GSocketFamily family)
1105 {
1106   GstRTSPStreamPrivate *priv;
1107   GstPad *pad, *selpad;
1108   guint i;
1109   GstBin *bin;
1110
1111   priv = stream->priv;
1112   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1113
1114   for (i = 0; i < 2; i++) {
1115     if (priv->sinkpad || i == 1) {
1116       if (priv->srcpad) {
1117         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1118          * values. This is only relevant for PLAY pipelines */
1119         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1120         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1121       }
1122       /* add udpsrc */
1123       gst_bin_add (bin, udpsrc_out[i]);
1124
1125       /* and link to the funnel */
1126       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1127       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1128       gst_pad_link (pad, selpad);
1129       gst_object_unref (pad);
1130       gst_object_unref (selpad);
1131     }
1132   }
1133
1134   gst_object_unref (bin);
1135 }
1136
1137 /* must be called with lock */
1138 static gboolean
1139 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1140     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1141     const gchar * address, gint rtpport, gint rtcpport,
1142     GstRTSPLowerTrans transport)
1143 {
1144   GstStateChangeReturn ret;
1145
1146   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1147   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1148
1149   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1150     goto error;
1151
1152   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1153     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1154     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1155     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1156     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1157     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1158     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1159   }
1160
1161   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1162   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1163
1164   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1165   if (ret == GST_STATE_CHANGE_FAILURE)
1166     goto error;
1167   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1168   if (ret == GST_STATE_CHANGE_FAILURE)
1169     goto error;
1170
1171   return TRUE;
1172   return TRUE;
1173
1174   /* ERRORS */
1175 error:
1176   {
1177     if (udpsrc_out[0])
1178       gst_object_unref (udpsrc_out[0]);
1179     if (udpsrc_out[1])
1180       gst_object_unref (udpsrc_out[1]);
1181     return FALSE;
1182   }
1183 }
1184
1185 static gboolean
1186 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1187     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1188     GstRTSPTransport *ct, GstRTSPAddress ** server_addr_out,
1189     gboolean use_client_settings)
1190 {
1191   GstRTSPStreamPrivate *priv = stream->priv;
1192   GSocket *rtp_socket = NULL;
1193   GSocket *rtcp_socket;
1194   gint tmp_rtp, tmp_rtcp;
1195   guint count;
1196   gint rtpport, rtcpport;
1197   GList *rejected_addresses = NULL;
1198   GstRTSPAddress *addr = NULL;
1199   GInetAddress *inetaddr = NULL;
1200   gchar *addr_str;
1201   GSocketAddress *rtp_sockaddr = NULL;
1202   GSocketAddress *rtcp_sockaddr = NULL;
1203   GstRTSPAddressPool * pool;
1204   GstRTSPLowerTrans transport;
1205
1206   pool = priv->pool;
1207   count = 0;
1208   transport = ct->lower_transport;
1209
1210   /* Start with random port */
1211   tmp_rtp = 0;
1212
1213   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1214       G_SOCKET_PROTOCOL_UDP, NULL);
1215   if (!rtcp_socket)
1216     goto no_udp_protocol;
1217
1218   if (*server_addr_out)
1219     gst_rtsp_address_free (*server_addr_out);
1220
1221   /* try to allocate 2 UDP ports, the RTP port should be an even
1222    * number and the RTCP port should be the next (uneven) port */
1223 again:
1224
1225   if (rtp_socket == NULL) {
1226     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1227         G_SOCKET_PROTOCOL_UDP, NULL);
1228     if (!rtp_socket)
1229       goto no_udp_protocol;
1230   }
1231
1232   if (pool) {
1233     GstRTSPAddressFlags flags;
1234
1235     if (transport == GST_RTSP_LOWER_TRANS_UDP &&
1236         gst_rtsp_address_pool_has_unicast_addresses (pool))
1237       flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1238     else if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)
1239       flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
1240     else
1241       goto no_ports;
1242
1243     if (addr)
1244       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1245
1246     if (family == G_SOCKET_FAMILY_IPV6)
1247       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1248     else
1249       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1250
1251     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST && use_client_settings)
1252       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1253           ct->port.min, 2, ct->ttl, &addr);
1254     else
1255       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1256
1257     if (addr == NULL)
1258       goto no_ports;
1259
1260     tmp_rtp = addr->port;
1261
1262     g_clear_object (&inetaddr);
1263     inetaddr = g_inet_address_new_from_string (addr->address);
1264
1265     /* Don't bind to multicast addresses, this does not work on
1266      * Windows. You're supposed to bind to ANY and then join the
1267      * multicast group, which udpsrc/sink does for us already.
1268      */
1269     if (g_inet_address_get_is_multicast (inetaddr)) {
1270       g_object_unref (inetaddr);
1271       inetaddr = g_inet_address_new_any (family);
1272     }
1273   } else {
1274     if (tmp_rtp != 0) {
1275       tmp_rtp += 2;
1276       if (++count > 20)
1277         goto no_ports;
1278     }
1279
1280     if (inetaddr == NULL)
1281       inetaddr = g_inet_address_new_any (family);
1282   }
1283
1284   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1285   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1286     g_object_unref (rtp_sockaddr);
1287     goto again;
1288   }
1289   g_object_unref (rtp_sockaddr);
1290
1291   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1292   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1293     g_clear_object (&rtp_sockaddr);
1294     goto socket_error;
1295   }
1296
1297   tmp_rtp =
1298       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1299   g_object_unref (rtp_sockaddr);
1300
1301   /* check if port is even */
1302   if ((tmp_rtp & 1) != 0) {
1303     /* port not even, close and allocate another */
1304     tmp_rtp++;
1305     g_clear_object (&rtp_socket);
1306     goto again;
1307   }
1308
1309   /* set port */
1310   tmp_rtcp = tmp_rtp + 1;
1311
1312   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1313   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1314     g_object_unref (rtcp_sockaddr);
1315     g_clear_object (&rtp_socket);
1316     goto again;
1317   }
1318   g_object_unref (rtcp_sockaddr);
1319
1320   if (addr == NULL)
1321     addr_str = g_inet_address_to_string (inetaddr);
1322   else
1323     addr_str = addr->address;
1324   g_clear_object (&inetaddr);
1325
1326   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1327         rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, transport)) {
1328     if (addr == NULL)
1329       g_free (addr_str);
1330     goto no_udp_protocol;
1331   }
1332
1333   if (addr == NULL)
1334     g_free (addr_str);
1335
1336   play_udpsources_one_family (stream, udpsrc_out, family);
1337
1338   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1339   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1340
1341   /* this should not happen... */
1342   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1343     goto port_error;
1344
1345   /* set RTP and RTCP sockets */
1346   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1347
1348   server_port_out->min = rtpport;
1349   server_port_out->max = rtcpport;
1350
1351   *server_addr_out = addr;
1352   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1353
1354   g_object_unref (rtp_socket);
1355   g_object_unref (rtcp_socket);
1356
1357   return TRUE;
1358
1359   /* ERRORS */
1360 no_udp_protocol:
1361   {
1362     goto cleanup;
1363   }
1364 no_ports:
1365   {
1366     goto cleanup;
1367   }
1368 port_error:
1369   {
1370     goto cleanup;
1371   }
1372 socket_error:
1373   {
1374     goto cleanup;
1375   }
1376 cleanup:
1377   {
1378     if (inetaddr)
1379       g_object_unref (inetaddr);
1380     g_list_free_full (rejected_addresses,
1381         (GDestroyNotify) gst_rtsp_address_free);
1382     if (addr)
1383       gst_rtsp_address_free (addr);
1384     if (rtp_socket)
1385       g_object_unref (rtp_socket);
1386     if (rtcp_socket)
1387       g_object_unref (rtcp_socket);
1388     return FALSE;
1389   }
1390 }
1391
1392 /**
1393  * gst_rtsp_stream_allocate_udp_sockets:
1394  * @stream: a #GstRTSPStream
1395  * @family: protocol family
1396  * @transport_method: transport method
1397  *
1398  * Allocates RTP and RTCP ports.
1399  *
1400  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1401  */
1402 gboolean
1403 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1404     GSocketFamily family, GstRTSPTransport *ct, gboolean use_client_settings)
1405 {
1406   GstRTSPStreamPrivate *priv;
1407   gboolean result = FALSE;
1408   GstRTSPLowerTrans transport = ct->lower_transport;
1409
1410   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1411   priv = stream->priv;
1412   g_return_val_if_fail (priv->is_joined, FALSE);
1413
1414   g_mutex_lock (&priv->lock);
1415
1416   if (family == G_SOCKET_FAMILY_IPV4) {
1417     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1418       if (priv->have_ipv4_mcast)
1419         goto done;
1420       priv->have_ipv4_mcast =
1421         alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_mcast_v4,
1422             &priv->server_port_v4, ct, &priv->addr_v4, use_client_settings);
1423     } else {
1424       priv->have_ipv4 =
1425         alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1426             &priv->server_port_v4, ct, &priv->server_addr_v4, use_client_settings);
1427     }
1428   } else {
1429     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1430       if (priv->have_ipv6_mcast)
1431         goto done;
1432       priv->have_ipv6_mcast =
1433         alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_mcast_v6,
1434             &priv->server_port_v6, ct, &priv->addr_v6, use_client_settings);
1435     } else {
1436       if (priv->have_ipv6)
1437         goto done;
1438       priv->have_ipv6 =
1439         alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1440             &priv->server_port_v6, ct, &priv->server_addr_v6, use_client_settings);
1441     }
1442   }
1443
1444 done:
1445   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1446     priv->have_ipv6_mcast;
1447
1448   g_mutex_unlock (&priv->lock);
1449
1450   return result;
1451 }
1452
1453 /**
1454  * gst_rtsp_stream_set_client_side:
1455  * @stream: a #GstRTSPStream
1456  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1457  * an RTSP connection.
1458  *
1459  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1460  * streams to an RTSP server via RECORD. This has the practical effect
1461  * of changing which UDP port numbers are used when setting up the local
1462  * side of the stream sending to be either the 'server' or 'client' pair
1463  * of a configured UDP transport.
1464  */
1465 void
1466 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1467 {
1468   GstRTSPStreamPrivate *priv;
1469
1470   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1471   priv = stream->priv;
1472   g_mutex_lock (&priv->lock);
1473   priv->client_side = client_side;
1474   g_mutex_unlock (&priv->lock);
1475 }
1476
1477 /**
1478  * gst_rtsp_stream_set_client_side:
1479  * @stream: a #GstRTSPStream
1480  *
1481  * See gst_rtsp_stream_set_client_side()
1482  *
1483  * Returns: TRUE if this #GstRTSPStream is client-side.
1484  */
1485 gboolean
1486 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1487 {
1488   GstRTSPStreamPrivate *priv;
1489   gboolean ret;
1490
1491   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1492
1493   priv = stream->priv;
1494   g_mutex_lock (&priv->lock);
1495   ret = priv->client_side;
1496   g_mutex_unlock (&priv->lock);
1497
1498   return ret;
1499 }
1500
1501 /**
1502  * gst_rtsp_stream_get_server_port:
1503  * @stream: a #GstRTSPStream
1504  * @server_port: (out): result server port
1505  * @family: the port family to get
1506  *
1507  * Fill @server_port with the port pair used by the server. This function can
1508  * only be called when @stream has been joined.
1509  */
1510 void
1511 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1512     GstRTSPRange * server_port, GSocketFamily family)
1513 {
1514   GstRTSPStreamPrivate *priv;
1515
1516   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1517   priv = stream->priv;
1518   g_return_if_fail (priv->is_joined);
1519
1520   g_mutex_lock (&priv->lock);
1521   if (family == G_SOCKET_FAMILY_IPV4) {
1522     if (server_port)
1523       *server_port = priv->server_port_v4;
1524   } else {
1525     if (server_port)
1526       *server_port = priv->server_port_v6;
1527   }
1528   g_mutex_unlock (&priv->lock);
1529 }
1530
1531 /**
1532  * gst_rtsp_stream_get_rtpsession:
1533  * @stream: a #GstRTSPStream
1534  *
1535  * Get the RTP session of this stream.
1536  *
1537  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1538  */
1539 GObject *
1540 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1541 {
1542   GstRTSPStreamPrivate *priv;
1543   GObject *session;
1544
1545   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1546
1547   priv = stream->priv;
1548
1549   g_mutex_lock (&priv->lock);
1550   if ((session = priv->session))
1551     g_object_ref (session);
1552   g_mutex_unlock (&priv->lock);
1553
1554   return session;
1555 }
1556
1557 /**
1558  * gst_rtsp_stream_get_ssrc:
1559  * @stream: a #GstRTSPStream
1560  * @ssrc: (out): result ssrc
1561  *
1562  * Get the SSRC used by the RTP session of this stream. This function can only
1563  * be called when @stream has been joined.
1564  */
1565 void
1566 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1567 {
1568   GstRTSPStreamPrivate *priv;
1569
1570   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1571   priv = stream->priv;
1572   g_return_if_fail (priv->is_joined);
1573
1574   g_mutex_lock (&priv->lock);
1575   if (ssrc && priv->session)
1576     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1577   g_mutex_unlock (&priv->lock);
1578 }
1579
1580 /**
1581  * gst_rtsp_stream_set_retransmission_time:
1582  * @stream: a #GstRTSPStream
1583  * @time: a #GstClockTime
1584  *
1585  * Set the amount of time to store retransmission packets.
1586  */
1587 void
1588 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1589     GstClockTime time)
1590 {
1591   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1592
1593   g_mutex_lock (&stream->priv->lock);
1594   stream->priv->rtx_time = time;
1595   if (stream->priv->rtxsend)
1596     g_object_set (stream->priv->rtxsend, "max-size-time",
1597         GST_TIME_AS_MSECONDS (time), NULL);
1598   g_mutex_unlock (&stream->priv->lock);
1599 }
1600
1601 /**
1602  * gst_rtsp_stream_get_retransmission_time:
1603  * @stream: a #GstRTSPStream
1604  *
1605  * Get the amount of time to store retransmission data.
1606  *
1607  * Returns: the amount of time to store retransmission data.
1608  */
1609 GstClockTime
1610 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1611 {
1612   GstClockTime ret;
1613
1614   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1615
1616   g_mutex_lock (&stream->priv->lock);
1617   ret = stream->priv->rtx_time;
1618   g_mutex_unlock (&stream->priv->lock);
1619
1620   return ret;
1621 }
1622
1623 /**
1624  * gst_rtsp_stream_set_retransmission_pt:
1625  * @stream: a #GstRTSPStream
1626  * @rtx_pt: a #guint
1627  *
1628  * Set the payload type (pt) for retransmission of this stream.
1629  */
1630 void
1631 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1632 {
1633   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1634
1635   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1636
1637   g_mutex_lock (&stream->priv->lock);
1638   stream->priv->rtx_pt = rtx_pt;
1639   if (stream->priv->rtxsend) {
1640     guint pt = gst_rtsp_stream_get_pt (stream);
1641     gchar *pt_s = g_strdup_printf ("%d", pt);
1642     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1643         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1644     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1645     g_free (pt_s);
1646     gst_structure_free (rtx_pt_map);
1647   }
1648   g_mutex_unlock (&stream->priv->lock);
1649 }
1650
1651 /**
1652  * gst_rtsp_stream_get_retransmission_pt:
1653  * @stream: a #GstRTSPStream
1654  *
1655  * Get the payload-type used for retransmission of this stream
1656  *
1657  * Returns: The retransmission PT.
1658  */
1659 guint
1660 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1661 {
1662   guint rtx_pt;
1663
1664   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1665
1666   g_mutex_lock (&stream->priv->lock);
1667   rtx_pt = stream->priv->rtx_pt;
1668   g_mutex_unlock (&stream->priv->lock);
1669
1670   return rtx_pt;
1671 }
1672
1673 /**
1674  * gst_rtsp_stream_set_buffer_size:
1675  * @stream: a #GstRTSPStream
1676  * @size: the buffer size
1677  *
1678  * Set the size of the UDP transmission buffer (in bytes)
1679  * Needs to be set before the stream is joined to a bin.
1680  *
1681  * Since: 1.6
1682  */
1683 void
1684 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1685 {
1686   g_mutex_lock (&stream->priv->lock);
1687   stream->priv->buffer_size = size;
1688   g_mutex_unlock (&stream->priv->lock);
1689 }
1690
1691 /**
1692  * gst_rtsp_stream_get_buffer_size:
1693  * @stream: a #GstRTSPStream
1694  *
1695  * Get the size of the UDP transmission buffer (in bytes)
1696  *
1697  * Returns: the size of the UDP TX buffer
1698  *
1699  * Since: 1.6
1700  */
1701 guint
1702 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1703 {
1704   guint buffer_size;
1705
1706   g_mutex_lock (&stream->priv->lock);
1707   buffer_size = stream->priv->buffer_size;
1708   g_mutex_unlock (&stream->priv->lock);
1709
1710   return buffer_size;
1711 }
1712
1713 /* executed from streaming thread */
1714 static void
1715 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1716 {
1717   GstRTSPStreamPrivate *priv = stream->priv;
1718   GstCaps *newcaps, *oldcaps;
1719
1720   newcaps = gst_pad_get_current_caps (pad);
1721
1722   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1723       newcaps);
1724
1725   g_mutex_lock (&priv->lock);
1726   oldcaps = priv->caps;
1727   priv->caps = newcaps;
1728   g_mutex_unlock (&priv->lock);
1729
1730   if (oldcaps)
1731     gst_caps_unref (oldcaps);
1732 }
1733
1734 static void
1735 dump_structure (const GstStructure * s)
1736 {
1737   gchar *sstr;
1738
1739   sstr = gst_structure_to_string (s);
1740   GST_INFO ("structure: %s", sstr);
1741   g_free (sstr);
1742 }
1743
1744 static GstRTSPStreamTransport *
1745 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1746 {
1747   GstRTSPStreamPrivate *priv = stream->priv;
1748   GList *walk;
1749   GstRTSPStreamTransport *result = NULL;
1750   const gchar *tmp;
1751   gchar *dest;
1752   guint port;
1753
1754   if (rtcp_from == NULL)
1755     return NULL;
1756
1757   tmp = g_strrstr (rtcp_from, ":");
1758   if (tmp == NULL)
1759     return NULL;
1760
1761   port = atoi (tmp + 1);
1762   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1763
1764   g_mutex_lock (&priv->lock);
1765   GST_INFO ("finding %s:%d in %d transports", dest, port,
1766       g_list_length (priv->transports));
1767
1768   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1769     GstRTSPStreamTransport *trans = walk->data;
1770     const GstRTSPTransport *tr;
1771     gint min, max;
1772
1773     tr = gst_rtsp_stream_transport_get_transport (trans);
1774
1775     if (priv->client_side) {
1776       /* In client side mode the 'destination' is the RTSP server, so send
1777        * to those ports */
1778       min = tr->server_port.min;
1779       max = tr->server_port.max;
1780     } else {
1781       min = tr->client_port.min;
1782       max = tr->client_port.max;
1783     }
1784
1785     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1786       result = trans;
1787       break;
1788     }
1789   }
1790   if (result)
1791     g_object_ref (result);
1792   g_mutex_unlock (&priv->lock);
1793
1794   g_free (dest);
1795
1796   return result;
1797 }
1798
1799 static GstRTSPStreamTransport *
1800 check_transport (GObject * source, GstRTSPStream * stream)
1801 {
1802   GstStructure *stats;
1803   GstRTSPStreamTransport *trans;
1804
1805   /* see if we have a stream to match with the origin of the RTCP packet */
1806   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1807   if (trans == NULL) {
1808     g_object_get (source, "stats", &stats, NULL);
1809     if (stats) {
1810       const gchar *rtcp_from;
1811
1812       dump_structure (stats);
1813
1814       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1815       if ((trans = find_transport (stream, rtcp_from))) {
1816         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1817             source);
1818         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1819             g_object_unref);
1820       }
1821       gst_structure_free (stats);
1822     }
1823   }
1824   return trans;
1825 }
1826
1827
1828 static void
1829 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1830 {
1831   GstRTSPStreamTransport *trans;
1832
1833   GST_INFO ("%p: new source %p", stream, source);
1834
1835   trans = check_transport (source, stream);
1836
1837   if (trans)
1838     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1839 }
1840
1841 static void
1842 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1843 {
1844   GST_INFO ("%p: new SDES %p", stream, source);
1845 }
1846
1847 static void
1848 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1849 {
1850   GstRTSPStreamTransport *trans;
1851
1852   trans = check_transport (source, stream);
1853
1854   if (trans) {
1855     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1856     gst_rtsp_stream_transport_keep_alive (trans);
1857   }
1858 #ifdef DUMP_STATS
1859   {
1860     GstStructure *stats;
1861     g_object_get (source, "stats", &stats, NULL);
1862     if (stats) {
1863       dump_structure (stats);
1864       gst_structure_free (stats);
1865     }
1866   }
1867 #endif
1868 }
1869
1870 static void
1871 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1872 {
1873   GST_INFO ("%p: source %p bye", stream, source);
1874 }
1875
1876 static void
1877 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1878 {
1879   GstRTSPStreamTransport *trans;
1880
1881   GST_INFO ("%p: source %p bye timeout", stream, source);
1882
1883   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1884     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1885     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1886   }
1887 }
1888
1889 static void
1890 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1891 {
1892   GstRTSPStreamTransport *trans;
1893
1894   GST_INFO ("%p: source %p timeout", stream, source);
1895
1896   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1897     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1898     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1899   }
1900 }
1901
1902 static void
1903 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1904 {
1905   GST_INFO ("%p: new sender source %p", stream, source);
1906 #ifndef DUMP_STATS
1907   {
1908     GstStructure *stats;
1909     g_object_get (source, "stats", &stats, NULL);
1910     if (stats) {
1911       dump_structure (stats);
1912       gst_structure_free (stats);
1913     }
1914   }
1915 #endif
1916 }
1917
1918 static void
1919 on_sender_ssrc_active (GObject * session, GObject * source,
1920     GstRTSPStream * stream)
1921 {
1922 #ifndef DUMP_STATS
1923   {
1924     GstStructure *stats;
1925     g_object_get (source, "stats", &stats, NULL);
1926     if (stats) {
1927       dump_structure (stats);
1928       gst_structure_free (stats);
1929     }
1930   }
1931 #endif
1932 }
1933
1934 static void
1935 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1936 {
1937   if (is_rtp) {
1938     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1939     g_list_free (priv->tr_cache_rtp);
1940     priv->tr_cache_rtp = NULL;
1941   } else {
1942     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1943     g_list_free (priv->tr_cache_rtcp);
1944     priv->tr_cache_rtcp = NULL;
1945   }
1946 }
1947
1948 static GstFlowReturn
1949 handle_new_sample (GstAppSink * sink, gpointer user_data)
1950 {
1951   GstRTSPStreamPrivate *priv;
1952   GList *walk;
1953   GstSample *sample;
1954   GstBuffer *buffer;
1955   GstRTSPStream *stream;
1956   gboolean is_rtp;
1957
1958   sample = gst_app_sink_pull_sample (sink);
1959   if (!sample)
1960     return GST_FLOW_OK;
1961
1962   stream = (GstRTSPStream *) user_data;
1963   priv = stream->priv;
1964   buffer = gst_sample_get_buffer (sample);
1965
1966   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1967
1968   g_mutex_lock (&priv->lock);
1969   if (is_rtp) {
1970     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1971       clear_tr_cache (priv, is_rtp);
1972       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1973         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1974         priv->tr_cache_rtp =
1975             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1976       }
1977       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1978     }
1979   } else {
1980     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1981       clear_tr_cache (priv, is_rtp);
1982       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1983         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1984         priv->tr_cache_rtcp =
1985             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1986       }
1987       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1988     }
1989   }
1990   g_mutex_unlock (&priv->lock);
1991
1992   if (is_rtp) {
1993     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1994       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1995       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1996     }
1997   } else {
1998     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1999       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2000       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2001     }
2002   }
2003   gst_sample_unref (sample);
2004
2005   return GST_FLOW_OK;
2006 }
2007
2008 static GstAppSinkCallbacks sink_cb = {
2009   NULL,                         /* not interested in EOS */
2010   NULL,                         /* not interested in preroll samples */
2011   handle_new_sample,
2012 };
2013
2014 static GstElement *
2015 get_rtp_encoder (GstRTSPStream * stream, guint session)
2016 {
2017   GstRTSPStreamPrivate *priv = stream->priv;
2018
2019   if (priv->srtpenc == NULL) {
2020     gchar *name;
2021
2022     name = g_strdup_printf ("srtpenc_%u", session);
2023     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2024     g_free (name);
2025
2026     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2027   }
2028   return gst_object_ref (priv->srtpenc);
2029 }
2030
2031 static GstElement *
2032 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2033 {
2034   GstRTSPStreamPrivate *priv = stream->priv;
2035   GstElement *oldenc, *enc;
2036   GstPad *pad;
2037   gchar *name;
2038
2039   if (priv->idx != session)
2040     return NULL;
2041
2042   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2043
2044   oldenc = priv->srtpenc;
2045   enc = get_rtp_encoder (stream, session);
2046   name = g_strdup_printf ("rtp_sink_%d", session);
2047   pad = gst_element_get_request_pad (enc, name);
2048   g_free (name);
2049   gst_object_unref (pad);
2050
2051   if (oldenc == NULL)
2052     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2053         enc);
2054
2055   return enc;
2056 }
2057
2058 static GstElement *
2059 request_rtcp_encoder (GstElement * rtpbin, guint session,
2060     GstRTSPStream * stream)
2061 {
2062   GstRTSPStreamPrivate *priv = stream->priv;
2063   GstElement *oldenc, *enc;
2064   GstPad *pad;
2065   gchar *name;
2066
2067   if (priv->idx != session)
2068     return NULL;
2069
2070   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2071
2072   oldenc = priv->srtpenc;
2073   enc = get_rtp_encoder (stream, session);
2074   name = g_strdup_printf ("rtcp_sink_%d", session);
2075   pad = gst_element_get_request_pad (enc, name);
2076   g_free (name);
2077   gst_object_unref (pad);
2078
2079   if (oldenc == NULL)
2080     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2081         enc);
2082
2083   return enc;
2084 }
2085
2086 static GstCaps *
2087 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2088 {
2089   GstRTSPStreamPrivate *priv = stream->priv;
2090   GstCaps *caps;
2091
2092   GST_DEBUG ("request key %08x", ssrc);
2093
2094   g_mutex_lock (&priv->lock);
2095   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2096     gst_caps_ref (caps);
2097   g_mutex_unlock (&priv->lock);
2098
2099   return caps;
2100 }
2101
2102 static GstElement *
2103 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2104     GstRTSPStream * stream)
2105 {
2106   GstRTSPStreamPrivate *priv = stream->priv;
2107
2108   if (priv->idx != session)
2109     return NULL;
2110
2111   if (priv->srtpdec == NULL) {
2112     gchar *name;
2113
2114     name = g_strdup_printf ("srtpdec_%u", session);
2115     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2116     g_free (name);
2117
2118     g_signal_connect (priv->srtpdec, "request-key",
2119         (GCallback) request_key, stream);
2120   }
2121   return gst_object_ref (priv->srtpdec);
2122 }
2123
2124 /**
2125  * gst_rtsp_stream_request_aux_sender:
2126  * @stream: a #GstRTSPStream
2127  * @sessid: the session id
2128  *
2129  * Creating a rtxsend bin
2130  *
2131  * Returns: (transfer full): a #GstElement.
2132  *
2133  * Since: 1.6
2134  */
2135 GstElement *
2136 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2137 {
2138   GstElement *bin;
2139   GstPad *pad;
2140   GstStructure *pt_map;
2141   gchar *name;
2142   guint pt, rtx_pt;
2143   gchar *pt_s;
2144
2145   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2146
2147   pt = gst_rtsp_stream_get_pt (stream);
2148   pt_s = g_strdup_printf ("%u", pt);
2149   rtx_pt = stream->priv->rtx_pt;
2150
2151   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2152
2153   bin = gst_bin_new (NULL);
2154   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2155   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2156       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2157   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2158       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2159   g_free (pt_s);
2160   gst_structure_free (pt_map);
2161   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2162
2163   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2164   name = g_strdup_printf ("src_%u", sessid);
2165   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2166   g_free (name);
2167   gst_object_unref (pad);
2168
2169   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2170   name = g_strdup_printf ("sink_%u", sessid);
2171   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2172   g_free (name);
2173   gst_object_unref (pad);
2174
2175   return bin;
2176 }
2177
2178 /**
2179  * gst_rtsp_stream_set_pt_map:
2180  * @stream: a #GstRTSPStream
2181  * @pt: the pt
2182  * @caps: a #GstCaps
2183  *
2184  * Configure a pt map between @pt and @caps.
2185  */
2186 void
2187 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2188 {
2189   GstRTSPStreamPrivate *priv = stream->priv;
2190
2191   g_mutex_lock (&priv->lock);
2192   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2193   g_mutex_unlock (&priv->lock);
2194 }
2195
2196 static GstCaps *
2197 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2198     GstRTSPStream * stream)
2199 {
2200   GstRTSPStreamPrivate *priv = stream->priv;
2201   GstCaps *caps = NULL;
2202
2203   g_mutex_lock (&priv->lock);
2204
2205   if (priv->idx == session) {
2206     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2207     if (caps) {
2208       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2209       gst_caps_ref (caps);
2210     } else {
2211       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2212     }
2213   }
2214
2215   g_mutex_unlock (&priv->lock);
2216
2217   return caps;
2218 }
2219
2220 static void
2221 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2222 {
2223   GstRTSPStreamPrivate *priv = stream->priv;
2224   gchar *name;
2225   GstPadLinkReturn ret;
2226   guint sessid;
2227
2228   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2229       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2230
2231   name = gst_pad_get_name (pad);
2232   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2233     g_free (name);
2234     return;
2235   }
2236   g_free (name);
2237
2238   if (priv->idx != sessid)
2239     return;
2240
2241   if (gst_pad_is_linked (priv->sinkpad)) {
2242     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2243         GST_DEBUG_PAD_NAME (priv->sinkpad));
2244     return;
2245   }
2246
2247   /* link the RTP pad to the session manager, it should not really fail unless
2248    * this is not really an RTP pad */
2249   ret = gst_pad_link (pad, priv->sinkpad);
2250   if (ret != GST_PAD_LINK_OK)
2251     goto link_failed;
2252   priv->recv_rtp_src = gst_object_ref (pad);
2253
2254   return;
2255
2256 /* ERRORS */
2257 link_failed:
2258   {
2259     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2260         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2261   }
2262 }
2263
2264 static void
2265 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2266     GstRTSPStream * stream)
2267 {
2268   /* TODO: What to do here other than this? */
2269   GST_DEBUG ("Stream %p: Got EOS", stream);
2270   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2271 }
2272
2273 /* must be called with lock */
2274 static gboolean
2275 create_sender_part (GstRTSPStream * stream, GstBin * bin,
2276     GstState state)
2277 {
2278   GstRTSPStreamPrivate *priv;
2279   GstPad *pad, *sinkpad = NULL;
2280   gboolean is_tcp = FALSE, is_udp = FALSE;
2281   gint i;
2282
2283   priv = stream->priv;
2284
2285   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2286   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2287       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2288
2289   if (is_udp && !create_and_configure_udpsinks (stream))
2290     goto no_udp_protocol;
2291
2292   for (i = 0; i < 2; i++) {
2293     GstPad *teepad, *queuepad;
2294     /* For the sender we create this bit of pipeline for both
2295      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2296      * we need to add a queue before appsink and udpsink to make
2297      * the pipeline not block. For the TCP case, we want to pump
2298      * client as fast as possible anyway. This pipeline is used
2299      * when both TCP and UDP are present.
2300      *
2301      * .--------.      .-----.    .---------.    .---------.
2302      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2303      * |       send->sink   src->sink      src->sink       |
2304      * '--------'      |     |    '---------'    '---------'
2305      *                 |     |    .---------.    .---------.
2306      *                 |     |    |  queue  |    | appsink |
2307      *                 |    src->sink      src->sink       |
2308      *                 '-----'    '---------'    '---------'
2309      *
2310      * When only UDP or only TCP is allowed, we skip the tee and queue
2311      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2312      * the session.
2313      */
2314     /* Only link the RTP send src if we're going to send RTP, link
2315      * the RTCP send src always */
2316     if (priv->srcpad || i == 1) {
2317       if (is_udp) {
2318         /* add udpsink */
2319         gst_bin_add (bin, priv->udpsink[i]);
2320         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2321       }
2322
2323       if (is_tcp) {
2324         /* make appsink */
2325         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2326         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2327         gst_bin_add (bin, priv->appsink[i]);
2328         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2329             &sink_cb, stream, NULL);
2330       }
2331
2332       if (is_udp && is_tcp) {
2333         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2334
2335         /* make tee for RTP/RTCP */
2336         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2337         gst_bin_add (bin, priv->tee[i]);
2338
2339         /* and link to rtpbin send pad */
2340         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2341         gst_pad_link (priv->send_src[i], pad);
2342         gst_object_unref (pad);
2343
2344         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2345         g_object_set (priv->udpqueue[i], "max-size-buffers",
2346             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2347             NULL);
2348         gst_bin_add (bin, priv->udpqueue[i]);
2349         /* link tee to udpqueue */
2350         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2351         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2352         gst_pad_link (teepad, pad);
2353         gst_object_unref (pad);
2354         gst_object_unref (teepad);
2355
2356         /* link udpqueue to udpsink */
2357         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2358         gst_pad_link (queuepad, sinkpad);
2359         gst_object_unref (queuepad);
2360         gst_object_unref (sinkpad);
2361
2362         /* make appqueue */
2363         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2364         g_object_set (priv->appqueue[i], "max-size-buffers",
2365             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2366             NULL);
2367         gst_bin_add (bin, priv->appqueue[i]);
2368         /* and link tee to appqueue */
2369         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2370         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2371         gst_pad_link (teepad, pad);
2372         gst_object_unref (pad);
2373         gst_object_unref (teepad);
2374
2375         /* and link appqueue to appsink */
2376         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2377         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2378         gst_pad_link (queuepad, pad);
2379         gst_object_unref (pad);
2380         gst_object_unref (queuepad);
2381       } else if (is_tcp) {
2382         /* only appsink needed, link it to the session */
2383         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2384         gst_pad_link (priv->send_src[i], pad);
2385         gst_object_unref (pad);
2386
2387         /* when its only TCP, we need to set sync and preroll to FALSE
2388          * for the sink to avoid deadlock. And this is only needed for
2389          * sink used for RTCP data, not the RTP data. */
2390         if (i == 1)
2391           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2392       } else {
2393         /* else only udpsink needed, link it to the session */
2394         gst_pad_link (priv->send_src[i], sinkpad);
2395         gst_object_unref (sinkpad);
2396       }
2397     }
2398
2399     /* check if we need to set to a special state */
2400     if (state != GST_STATE_NULL) {
2401       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2402         gst_element_set_state (priv->udpsink[i], state);
2403       if (priv->appsink[i] && (priv->srcpad || i == 1))
2404         gst_element_set_state (priv->appsink[i], state);
2405       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2406         gst_element_set_state (priv->appqueue[i], state);
2407       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2408         gst_element_set_state (priv->udpqueue[i], state);
2409       if (priv->tee[i] && (priv->srcpad || i == 1))
2410         gst_element_set_state (priv->tee[i], state);
2411     }
2412   }
2413
2414   return TRUE;
2415
2416   /* ERRORS */
2417 no_udp_protocol:
2418   {
2419     return FALSE;
2420   }
2421 }
2422
2423 /* must be called with lock */
2424 static void
2425 create_receiver_part (GstRTSPStream * stream, GstBin * bin,
2426     GstState state)
2427 {
2428   GstRTSPStreamPrivate *priv;
2429   GstPad *pad, *selpad;
2430   gboolean is_tcp;
2431   gint i;
2432
2433   priv = stream->priv;
2434
2435   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2436
2437   for (i = 0; i < 2; i++) {
2438     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2439      * RTCP sink always */
2440     if (priv->sinkpad || i == 1) {
2441       /* For the receiver we create this bit of pipeline for both
2442        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2443        * and it is all funneled into the rtpbin receive pad.
2444        *
2445        * .--------.     .--------.    .--------.
2446        * | udpsrc |     | funnel |    | rtpbin |
2447        * |       src->sink      src->sink      |
2448        * '--------'     |        |    '--------'
2449        * .--------.     |        |
2450        * | appsrc |     |        |
2451        * |       src->sink       |
2452        * '--------'     '--------'
2453        */
2454       /* make funnel for the RTP/RTCP receivers */
2455       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2456       gst_bin_add (bin, priv->funnel[i]);
2457
2458       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2459       gst_pad_link (pad, priv->recv_sink[i]);
2460       gst_object_unref (pad);
2461
2462       if (priv->udpsrc_v4[i]) {
2463         if (priv->srcpad) {
2464           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2465            * values. This is only relevant for PLAY pipelines */
2466           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2467           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2468         }
2469         /* add udpsrc */
2470         gst_bin_add (bin, priv->udpsrc_v4[i]);
2471
2472         /* and link to the funnel v4 */
2473         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2474         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2475         gst_pad_link (pad, selpad);
2476         gst_object_unref (pad);
2477         gst_object_unref (selpad);
2478       }
2479
2480       if (priv->udpsrc_v6[i]) {
2481         if (priv->srcpad) {
2482           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2483           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2484         }
2485         gst_bin_add (bin, priv->udpsrc_v6[i]);
2486
2487         /* and link to the funnel v6 */
2488         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2489         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2490         gst_pad_link (pad, selpad);
2491         gst_object_unref (pad);
2492         gst_object_unref (selpad);
2493       }
2494
2495       if (is_tcp) {
2496         /* make and add appsrc */
2497         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2498         priv->appsrc_base_time[i] = -1;
2499         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2500         gst_bin_add (bin, priv->appsrc[i]);
2501         /* and link to the funnel */
2502         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2503         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2504         gst_pad_link (pad, selpad);
2505         gst_object_unref (pad);
2506         gst_object_unref (selpad);
2507       }
2508     }
2509
2510     /* check if we need to set to a special state */
2511     if (state != GST_STATE_NULL) {
2512       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2513         gst_element_set_state (priv->funnel[i], state);
2514       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2515         gst_element_set_state (priv->appsrc[i], state);
2516     }
2517   }
2518 }
2519
2520 /**
2521  * gst_rtsp_stream_join_bin:
2522  * @stream: a #GstRTSPStream
2523  * @bin: (transfer none): a #GstBin to join
2524  * @rtpbin: (transfer none): a rtpbin element in @bin
2525  * @state: the target state of the new elements
2526  *
2527  * Join the #GstBin @bin that contains the element @rtpbin.
2528  *
2529  * @stream will link to @rtpbin, which must be inside @bin. The elements
2530  * added to @bin will be set to the state given in @state.
2531  *
2532  * Returns: %TRUE on success.
2533  */
2534 gboolean
2535 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2536     GstElement * rtpbin, GstState state)
2537 {
2538   GstRTSPStreamPrivate *priv;
2539   guint idx;
2540   gchar *name;
2541   GstPadLinkReturn ret;
2542
2543   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2544   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2545   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2546
2547   priv = stream->priv;
2548
2549   g_mutex_lock (&priv->lock);
2550   if (priv->is_joined)
2551     goto was_joined;
2552
2553   /* create a session with the same index as the stream */
2554   idx = priv->idx;
2555
2556   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2557
2558   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2559       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2560     /* For SRTP */
2561     g_signal_connect (rtpbin, "request-rtp-encoder",
2562         (GCallback) request_rtp_encoder, stream);
2563     g_signal_connect (rtpbin, "request-rtcp-encoder",
2564         (GCallback) request_rtcp_encoder, stream);
2565     g_signal_connect (rtpbin, "request-rtp-decoder",
2566         (GCallback) request_rtp_rtcp_decoder, stream);
2567     g_signal_connect (rtpbin, "request-rtcp-decoder",
2568         (GCallback) request_rtp_rtcp_decoder, stream);
2569   }
2570
2571   if (priv->sinkpad) {
2572     g_signal_connect (rtpbin, "request-pt-map",
2573         (GCallback) request_pt_map, stream);
2574   }
2575
2576   /* get pads from the RTP session element for sending and receiving
2577    * RTP/RTCP*/
2578   if (priv->srcpad) {
2579     /* get a pad for sending RTP */
2580     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2581     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2582     g_free (name);
2583
2584     /* link the RTP pad to the session manager, it should not really fail unless
2585      * this is not really an RTP pad */
2586     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2587     if (ret != GST_PAD_LINK_OK)
2588       goto link_failed;
2589
2590     name = g_strdup_printf ("send_rtp_src_%u", idx);
2591     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2592     g_free (name);
2593   } else {
2594     /* Need to connect our sinkpad from here */
2595     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2596     /* EOS */
2597     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2598
2599     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2600     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2601     g_free (name);
2602   }
2603
2604   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2605   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2606   g_free (name);
2607   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2608   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2609   g_free (name);
2610
2611   /* get the session */
2612   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2613
2614   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2615       stream);
2616   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2617       stream);
2618   g_signal_connect (priv->session, "on-ssrc-active",
2619       (GCallback) on_ssrc_active, stream);
2620   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2621       stream);
2622   g_signal_connect (priv->session, "on-bye-timeout",
2623       (GCallback) on_bye_timeout, stream);
2624   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2625       stream);
2626
2627   /* signal for sender ssrc */
2628   g_signal_connect (priv->session, "on-new-sender-ssrc",
2629       (GCallback) on_new_sender_ssrc, stream);
2630   g_signal_connect (priv->session, "on-sender-ssrc-active",
2631       (GCallback) on_sender_ssrc_active, stream);
2632
2633   if (!create_sender_part (stream, bin, state))
2634     goto no_udp_protocol;
2635
2636   create_receiver_part (stream, bin, state);
2637
2638   if (priv->srcpad) {
2639     /* be notified of caps changes */
2640     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2641         (GCallback) caps_notify, stream);
2642   }
2643
2644   priv->is_joined = TRUE;
2645   g_mutex_unlock (&priv->lock);
2646
2647   return TRUE;
2648
2649   /* ERRORS */
2650 was_joined:
2651   {
2652     g_mutex_unlock (&priv->lock);
2653     return TRUE;
2654   }
2655 link_failed:
2656   {
2657     GST_WARNING ("failed to link stream %u", idx);
2658     gst_object_unref (priv->send_rtp_sink);
2659     priv->send_rtp_sink = NULL;
2660     g_mutex_unlock (&priv->lock);
2661     return FALSE;
2662   }
2663 no_udp_protocol:
2664   {
2665     GST_WARNING ("failed to allocate ports %u", idx);
2666     gst_object_unref (priv->send_rtp_sink);
2667     priv->send_rtp_sink = NULL;
2668     gst_object_unref (priv->send_src[0]);
2669     priv->send_src[0] = NULL;
2670     gst_object_unref (priv->send_src[1]);
2671     priv->send_src[1] = NULL;
2672     gst_object_unref (priv->recv_sink[0]);
2673     priv->recv_sink[0] = NULL;
2674     gst_object_unref (priv->recv_sink[1]);
2675     priv->recv_sink[1] = NULL;
2676     if (priv->udpsink[0])
2677       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2678     if (priv->udpsink[1])
2679       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2680     if (priv->udpsrc_v4[0]) {
2681       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2682       gst_object_unref (priv->udpsrc_v4[0]);
2683       priv->udpsrc_v4[0] = NULL;
2684     }
2685     if (priv->udpsrc_v4[1]) {
2686       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2687       gst_object_unref (priv->udpsrc_v4[1]);
2688       priv->udpsrc_v4[1] = NULL;
2689     }
2690     if (priv->udpsrc_mcast_v4[0]) {
2691       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2692       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2693       priv->udpsrc_mcast_v4[0] = NULL;
2694     }
2695     if (priv->udpsrc_mcast_v4[1]) {
2696       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2697       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2698       priv->udpsrc_mcast_v4[1] = NULL;
2699     }
2700     if (priv->udpsrc_v6[0]) {
2701       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2702       gst_object_unref (priv->udpsrc_v6[0]);
2703       priv->udpsrc_v6[0] = NULL;
2704     }
2705     if (priv->udpsrc_v6[1]) {
2706       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2707       gst_object_unref (priv->udpsrc_v6[1]);
2708       priv->udpsrc_v6[1] = NULL;
2709     }
2710     if (priv->udpsrc_mcast_v6[0]) {
2711       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2712       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2713       priv->udpsrc_mcast_v6[0] = NULL;
2714     }
2715     if (priv->udpsrc_mcast_v6[1]) {
2716       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2717       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2718       priv->udpsrc_mcast_v6[1] = NULL;
2719     }
2720     g_mutex_unlock (&priv->lock);
2721     return FALSE;
2722   }
2723 }
2724
2725 /**
2726  * gst_rtsp_stream_leave_bin:
2727  * @stream: a #GstRTSPStream
2728  * @bin: (transfer none): a #GstBin
2729  * @rtpbin: (transfer none): a rtpbin #GstElement
2730  *
2731  * Remove the elements of @stream from @bin.
2732  *
2733  * Return: %TRUE on success.
2734  */
2735 gboolean
2736 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2737     GstElement * rtpbin)
2738 {
2739   GstRTSPStreamPrivate *priv;
2740   gint i;
2741   gboolean is_tcp, is_udp;
2742
2743   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2744   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2745   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2746
2747   priv = stream->priv;
2748
2749   g_mutex_lock (&priv->lock);
2750   if (!priv->is_joined)
2751     goto was_not_joined;
2752
2753   /* all transports must be removed by now */
2754   if (priv->transports != NULL)
2755     goto transports_not_removed;
2756
2757   clear_tr_cache (priv, TRUE);
2758   clear_tr_cache (priv, FALSE);
2759
2760   GST_INFO ("stream %p leaving bin", stream);
2761
2762   if (priv->srcpad) {
2763     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2764
2765     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2766     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2767     gst_object_unref (priv->send_rtp_sink);
2768     priv->send_rtp_sink = NULL;
2769   } else if (priv->recv_rtp_src) {
2770     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2771     gst_object_unref (priv->recv_rtp_src);
2772     priv->recv_rtp_src = NULL;
2773   }
2774
2775   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2776
2777   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2778       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2779
2780
2781   for (i = 0; i < 2; i++) {
2782     if (priv->udpsink[i])
2783       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2784     if (priv->appsink[i])
2785       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2786     if (priv->appqueue[i])
2787       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2788     if (priv->udpqueue[i])
2789       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2790     if (priv->tee[i])
2791       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2792     if (priv->funnel[i])
2793       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2794     if (priv->appsrc[i])
2795       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2796
2797     if (priv->udpsrc_v4[i]) {
2798       if (priv->sinkpad || i == 1) {
2799         /* and set udpsrc to NULL now before removing */
2800         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2801         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2802         /* removing them should also nicely release the request
2803          * pads when they finalize */
2804         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2805       } else {
2806         /* we need to set the state to NULL before unref */
2807         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2808         gst_object_unref (priv->udpsrc_v4[i]);
2809       }
2810     }
2811
2812     if (priv->udpsrc_mcast_v4[i]) {
2813       if (priv->sinkpad || i == 1) {
2814         /* and set udpsrc to NULL now before removing */
2815         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2816         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2817         /* removing them should also nicely release the request
2818          * pads when they finalize */
2819         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2820       } else {
2821         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2822         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2823       }
2824     }
2825
2826     if (priv->udpsrc_v6[i]) {
2827       if (priv->sinkpad || i == 1) {
2828         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2829         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2830         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2831       } else {
2832         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2833         gst_object_unref (priv->udpsrc_v6[i]);
2834       }
2835     }
2836     if (priv->udpsrc_mcast_v6[i]) {
2837       if (priv->sinkpad || i == 1) {
2838         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2839         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2840         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
2841       } else {
2842         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2843         gst_object_unref (priv->udpsrc_mcast_v6[i]);
2844       }
2845     }
2846
2847     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2848       gst_bin_remove (bin, priv->udpsink[i]);
2849     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2850       gst_bin_remove (bin, priv->appsrc[i]);
2851     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2852       gst_bin_remove (bin, priv->appsink[i]);
2853     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2854       gst_bin_remove (bin, priv->appqueue[i]);
2855     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2856       gst_bin_remove (bin, priv->udpqueue[i]);
2857     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2858       gst_bin_remove (bin, priv->tee[i]);
2859     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2860       gst_bin_remove (bin, priv->funnel[i]);
2861
2862     if (priv->sinkpad || i == 1) {
2863       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2864       gst_object_unref (priv->recv_sink[i]);
2865       priv->recv_sink[i] = NULL;
2866     }
2867
2868     priv->udpsrc_v4[i] = NULL;
2869     priv->udpsrc_v6[i] = NULL;
2870     priv->udpsrc_mcast_v4[i] = NULL;
2871     priv->udpsrc_mcast_v6[i] = NULL;
2872     priv->udpsink[i] = NULL;
2873     priv->appsrc[i] = NULL;
2874     priv->appsink[i] = NULL;
2875     priv->appqueue[i] = NULL;
2876     priv->udpqueue[i] = NULL;
2877     priv->tee[i] = NULL;
2878     priv->funnel[i] = NULL;
2879   }
2880
2881   if (priv->srcpad) {
2882     gst_object_unref (priv->send_src[0]);
2883     priv->send_src[0] = NULL;
2884   }
2885
2886   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2887   gst_object_unref (priv->send_src[1]);
2888   priv->send_src[1] = NULL;
2889
2890   g_object_unref (priv->session);
2891   priv->session = NULL;
2892   if (priv->caps)
2893     gst_caps_unref (priv->caps);
2894   priv->caps = NULL;
2895
2896   if (priv->srtpenc)
2897     gst_object_unref (priv->srtpenc);
2898   if (priv->srtpdec)
2899     gst_object_unref (priv->srtpdec);
2900
2901   priv->is_joined = FALSE;
2902   g_mutex_unlock (&priv->lock);
2903
2904   return TRUE;
2905
2906 was_not_joined:
2907   {
2908     g_mutex_unlock (&priv->lock);
2909     return TRUE;
2910   }
2911 transports_not_removed:
2912   {
2913     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2914     g_mutex_unlock (&priv->lock);
2915     return FALSE;
2916   }
2917 }
2918
2919 /**
2920  * gst_rtsp_stream_get_rtpinfo:
2921  * @stream: a #GstRTSPStream
2922  * @rtptime: (allow-none): result RTP timestamp
2923  * @seq: (allow-none): result RTP seqnum
2924  * @clock_rate: (allow-none): the clock rate
2925  * @running_time: (allow-none): result running-time
2926  *
2927  * Retrieve the current rtptime, seq and running-time. This is used to
2928  * construct a RTPInfo reply header.
2929  *
2930  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2931  */
2932 gboolean
2933 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2934     guint * rtptime, guint * seq, guint * clock_rate,
2935     GstClockTime * running_time)
2936 {
2937   GstRTSPStreamPrivate *priv;
2938   GstStructure *stats;
2939   GObjectClass *payobjclass;
2940
2941   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2942
2943   priv = stream->priv;
2944
2945   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2946
2947   g_mutex_lock (&priv->lock);
2948
2949   /* First try to extract the information from the last buffer on the sinks.
2950    * This will have a more accurate sequence number and timestamp, as between
2951    * the payloader and the sink there can be some queues
2952    */
2953   if (priv->udpsink[0] || priv->appsink[0]) {
2954     GstSample *last_sample;
2955
2956     if (priv->udpsink[0])
2957       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2958     else
2959       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2960
2961     if (last_sample) {
2962       GstCaps *caps;
2963       GstBuffer *buffer;
2964       GstSegment *segment;
2965       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2966
2967       caps = gst_sample_get_caps (last_sample);
2968       buffer = gst_sample_get_buffer (last_sample);
2969       segment = gst_sample_get_segment (last_sample);
2970
2971       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2972         if (seq) {
2973           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2974         }
2975
2976         if (rtptime) {
2977           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2978         }
2979
2980         gst_rtp_buffer_unmap (&rtp_buffer);
2981
2982         if (running_time) {
2983           *running_time =
2984               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2985               GST_BUFFER_TIMESTAMP (buffer));
2986         }
2987
2988         if (clock_rate) {
2989           GstStructure *s = gst_caps_get_structure (caps, 0);
2990
2991           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2992
2993           if (*clock_rate == 0 && running_time)
2994             *running_time = GST_CLOCK_TIME_NONE;
2995         }
2996         gst_sample_unref (last_sample);
2997
2998         goto done;
2999       } else {
3000         gst_sample_unref (last_sample);
3001       }
3002     }
3003   }
3004
3005   if (g_object_class_find_property (payobjclass, "stats")) {
3006     g_object_get (priv->payloader, "stats", &stats, NULL);
3007     if (stats == NULL)
3008       goto no_stats;
3009
3010     if (seq)
3011       gst_structure_get_uint (stats, "seqnum", seq);
3012
3013     if (rtptime)
3014       gst_structure_get_uint (stats, "timestamp", rtptime);
3015
3016     if (running_time)
3017       gst_structure_get_clock_time (stats, "running-time", running_time);
3018
3019     if (clock_rate) {
3020       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3021       if (*clock_rate == 0 && running_time)
3022         *running_time = GST_CLOCK_TIME_NONE;
3023     }
3024     gst_structure_free (stats);
3025   } else {
3026     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3027         !g_object_class_find_property (payobjclass, "timestamp"))
3028       goto no_stats;
3029
3030     if (seq)
3031       g_object_get (priv->payloader, "seqnum", seq, NULL);
3032
3033     if (rtptime)
3034       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3035
3036     if (running_time)
3037       *running_time = GST_CLOCK_TIME_NONE;
3038   }
3039
3040 done:
3041   g_mutex_unlock (&priv->lock);
3042
3043   return TRUE;
3044
3045   /* ERRORS */
3046 no_stats:
3047   {
3048     GST_WARNING ("Could not get payloader stats");
3049     g_mutex_unlock (&priv->lock);
3050     return FALSE;
3051   }
3052 }
3053
3054 /**
3055  * gst_rtsp_stream_get_caps:
3056  * @stream: a #GstRTSPStream
3057  *
3058  * Retrieve the current caps of @stream.
3059  *
3060  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3061  * after usage.
3062  */
3063 GstCaps *
3064 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3065 {
3066   GstRTSPStreamPrivate *priv;
3067   GstCaps *result;
3068
3069   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3070
3071   priv = stream->priv;
3072
3073   g_mutex_lock (&priv->lock);
3074   if ((result = priv->caps))
3075     gst_caps_ref (result);
3076   g_mutex_unlock (&priv->lock);
3077
3078   return result;
3079 }
3080
3081 /**
3082  * gst_rtsp_stream_recv_rtp:
3083  * @stream: a #GstRTSPStream
3084  * @buffer: (transfer full): a #GstBuffer
3085  *
3086  * Handle an RTP buffer for the stream. This method is usually called when a
3087  * message has been received from a client using the TCP transport.
3088  *
3089  * This function takes ownership of @buffer.
3090  *
3091  * Returns: a GstFlowReturn.
3092  */
3093 GstFlowReturn
3094 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3095 {
3096   GstRTSPStreamPrivate *priv;
3097   GstFlowReturn ret;
3098   GstElement *element;
3099
3100   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3101   priv = stream->priv;
3102   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3103   g_return_val_if_fail (priv->is_joined, FALSE);
3104
3105   g_mutex_lock (&priv->lock);
3106   if (priv->appsrc[0])
3107     element = gst_object_ref (priv->appsrc[0]);
3108   else
3109     element = NULL;
3110   g_mutex_unlock (&priv->lock);
3111
3112   if (element) {
3113     if (priv->appsrc_base_time[0] == -1) {
3114       /* Take current running_time. This timestamp will be put on
3115        * the first buffer of each stream because we are a live source and so we
3116        * timestamp with the running_time. When we are dealing with TCP, we also
3117        * only timestamp the first buffer (using the DISCONT flag) because a server
3118        * typically bursts data, for which we don't want to compensate by speeding
3119        * up the media. The other timestamps will be interpollated from this one
3120        * using the RTP timestamps. */
3121       GST_OBJECT_LOCK (element);
3122       if (GST_ELEMENT_CLOCK (element)) {
3123         GstClockTime now;
3124         GstClockTime base_time;
3125
3126         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3127         base_time = GST_ELEMENT_CAST (element)->base_time;
3128
3129         priv->appsrc_base_time[0] = now - base_time;
3130         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3131         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3132             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3133             GST_TIME_ARGS (base_time));
3134       }
3135       GST_OBJECT_UNLOCK (element);
3136     }
3137
3138     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3139     gst_object_unref (element);
3140   } else {
3141     ret = GST_FLOW_OK;
3142   }
3143   return ret;
3144 }
3145
3146 /**
3147  * gst_rtsp_stream_recv_rtcp:
3148  * @stream: a #GstRTSPStream
3149  * @buffer: (transfer full): a #GstBuffer
3150  *
3151  * Handle an RTCP buffer for the stream. This method is usually called when a
3152  * message has been received from a client using the TCP transport.
3153  *
3154  * This function takes ownership of @buffer.
3155  *
3156  * Returns: a GstFlowReturn.
3157  */
3158 GstFlowReturn
3159 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3160 {
3161   GstRTSPStreamPrivate *priv;
3162   GstFlowReturn ret;
3163   GstElement *element;
3164
3165   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3166   priv = stream->priv;
3167   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3168
3169   if (!priv->is_joined) {
3170     gst_buffer_unref (buffer);
3171     return GST_FLOW_NOT_LINKED;
3172   }
3173   g_mutex_lock (&priv->lock);
3174   if (priv->appsrc[1])
3175     element = gst_object_ref (priv->appsrc[1]);
3176   else
3177     element = NULL;
3178   g_mutex_unlock (&priv->lock);
3179
3180   if (element) {
3181     if (priv->appsrc_base_time[1] == -1) {
3182       /* Take current running_time. This timestamp will be put on
3183        * the first buffer of each stream because we are a live source and so we
3184        * timestamp with the running_time. When we are dealing with TCP, we also
3185        * only timestamp the first buffer (using the DISCONT flag) because a server
3186        * typically bursts data, for which we don't want to compensate by speeding
3187        * up the media. The other timestamps will be interpollated from this one
3188        * using the RTP timestamps. */
3189       GST_OBJECT_LOCK (element);
3190       if (GST_ELEMENT_CLOCK (element)) {
3191         GstClockTime now;
3192         GstClockTime base_time;
3193
3194         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3195         base_time = GST_ELEMENT_CAST (element)->base_time;
3196
3197         priv->appsrc_base_time[1] = now - base_time;
3198         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3199         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3200             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3201             GST_TIME_ARGS (base_time));
3202       }
3203       GST_OBJECT_UNLOCK (element);
3204     }
3205
3206     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3207     gst_object_unref (element);
3208   } else {
3209     ret = GST_FLOW_OK;
3210     gst_buffer_unref (buffer);
3211   }
3212   return ret;
3213 }
3214
3215 /* must be called with lock */
3216 static gboolean
3217 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3218     gboolean add)
3219 {
3220   GstRTSPStreamPrivate *priv = stream->priv;
3221   const GstRTSPTransport *tr;
3222
3223   tr = gst_rtsp_stream_transport_get_transport (trans);
3224
3225   switch (tr->lower_transport) {
3226     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3227     case GST_RTSP_LOWER_TRANS_UDP:
3228     {
3229       gchar *dest;
3230       gint min, max;
3231       guint ttl = 0;
3232
3233       dest = tr->destination;
3234       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3235         min = tr->port.min;
3236         max = tr->port.max;
3237         ttl = tr->ttl;
3238       } else if (priv->client_side) {
3239         /* In client side mode the 'destination' is the RTSP server, so send
3240          * to those ports */
3241         min = tr->server_port.min;
3242         max = tr->server_port.max;
3243       } else {
3244         min = tr->client_port.min;
3245         max = tr->client_port.max;
3246       }
3247
3248       if (add) {
3249         if (ttl > 0) {
3250           GST_INFO ("setting ttl-mc %d", ttl);
3251           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3252           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3253         }
3254         GST_INFO ("adding %s:%d-%d", dest, min, max);
3255         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3256         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3257         priv->transports = g_list_prepend (priv->transports, trans);
3258       } else {
3259         GST_INFO ("removing %s:%d-%d", dest, min, max);
3260         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3261         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3262         priv->transports = g_list_remove (priv->transports, trans);
3263       }
3264       priv->transports_cookie++;
3265       break;
3266     }
3267     case GST_RTSP_LOWER_TRANS_TCP:
3268       if (add) {
3269         GST_INFO ("adding TCP %s", tr->destination);
3270         priv->transports = g_list_prepend (priv->transports, trans);
3271       } else {
3272         GST_INFO ("removing TCP %s", tr->destination);
3273         priv->transports = g_list_remove (priv->transports, trans);
3274       }
3275       priv->transports_cookie++;
3276       break;
3277     default:
3278       goto unknown_transport;
3279   }
3280   return TRUE;
3281
3282   /* ERRORS */
3283 unknown_transport:
3284   {
3285     GST_INFO ("Unknown transport %d", tr->lower_transport);
3286     return FALSE;
3287   }
3288 }
3289
3290
3291 /**
3292  * gst_rtsp_stream_add_transport:
3293  * @stream: a #GstRTSPStream
3294  * @trans: (transfer none): a #GstRTSPStreamTransport
3295  *
3296  * Add the transport in @trans to @stream. The media of @stream will
3297  * then also be send to the values configured in @trans.
3298  *
3299  * @stream must be joined to a bin.
3300  *
3301  * @trans must contain a valid #GstRTSPTransport.
3302  *
3303  * Returns: %TRUE if @trans was added
3304  */
3305 gboolean
3306 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3307     GstRTSPStreamTransport * trans)
3308 {
3309   GstRTSPStreamPrivate *priv;
3310   gboolean res;
3311
3312   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3313   priv = stream->priv;
3314   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3315   g_return_val_if_fail (priv->is_joined, FALSE);
3316
3317   g_mutex_lock (&priv->lock);
3318   res = update_transport (stream, trans, TRUE);
3319   g_mutex_unlock (&priv->lock);
3320
3321   return res;
3322 }
3323
3324 /**
3325  * gst_rtsp_stream_remove_transport:
3326  * @stream: a #GstRTSPStream
3327  * @trans: (transfer none): a #GstRTSPStreamTransport
3328  *
3329  * Remove the transport in @trans from @stream. The media of @stream will
3330  * not be sent to the values configured in @trans.
3331  *
3332  * @stream must be joined to a bin.
3333  *
3334  * @trans must contain a valid #GstRTSPTransport.
3335  *
3336  * Returns: %TRUE if @trans was removed
3337  */
3338 gboolean
3339 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3340     GstRTSPStreamTransport * trans)
3341 {
3342   GstRTSPStreamPrivate *priv;
3343   gboolean res;
3344
3345   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3346   priv = stream->priv;
3347   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3348   g_return_val_if_fail (priv->is_joined, FALSE);
3349
3350   g_mutex_lock (&priv->lock);
3351   res = update_transport (stream, trans, FALSE);
3352   g_mutex_unlock (&priv->lock);
3353
3354   return res;
3355 }
3356
3357 /**
3358  * gst_rtsp_stream_update_crypto:
3359  * @stream: a #GstRTSPStream
3360  * @ssrc: the SSRC
3361  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3362  *
3363  * Update the new crypto information for @ssrc in @stream. If information
3364  * for @ssrc did not exist, it will be added. If information
3365  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3366  * be removed from @stream.
3367  *
3368  * Returns: %TRUE if @crypto could be updated
3369  */
3370 gboolean
3371 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3372     guint ssrc, GstCaps * crypto)
3373 {
3374   GstRTSPStreamPrivate *priv;
3375
3376   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3377   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3378
3379   priv = stream->priv;
3380
3381   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3382
3383   g_mutex_lock (&priv->lock);
3384   if (crypto)
3385     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3386         gst_caps_ref (crypto));
3387   else
3388     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3389   g_mutex_unlock (&priv->lock);
3390
3391   return TRUE;
3392 }
3393
3394 /**
3395  * gst_rtsp_stream_get_rtp_socket:
3396  * @stream: a #GstRTSPStream
3397  * @family: the socket family
3398  *
3399  * Get the RTP socket from @stream for a @family.
3400  *
3401  * @stream must be joined to a bin.
3402  *
3403  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3404  * socket could be allocated for @family. Unref after usage
3405  */
3406 GSocket *
3407 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3408 {
3409   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3410   GSocket *socket;
3411   const gchar *name;
3412
3413   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3414   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3415       family == G_SOCKET_FAMILY_IPV6, NULL);
3416   g_return_val_if_fail (priv->udpsink[0], NULL);
3417
3418   if (family == G_SOCKET_FAMILY_IPV6)
3419     name = "socket-v6";
3420   else
3421     name = "socket";
3422
3423   g_object_get (priv->udpsink[0], name, &socket, NULL);
3424
3425   return socket;
3426 }
3427
3428 /**
3429  * gst_rtsp_stream_get_rtcp_socket:
3430  * @stream: a #GstRTSPStream
3431  * @family: the socket family
3432  *
3433  * Get the RTCP socket from @stream for a @family.
3434  *
3435  * @stream must be joined to a bin.
3436  *
3437  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3438  * socket could be allocated for @family. Unref after usage
3439  */
3440 GSocket *
3441 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3442 {
3443   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3444   GSocket *socket;
3445   const gchar *name;
3446
3447   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3448   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3449       family == G_SOCKET_FAMILY_IPV6, NULL);
3450   g_return_val_if_fail (priv->udpsink[1], NULL);
3451
3452   if (family == G_SOCKET_FAMILY_IPV6)
3453     name = "socket-v6";
3454   else
3455     name = "socket";
3456
3457   g_object_get (priv->udpsink[1], name, &socket, NULL);
3458
3459   return socket;
3460 }
3461
3462 /**
3463  * gst_rtsp_stream_set_seqnum:
3464  * @stream: a #GstRTSPStream
3465  * @seqnum: a new sequence number
3466  *
3467  * Configure the sequence number in the payloader of @stream to @seqnum.
3468  */
3469 void
3470 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3471 {
3472   GstRTSPStreamPrivate *priv;
3473
3474   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3475
3476   priv = stream->priv;
3477
3478   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3479 }
3480
3481 /**
3482  * gst_rtsp_stream_get_seqnum:
3483  * @stream: a #GstRTSPStream
3484  *
3485  * Get the configured sequence number in the payloader of @stream.
3486  *
3487  * Returns: the sequence number of the payloader.
3488  */
3489 guint16
3490 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3491 {
3492   GstRTSPStreamPrivate *priv;
3493   guint seqnum;
3494
3495   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3496
3497   priv = stream->priv;
3498
3499   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3500
3501   return seqnum;
3502 }
3503
3504 /**
3505  * gst_rtsp_stream_transport_filter:
3506  * @stream: a #GstRTSPStream
3507  * @func: (scope call) (allow-none): a callback
3508  * @user_data: (closure): user data passed to @func
3509  *
3510  * Call @func for each transport managed by @stream. The result value of @func
3511  * determines what happens to the transport. @func will be called with @stream
3512  * locked so no further actions on @stream can be performed from @func.
3513  *
3514  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3515  * @stream.
3516  *
3517  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3518  *
3519  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3520  * will also be added with an additional ref to the result #GList of this
3521  * function..
3522  *
3523  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3524  *
3525  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3526  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3527  * element in the #GList should be unreffed before the list is freed.
3528  */
3529 GList *
3530 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3531     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3532 {
3533   GstRTSPStreamPrivate *priv;
3534   GList *result, *walk, *next;
3535   GHashTable *visited = NULL;
3536   guint cookie;
3537
3538   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3539
3540   priv = stream->priv;
3541
3542   result = NULL;
3543   if (func)
3544     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3545
3546   g_mutex_lock (&priv->lock);
3547 restart:
3548   cookie = priv->transports_cookie;
3549   for (walk = priv->transports; walk; walk = next) {
3550     GstRTSPStreamTransport *trans = walk->data;
3551     GstRTSPFilterResult res;
3552     gboolean changed;
3553
3554     next = g_list_next (walk);
3555
3556     if (func) {
3557       /* only visit each transport once */
3558       if (g_hash_table_contains (visited, trans))
3559         continue;
3560
3561       g_hash_table_add (visited, g_object_ref (trans));
3562       g_mutex_unlock (&priv->lock);
3563
3564       res = func (stream, trans, user_data);
3565
3566       g_mutex_lock (&priv->lock);
3567     } else
3568       res = GST_RTSP_FILTER_REF;
3569
3570     changed = (cookie != priv->transports_cookie);
3571
3572     switch (res) {
3573       case GST_RTSP_FILTER_REMOVE:
3574         update_transport (stream, trans, FALSE);
3575         break;
3576       case GST_RTSP_FILTER_REF:
3577         result = g_list_prepend (result, g_object_ref (trans));
3578         break;
3579       case GST_RTSP_FILTER_KEEP:
3580       default:
3581         break;
3582     }
3583     if (changed)
3584       goto restart;
3585   }
3586   g_mutex_unlock (&priv->lock);
3587
3588   if (func)
3589     g_hash_table_unref (visited);
3590
3591   return result;
3592 }
3593
3594 static GstPadProbeReturn
3595 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3596 {
3597   GstRTSPStreamPrivate *priv;
3598   GstRTSPStream *stream;
3599
3600   stream = user_data;
3601   priv = stream->priv;
3602
3603   GST_DEBUG_OBJECT (pad, "now blocking");
3604
3605   g_mutex_lock (&priv->lock);
3606   priv->blocking = TRUE;
3607   g_mutex_unlock (&priv->lock);
3608
3609   gst_element_post_message (priv->payloader,
3610       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3611           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3612
3613   return GST_PAD_PROBE_OK;
3614 }
3615
3616 /**
3617  * gst_rtsp_stream_set_blocked:
3618  * @stream: a #GstRTSPStream
3619  * @blocked: boolean indicating we should block or unblock
3620  *
3621  * Blocks or unblocks the dataflow on @stream.
3622  *
3623  * Returns: %TRUE on success
3624  */
3625 gboolean
3626 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3627 {
3628   GstRTSPStreamPrivate *priv;
3629
3630   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3631
3632   priv = stream->priv;
3633
3634   g_mutex_lock (&priv->lock);
3635   if (blocked) {
3636     priv->blocking = FALSE;
3637     if (priv->blocked_id == 0) {
3638       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3639           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3640           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3641           g_object_ref (stream), g_object_unref);
3642     }
3643   } else {
3644     if (priv->blocked_id != 0) {
3645       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3646       priv->blocked_id = 0;
3647       priv->blocking = FALSE;
3648     }
3649   }
3650   g_mutex_unlock (&priv->lock);
3651
3652   return TRUE;
3653 }
3654
3655 /**
3656  * gst_rtsp_stream_is_blocking:
3657  * @stream: a #GstRTSPStream
3658  *
3659  * Check if @stream is blocking on a #GstBuffer.
3660  *
3661  * Returns: %TRUE if @stream is blocking
3662  */
3663 gboolean
3664 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3665 {
3666   GstRTSPStreamPrivate *priv;
3667   gboolean result;
3668
3669   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3670
3671   priv = stream->priv;
3672
3673   g_mutex_lock (&priv->lock);
3674   result = priv->blocking;
3675   g_mutex_unlock (&priv->lock);
3676
3677   return result;
3678 }
3679
3680 /**
3681  * gst_rtsp_stream_query_position:
3682  * @stream: a #GstRTSPStream
3683  *
3684  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3685  * the RTP parts of the pipeline and not the RTCP parts.
3686  *
3687  * Returns: %TRUE if the position could be queried
3688  */
3689 gboolean
3690 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3691 {
3692   GstRTSPStreamPrivate *priv;
3693   GstElement *sink;
3694   gboolean ret;
3695
3696   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3697
3698   priv = stream->priv;
3699
3700   g_mutex_lock (&priv->lock);
3701   /* depending on the transport type, it should query corresponding sink */
3702   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3703       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3704     sink = priv->udpsink[0];
3705   else
3706     sink = priv->appsink[0];
3707
3708   if (sink)
3709     gst_object_ref (sink);
3710   g_mutex_unlock (&priv->lock);
3711
3712   if (!sink)
3713     return FALSE;
3714
3715   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3716   gst_object_unref (sink);
3717
3718   return ret;
3719 }
3720
3721 /**
3722  * gst_rtsp_stream_query_stop:
3723  * @stream: a #GstRTSPStream
3724  *
3725  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3726  * the RTP parts of the pipeline and not the RTCP parts.
3727  *
3728  * Returns: %TRUE if the stop could be queried
3729  */
3730 gboolean
3731 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3732 {
3733   GstRTSPStreamPrivate *priv;
3734   GstElement *sink;
3735   GstQuery *query;
3736   gboolean ret;
3737
3738   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3739
3740   priv = stream->priv;
3741
3742   g_mutex_lock (&priv->lock);
3743   /* depending on the transport type, it should query corresponding sink */
3744   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3745       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3746     sink = priv->udpsink[0];
3747   else
3748     sink = priv->appsink[0];
3749
3750   if (sink)
3751     gst_object_ref (sink);
3752   g_mutex_unlock (&priv->lock);
3753
3754   if (!sink)
3755     return FALSE;
3756
3757   query = gst_query_new_segment (GST_FORMAT_TIME);
3758   if ((ret = gst_element_query (sink, query))) {
3759     GstFormat format;
3760
3761     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3762     if (format != GST_FORMAT_TIME)
3763       *stop = -1;
3764   }
3765   gst_query_unref (query);
3766   gst_object_unref (sink);
3767
3768   return ret;
3769
3770 }