rtsp-stream: Disable multicast loopback for all our sockets
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   gboolean is_joined;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
98    * sockets */
99   GstElement *udpsrc_v4[2];
100   /* UDP sources for UDP multicast transports */
101   GstElement *udpsrc_mcast_v4[2];
102
103   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
104    * sockets */
105   GstElement *udpsrc_v6[2];
106   /* UDP sources for UDP multicast transports */
107   GstElement *udpsrc_mcast_v6[2];
108
109   GstElement *udpqueue[2];
110   GstElement *udpsink[2];
111
112   /* for TCP transport */
113   GstElement *appsrc[2];
114   GstClockTime appsrc_base_time[2];
115   GstElement *appqueue[2];
116   GstElement *appsink[2];
117
118   GstElement *tee[2];
119   GstElement *funnel[2];
120
121   /* retransmission */
122   GstElement *rtxsend;
123   guint rtx_pt;
124   GstClockTime rtx_time;
125
126   /* server ports for sending/receiving over ipv4 */
127   GstRTSPRange server_port_v4;
128   GstRTSPAddress *server_addr_v4;
129   gboolean have_ipv4;
130
131   /* server ports for sending/receiving over ipv6 */
132   GstRTSPRange server_port_v6;
133   GstRTSPAddress *server_addr_v6;
134   gboolean have_ipv6;
135
136   /* multicast addresses */
137   GstRTSPAddressPool *pool;
138   GstRTSPAddress *addr_v4;
139   GstRTSPAddress *addr_v6;
140   gboolean have_ipv4_mcast;
141   gboolean have_ipv6_mcast;
142
143   /* the caps of the stream */
144   gulong caps_sig;
145   GstCaps *caps;
146
147   /* transports we stream to */
148   guint n_active;
149   GList *transports;
150   guint transports_cookie;
151   GList *tr_cache_rtp;
152   GList *tr_cache_rtcp;
153   guint tr_cache_cookie_rtp;
154   guint tr_cache_cookie_rtcp;
155
156
157   gint dscp_qos;
158
159   /* stream blocking */
160   gulong blocked_id;
161   gboolean blocking;
162
163   /* pt->caps map for RECORD streams */
164   GHashTable *ptmap;
165 };
166
167 #define DEFAULT_CONTROL         NULL
168 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
169 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
170                                         GST_RTSP_LOWER_TRANS_TCP
171
172 enum
173 {
174   PROP_0,
175   PROP_CONTROL,
176   PROP_PROFILES,
177   PROP_PROTOCOLS,
178   PROP_LAST
179 };
180
181 enum
182 {
183   SIGNAL_NEW_RTP_ENCODER,
184   SIGNAL_NEW_RTCP_ENCODER,
185   SIGNAL_LAST
186 };
187
188 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
189 #define GST_CAT_DEFAULT rtsp_stream_debug
190
191 static GQuark ssrc_stream_map_key;
192
193 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
194     GValue * value, GParamSpec * pspec);
195 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
196     const GValue * value, GParamSpec * pspec);
197
198 static void gst_rtsp_stream_finalize (GObject * obj);
199
200 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
201
202 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
203
204 static void
205 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
206 {
207   GObjectClass *gobject_class;
208
209   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
210
211   gobject_class = G_OBJECT_CLASS (klass);
212
213   gobject_class->get_property = gst_rtsp_stream_get_property;
214   gobject_class->set_property = gst_rtsp_stream_set_property;
215   gobject_class->finalize = gst_rtsp_stream_finalize;
216
217   g_object_class_install_property (gobject_class, PROP_CONTROL,
218       g_param_spec_string ("control", "Control",
219           "The control string for this stream", DEFAULT_CONTROL,
220           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
221
222   g_object_class_install_property (gobject_class, PROP_PROFILES,
223       g_param_spec_flags ("profiles", "Profiles",
224           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
225           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
228       g_param_spec_flags ("protocols", "Protocols",
229           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
230           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
233       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
234       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
235       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
236
237   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
238       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
240       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
241
242   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
243
244   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
245 }
246
247 static void
248 gst_rtsp_stream_init (GstRTSPStream * stream)
249 {
250   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
251
252   GST_DEBUG ("new stream %p", stream);
253
254   stream->priv = priv;
255
256   priv->dscp_qos = -1;
257   priv->control = g_strdup (DEFAULT_CONTROL);
258   priv->profiles = DEFAULT_PROFILES;
259   priv->protocols = DEFAULT_PROTOCOLS;
260
261   g_mutex_init (&priv->lock);
262
263   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
264       NULL, (GDestroyNotify) gst_caps_unref);
265   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
266       (GDestroyNotify) gst_caps_unref);
267 }
268
269 static void
270 gst_rtsp_stream_finalize (GObject * obj)
271 {
272   GstRTSPStream *stream;
273   GstRTSPStreamPrivate *priv;
274
275   stream = GST_RTSP_STREAM (obj);
276   priv = stream->priv;
277
278   GST_DEBUG ("finalize stream %p", stream);
279
280   /* we really need to be unjoined now */
281   g_return_if_fail (!priv->is_joined);
282
283   if (priv->addr_v4)
284     gst_rtsp_address_free (priv->addr_v4);
285   if (priv->addr_v6)
286     gst_rtsp_address_free (priv->addr_v6);
287   if (priv->server_addr_v4)
288     gst_rtsp_address_free (priv->server_addr_v4);
289   if (priv->server_addr_v6)
290     gst_rtsp_address_free (priv->server_addr_v6);
291   if (priv->pool)
292     g_object_unref (priv->pool);
293   if (priv->rtxsend)
294     g_object_unref (priv->rtxsend);
295
296   gst_object_unref (priv->payloader);
297   if (priv->srcpad)
298     gst_object_unref (priv->srcpad);
299   if (priv->sinkpad)
300     gst_object_unref (priv->sinkpad);
301   g_free (priv->control);
302   g_mutex_clear (&priv->lock);
303
304   g_hash_table_unref (priv->keys);
305   g_hash_table_destroy (priv->ptmap);
306
307   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
308 }
309
310 static void
311 gst_rtsp_stream_get_property (GObject * object, guint propid,
312     GValue * value, GParamSpec * pspec)
313 {
314   GstRTSPStream *stream = GST_RTSP_STREAM (object);
315
316   switch (propid) {
317     case PROP_CONTROL:
318       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
319       break;
320     case PROP_PROFILES:
321       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
322       break;
323     case PROP_PROTOCOLS:
324       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
325       break;
326     default:
327       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
328   }
329 }
330
331 static void
332 gst_rtsp_stream_set_property (GObject * object, guint propid,
333     const GValue * value, GParamSpec * pspec)
334 {
335   GstRTSPStream *stream = GST_RTSP_STREAM (object);
336
337   switch (propid) {
338     case PROP_CONTROL:
339       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
340       break;
341     case PROP_PROFILES:
342       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
343       break;
344     case PROP_PROTOCOLS:
345       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
349   }
350 }
351
352 /**
353  * gst_rtsp_stream_new:
354  * @idx: an index
355  * @pad: a #GstPad
356  * @payloader: a #GstElement
357  *
358  * Create a new media stream with index @idx that handles RTP data on
359  * @pad and has a payloader element @payloader if @pad is a source pad
360  * or a depayloader element @payloader if @pad is a sink pad.
361  *
362  * Returns: (transfer full): a new #GstRTSPStream
363  */
364 GstRTSPStream *
365 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
366 {
367   GstRTSPStreamPrivate *priv;
368   GstRTSPStream *stream;
369
370   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
371   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
372
373   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
374   priv = stream->priv;
375   priv->idx = idx;
376   priv->payloader = gst_object_ref (payloader);
377   if (GST_PAD_IS_SRC (pad))
378     priv->srcpad = gst_object_ref (pad);
379   else
380     priv->sinkpad = gst_object_ref (pad);
381
382   return stream;
383 }
384
385 /**
386  * gst_rtsp_stream_get_index:
387  * @stream: a #GstRTSPStream
388  *
389  * Get the stream index.
390  *
391  * Return: the stream index.
392  */
393 guint
394 gst_rtsp_stream_get_index (GstRTSPStream * stream)
395 {
396   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
397
398   return stream->priv->idx;
399 }
400
401 /**
402  * gst_rtsp_stream_get_pt:
403  * @stream: a #GstRTSPStream
404  *
405  * Get the stream payload type.
406  *
407  * Return: the stream payload type.
408  */
409 guint
410 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
411 {
412   GstRTSPStreamPrivate *priv;
413   guint pt;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
416
417   priv = stream->priv;
418
419   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
420
421   return pt;
422 }
423
424 /**
425  * gst_rtsp_stream_get_srcpad:
426  * @stream: a #GstRTSPStream
427  *
428  * Get the srcpad associated with @stream.
429  *
430  * Returns: (transfer full): the srcpad. Unref after usage.
431  */
432 GstPad *
433 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
434 {
435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
436
437   if (!stream->priv->srcpad)
438     return NULL;
439
440   return gst_object_ref (stream->priv->srcpad);
441 }
442
443 /**
444  * gst_rtsp_stream_get_sinkpad:
445  * @stream: a #GstRTSPStream
446  *
447  * Get the sinkpad associated with @stream.
448  *
449  * Returns: (transfer full): the sinkpad. Unref after usage.
450  */
451 GstPad *
452 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
453 {
454   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
455
456   if (!stream->priv->sinkpad)
457     return NULL;
458
459   return gst_object_ref (stream->priv->sinkpad);
460 }
461
462 /**
463  * gst_rtsp_stream_get_control:
464  * @stream: a #GstRTSPStream
465  *
466  * Get the control string to identify this stream.
467  *
468  * Returns: (transfer full): the control string. g_free() after usage.
469  */
470 gchar *
471 gst_rtsp_stream_get_control (GstRTSPStream * stream)
472 {
473   GstRTSPStreamPrivate *priv;
474   gchar *result;
475
476   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
477
478   priv = stream->priv;
479
480   g_mutex_lock (&priv->lock);
481   if ((result = g_strdup (priv->control)) == NULL)
482     result = g_strdup_printf ("stream=%u", priv->idx);
483   g_mutex_unlock (&priv->lock);
484
485   return result;
486 }
487
488 /**
489  * gst_rtsp_stream_set_control:
490  * @stream: a #GstRTSPStream
491  * @control: a control string
492  *
493  * Set the control string in @stream.
494  */
495 void
496 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
497 {
498   GstRTSPStreamPrivate *priv;
499
500   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
501
502   priv = stream->priv;
503
504   g_mutex_lock (&priv->lock);
505   g_free (priv->control);
506   priv->control = g_strdup (control);
507   g_mutex_unlock (&priv->lock);
508 }
509
510 /**
511  * gst_rtsp_stream_has_control:
512  * @stream: a #GstRTSPStream
513  * @control: a control string
514  *
515  * Check if @stream has the control string @control.
516  *
517  * Returns: %TRUE is @stream has @control as the control string
518  */
519 gboolean
520 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
521 {
522   GstRTSPStreamPrivate *priv;
523   gboolean res;
524
525   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
526
527   priv = stream->priv;
528
529   g_mutex_lock (&priv->lock);
530   if (priv->control)
531     res = (g_strcmp0 (priv->control, control) == 0);
532   else {
533     guint streamid;
534
535     if (sscanf (control, "stream=%u", &streamid) > 0)
536       res = (streamid == priv->idx);
537     else
538       res = FALSE;
539   }
540   g_mutex_unlock (&priv->lock);
541
542   return res;
543 }
544
545 /**
546  * gst_rtsp_stream_set_mtu:
547  * @stream: a #GstRTSPStream
548  * @mtu: a new MTU
549  *
550  * Configure the mtu in the payloader of @stream to @mtu.
551  */
552 void
553 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
554 {
555   GstRTSPStreamPrivate *priv;
556
557   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
558
559   priv = stream->priv;
560
561   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
562
563   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
564 }
565
566 /**
567  * gst_rtsp_stream_get_mtu:
568  * @stream: a #GstRTSPStream
569  *
570  * Get the configured MTU in the payloader of @stream.
571  *
572  * Returns: the MTU of the payloader.
573  */
574 guint
575 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
576 {
577   GstRTSPStreamPrivate *priv;
578   guint mtu;
579
580   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
581
582   priv = stream->priv;
583
584   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
585
586   return mtu;
587 }
588
589 /* Update the dscp qos property on the udp sinks */
590 static void
591 update_dscp_qos (GstRTSPStream * stream)
592 {
593   GstRTSPStreamPrivate *priv;
594
595   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
596
597   priv = stream->priv;
598
599   if (priv->udpsink[0]) {
600     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
601         NULL);
602   }
603
604   if (priv->udpsink[1]) {
605     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
606         NULL);
607   }
608 }
609
610 /**
611  * gst_rtsp_stream_set_dscp_qos:
612  * @stream: a #GstRTSPStream
613  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
614  *
615  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
616  */
617 void
618 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
619 {
620   GstRTSPStreamPrivate *priv;
621
622   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
623
624   priv = stream->priv;
625
626   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
627
628   if (dscp_qos < -1 || dscp_qos > 63) {
629     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
630     return;
631   }
632
633   priv->dscp_qos = dscp_qos;
634
635   update_dscp_qos (stream);
636 }
637
638 /**
639  * gst_rtsp_stream_get_dscp_qos:
640  * @stream: a #GstRTSPStream
641  *
642  * Get the configured DSCP QoS in of the outgoing sockets.
643  *
644  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
645  */
646 gint
647 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
648 {
649   GstRTSPStreamPrivate *priv;
650
651   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
652
653   priv = stream->priv;
654
655   return priv->dscp_qos;
656 }
657
658 /**
659  * gst_rtsp_stream_is_transport_supported:
660  * @stream: a #GstRTSPStream
661  * @transport: (transfer none): a #GstRTSPTransport
662  *
663  * Check if @transport can be handled by stream
664  *
665  * Returns: %TRUE if @transport can be handled by @stream.
666  */
667 gboolean
668 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
669     GstRTSPTransport * transport)
670 {
671   GstRTSPStreamPrivate *priv;
672
673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
674
675   priv = stream->priv;
676
677   g_mutex_lock (&priv->lock);
678   if (transport->trans != GST_RTSP_TRANS_RTP)
679     goto unsupported_transmode;
680
681   if (!(transport->profile & priv->profiles))
682     goto unsupported_profile;
683
684   if (!(transport->lower_transport & priv->protocols))
685     goto unsupported_ltrans;
686
687   g_mutex_unlock (&priv->lock);
688
689   return TRUE;
690
691   /* ERRORS */
692 unsupported_transmode:
693   {
694     GST_DEBUG ("unsupported transport mode %d", transport->trans);
695     g_mutex_unlock (&priv->lock);
696     return FALSE;
697   }
698 unsupported_profile:
699   {
700     GST_DEBUG ("unsupported profile %d", transport->profile);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_ltrans:
705   {
706     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 }
711
712 /**
713  * gst_rtsp_stream_set_profiles:
714  * @stream: a #GstRTSPStream
715  * @profiles: the new profiles
716  *
717  * Configure the allowed profiles for @stream.
718  */
719 void
720 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
721 {
722   GstRTSPStreamPrivate *priv;
723
724   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
725
726   priv = stream->priv;
727
728   g_mutex_lock (&priv->lock);
729   priv->profiles = profiles;
730   g_mutex_unlock (&priv->lock);
731 }
732
733 /**
734  * gst_rtsp_stream_get_profiles:
735  * @stream: a #GstRTSPStream
736  *
737  * Get the allowed profiles of @stream.
738  *
739  * Returns: a #GstRTSPProfile
740  */
741 GstRTSPProfile
742 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
743 {
744   GstRTSPStreamPrivate *priv;
745   GstRTSPProfile res;
746
747   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
748
749   priv = stream->priv;
750
751   g_mutex_lock (&priv->lock);
752   res = priv->profiles;
753   g_mutex_unlock (&priv->lock);
754
755   return res;
756 }
757
758 /**
759  * gst_rtsp_stream_set_protocols:
760  * @stream: a #GstRTSPStream
761  * @protocols: the new flags
762  *
763  * Configure the allowed lower transport for @stream.
764  */
765 void
766 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
767     GstRTSPLowerTrans protocols)
768 {
769   GstRTSPStreamPrivate *priv;
770
771   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
772
773   priv = stream->priv;
774
775   g_mutex_lock (&priv->lock);
776   priv->protocols = protocols;
777   g_mutex_unlock (&priv->lock);
778 }
779
780 /**
781  * gst_rtsp_stream_get_protocols:
782  * @stream: a #GstRTSPStream
783  *
784  * Get the allowed protocols of @stream.
785  *
786  * Returns: a #GstRTSPLowerTrans
787  */
788 GstRTSPLowerTrans
789 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
790 {
791   GstRTSPStreamPrivate *priv;
792   GstRTSPLowerTrans res;
793
794   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
795       GST_RTSP_LOWER_TRANS_UNKNOWN);
796
797   priv = stream->priv;
798
799   g_mutex_lock (&priv->lock);
800   res = priv->protocols;
801   g_mutex_unlock (&priv->lock);
802
803   return res;
804 }
805
806 /**
807  * gst_rtsp_stream_set_address_pool:
808  * @stream: a #GstRTSPStream
809  * @pool: (transfer none): a #GstRTSPAddressPool
810  *
811  * configure @pool to be used as the address pool of @stream.
812  */
813 void
814 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
815     GstRTSPAddressPool * pool)
816 {
817   GstRTSPStreamPrivate *priv;
818   GstRTSPAddressPool *old;
819
820   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
821
822   priv = stream->priv;
823
824   GST_LOG_OBJECT (stream, "set address pool %p", pool);
825
826   g_mutex_lock (&priv->lock);
827   if ((old = priv->pool) != pool)
828     priv->pool = pool ? g_object_ref (pool) : NULL;
829   else
830     old = NULL;
831   g_mutex_unlock (&priv->lock);
832
833   if (old)
834     g_object_unref (old);
835 }
836
837 /**
838  * gst_rtsp_stream_get_address_pool:
839  * @stream: a #GstRTSPStream
840  *
841  * Get the #GstRTSPAddressPool used as the address pool of @stream.
842  *
843  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
844  * usage.
845  */
846 GstRTSPAddressPool *
847 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
848 {
849   GstRTSPStreamPrivate *priv;
850   GstRTSPAddressPool *result;
851
852   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
853
854   priv = stream->priv;
855
856   g_mutex_lock (&priv->lock);
857   if ((result = priv->pool))
858     g_object_ref (result);
859   g_mutex_unlock (&priv->lock);
860
861   return result;
862 }
863
864 /**
865  * gst_rtsp_stream_get_multicast_address:
866  * @stream: a #GstRTSPStream
867  * @family: the #GSocketFamily
868  *
869  * Get the multicast address of @stream for @family.
870  *
871  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
872  * or %NULL when no address could be allocated. gst_rtsp_address_free()
873  * after usage.
874  */
875 GstRTSPAddress *
876 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
877     GSocketFamily family)
878 {
879   GstRTSPStreamPrivate *priv;
880   GstRTSPAddress *result;
881   GstRTSPAddress **addrp;
882   GstRTSPAddressFlags flags;
883
884   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
885
886   priv = stream->priv;
887
888   if (family == G_SOCKET_FAMILY_IPV6) {
889     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
890     addrp = &priv->addr_v6;
891   } else {
892     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
893     addrp = &priv->addr_v4;
894   }
895
896   g_mutex_lock (&priv->lock);
897   if (*addrp == NULL) {
898     if (priv->pool == NULL)
899       goto no_pool;
900
901     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
902
903     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
904     if (*addrp == NULL)
905       goto no_address;
906   }
907   result = gst_rtsp_address_copy (*addrp);
908   g_mutex_unlock (&priv->lock);
909
910   return result;
911
912   /* ERRORS */
913 no_pool:
914   {
915     GST_ERROR_OBJECT (stream, "no address pool specified");
916     g_mutex_unlock (&priv->lock);
917     return NULL;
918   }
919 no_address:
920   {
921     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 }
926
927 /**
928  * gst_rtsp_stream_reserve_address:
929  * @stream: a #GstRTSPStream
930  * @address: an address
931  * @port: a port
932  * @n_ports: n_ports
933  * @ttl: a TTL
934  *
935  * Reserve @address and @port as the address and port of @stream.
936  *
937  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
938  * the address could be reserved. gst_rtsp_address_free() after usage.
939  */
940 GstRTSPAddress *
941 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
942     const gchar * address, guint port, guint n_ports, guint ttl)
943 {
944   GstRTSPStreamPrivate *priv;
945   GstRTSPAddress *result;
946   GInetAddress *addr;
947   GSocketFamily family;
948   GstRTSPAddress **addrp;
949
950   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
951   g_return_val_if_fail (address != NULL, NULL);
952   g_return_val_if_fail (port > 0, NULL);
953   g_return_val_if_fail (n_ports > 0, NULL);
954   g_return_val_if_fail (ttl > 0, NULL);
955
956   priv = stream->priv;
957
958   addr = g_inet_address_new_from_string (address);
959   if (!addr) {
960     GST_ERROR ("failed to get inet addr from %s", address);
961     family = G_SOCKET_FAMILY_IPV4;
962   } else {
963     family = g_inet_address_get_family (addr);
964     g_object_unref (addr);
965   }
966
967   if (family == G_SOCKET_FAMILY_IPV6)
968     addrp = &priv->addr_v6;
969   else
970     addrp = &priv->addr_v4;
971
972   g_mutex_lock (&priv->lock);
973   if (*addrp == NULL) {
974     GstRTSPAddressPoolResult res;
975
976     if (priv->pool == NULL)
977       goto no_pool;
978
979     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
980         port, n_ports, ttl, addrp);
981     if (res != GST_RTSP_ADDRESS_POOL_OK)
982       goto no_address;
983   } else {
984     if (strcmp ((*addrp)->address, address) ||
985         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
986         (*addrp)->ttl != ttl)
987       goto different_address;
988   }
989   result = gst_rtsp_address_copy (*addrp);
990   g_mutex_unlock (&priv->lock);
991
992   return result;
993
994   /* ERRORS */
995 no_pool:
996   {
997     GST_ERROR_OBJECT (stream, "no address pool specified");
998     g_mutex_unlock (&priv->lock);
999     return NULL;
1000   }
1001 no_address:
1002   {
1003     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1004         address);
1005     g_mutex_unlock (&priv->lock);
1006     return NULL;
1007   }
1008 different_address:
1009   {
1010     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1011         " reserved", address);
1012     g_mutex_unlock (&priv->lock);
1013     return NULL;
1014   }
1015 }
1016
1017 /* must be called with lock */
1018 static void
1019 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1020     GSocket * rtcp_socket, GSocketFamily family)
1021 {
1022   GstRTSPStreamPrivate *priv = stream->priv;
1023   const gchar *multisink_socket;
1024
1025   if (family == G_SOCKET_FAMILY_IPV6)
1026     multisink_socket = "socket-v6";
1027   else
1028     multisink_socket = "socket";
1029
1030   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1031       NULL);
1032   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1033       NULL);
1034 }
1035
1036 /* must be called with lock */
1037 static gboolean
1038 create_and_configure_udpsinks (GstRTSPStream * stream)
1039 {
1040   GstRTSPStreamPrivate *priv = stream->priv;
1041   GstElement *udpsink0, *udpsink1;
1042
1043   udpsink0 = NULL;
1044   udpsink1 = NULL;
1045
1046   if (priv->udpsink[0])
1047     udpsink0 = priv->udpsink[0];
1048   else
1049     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1050
1051   if (!udpsink0)
1052     goto no_udp_protocol;
1053
1054   if (priv->udpsink[1])
1055     udpsink1 = priv->udpsink[1];
1056   else
1057     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1058
1059   if (!udpsink1)
1060     goto no_udp_protocol;
1061
1062   /* configure sinks */
1063
1064   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1065   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1066
1067   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1068   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1069
1070   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1071
1072   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1073   /* Needs to be async for RECORD streams, otherwise we will never go to
1074    * PLAYING because the sinks will wait for data while the udpsrc can't
1075    * provide data with timestamps in PAUSED. */
1076   if (priv->sinkpad)
1077     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1078   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1079
1080   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1081   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1082
1083   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1085
1086   /* update the dscp qos field in the sinks */
1087   update_dscp_qos (stream);
1088
1089   priv->udpsink[0] = udpsink0;
1090   priv->udpsink[1] = udpsink1;
1091
1092   return TRUE;
1093
1094   /* ERRORS */
1095 no_udp_protocol:
1096   {
1097     return FALSE;
1098   }
1099 }
1100
1101 /* must be called with lock */
1102 static void
1103 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1104     GSocketFamily family)
1105 {
1106   GstRTSPStreamPrivate *priv;
1107   GstPad *pad, *selpad;
1108   guint i;
1109   GstBin *bin;
1110
1111   priv = stream->priv;
1112   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1113
1114   for (i = 0; i < 2; i++) {
1115     if (priv->sinkpad || i == 1) {
1116       if (priv->srcpad) {
1117         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1118          * values. This is only relevant for PLAY pipelines */
1119         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1120         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1121       }
1122       /* add udpsrc */
1123       gst_bin_add (bin, udpsrc_out[i]);
1124
1125       /* and link to the funnel */
1126       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1127       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1128       gst_pad_link (pad, selpad);
1129       gst_object_unref (pad);
1130       gst_object_unref (selpad);
1131     }
1132   }
1133
1134   gst_object_unref (bin);
1135 }
1136
1137 /* must be called with lock */
1138 static gboolean
1139 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1140     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1141     const gchar * address, gint rtpport, gint rtcpport,
1142     GstRTSPLowerTrans transport)
1143 {
1144   GstStateChangeReturn ret;
1145
1146   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1147   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1148
1149   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1150     goto error;
1151
1152   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1153     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1154     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1155     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1156     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1157     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1158     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1159   }
1160
1161   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1162   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1163
1164   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1165   if (ret == GST_STATE_CHANGE_FAILURE)
1166     goto error;
1167   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1168   if (ret == GST_STATE_CHANGE_FAILURE)
1169     goto error;
1170
1171   return TRUE;
1172   return TRUE;
1173
1174   /* ERRORS */
1175 error:
1176   {
1177     if (udpsrc_out[0])
1178       gst_object_unref (udpsrc_out[0]);
1179     if (udpsrc_out[1])
1180       gst_object_unref (udpsrc_out[1]);
1181     return FALSE;
1182   }
1183 }
1184
1185 static gboolean
1186 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1187     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1188     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1189     gboolean use_client_settings)
1190 {
1191   GstRTSPStreamPrivate *priv = stream->priv;
1192   GSocket *rtp_socket = NULL;
1193   GSocket *rtcp_socket;
1194   gint tmp_rtp, tmp_rtcp;
1195   guint count;
1196   gint rtpport, rtcpport;
1197   GList *rejected_addresses = NULL;
1198   GstRTSPAddress *addr = NULL;
1199   GInetAddress *inetaddr = NULL;
1200   gchar *addr_str;
1201   GSocketAddress *rtp_sockaddr = NULL;
1202   GSocketAddress *rtcp_sockaddr = NULL;
1203   GstRTSPAddressPool *pool;
1204   GstRTSPLowerTrans transport;
1205
1206   pool = priv->pool;
1207   count = 0;
1208   transport = ct->lower_transport;
1209
1210   /* Start with random port */
1211   tmp_rtp = 0;
1212
1213   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1214       G_SOCKET_PROTOCOL_UDP, NULL);
1215   if (!rtcp_socket)
1216     goto no_udp_protocol;
1217   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1218
1219   if (*server_addr_out)
1220     gst_rtsp_address_free (*server_addr_out);
1221
1222   /* try to allocate 2 UDP ports, the RTP port should be an even
1223    * number and the RTCP port should be the next (uneven) port */
1224 again:
1225
1226   if (rtp_socket == NULL) {
1227     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1228         G_SOCKET_PROTOCOL_UDP, NULL);
1229     if (!rtp_socket)
1230       goto no_udp_protocol;
1231     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1232   }
1233
1234   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1235               gst_rtsp_address_pool_has_unicast_addresses (pool))
1236           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1237     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1238
1239     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1240       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1241     else
1242       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1243
1244     if (addr)
1245       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1246
1247     if (family == G_SOCKET_FAMILY_IPV6)
1248       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1249     else
1250       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1251
1252     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1253         && use_client_settings)
1254       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1255           ct->port.min, 2, ct->ttl, &addr);
1256     else
1257       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1258
1259     if (addr == NULL)
1260       goto no_ports;
1261
1262     tmp_rtp = addr->port;
1263
1264     g_clear_object (&inetaddr);
1265     inetaddr = g_inet_address_new_from_string (addr->address);
1266
1267     /* On Windows it's not possible to bind to a multicast address
1268      * but the OS will make sure to filter out all packets that
1269      * arrive not for the multicast address the socket joined.
1270      *
1271      * On Linux and others it is necessary to bind to a multicast
1272      * address to let the OS filter out all packets that are received
1273      * on the same port but for different addresses than the multicast
1274      * address
1275      */
1276 #ifdef G_OS_WIN32
1277     if (g_inet_address_get_is_multicast (inetaddr)) {
1278       g_object_unref (inetaddr);
1279       inetaddr = g_inet_address_new_any (family);
1280     }
1281 #endif
1282   } else {
1283     if (tmp_rtp != 0) {
1284       tmp_rtp += 2;
1285       if (++count > 20)
1286         goto no_ports;
1287     }
1288
1289     if (inetaddr == NULL)
1290       inetaddr = g_inet_address_new_any (family);
1291   }
1292
1293   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1294   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1295     g_object_unref (rtp_sockaddr);
1296     goto again;
1297   }
1298   g_object_unref (rtp_sockaddr);
1299
1300   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1301   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1302     g_clear_object (&rtp_sockaddr);
1303     goto socket_error;
1304   }
1305
1306   tmp_rtp =
1307       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1308   g_object_unref (rtp_sockaddr);
1309
1310   /* check if port is even */
1311   if ((tmp_rtp & 1) != 0) {
1312     /* port not even, close and allocate another */
1313     tmp_rtp++;
1314     g_clear_object (&rtp_socket);
1315     goto again;
1316   }
1317
1318   /* set port */
1319   tmp_rtcp = tmp_rtp + 1;
1320
1321   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1322   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1323     g_object_unref (rtcp_sockaddr);
1324     g_clear_object (&rtp_socket);
1325     goto again;
1326   }
1327   g_object_unref (rtcp_sockaddr);
1328
1329   if (addr == NULL)
1330     addr_str = g_inet_address_to_string (inetaddr);
1331   else
1332     addr_str = addr->address;
1333   g_clear_object (&inetaddr);
1334
1335   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1336           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, transport)) {
1337     if (addr == NULL)
1338       g_free (addr_str);
1339     goto no_udp_protocol;
1340   }
1341
1342   if (addr == NULL)
1343     g_free (addr_str);
1344
1345   play_udpsources_one_family (stream, udpsrc_out, family);
1346
1347   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1348   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1349
1350   /* this should not happen... */
1351   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1352     goto port_error;
1353
1354   /* set RTP and RTCP sockets */
1355   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1356
1357   server_port_out->min = rtpport;
1358   server_port_out->max = rtcpport;
1359
1360   *server_addr_out = addr;
1361   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1362
1363   g_object_unref (rtp_socket);
1364   g_object_unref (rtcp_socket);
1365
1366   return TRUE;
1367
1368   /* ERRORS */
1369 no_udp_protocol:
1370   {
1371     goto cleanup;
1372   }
1373 no_ports:
1374   {
1375     goto cleanup;
1376   }
1377 port_error:
1378   {
1379     goto cleanup;
1380   }
1381 socket_error:
1382   {
1383     goto cleanup;
1384   }
1385 cleanup:
1386   {
1387     if (inetaddr)
1388       g_object_unref (inetaddr);
1389     g_list_free_full (rejected_addresses,
1390         (GDestroyNotify) gst_rtsp_address_free);
1391     if (addr)
1392       gst_rtsp_address_free (addr);
1393     if (rtp_socket)
1394       g_object_unref (rtp_socket);
1395     if (rtcp_socket)
1396       g_object_unref (rtcp_socket);
1397     return FALSE;
1398   }
1399 }
1400
1401 /**
1402  * gst_rtsp_stream_allocate_udp_sockets:
1403  * @stream: a #GstRTSPStream
1404  * @family: protocol family
1405  * @transport_method: transport method
1406  *
1407  * Allocates RTP and RTCP ports.
1408  *
1409  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1410  */
1411 gboolean
1412 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1413     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1414 {
1415   GstRTSPStreamPrivate *priv;
1416   gboolean result = FALSE;
1417   GstRTSPLowerTrans transport = ct->lower_transport;
1418
1419   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1420   priv = stream->priv;
1421   g_return_val_if_fail (priv->is_joined, FALSE);
1422
1423   g_mutex_lock (&priv->lock);
1424
1425   if (family == G_SOCKET_FAMILY_IPV4) {
1426     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1427       if (priv->have_ipv4_mcast)
1428         goto done;
1429       priv->have_ipv4_mcast =
1430           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1431           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1432           use_client_settings);
1433     } else {
1434       priv->have_ipv4 =
1435           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1436           &priv->server_port_v4, ct, &priv->server_addr_v4,
1437           use_client_settings);
1438     }
1439   } else {
1440     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1441       if (priv->have_ipv6_mcast)
1442         goto done;
1443       priv->have_ipv6_mcast =
1444           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1445           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1446           use_client_settings);
1447     } else {
1448       if (priv->have_ipv6)
1449         goto done;
1450       priv->have_ipv6 =
1451           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1452           &priv->server_port_v6, ct, &priv->server_addr_v6,
1453           use_client_settings);
1454     }
1455   }
1456
1457 done:
1458   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1459       priv->have_ipv6_mcast;
1460
1461   g_mutex_unlock (&priv->lock);
1462
1463   return result;
1464 }
1465
1466 /**
1467  * gst_rtsp_stream_set_client_side:
1468  * @stream: a #GstRTSPStream
1469  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1470  * an RTSP connection.
1471  *
1472  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1473  * streams to an RTSP server via RECORD. This has the practical effect
1474  * of changing which UDP port numbers are used when setting up the local
1475  * side of the stream sending to be either the 'server' or 'client' pair
1476  * of a configured UDP transport.
1477  */
1478 void
1479 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1480 {
1481   GstRTSPStreamPrivate *priv;
1482
1483   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1484   priv = stream->priv;
1485   g_mutex_lock (&priv->lock);
1486   priv->client_side = client_side;
1487   g_mutex_unlock (&priv->lock);
1488 }
1489
1490 /**
1491  * gst_rtsp_stream_set_client_side:
1492  * @stream: a #GstRTSPStream
1493  *
1494  * See gst_rtsp_stream_set_client_side()
1495  *
1496  * Returns: TRUE if this #GstRTSPStream is client-side.
1497  */
1498 gboolean
1499 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1500 {
1501   GstRTSPStreamPrivate *priv;
1502   gboolean ret;
1503
1504   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1505
1506   priv = stream->priv;
1507   g_mutex_lock (&priv->lock);
1508   ret = priv->client_side;
1509   g_mutex_unlock (&priv->lock);
1510
1511   return ret;
1512 }
1513
1514 /**
1515  * gst_rtsp_stream_get_server_port:
1516  * @stream: a #GstRTSPStream
1517  * @server_port: (out): result server port
1518  * @family: the port family to get
1519  *
1520  * Fill @server_port with the port pair used by the server. This function can
1521  * only be called when @stream has been joined.
1522  */
1523 void
1524 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1525     GstRTSPRange * server_port, GSocketFamily family)
1526 {
1527   GstRTSPStreamPrivate *priv;
1528
1529   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1530   priv = stream->priv;
1531   g_return_if_fail (priv->is_joined);
1532
1533   g_mutex_lock (&priv->lock);
1534   if (family == G_SOCKET_FAMILY_IPV4) {
1535     if (server_port)
1536       *server_port = priv->server_port_v4;
1537   } else {
1538     if (server_port)
1539       *server_port = priv->server_port_v6;
1540   }
1541   g_mutex_unlock (&priv->lock);
1542 }
1543
1544 /**
1545  * gst_rtsp_stream_get_rtpsession:
1546  * @stream: a #GstRTSPStream
1547  *
1548  * Get the RTP session of this stream.
1549  *
1550  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1551  */
1552 GObject *
1553 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1554 {
1555   GstRTSPStreamPrivate *priv;
1556   GObject *session;
1557
1558   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1559
1560   priv = stream->priv;
1561
1562   g_mutex_lock (&priv->lock);
1563   if ((session = priv->session))
1564     g_object_ref (session);
1565   g_mutex_unlock (&priv->lock);
1566
1567   return session;
1568 }
1569
1570 /**
1571  * gst_rtsp_stream_get_ssrc:
1572  * @stream: a #GstRTSPStream
1573  * @ssrc: (out): result ssrc
1574  *
1575  * Get the SSRC used by the RTP session of this stream. This function can only
1576  * be called when @stream has been joined.
1577  */
1578 void
1579 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1580 {
1581   GstRTSPStreamPrivate *priv;
1582
1583   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1584   priv = stream->priv;
1585   g_return_if_fail (priv->is_joined);
1586
1587   g_mutex_lock (&priv->lock);
1588   if (ssrc && priv->session)
1589     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1590   g_mutex_unlock (&priv->lock);
1591 }
1592
1593 /**
1594  * gst_rtsp_stream_set_retransmission_time:
1595  * @stream: a #GstRTSPStream
1596  * @time: a #GstClockTime
1597  *
1598  * Set the amount of time to store retransmission packets.
1599  */
1600 void
1601 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1602     GstClockTime time)
1603 {
1604   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1605
1606   g_mutex_lock (&stream->priv->lock);
1607   stream->priv->rtx_time = time;
1608   if (stream->priv->rtxsend)
1609     g_object_set (stream->priv->rtxsend, "max-size-time",
1610         GST_TIME_AS_MSECONDS (time), NULL);
1611   g_mutex_unlock (&stream->priv->lock);
1612 }
1613
1614 /**
1615  * gst_rtsp_stream_get_retransmission_time:
1616  * @stream: a #GstRTSPStream
1617  *
1618  * Get the amount of time to store retransmission data.
1619  *
1620  * Returns: the amount of time to store retransmission data.
1621  */
1622 GstClockTime
1623 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1624 {
1625   GstClockTime ret;
1626
1627   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1628
1629   g_mutex_lock (&stream->priv->lock);
1630   ret = stream->priv->rtx_time;
1631   g_mutex_unlock (&stream->priv->lock);
1632
1633   return ret;
1634 }
1635
1636 /**
1637  * gst_rtsp_stream_set_retransmission_pt:
1638  * @stream: a #GstRTSPStream
1639  * @rtx_pt: a #guint
1640  *
1641  * Set the payload type (pt) for retransmission of this stream.
1642  */
1643 void
1644 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1645 {
1646   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1647
1648   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1649
1650   g_mutex_lock (&stream->priv->lock);
1651   stream->priv->rtx_pt = rtx_pt;
1652   if (stream->priv->rtxsend) {
1653     guint pt = gst_rtsp_stream_get_pt (stream);
1654     gchar *pt_s = g_strdup_printf ("%d", pt);
1655     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1656         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1657     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1658     g_free (pt_s);
1659     gst_structure_free (rtx_pt_map);
1660   }
1661   g_mutex_unlock (&stream->priv->lock);
1662 }
1663
1664 /**
1665  * gst_rtsp_stream_get_retransmission_pt:
1666  * @stream: a #GstRTSPStream
1667  *
1668  * Get the payload-type used for retransmission of this stream
1669  *
1670  * Returns: The retransmission PT.
1671  */
1672 guint
1673 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1674 {
1675   guint rtx_pt;
1676
1677   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1678
1679   g_mutex_lock (&stream->priv->lock);
1680   rtx_pt = stream->priv->rtx_pt;
1681   g_mutex_unlock (&stream->priv->lock);
1682
1683   return rtx_pt;
1684 }
1685
1686 /**
1687  * gst_rtsp_stream_set_buffer_size:
1688  * @stream: a #GstRTSPStream
1689  * @size: the buffer size
1690  *
1691  * Set the size of the UDP transmission buffer (in bytes)
1692  * Needs to be set before the stream is joined to a bin.
1693  *
1694  * Since: 1.6
1695  */
1696 void
1697 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1698 {
1699   g_mutex_lock (&stream->priv->lock);
1700   stream->priv->buffer_size = size;
1701   g_mutex_unlock (&stream->priv->lock);
1702 }
1703
1704 /**
1705  * gst_rtsp_stream_get_buffer_size:
1706  * @stream: a #GstRTSPStream
1707  *
1708  * Get the size of the UDP transmission buffer (in bytes)
1709  *
1710  * Returns: the size of the UDP TX buffer
1711  *
1712  * Since: 1.6
1713  */
1714 guint
1715 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1716 {
1717   guint buffer_size;
1718
1719   g_mutex_lock (&stream->priv->lock);
1720   buffer_size = stream->priv->buffer_size;
1721   g_mutex_unlock (&stream->priv->lock);
1722
1723   return buffer_size;
1724 }
1725
1726 /* executed from streaming thread */
1727 static void
1728 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1729 {
1730   GstRTSPStreamPrivate *priv = stream->priv;
1731   GstCaps *newcaps, *oldcaps;
1732
1733   newcaps = gst_pad_get_current_caps (pad);
1734
1735   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1736       newcaps);
1737
1738   g_mutex_lock (&priv->lock);
1739   oldcaps = priv->caps;
1740   priv->caps = newcaps;
1741   g_mutex_unlock (&priv->lock);
1742
1743   if (oldcaps)
1744     gst_caps_unref (oldcaps);
1745 }
1746
1747 static void
1748 dump_structure (const GstStructure * s)
1749 {
1750   gchar *sstr;
1751
1752   sstr = gst_structure_to_string (s);
1753   GST_INFO ("structure: %s", sstr);
1754   g_free (sstr);
1755 }
1756
1757 static GstRTSPStreamTransport *
1758 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1759 {
1760   GstRTSPStreamPrivate *priv = stream->priv;
1761   GList *walk;
1762   GstRTSPStreamTransport *result = NULL;
1763   const gchar *tmp;
1764   gchar *dest;
1765   guint port;
1766
1767   if (rtcp_from == NULL)
1768     return NULL;
1769
1770   tmp = g_strrstr (rtcp_from, ":");
1771   if (tmp == NULL)
1772     return NULL;
1773
1774   port = atoi (tmp + 1);
1775   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1776
1777   g_mutex_lock (&priv->lock);
1778   GST_INFO ("finding %s:%d in %d transports", dest, port,
1779       g_list_length (priv->transports));
1780
1781   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1782     GstRTSPStreamTransport *trans = walk->data;
1783     const GstRTSPTransport *tr;
1784     gint min, max;
1785
1786     tr = gst_rtsp_stream_transport_get_transport (trans);
1787
1788     if (priv->client_side) {
1789       /* In client side mode the 'destination' is the RTSP server, so send
1790        * to those ports */
1791       min = tr->server_port.min;
1792       max = tr->server_port.max;
1793     } else {
1794       min = tr->client_port.min;
1795       max = tr->client_port.max;
1796     }
1797
1798     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1799       result = trans;
1800       break;
1801     }
1802   }
1803   if (result)
1804     g_object_ref (result);
1805   g_mutex_unlock (&priv->lock);
1806
1807   g_free (dest);
1808
1809   return result;
1810 }
1811
1812 static GstRTSPStreamTransport *
1813 check_transport (GObject * source, GstRTSPStream * stream)
1814 {
1815   GstStructure *stats;
1816   GstRTSPStreamTransport *trans;
1817
1818   /* see if we have a stream to match with the origin of the RTCP packet */
1819   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1820   if (trans == NULL) {
1821     g_object_get (source, "stats", &stats, NULL);
1822     if (stats) {
1823       const gchar *rtcp_from;
1824
1825       dump_structure (stats);
1826
1827       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1828       if ((trans = find_transport (stream, rtcp_from))) {
1829         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1830             source);
1831         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1832             g_object_unref);
1833       }
1834       gst_structure_free (stats);
1835     }
1836   }
1837   return trans;
1838 }
1839
1840
1841 static void
1842 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1843 {
1844   GstRTSPStreamTransport *trans;
1845
1846   GST_INFO ("%p: new source %p", stream, source);
1847
1848   trans = check_transport (source, stream);
1849
1850   if (trans)
1851     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1852 }
1853
1854 static void
1855 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1856 {
1857   GST_INFO ("%p: new SDES %p", stream, source);
1858 }
1859
1860 static void
1861 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1862 {
1863   GstRTSPStreamTransport *trans;
1864
1865   trans = check_transport (source, stream);
1866
1867   if (trans) {
1868     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1869     gst_rtsp_stream_transport_keep_alive (trans);
1870   }
1871 #ifdef DUMP_STATS
1872   {
1873     GstStructure *stats;
1874     g_object_get (source, "stats", &stats, NULL);
1875     if (stats) {
1876       dump_structure (stats);
1877       gst_structure_free (stats);
1878     }
1879   }
1880 #endif
1881 }
1882
1883 static void
1884 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1885 {
1886   GST_INFO ("%p: source %p bye", stream, source);
1887 }
1888
1889 static void
1890 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1891 {
1892   GstRTSPStreamTransport *trans;
1893
1894   GST_INFO ("%p: source %p bye timeout", stream, source);
1895
1896   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1897     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1898     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1899   }
1900 }
1901
1902 static void
1903 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1904 {
1905   GstRTSPStreamTransport *trans;
1906
1907   GST_INFO ("%p: source %p timeout", stream, source);
1908
1909   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1910     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1911     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1912   }
1913 }
1914
1915 static void
1916 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1917 {
1918   GST_INFO ("%p: new sender source %p", stream, source);
1919 #ifndef DUMP_STATS
1920   {
1921     GstStructure *stats;
1922     g_object_get (source, "stats", &stats, NULL);
1923     if (stats) {
1924       dump_structure (stats);
1925       gst_structure_free (stats);
1926     }
1927   }
1928 #endif
1929 }
1930
1931 static void
1932 on_sender_ssrc_active (GObject * session, GObject * source,
1933     GstRTSPStream * stream)
1934 {
1935 #ifndef DUMP_STATS
1936   {
1937     GstStructure *stats;
1938     g_object_get (source, "stats", &stats, NULL);
1939     if (stats) {
1940       dump_structure (stats);
1941       gst_structure_free (stats);
1942     }
1943   }
1944 #endif
1945 }
1946
1947 static void
1948 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1949 {
1950   if (is_rtp) {
1951     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1952     g_list_free (priv->tr_cache_rtp);
1953     priv->tr_cache_rtp = NULL;
1954   } else {
1955     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1956     g_list_free (priv->tr_cache_rtcp);
1957     priv->tr_cache_rtcp = NULL;
1958   }
1959 }
1960
1961 static GstFlowReturn
1962 handle_new_sample (GstAppSink * sink, gpointer user_data)
1963 {
1964   GstRTSPStreamPrivate *priv;
1965   GList *walk;
1966   GstSample *sample;
1967   GstBuffer *buffer;
1968   GstRTSPStream *stream;
1969   gboolean is_rtp;
1970
1971   sample = gst_app_sink_pull_sample (sink);
1972   if (!sample)
1973     return GST_FLOW_OK;
1974
1975   stream = (GstRTSPStream *) user_data;
1976   priv = stream->priv;
1977   buffer = gst_sample_get_buffer (sample);
1978
1979   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1980
1981   g_mutex_lock (&priv->lock);
1982   if (is_rtp) {
1983     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1984       clear_tr_cache (priv, is_rtp);
1985       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1986         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1987         priv->tr_cache_rtp =
1988             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1989       }
1990       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1991     }
1992   } else {
1993     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1994       clear_tr_cache (priv, is_rtp);
1995       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1996         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1997         priv->tr_cache_rtcp =
1998             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1999       }
2000       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2001     }
2002   }
2003   g_mutex_unlock (&priv->lock);
2004
2005   if (is_rtp) {
2006     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2007       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2008       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2009     }
2010   } else {
2011     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2012       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2013       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2014     }
2015   }
2016   gst_sample_unref (sample);
2017
2018   return GST_FLOW_OK;
2019 }
2020
2021 static GstAppSinkCallbacks sink_cb = {
2022   NULL,                         /* not interested in EOS */
2023   NULL,                         /* not interested in preroll samples */
2024   handle_new_sample,
2025 };
2026
2027 static GstElement *
2028 get_rtp_encoder (GstRTSPStream * stream, guint session)
2029 {
2030   GstRTSPStreamPrivate *priv = stream->priv;
2031
2032   if (priv->srtpenc == NULL) {
2033     gchar *name;
2034
2035     name = g_strdup_printf ("srtpenc_%u", session);
2036     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2037     g_free (name);
2038
2039     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2040   }
2041   return gst_object_ref (priv->srtpenc);
2042 }
2043
2044 static GstElement *
2045 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2046 {
2047   GstRTSPStreamPrivate *priv = stream->priv;
2048   GstElement *oldenc, *enc;
2049   GstPad *pad;
2050   gchar *name;
2051
2052   if (priv->idx != session)
2053     return NULL;
2054
2055   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2056
2057   oldenc = priv->srtpenc;
2058   enc = get_rtp_encoder (stream, session);
2059   name = g_strdup_printf ("rtp_sink_%d", session);
2060   pad = gst_element_get_request_pad (enc, name);
2061   g_free (name);
2062   gst_object_unref (pad);
2063
2064   if (oldenc == NULL)
2065     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2066         enc);
2067
2068   return enc;
2069 }
2070
2071 static GstElement *
2072 request_rtcp_encoder (GstElement * rtpbin, guint session,
2073     GstRTSPStream * stream)
2074 {
2075   GstRTSPStreamPrivate *priv = stream->priv;
2076   GstElement *oldenc, *enc;
2077   GstPad *pad;
2078   gchar *name;
2079
2080   if (priv->idx != session)
2081     return NULL;
2082
2083   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2084
2085   oldenc = priv->srtpenc;
2086   enc = get_rtp_encoder (stream, session);
2087   name = g_strdup_printf ("rtcp_sink_%d", session);
2088   pad = gst_element_get_request_pad (enc, name);
2089   g_free (name);
2090   gst_object_unref (pad);
2091
2092   if (oldenc == NULL)
2093     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2094         enc);
2095
2096   return enc;
2097 }
2098
2099 static GstCaps *
2100 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2101 {
2102   GstRTSPStreamPrivate *priv = stream->priv;
2103   GstCaps *caps;
2104
2105   GST_DEBUG ("request key %08x", ssrc);
2106
2107   g_mutex_lock (&priv->lock);
2108   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2109     gst_caps_ref (caps);
2110   g_mutex_unlock (&priv->lock);
2111
2112   return caps;
2113 }
2114
2115 static GstElement *
2116 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2117     GstRTSPStream * stream)
2118 {
2119   GstRTSPStreamPrivate *priv = stream->priv;
2120
2121   if (priv->idx != session)
2122     return NULL;
2123
2124   if (priv->srtpdec == NULL) {
2125     gchar *name;
2126
2127     name = g_strdup_printf ("srtpdec_%u", session);
2128     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2129     g_free (name);
2130
2131     g_signal_connect (priv->srtpdec, "request-key",
2132         (GCallback) request_key, stream);
2133   }
2134   return gst_object_ref (priv->srtpdec);
2135 }
2136
2137 /**
2138  * gst_rtsp_stream_request_aux_sender:
2139  * @stream: a #GstRTSPStream
2140  * @sessid: the session id
2141  *
2142  * Creating a rtxsend bin
2143  *
2144  * Returns: (transfer full): a #GstElement.
2145  *
2146  * Since: 1.6
2147  */
2148 GstElement *
2149 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2150 {
2151   GstElement *bin;
2152   GstPad *pad;
2153   GstStructure *pt_map;
2154   gchar *name;
2155   guint pt, rtx_pt;
2156   gchar *pt_s;
2157
2158   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2159
2160   pt = gst_rtsp_stream_get_pt (stream);
2161   pt_s = g_strdup_printf ("%u", pt);
2162   rtx_pt = stream->priv->rtx_pt;
2163
2164   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2165
2166   bin = gst_bin_new (NULL);
2167   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2168   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2169       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2170   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2171       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2172   g_free (pt_s);
2173   gst_structure_free (pt_map);
2174   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2175
2176   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2177   name = g_strdup_printf ("src_%u", sessid);
2178   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2179   g_free (name);
2180   gst_object_unref (pad);
2181
2182   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2183   name = g_strdup_printf ("sink_%u", sessid);
2184   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2185   g_free (name);
2186   gst_object_unref (pad);
2187
2188   return bin;
2189 }
2190
2191 /**
2192  * gst_rtsp_stream_set_pt_map:
2193  * @stream: a #GstRTSPStream
2194  * @pt: the pt
2195  * @caps: a #GstCaps
2196  *
2197  * Configure a pt map between @pt and @caps.
2198  */
2199 void
2200 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2201 {
2202   GstRTSPStreamPrivate *priv = stream->priv;
2203
2204   g_mutex_lock (&priv->lock);
2205   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2206   g_mutex_unlock (&priv->lock);
2207 }
2208
2209 static GstCaps *
2210 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2211     GstRTSPStream * stream)
2212 {
2213   GstRTSPStreamPrivate *priv = stream->priv;
2214   GstCaps *caps = NULL;
2215
2216   g_mutex_lock (&priv->lock);
2217
2218   if (priv->idx == session) {
2219     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2220     if (caps) {
2221       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2222       gst_caps_ref (caps);
2223     } else {
2224       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2225     }
2226   }
2227
2228   g_mutex_unlock (&priv->lock);
2229
2230   return caps;
2231 }
2232
2233 static void
2234 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2235 {
2236   GstRTSPStreamPrivate *priv = stream->priv;
2237   gchar *name;
2238   GstPadLinkReturn ret;
2239   guint sessid;
2240
2241   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2242       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2243
2244   name = gst_pad_get_name (pad);
2245   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2246     g_free (name);
2247     return;
2248   }
2249   g_free (name);
2250
2251   if (priv->idx != sessid)
2252     return;
2253
2254   if (gst_pad_is_linked (priv->sinkpad)) {
2255     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2256         GST_DEBUG_PAD_NAME (priv->sinkpad));
2257     return;
2258   }
2259
2260   /* link the RTP pad to the session manager, it should not really fail unless
2261    * this is not really an RTP pad */
2262   ret = gst_pad_link (pad, priv->sinkpad);
2263   if (ret != GST_PAD_LINK_OK)
2264     goto link_failed;
2265   priv->recv_rtp_src = gst_object_ref (pad);
2266
2267   return;
2268
2269 /* ERRORS */
2270 link_failed:
2271   {
2272     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2273         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2274   }
2275 }
2276
2277 static void
2278 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2279     GstRTSPStream * stream)
2280 {
2281   /* TODO: What to do here other than this? */
2282   GST_DEBUG ("Stream %p: Got EOS", stream);
2283   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2284 }
2285
2286 /* must be called with lock */
2287 static gboolean
2288 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2289 {
2290   GstRTSPStreamPrivate *priv;
2291   GstPad *pad, *sinkpad = NULL;
2292   gboolean is_tcp = FALSE, is_udp = FALSE;
2293   gint i;
2294
2295   priv = stream->priv;
2296
2297   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2298   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2299       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2300
2301   if (is_udp && !create_and_configure_udpsinks (stream))
2302     goto no_udp_protocol;
2303
2304   for (i = 0; i < 2; i++) {
2305     GstPad *teepad, *queuepad;
2306     /* For the sender we create this bit of pipeline for both
2307      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2308      * we need to add a queue before appsink and udpsink to make
2309      * the pipeline not block. For the TCP case, we want to pump
2310      * client as fast as possible anyway. This pipeline is used
2311      * when both TCP and UDP are present.
2312      *
2313      * .--------.      .-----.    .---------.    .---------.
2314      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2315      * |       send->sink   src->sink      src->sink       |
2316      * '--------'      |     |    '---------'    '---------'
2317      *                 |     |    .---------.    .---------.
2318      *                 |     |    |  queue  |    | appsink |
2319      *                 |    src->sink      src->sink       |
2320      *                 '-----'    '---------'    '---------'
2321      *
2322      * When only UDP or only TCP is allowed, we skip the tee and queue
2323      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2324      * the session.
2325      */
2326     /* Only link the RTP send src if we're going to send RTP, link
2327      * the RTCP send src always */
2328     if (priv->srcpad || i == 1) {
2329       if (is_udp) {
2330         /* add udpsink */
2331         gst_bin_add (bin, priv->udpsink[i]);
2332         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2333       }
2334
2335       if (is_tcp) {
2336         /* make appsink */
2337         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2338         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2339         gst_bin_add (bin, priv->appsink[i]);
2340         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2341             &sink_cb, stream, NULL);
2342       }
2343
2344       if (is_udp && is_tcp) {
2345         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2346
2347         /* make tee for RTP/RTCP */
2348         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2349         gst_bin_add (bin, priv->tee[i]);
2350
2351         /* and link to rtpbin send pad */
2352         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2353         gst_pad_link (priv->send_src[i], pad);
2354         gst_object_unref (pad);
2355
2356         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2357         g_object_set (priv->udpqueue[i], "max-size-buffers",
2358             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2359             NULL);
2360         gst_bin_add (bin, priv->udpqueue[i]);
2361         /* link tee to udpqueue */
2362         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2363         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2364         gst_pad_link (teepad, pad);
2365         gst_object_unref (pad);
2366         gst_object_unref (teepad);
2367
2368         /* link udpqueue to udpsink */
2369         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2370         gst_pad_link (queuepad, sinkpad);
2371         gst_object_unref (queuepad);
2372         gst_object_unref (sinkpad);
2373
2374         /* make appqueue */
2375         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2376         g_object_set (priv->appqueue[i], "max-size-buffers",
2377             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2378             NULL);
2379         gst_bin_add (bin, priv->appqueue[i]);
2380         /* and link tee to appqueue */
2381         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2382         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2383         gst_pad_link (teepad, pad);
2384         gst_object_unref (pad);
2385         gst_object_unref (teepad);
2386
2387         /* and link appqueue to appsink */
2388         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2389         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2390         gst_pad_link (queuepad, pad);
2391         gst_object_unref (pad);
2392         gst_object_unref (queuepad);
2393       } else if (is_tcp) {
2394         /* only appsink needed, link it to the session */
2395         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2396         gst_pad_link (priv->send_src[i], pad);
2397         gst_object_unref (pad);
2398
2399         /* when its only TCP, we need to set sync and preroll to FALSE
2400          * for the sink to avoid deadlock. And this is only needed for
2401          * sink used for RTCP data, not the RTP data. */
2402         if (i == 1)
2403           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2404       } else {
2405         /* else only udpsink needed, link it to the session */
2406         gst_pad_link (priv->send_src[i], sinkpad);
2407         gst_object_unref (sinkpad);
2408       }
2409     }
2410
2411     /* check if we need to set to a special state */
2412     if (state != GST_STATE_NULL) {
2413       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2414         gst_element_set_state (priv->udpsink[i], state);
2415       if (priv->appsink[i] && (priv->srcpad || i == 1))
2416         gst_element_set_state (priv->appsink[i], state);
2417       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2418         gst_element_set_state (priv->appqueue[i], state);
2419       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2420         gst_element_set_state (priv->udpqueue[i], state);
2421       if (priv->tee[i] && (priv->srcpad || i == 1))
2422         gst_element_set_state (priv->tee[i], state);
2423     }
2424   }
2425
2426   return TRUE;
2427
2428   /* ERRORS */
2429 no_udp_protocol:
2430   {
2431     return FALSE;
2432   }
2433 }
2434
2435 /* must be called with lock */
2436 static void
2437 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2438 {
2439   GstRTSPStreamPrivate *priv;
2440   GstPad *pad, *selpad;
2441   gboolean is_tcp;
2442   gint i;
2443
2444   priv = stream->priv;
2445
2446   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2447
2448   for (i = 0; i < 2; i++) {
2449     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2450      * RTCP sink always */
2451     if (priv->sinkpad || i == 1) {
2452       /* For the receiver we create this bit of pipeline for both
2453        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2454        * and it is all funneled into the rtpbin receive pad.
2455        *
2456        * .--------.     .--------.    .--------.
2457        * | udpsrc |     | funnel |    | rtpbin |
2458        * |       src->sink      src->sink      |
2459        * '--------'     |        |    '--------'
2460        * .--------.     |        |
2461        * | appsrc |     |        |
2462        * |       src->sink       |
2463        * '--------'     '--------'
2464        */
2465       /* make funnel for the RTP/RTCP receivers */
2466       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2467       gst_bin_add (bin, priv->funnel[i]);
2468
2469       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2470       gst_pad_link (pad, priv->recv_sink[i]);
2471       gst_object_unref (pad);
2472
2473       if (priv->udpsrc_v4[i]) {
2474         if (priv->srcpad) {
2475           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2476            * values. This is only relevant for PLAY pipelines */
2477           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2478           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2479         }
2480         /* add udpsrc */
2481         gst_bin_add (bin, priv->udpsrc_v4[i]);
2482
2483         /* and link to the funnel v4 */
2484         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2485         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2486         gst_pad_link (pad, selpad);
2487         gst_object_unref (pad);
2488         gst_object_unref (selpad);
2489       }
2490
2491       if (priv->udpsrc_v6[i]) {
2492         if (priv->srcpad) {
2493           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2494           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2495         }
2496         gst_bin_add (bin, priv->udpsrc_v6[i]);
2497
2498         /* and link to the funnel v6 */
2499         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2500         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2501         gst_pad_link (pad, selpad);
2502         gst_object_unref (pad);
2503         gst_object_unref (selpad);
2504       }
2505
2506       if (is_tcp) {
2507         /* make and add appsrc */
2508         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2509         priv->appsrc_base_time[i] = -1;
2510         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2511         gst_bin_add (bin, priv->appsrc[i]);
2512         /* and link to the funnel */
2513         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2514         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2515         gst_pad_link (pad, selpad);
2516         gst_object_unref (pad);
2517         gst_object_unref (selpad);
2518       }
2519     }
2520
2521     /* check if we need to set to a special state */
2522     if (state != GST_STATE_NULL) {
2523       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2524         gst_element_set_state (priv->funnel[i], state);
2525       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2526         gst_element_set_state (priv->appsrc[i], state);
2527     }
2528   }
2529 }
2530
2531 /**
2532  * gst_rtsp_stream_join_bin:
2533  * @stream: a #GstRTSPStream
2534  * @bin: (transfer none): a #GstBin to join
2535  * @rtpbin: (transfer none): a rtpbin element in @bin
2536  * @state: the target state of the new elements
2537  *
2538  * Join the #GstBin @bin that contains the element @rtpbin.
2539  *
2540  * @stream will link to @rtpbin, which must be inside @bin. The elements
2541  * added to @bin will be set to the state given in @state.
2542  *
2543  * Returns: %TRUE on success.
2544  */
2545 gboolean
2546 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2547     GstElement * rtpbin, GstState state)
2548 {
2549   GstRTSPStreamPrivate *priv;
2550   guint idx;
2551   gchar *name;
2552   GstPadLinkReturn ret;
2553
2554   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2555   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2556   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2557
2558   priv = stream->priv;
2559
2560   g_mutex_lock (&priv->lock);
2561   if (priv->is_joined)
2562     goto was_joined;
2563
2564   /* create a session with the same index as the stream */
2565   idx = priv->idx;
2566
2567   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2568
2569   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2570       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2571     /* For SRTP */
2572     g_signal_connect (rtpbin, "request-rtp-encoder",
2573         (GCallback) request_rtp_encoder, stream);
2574     g_signal_connect (rtpbin, "request-rtcp-encoder",
2575         (GCallback) request_rtcp_encoder, stream);
2576     g_signal_connect (rtpbin, "request-rtp-decoder",
2577         (GCallback) request_rtp_rtcp_decoder, stream);
2578     g_signal_connect (rtpbin, "request-rtcp-decoder",
2579         (GCallback) request_rtp_rtcp_decoder, stream);
2580   }
2581
2582   if (priv->sinkpad) {
2583     g_signal_connect (rtpbin, "request-pt-map",
2584         (GCallback) request_pt_map, stream);
2585   }
2586
2587   /* get pads from the RTP session element for sending and receiving
2588    * RTP/RTCP*/
2589   if (priv->srcpad) {
2590     /* get a pad for sending RTP */
2591     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2592     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2593     g_free (name);
2594
2595     /* link the RTP pad to the session manager, it should not really fail unless
2596      * this is not really an RTP pad */
2597     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2598     if (ret != GST_PAD_LINK_OK)
2599       goto link_failed;
2600
2601     name = g_strdup_printf ("send_rtp_src_%u", idx);
2602     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2603     g_free (name);
2604   } else {
2605     /* Need to connect our sinkpad from here */
2606     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2607     /* EOS */
2608     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2609
2610     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2611     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2612     g_free (name);
2613   }
2614
2615   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2616   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2617   g_free (name);
2618   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2619   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2620   g_free (name);
2621
2622   /* get the session */
2623   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2624
2625   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2626       stream);
2627   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2628       stream);
2629   g_signal_connect (priv->session, "on-ssrc-active",
2630       (GCallback) on_ssrc_active, stream);
2631   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2632       stream);
2633   g_signal_connect (priv->session, "on-bye-timeout",
2634       (GCallback) on_bye_timeout, stream);
2635   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2636       stream);
2637
2638   /* signal for sender ssrc */
2639   g_signal_connect (priv->session, "on-new-sender-ssrc",
2640       (GCallback) on_new_sender_ssrc, stream);
2641   g_signal_connect (priv->session, "on-sender-ssrc-active",
2642       (GCallback) on_sender_ssrc_active, stream);
2643
2644   if (!create_sender_part (stream, bin, state))
2645     goto no_udp_protocol;
2646
2647   create_receiver_part (stream, bin, state);
2648
2649   if (priv->srcpad) {
2650     /* be notified of caps changes */
2651     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2652         (GCallback) caps_notify, stream);
2653   }
2654
2655   priv->is_joined = TRUE;
2656   g_mutex_unlock (&priv->lock);
2657
2658   return TRUE;
2659
2660   /* ERRORS */
2661 was_joined:
2662   {
2663     g_mutex_unlock (&priv->lock);
2664     return TRUE;
2665   }
2666 link_failed:
2667   {
2668     GST_WARNING ("failed to link stream %u", idx);
2669     gst_object_unref (priv->send_rtp_sink);
2670     priv->send_rtp_sink = NULL;
2671     g_mutex_unlock (&priv->lock);
2672     return FALSE;
2673   }
2674 no_udp_protocol:
2675   {
2676     GST_WARNING ("failed to allocate ports %u", idx);
2677     gst_object_unref (priv->send_rtp_sink);
2678     priv->send_rtp_sink = NULL;
2679     gst_object_unref (priv->send_src[0]);
2680     priv->send_src[0] = NULL;
2681     gst_object_unref (priv->send_src[1]);
2682     priv->send_src[1] = NULL;
2683     gst_object_unref (priv->recv_sink[0]);
2684     priv->recv_sink[0] = NULL;
2685     gst_object_unref (priv->recv_sink[1]);
2686     priv->recv_sink[1] = NULL;
2687     if (priv->udpsink[0])
2688       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2689     if (priv->udpsink[1])
2690       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2691     if (priv->udpsrc_v4[0]) {
2692       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2693       gst_object_unref (priv->udpsrc_v4[0]);
2694       priv->udpsrc_v4[0] = NULL;
2695     }
2696     if (priv->udpsrc_v4[1]) {
2697       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2698       gst_object_unref (priv->udpsrc_v4[1]);
2699       priv->udpsrc_v4[1] = NULL;
2700     }
2701     if (priv->udpsrc_mcast_v4[0]) {
2702       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2703       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2704       priv->udpsrc_mcast_v4[0] = NULL;
2705     }
2706     if (priv->udpsrc_mcast_v4[1]) {
2707       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2708       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2709       priv->udpsrc_mcast_v4[1] = NULL;
2710     }
2711     if (priv->udpsrc_v6[0]) {
2712       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2713       gst_object_unref (priv->udpsrc_v6[0]);
2714       priv->udpsrc_v6[0] = NULL;
2715     }
2716     if (priv->udpsrc_v6[1]) {
2717       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2718       gst_object_unref (priv->udpsrc_v6[1]);
2719       priv->udpsrc_v6[1] = NULL;
2720     }
2721     if (priv->udpsrc_mcast_v6[0]) {
2722       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2723       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2724       priv->udpsrc_mcast_v6[0] = NULL;
2725     }
2726     if (priv->udpsrc_mcast_v6[1]) {
2727       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2728       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2729       priv->udpsrc_mcast_v6[1] = NULL;
2730     }
2731     g_mutex_unlock (&priv->lock);
2732     return FALSE;
2733   }
2734 }
2735
2736 /**
2737  * gst_rtsp_stream_leave_bin:
2738  * @stream: a #GstRTSPStream
2739  * @bin: (transfer none): a #GstBin
2740  * @rtpbin: (transfer none): a rtpbin #GstElement
2741  *
2742  * Remove the elements of @stream from @bin.
2743  *
2744  * Return: %TRUE on success.
2745  */
2746 gboolean
2747 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2748     GstElement * rtpbin)
2749 {
2750   GstRTSPStreamPrivate *priv;
2751   gint i;
2752   gboolean is_tcp, is_udp;
2753
2754   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2755   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2756   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2757
2758   priv = stream->priv;
2759
2760   g_mutex_lock (&priv->lock);
2761   if (!priv->is_joined)
2762     goto was_not_joined;
2763
2764   /* all transports must be removed by now */
2765   if (priv->transports != NULL)
2766     goto transports_not_removed;
2767
2768   clear_tr_cache (priv, TRUE);
2769   clear_tr_cache (priv, FALSE);
2770
2771   GST_INFO ("stream %p leaving bin", stream);
2772
2773   if (priv->srcpad) {
2774     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2775
2776     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2777     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2778     gst_object_unref (priv->send_rtp_sink);
2779     priv->send_rtp_sink = NULL;
2780   } else if (priv->recv_rtp_src) {
2781     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2782     gst_object_unref (priv->recv_rtp_src);
2783     priv->recv_rtp_src = NULL;
2784   }
2785
2786   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2787
2788   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2789       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2790
2791
2792   for (i = 0; i < 2; i++) {
2793     if (priv->udpsink[i])
2794       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2795     if (priv->appsink[i])
2796       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2797     if (priv->appqueue[i])
2798       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2799     if (priv->udpqueue[i])
2800       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2801     if (priv->tee[i])
2802       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2803     if (priv->funnel[i])
2804       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2805     if (priv->appsrc[i])
2806       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2807
2808     if (priv->udpsrc_v4[i]) {
2809       if (priv->sinkpad || i == 1) {
2810         /* and set udpsrc to NULL now before removing */
2811         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2812         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2813         /* removing them should also nicely release the request
2814          * pads when they finalize */
2815         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2816       } else {
2817         /* we need to set the state to NULL before unref */
2818         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2819         gst_object_unref (priv->udpsrc_v4[i]);
2820       }
2821     }
2822
2823     if (priv->udpsrc_mcast_v4[i]) {
2824       if (priv->sinkpad || i == 1) {
2825         /* and set udpsrc to NULL now before removing */
2826         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2827         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2828         /* removing them should also nicely release the request
2829          * pads when they finalize */
2830         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2831       } else {
2832         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2833         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2834       }
2835     }
2836
2837     if (priv->udpsrc_v6[i]) {
2838       if (priv->sinkpad || i == 1) {
2839         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2840         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2841         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2842       } else {
2843         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2844         gst_object_unref (priv->udpsrc_v6[i]);
2845       }
2846     }
2847     if (priv->udpsrc_mcast_v6[i]) {
2848       if (priv->sinkpad || i == 1) {
2849         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2850         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2851         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
2852       } else {
2853         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2854         gst_object_unref (priv->udpsrc_mcast_v6[i]);
2855       }
2856     }
2857
2858     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2859       gst_bin_remove (bin, priv->udpsink[i]);
2860     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2861       gst_bin_remove (bin, priv->appsrc[i]);
2862     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2863       gst_bin_remove (bin, priv->appsink[i]);
2864     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2865       gst_bin_remove (bin, priv->appqueue[i]);
2866     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2867       gst_bin_remove (bin, priv->udpqueue[i]);
2868     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2869       gst_bin_remove (bin, priv->tee[i]);
2870     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2871       gst_bin_remove (bin, priv->funnel[i]);
2872
2873     if (priv->sinkpad || i == 1) {
2874       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2875       gst_object_unref (priv->recv_sink[i]);
2876       priv->recv_sink[i] = NULL;
2877     }
2878
2879     priv->udpsrc_v4[i] = NULL;
2880     priv->udpsrc_v6[i] = NULL;
2881     priv->udpsrc_mcast_v4[i] = NULL;
2882     priv->udpsrc_mcast_v6[i] = NULL;
2883     priv->udpsink[i] = NULL;
2884     priv->appsrc[i] = NULL;
2885     priv->appsink[i] = NULL;
2886     priv->appqueue[i] = NULL;
2887     priv->udpqueue[i] = NULL;
2888     priv->tee[i] = NULL;
2889     priv->funnel[i] = NULL;
2890   }
2891
2892   if (priv->srcpad) {
2893     gst_object_unref (priv->send_src[0]);
2894     priv->send_src[0] = NULL;
2895   }
2896
2897   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2898   gst_object_unref (priv->send_src[1]);
2899   priv->send_src[1] = NULL;
2900
2901   g_object_unref (priv->session);
2902   priv->session = NULL;
2903   if (priv->caps)
2904     gst_caps_unref (priv->caps);
2905   priv->caps = NULL;
2906
2907   if (priv->srtpenc)
2908     gst_object_unref (priv->srtpenc);
2909   if (priv->srtpdec)
2910     gst_object_unref (priv->srtpdec);
2911
2912   priv->is_joined = FALSE;
2913   g_mutex_unlock (&priv->lock);
2914
2915   return TRUE;
2916
2917 was_not_joined:
2918   {
2919     g_mutex_unlock (&priv->lock);
2920     return TRUE;
2921   }
2922 transports_not_removed:
2923   {
2924     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2925     g_mutex_unlock (&priv->lock);
2926     return FALSE;
2927   }
2928 }
2929
2930 /**
2931  * gst_rtsp_stream_get_rtpinfo:
2932  * @stream: a #GstRTSPStream
2933  * @rtptime: (allow-none): result RTP timestamp
2934  * @seq: (allow-none): result RTP seqnum
2935  * @clock_rate: (allow-none): the clock rate
2936  * @running_time: (allow-none): result running-time
2937  *
2938  * Retrieve the current rtptime, seq and running-time. This is used to
2939  * construct a RTPInfo reply header.
2940  *
2941  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2942  */
2943 gboolean
2944 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2945     guint * rtptime, guint * seq, guint * clock_rate,
2946     GstClockTime * running_time)
2947 {
2948   GstRTSPStreamPrivate *priv;
2949   GstStructure *stats;
2950   GObjectClass *payobjclass;
2951
2952   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2953
2954   priv = stream->priv;
2955
2956   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2957
2958   g_mutex_lock (&priv->lock);
2959
2960   /* First try to extract the information from the last buffer on the sinks.
2961    * This will have a more accurate sequence number and timestamp, as between
2962    * the payloader and the sink there can be some queues
2963    */
2964   if (priv->udpsink[0] || priv->appsink[0]) {
2965     GstSample *last_sample;
2966
2967     if (priv->udpsink[0])
2968       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2969     else
2970       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2971
2972     if (last_sample) {
2973       GstCaps *caps;
2974       GstBuffer *buffer;
2975       GstSegment *segment;
2976       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2977
2978       caps = gst_sample_get_caps (last_sample);
2979       buffer = gst_sample_get_buffer (last_sample);
2980       segment = gst_sample_get_segment (last_sample);
2981
2982       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2983         if (seq) {
2984           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2985         }
2986
2987         if (rtptime) {
2988           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2989         }
2990
2991         gst_rtp_buffer_unmap (&rtp_buffer);
2992
2993         if (running_time) {
2994           *running_time =
2995               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2996               GST_BUFFER_TIMESTAMP (buffer));
2997         }
2998
2999         if (clock_rate) {
3000           GstStructure *s = gst_caps_get_structure (caps, 0);
3001
3002           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3003
3004           if (*clock_rate == 0 && running_time)
3005             *running_time = GST_CLOCK_TIME_NONE;
3006         }
3007         gst_sample_unref (last_sample);
3008
3009         goto done;
3010       } else {
3011         gst_sample_unref (last_sample);
3012       }
3013     }
3014   }
3015
3016   if (g_object_class_find_property (payobjclass, "stats")) {
3017     g_object_get (priv->payloader, "stats", &stats, NULL);
3018     if (stats == NULL)
3019       goto no_stats;
3020
3021     if (seq)
3022       gst_structure_get_uint (stats, "seqnum", seq);
3023
3024     if (rtptime)
3025       gst_structure_get_uint (stats, "timestamp", rtptime);
3026
3027     if (running_time)
3028       gst_structure_get_clock_time (stats, "running-time", running_time);
3029
3030     if (clock_rate) {
3031       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3032       if (*clock_rate == 0 && running_time)
3033         *running_time = GST_CLOCK_TIME_NONE;
3034     }
3035     gst_structure_free (stats);
3036   } else {
3037     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3038         !g_object_class_find_property (payobjclass, "timestamp"))
3039       goto no_stats;
3040
3041     if (seq)
3042       g_object_get (priv->payloader, "seqnum", seq, NULL);
3043
3044     if (rtptime)
3045       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3046
3047     if (running_time)
3048       *running_time = GST_CLOCK_TIME_NONE;
3049   }
3050
3051 done:
3052   g_mutex_unlock (&priv->lock);
3053
3054   return TRUE;
3055
3056   /* ERRORS */
3057 no_stats:
3058   {
3059     GST_WARNING ("Could not get payloader stats");
3060     g_mutex_unlock (&priv->lock);
3061     return FALSE;
3062   }
3063 }
3064
3065 /**
3066  * gst_rtsp_stream_get_caps:
3067  * @stream: a #GstRTSPStream
3068  *
3069  * Retrieve the current caps of @stream.
3070  *
3071  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3072  * after usage.
3073  */
3074 GstCaps *
3075 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3076 {
3077   GstRTSPStreamPrivate *priv;
3078   GstCaps *result;
3079
3080   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3081
3082   priv = stream->priv;
3083
3084   g_mutex_lock (&priv->lock);
3085   if ((result = priv->caps))
3086     gst_caps_ref (result);
3087   g_mutex_unlock (&priv->lock);
3088
3089   return result;
3090 }
3091
3092 /**
3093  * gst_rtsp_stream_recv_rtp:
3094  * @stream: a #GstRTSPStream
3095  * @buffer: (transfer full): a #GstBuffer
3096  *
3097  * Handle an RTP buffer for the stream. This method is usually called when a
3098  * message has been received from a client using the TCP transport.
3099  *
3100  * This function takes ownership of @buffer.
3101  *
3102  * Returns: a GstFlowReturn.
3103  */
3104 GstFlowReturn
3105 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3106 {
3107   GstRTSPStreamPrivate *priv;
3108   GstFlowReturn ret;
3109   GstElement *element;
3110
3111   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3112   priv = stream->priv;
3113   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3114   g_return_val_if_fail (priv->is_joined, FALSE);
3115
3116   g_mutex_lock (&priv->lock);
3117   if (priv->appsrc[0])
3118     element = gst_object_ref (priv->appsrc[0]);
3119   else
3120     element = NULL;
3121   g_mutex_unlock (&priv->lock);
3122
3123   if (element) {
3124     if (priv->appsrc_base_time[0] == -1) {
3125       /* Take current running_time. This timestamp will be put on
3126        * the first buffer of each stream because we are a live source and so we
3127        * timestamp with the running_time. When we are dealing with TCP, we also
3128        * only timestamp the first buffer (using the DISCONT flag) because a server
3129        * typically bursts data, for which we don't want to compensate by speeding
3130        * up the media. The other timestamps will be interpollated from this one
3131        * using the RTP timestamps. */
3132       GST_OBJECT_LOCK (element);
3133       if (GST_ELEMENT_CLOCK (element)) {
3134         GstClockTime now;
3135         GstClockTime base_time;
3136
3137         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3138         base_time = GST_ELEMENT_CAST (element)->base_time;
3139
3140         priv->appsrc_base_time[0] = now - base_time;
3141         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3142         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3143             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3144             GST_TIME_ARGS (base_time));
3145       }
3146       GST_OBJECT_UNLOCK (element);
3147     }
3148
3149     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3150     gst_object_unref (element);
3151   } else {
3152     ret = GST_FLOW_OK;
3153   }
3154   return ret;
3155 }
3156
3157 /**
3158  * gst_rtsp_stream_recv_rtcp:
3159  * @stream: a #GstRTSPStream
3160  * @buffer: (transfer full): a #GstBuffer
3161  *
3162  * Handle an RTCP buffer for the stream. This method is usually called when a
3163  * message has been received from a client using the TCP transport.
3164  *
3165  * This function takes ownership of @buffer.
3166  *
3167  * Returns: a GstFlowReturn.
3168  */
3169 GstFlowReturn
3170 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3171 {
3172   GstRTSPStreamPrivate *priv;
3173   GstFlowReturn ret;
3174   GstElement *element;
3175
3176   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3177   priv = stream->priv;
3178   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3179
3180   if (!priv->is_joined) {
3181     gst_buffer_unref (buffer);
3182     return GST_FLOW_NOT_LINKED;
3183   }
3184   g_mutex_lock (&priv->lock);
3185   if (priv->appsrc[1])
3186     element = gst_object_ref (priv->appsrc[1]);
3187   else
3188     element = NULL;
3189   g_mutex_unlock (&priv->lock);
3190
3191   if (element) {
3192     if (priv->appsrc_base_time[1] == -1) {
3193       /* Take current running_time. This timestamp will be put on
3194        * the first buffer of each stream because we are a live source and so we
3195        * timestamp with the running_time. When we are dealing with TCP, we also
3196        * only timestamp the first buffer (using the DISCONT flag) because a server
3197        * typically bursts data, for which we don't want to compensate by speeding
3198        * up the media. The other timestamps will be interpollated from this one
3199        * using the RTP timestamps. */
3200       GST_OBJECT_LOCK (element);
3201       if (GST_ELEMENT_CLOCK (element)) {
3202         GstClockTime now;
3203         GstClockTime base_time;
3204
3205         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3206         base_time = GST_ELEMENT_CAST (element)->base_time;
3207
3208         priv->appsrc_base_time[1] = now - base_time;
3209         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3210         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3211             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3212             GST_TIME_ARGS (base_time));
3213       }
3214       GST_OBJECT_UNLOCK (element);
3215     }
3216
3217     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3218     gst_object_unref (element);
3219   } else {
3220     ret = GST_FLOW_OK;
3221     gst_buffer_unref (buffer);
3222   }
3223   return ret;
3224 }
3225
3226 /* must be called with lock */
3227 static gboolean
3228 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3229     gboolean add)
3230 {
3231   GstRTSPStreamPrivate *priv = stream->priv;
3232   const GstRTSPTransport *tr;
3233
3234   tr = gst_rtsp_stream_transport_get_transport (trans);
3235
3236   switch (tr->lower_transport) {
3237     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3238     case GST_RTSP_LOWER_TRANS_UDP:
3239     {
3240       gchar *dest;
3241       gint min, max;
3242       guint ttl = 0;
3243
3244       dest = tr->destination;
3245       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3246         min = tr->port.min;
3247         max = tr->port.max;
3248         ttl = tr->ttl;
3249       } else if (priv->client_side) {
3250         /* In client side mode the 'destination' is the RTSP server, so send
3251          * to those ports */
3252         min = tr->server_port.min;
3253         max = tr->server_port.max;
3254       } else {
3255         min = tr->client_port.min;
3256         max = tr->client_port.max;
3257       }
3258
3259       if (add) {
3260         if (ttl > 0) {
3261           GST_INFO ("setting ttl-mc %d", ttl);
3262           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3263           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3264         }
3265         GST_INFO ("adding %s:%d-%d", dest, min, max);
3266         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3267         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3268         priv->transports = g_list_prepend (priv->transports, trans);
3269       } else {
3270         GST_INFO ("removing %s:%d-%d", dest, min, max);
3271         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3272         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3273         priv->transports = g_list_remove (priv->transports, trans);
3274       }
3275       priv->transports_cookie++;
3276       break;
3277     }
3278     case GST_RTSP_LOWER_TRANS_TCP:
3279       if (add) {
3280         GST_INFO ("adding TCP %s", tr->destination);
3281         priv->transports = g_list_prepend (priv->transports, trans);
3282       } else {
3283         GST_INFO ("removing TCP %s", tr->destination);
3284         priv->transports = g_list_remove (priv->transports, trans);
3285       }
3286       priv->transports_cookie++;
3287       break;
3288     default:
3289       goto unknown_transport;
3290   }
3291   return TRUE;
3292
3293   /* ERRORS */
3294 unknown_transport:
3295   {
3296     GST_INFO ("Unknown transport %d", tr->lower_transport);
3297     return FALSE;
3298   }
3299 }
3300
3301
3302 /**
3303  * gst_rtsp_stream_add_transport:
3304  * @stream: a #GstRTSPStream
3305  * @trans: (transfer none): a #GstRTSPStreamTransport
3306  *
3307  * Add the transport in @trans to @stream. The media of @stream will
3308  * then also be send to the values configured in @trans.
3309  *
3310  * @stream must be joined to a bin.
3311  *
3312  * @trans must contain a valid #GstRTSPTransport.
3313  *
3314  * Returns: %TRUE if @trans was added
3315  */
3316 gboolean
3317 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3318     GstRTSPStreamTransport * trans)
3319 {
3320   GstRTSPStreamPrivate *priv;
3321   gboolean res;
3322
3323   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3324   priv = stream->priv;
3325   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3326   g_return_val_if_fail (priv->is_joined, FALSE);
3327
3328   g_mutex_lock (&priv->lock);
3329   res = update_transport (stream, trans, TRUE);
3330   g_mutex_unlock (&priv->lock);
3331
3332   return res;
3333 }
3334
3335 /**
3336  * gst_rtsp_stream_remove_transport:
3337  * @stream: a #GstRTSPStream
3338  * @trans: (transfer none): a #GstRTSPStreamTransport
3339  *
3340  * Remove the transport in @trans from @stream. The media of @stream will
3341  * not be sent to the values configured in @trans.
3342  *
3343  * @stream must be joined to a bin.
3344  *
3345  * @trans must contain a valid #GstRTSPTransport.
3346  *
3347  * Returns: %TRUE if @trans was removed
3348  */
3349 gboolean
3350 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3351     GstRTSPStreamTransport * trans)
3352 {
3353   GstRTSPStreamPrivate *priv;
3354   gboolean res;
3355
3356   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3357   priv = stream->priv;
3358   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3359   g_return_val_if_fail (priv->is_joined, FALSE);
3360
3361   g_mutex_lock (&priv->lock);
3362   res = update_transport (stream, trans, FALSE);
3363   g_mutex_unlock (&priv->lock);
3364
3365   return res;
3366 }
3367
3368 /**
3369  * gst_rtsp_stream_update_crypto:
3370  * @stream: a #GstRTSPStream
3371  * @ssrc: the SSRC
3372  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3373  *
3374  * Update the new crypto information for @ssrc in @stream. If information
3375  * for @ssrc did not exist, it will be added. If information
3376  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3377  * be removed from @stream.
3378  *
3379  * Returns: %TRUE if @crypto could be updated
3380  */
3381 gboolean
3382 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3383     guint ssrc, GstCaps * crypto)
3384 {
3385   GstRTSPStreamPrivate *priv;
3386
3387   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3388   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3389
3390   priv = stream->priv;
3391
3392   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3393
3394   g_mutex_lock (&priv->lock);
3395   if (crypto)
3396     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3397         gst_caps_ref (crypto));
3398   else
3399     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3400   g_mutex_unlock (&priv->lock);
3401
3402   return TRUE;
3403 }
3404
3405 /**
3406  * gst_rtsp_stream_get_rtp_socket:
3407  * @stream: a #GstRTSPStream
3408  * @family: the socket family
3409  *
3410  * Get the RTP socket from @stream for a @family.
3411  *
3412  * @stream must be joined to a bin.
3413  *
3414  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3415  * socket could be allocated for @family. Unref after usage
3416  */
3417 GSocket *
3418 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3419 {
3420   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3421   GSocket *socket;
3422   const gchar *name;
3423
3424   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3425   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3426       family == G_SOCKET_FAMILY_IPV6, NULL);
3427   g_return_val_if_fail (priv->udpsink[0], NULL);
3428
3429   if (family == G_SOCKET_FAMILY_IPV6)
3430     name = "socket-v6";
3431   else
3432     name = "socket";
3433
3434   g_object_get (priv->udpsink[0], name, &socket, NULL);
3435
3436   return socket;
3437 }
3438
3439 /**
3440  * gst_rtsp_stream_get_rtcp_socket:
3441  * @stream: a #GstRTSPStream
3442  * @family: the socket family
3443  *
3444  * Get the RTCP socket from @stream for a @family.
3445  *
3446  * @stream must be joined to a bin.
3447  *
3448  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3449  * socket could be allocated for @family. Unref after usage
3450  */
3451 GSocket *
3452 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3453 {
3454   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3455   GSocket *socket;
3456   const gchar *name;
3457
3458   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3459   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3460       family == G_SOCKET_FAMILY_IPV6, NULL);
3461   g_return_val_if_fail (priv->udpsink[1], NULL);
3462
3463   if (family == G_SOCKET_FAMILY_IPV6)
3464     name = "socket-v6";
3465   else
3466     name = "socket";
3467
3468   g_object_get (priv->udpsink[1], name, &socket, NULL);
3469
3470   return socket;
3471 }
3472
3473 /**
3474  * gst_rtsp_stream_set_seqnum:
3475  * @stream: a #GstRTSPStream
3476  * @seqnum: a new sequence number
3477  *
3478  * Configure the sequence number in the payloader of @stream to @seqnum.
3479  */
3480 void
3481 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3482 {
3483   GstRTSPStreamPrivate *priv;
3484
3485   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3486
3487   priv = stream->priv;
3488
3489   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3490 }
3491
3492 /**
3493  * gst_rtsp_stream_get_seqnum:
3494  * @stream: a #GstRTSPStream
3495  *
3496  * Get the configured sequence number in the payloader of @stream.
3497  *
3498  * Returns: the sequence number of the payloader.
3499  */
3500 guint16
3501 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3502 {
3503   GstRTSPStreamPrivate *priv;
3504   guint seqnum;
3505
3506   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3507
3508   priv = stream->priv;
3509
3510   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3511
3512   return seqnum;
3513 }
3514
3515 /**
3516  * gst_rtsp_stream_transport_filter:
3517  * @stream: a #GstRTSPStream
3518  * @func: (scope call) (allow-none): a callback
3519  * @user_data: (closure): user data passed to @func
3520  *
3521  * Call @func for each transport managed by @stream. The result value of @func
3522  * determines what happens to the transport. @func will be called with @stream
3523  * locked so no further actions on @stream can be performed from @func.
3524  *
3525  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3526  * @stream.
3527  *
3528  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3529  *
3530  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3531  * will also be added with an additional ref to the result #GList of this
3532  * function..
3533  *
3534  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3535  *
3536  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3537  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3538  * element in the #GList should be unreffed before the list is freed.
3539  */
3540 GList *
3541 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3542     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3543 {
3544   GstRTSPStreamPrivate *priv;
3545   GList *result, *walk, *next;
3546   GHashTable *visited = NULL;
3547   guint cookie;
3548
3549   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3550
3551   priv = stream->priv;
3552
3553   result = NULL;
3554   if (func)
3555     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3556
3557   g_mutex_lock (&priv->lock);
3558 restart:
3559   cookie = priv->transports_cookie;
3560   for (walk = priv->transports; walk; walk = next) {
3561     GstRTSPStreamTransport *trans = walk->data;
3562     GstRTSPFilterResult res;
3563     gboolean changed;
3564
3565     next = g_list_next (walk);
3566
3567     if (func) {
3568       /* only visit each transport once */
3569       if (g_hash_table_contains (visited, trans))
3570         continue;
3571
3572       g_hash_table_add (visited, g_object_ref (trans));
3573       g_mutex_unlock (&priv->lock);
3574
3575       res = func (stream, trans, user_data);
3576
3577       g_mutex_lock (&priv->lock);
3578     } else
3579       res = GST_RTSP_FILTER_REF;
3580
3581     changed = (cookie != priv->transports_cookie);
3582
3583     switch (res) {
3584       case GST_RTSP_FILTER_REMOVE:
3585         update_transport (stream, trans, FALSE);
3586         break;
3587       case GST_RTSP_FILTER_REF:
3588         result = g_list_prepend (result, g_object_ref (trans));
3589         break;
3590       case GST_RTSP_FILTER_KEEP:
3591       default:
3592         break;
3593     }
3594     if (changed)
3595       goto restart;
3596   }
3597   g_mutex_unlock (&priv->lock);
3598
3599   if (func)
3600     g_hash_table_unref (visited);
3601
3602   return result;
3603 }
3604
3605 static GstPadProbeReturn
3606 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3607 {
3608   GstRTSPStreamPrivate *priv;
3609   GstRTSPStream *stream;
3610
3611   stream = user_data;
3612   priv = stream->priv;
3613
3614   GST_DEBUG_OBJECT (pad, "now blocking");
3615
3616   g_mutex_lock (&priv->lock);
3617   priv->blocking = TRUE;
3618   g_mutex_unlock (&priv->lock);
3619
3620   gst_element_post_message (priv->payloader,
3621       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3622           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3623
3624   return GST_PAD_PROBE_OK;
3625 }
3626
3627 /**
3628  * gst_rtsp_stream_set_blocked:
3629  * @stream: a #GstRTSPStream
3630  * @blocked: boolean indicating we should block or unblock
3631  *
3632  * Blocks or unblocks the dataflow on @stream.
3633  *
3634  * Returns: %TRUE on success
3635  */
3636 gboolean
3637 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3638 {
3639   GstRTSPStreamPrivate *priv;
3640
3641   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3642
3643   priv = stream->priv;
3644
3645   g_mutex_lock (&priv->lock);
3646   if (blocked) {
3647     priv->blocking = FALSE;
3648     if (priv->blocked_id == 0) {
3649       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3650           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3651           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3652           g_object_ref (stream), g_object_unref);
3653     }
3654   } else {
3655     if (priv->blocked_id != 0) {
3656       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3657       priv->blocked_id = 0;
3658       priv->blocking = FALSE;
3659     }
3660   }
3661   g_mutex_unlock (&priv->lock);
3662
3663   return TRUE;
3664 }
3665
3666 /**
3667  * gst_rtsp_stream_is_blocking:
3668  * @stream: a #GstRTSPStream
3669  *
3670  * Check if @stream is blocking on a #GstBuffer.
3671  *
3672  * Returns: %TRUE if @stream is blocking
3673  */
3674 gboolean
3675 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3676 {
3677   GstRTSPStreamPrivate *priv;
3678   gboolean result;
3679
3680   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3681
3682   priv = stream->priv;
3683
3684   g_mutex_lock (&priv->lock);
3685   result = priv->blocking;
3686   g_mutex_unlock (&priv->lock);
3687
3688   return result;
3689 }
3690
3691 /**
3692  * gst_rtsp_stream_query_position:
3693  * @stream: a #GstRTSPStream
3694  *
3695  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3696  * the RTP parts of the pipeline and not the RTCP parts.
3697  *
3698  * Returns: %TRUE if the position could be queried
3699  */
3700 gboolean
3701 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3702 {
3703   GstRTSPStreamPrivate *priv;
3704   GstElement *sink;
3705   gboolean ret;
3706
3707   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3708
3709   priv = stream->priv;
3710
3711   g_mutex_lock (&priv->lock);
3712   /* depending on the transport type, it should query corresponding sink */
3713   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3714       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3715     sink = priv->udpsink[0];
3716   else
3717     sink = priv->appsink[0];
3718
3719   if (sink)
3720     gst_object_ref (sink);
3721   g_mutex_unlock (&priv->lock);
3722
3723   if (!sink)
3724     return FALSE;
3725
3726   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3727   gst_object_unref (sink);
3728
3729   return ret;
3730 }
3731
3732 /**
3733  * gst_rtsp_stream_query_stop:
3734  * @stream: a #GstRTSPStream
3735  *
3736  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3737  * the RTP parts of the pipeline and not the RTCP parts.
3738  *
3739  * Returns: %TRUE if the stop could be queried
3740  */
3741 gboolean
3742 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3743 {
3744   GstRTSPStreamPrivate *priv;
3745   GstElement *sink;
3746   GstQuery *query;
3747   gboolean ret;
3748
3749   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3750
3751   priv = stream->priv;
3752
3753   g_mutex_lock (&priv->lock);
3754   /* depending on the transport type, it should query corresponding sink */
3755   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3756       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3757     sink = priv->udpsink[0];
3758   else
3759     sink = priv->appsink[0];
3760
3761   if (sink)
3762     gst_object_ref (sink);
3763   g_mutex_unlock (&priv->lock);
3764
3765   if (!sink)
3766     return FALSE;
3767
3768   query = gst_query_new_segment (GST_FORMAT_TIME);
3769   if ((ret = gst_element_query (sink, query))) {
3770     GstFormat format;
3771
3772     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3773     if (format != GST_FORMAT_TIME)
3774       *stop = -1;
3775   }
3776   gst_query_unref (query);
3777   gst_object_unref (sink);
3778
3779   return ret;
3780
3781 }