rtsp-stream: Don't set the state of the appsrc from PLAYING to PAUSED again during...
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   gboolean is_joined;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
98    * sockets */
99   GstElement *udpsrc_v4[2];
100   /* UDP sources for UDP multicast transports */
101   GstElement *udpsrc_mcast_v4[2];
102
103   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
104    * sockets */
105   GstElement *udpsrc_v6[2];
106   /* UDP sources for UDP multicast transports */
107   GstElement *udpsrc_mcast_v6[2];
108
109   GstElement *udpqueue[2];
110   GstElement *udpsink[2];
111
112   /* for TCP transport */
113   GstElement *appsrc[2];
114   GstClockTime appsrc_base_time[2];
115   GstElement *appqueue[2];
116   GstElement *appsink[2];
117
118   GstElement *tee[2];
119   GstElement *funnel[2];
120
121   /* retransmission */
122   GstElement *rtxsend;
123   guint rtx_pt;
124   GstClockTime rtx_time;
125
126   /* server ports for sending/receiving over ipv4 */
127   GstRTSPRange server_port_v4;
128   GstRTSPAddress *server_addr_v4;
129   gboolean have_ipv4;
130
131   /* server ports for sending/receiving over ipv6 */
132   GstRTSPRange server_port_v6;
133   GstRTSPAddress *server_addr_v6;
134   gboolean have_ipv6;
135
136   /* multicast addresses */
137   GstRTSPAddressPool *pool;
138   GstRTSPAddress *addr_v4;
139   GstRTSPAddress *addr_v6;
140   gboolean have_ipv4_mcast;
141   gboolean have_ipv6_mcast;
142
143   /* the caps of the stream */
144   gulong caps_sig;
145   GstCaps *caps;
146
147   /* transports we stream to */
148   guint n_active;
149   GList *transports;
150   guint transports_cookie;
151   GList *tr_cache_rtp;
152   GList *tr_cache_rtcp;
153   guint tr_cache_cookie_rtp;
154   guint tr_cache_cookie_rtcp;
155
156
157   gint dscp_qos;
158
159   /* stream blocking */
160   gulong blocked_id;
161   gboolean blocking;
162
163   /* pt->caps map for RECORD streams */
164   GHashTable *ptmap;
165 };
166
167 #define DEFAULT_CONTROL         NULL
168 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
169 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
170                                         GST_RTSP_LOWER_TRANS_TCP
171
172 enum
173 {
174   PROP_0,
175   PROP_CONTROL,
176   PROP_PROFILES,
177   PROP_PROTOCOLS,
178   PROP_LAST
179 };
180
181 enum
182 {
183   SIGNAL_NEW_RTP_ENCODER,
184   SIGNAL_NEW_RTCP_ENCODER,
185   SIGNAL_LAST
186 };
187
188 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
189 #define GST_CAT_DEFAULT rtsp_stream_debug
190
191 static GQuark ssrc_stream_map_key;
192
193 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
194     GValue * value, GParamSpec * pspec);
195 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
196     const GValue * value, GParamSpec * pspec);
197
198 static void gst_rtsp_stream_finalize (GObject * obj);
199
200 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
201
202 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
203
204 static void
205 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
206 {
207   GObjectClass *gobject_class;
208
209   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
210
211   gobject_class = G_OBJECT_CLASS (klass);
212
213   gobject_class->get_property = gst_rtsp_stream_get_property;
214   gobject_class->set_property = gst_rtsp_stream_set_property;
215   gobject_class->finalize = gst_rtsp_stream_finalize;
216
217   g_object_class_install_property (gobject_class, PROP_CONTROL,
218       g_param_spec_string ("control", "Control",
219           "The control string for this stream", DEFAULT_CONTROL,
220           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
221
222   g_object_class_install_property (gobject_class, PROP_PROFILES,
223       g_param_spec_flags ("profiles", "Profiles",
224           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
225           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
228       g_param_spec_flags ("protocols", "Protocols",
229           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
230           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
233       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
234       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
235       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
236
237   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
238       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
240       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
241
242   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
243
244   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
245 }
246
247 static void
248 gst_rtsp_stream_init (GstRTSPStream * stream)
249 {
250   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
251
252   GST_DEBUG ("new stream %p", stream);
253
254   stream->priv = priv;
255
256   priv->dscp_qos = -1;
257   priv->control = g_strdup (DEFAULT_CONTROL);
258   priv->profiles = DEFAULT_PROFILES;
259   priv->protocols = DEFAULT_PROTOCOLS;
260
261   g_mutex_init (&priv->lock);
262
263   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
264       NULL, (GDestroyNotify) gst_caps_unref);
265   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
266       (GDestroyNotify) gst_caps_unref);
267 }
268
269 static void
270 gst_rtsp_stream_finalize (GObject * obj)
271 {
272   GstRTSPStream *stream;
273   GstRTSPStreamPrivate *priv;
274
275   stream = GST_RTSP_STREAM (obj);
276   priv = stream->priv;
277
278   GST_DEBUG ("finalize stream %p", stream);
279
280   /* we really need to be unjoined now */
281   g_return_if_fail (!priv->is_joined);
282
283   if (priv->addr_v4)
284     gst_rtsp_address_free (priv->addr_v4);
285   if (priv->addr_v6)
286     gst_rtsp_address_free (priv->addr_v6);
287   if (priv->server_addr_v4)
288     gst_rtsp_address_free (priv->server_addr_v4);
289   if (priv->server_addr_v6)
290     gst_rtsp_address_free (priv->server_addr_v6);
291   if (priv->pool)
292     g_object_unref (priv->pool);
293   if (priv->rtxsend)
294     g_object_unref (priv->rtxsend);
295
296   gst_object_unref (priv->payloader);
297   if (priv->srcpad)
298     gst_object_unref (priv->srcpad);
299   if (priv->sinkpad)
300     gst_object_unref (priv->sinkpad);
301   g_free (priv->control);
302   g_mutex_clear (&priv->lock);
303
304   g_hash_table_unref (priv->keys);
305   g_hash_table_destroy (priv->ptmap);
306
307   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
308 }
309
310 static void
311 gst_rtsp_stream_get_property (GObject * object, guint propid,
312     GValue * value, GParamSpec * pspec)
313 {
314   GstRTSPStream *stream = GST_RTSP_STREAM (object);
315
316   switch (propid) {
317     case PROP_CONTROL:
318       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
319       break;
320     case PROP_PROFILES:
321       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
322       break;
323     case PROP_PROTOCOLS:
324       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
325       break;
326     default:
327       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
328   }
329 }
330
331 static void
332 gst_rtsp_stream_set_property (GObject * object, guint propid,
333     const GValue * value, GParamSpec * pspec)
334 {
335   GstRTSPStream *stream = GST_RTSP_STREAM (object);
336
337   switch (propid) {
338     case PROP_CONTROL:
339       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
340       break;
341     case PROP_PROFILES:
342       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
343       break;
344     case PROP_PROTOCOLS:
345       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
349   }
350 }
351
352 /**
353  * gst_rtsp_stream_new:
354  * @idx: an index
355  * @pad: a #GstPad
356  * @payloader: a #GstElement
357  *
358  * Create a new media stream with index @idx that handles RTP data on
359  * @pad and has a payloader element @payloader if @pad is a source pad
360  * or a depayloader element @payloader if @pad is a sink pad.
361  *
362  * Returns: (transfer full): a new #GstRTSPStream
363  */
364 GstRTSPStream *
365 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
366 {
367   GstRTSPStreamPrivate *priv;
368   GstRTSPStream *stream;
369
370   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
371   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
372
373   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
374   priv = stream->priv;
375   priv->idx = idx;
376   priv->payloader = gst_object_ref (payloader);
377   if (GST_PAD_IS_SRC (pad))
378     priv->srcpad = gst_object_ref (pad);
379   else
380     priv->sinkpad = gst_object_ref (pad);
381
382   return stream;
383 }
384
385 /**
386  * gst_rtsp_stream_get_index:
387  * @stream: a #GstRTSPStream
388  *
389  * Get the stream index.
390  *
391  * Return: the stream index.
392  */
393 guint
394 gst_rtsp_stream_get_index (GstRTSPStream * stream)
395 {
396   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
397
398   return stream->priv->idx;
399 }
400
401 /**
402  * gst_rtsp_stream_get_pt:
403  * @stream: a #GstRTSPStream
404  *
405  * Get the stream payload type.
406  *
407  * Return: the stream payload type.
408  */
409 guint
410 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
411 {
412   GstRTSPStreamPrivate *priv;
413   guint pt;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
416
417   priv = stream->priv;
418
419   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
420
421   return pt;
422 }
423
424 /**
425  * gst_rtsp_stream_get_srcpad:
426  * @stream: a #GstRTSPStream
427  *
428  * Get the srcpad associated with @stream.
429  *
430  * Returns: (transfer full): the srcpad. Unref after usage.
431  */
432 GstPad *
433 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
434 {
435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
436
437   if (!stream->priv->srcpad)
438     return NULL;
439
440   return gst_object_ref (stream->priv->srcpad);
441 }
442
443 /**
444  * gst_rtsp_stream_get_sinkpad:
445  * @stream: a #GstRTSPStream
446  *
447  * Get the sinkpad associated with @stream.
448  *
449  * Returns: (transfer full): the sinkpad. Unref after usage.
450  */
451 GstPad *
452 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
453 {
454   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
455
456   if (!stream->priv->sinkpad)
457     return NULL;
458
459   return gst_object_ref (stream->priv->sinkpad);
460 }
461
462 /**
463  * gst_rtsp_stream_get_control:
464  * @stream: a #GstRTSPStream
465  *
466  * Get the control string to identify this stream.
467  *
468  * Returns: (transfer full): the control string. g_free() after usage.
469  */
470 gchar *
471 gst_rtsp_stream_get_control (GstRTSPStream * stream)
472 {
473   GstRTSPStreamPrivate *priv;
474   gchar *result;
475
476   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
477
478   priv = stream->priv;
479
480   g_mutex_lock (&priv->lock);
481   if ((result = g_strdup (priv->control)) == NULL)
482     result = g_strdup_printf ("stream=%u", priv->idx);
483   g_mutex_unlock (&priv->lock);
484
485   return result;
486 }
487
488 /**
489  * gst_rtsp_stream_set_control:
490  * @stream: a #GstRTSPStream
491  * @control: a control string
492  *
493  * Set the control string in @stream.
494  */
495 void
496 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
497 {
498   GstRTSPStreamPrivate *priv;
499
500   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
501
502   priv = stream->priv;
503
504   g_mutex_lock (&priv->lock);
505   g_free (priv->control);
506   priv->control = g_strdup (control);
507   g_mutex_unlock (&priv->lock);
508 }
509
510 /**
511  * gst_rtsp_stream_has_control:
512  * @stream: a #GstRTSPStream
513  * @control: a control string
514  *
515  * Check if @stream has the control string @control.
516  *
517  * Returns: %TRUE is @stream has @control as the control string
518  */
519 gboolean
520 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
521 {
522   GstRTSPStreamPrivate *priv;
523   gboolean res;
524
525   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
526
527   priv = stream->priv;
528
529   g_mutex_lock (&priv->lock);
530   if (priv->control)
531     res = (g_strcmp0 (priv->control, control) == 0);
532   else {
533     guint streamid;
534
535     if (sscanf (control, "stream=%u", &streamid) > 0)
536       res = (streamid == priv->idx);
537     else
538       res = FALSE;
539   }
540   g_mutex_unlock (&priv->lock);
541
542   return res;
543 }
544
545 /**
546  * gst_rtsp_stream_set_mtu:
547  * @stream: a #GstRTSPStream
548  * @mtu: a new MTU
549  *
550  * Configure the mtu in the payloader of @stream to @mtu.
551  */
552 void
553 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
554 {
555   GstRTSPStreamPrivate *priv;
556
557   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
558
559   priv = stream->priv;
560
561   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
562
563   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
564 }
565
566 /**
567  * gst_rtsp_stream_get_mtu:
568  * @stream: a #GstRTSPStream
569  *
570  * Get the configured MTU in the payloader of @stream.
571  *
572  * Returns: the MTU of the payloader.
573  */
574 guint
575 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
576 {
577   GstRTSPStreamPrivate *priv;
578   guint mtu;
579
580   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
581
582   priv = stream->priv;
583
584   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
585
586   return mtu;
587 }
588
589 /* Update the dscp qos property on the udp sinks */
590 static void
591 update_dscp_qos (GstRTSPStream * stream)
592 {
593   GstRTSPStreamPrivate *priv;
594
595   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
596
597   priv = stream->priv;
598
599   if (priv->udpsink[0]) {
600     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
601         NULL);
602   }
603
604   if (priv->udpsink[1]) {
605     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
606         NULL);
607   }
608 }
609
610 /**
611  * gst_rtsp_stream_set_dscp_qos:
612  * @stream: a #GstRTSPStream
613  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
614  *
615  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
616  */
617 void
618 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
619 {
620   GstRTSPStreamPrivate *priv;
621
622   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
623
624   priv = stream->priv;
625
626   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
627
628   if (dscp_qos < -1 || dscp_qos > 63) {
629     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
630     return;
631   }
632
633   priv->dscp_qos = dscp_qos;
634
635   update_dscp_qos (stream);
636 }
637
638 /**
639  * gst_rtsp_stream_get_dscp_qos:
640  * @stream: a #GstRTSPStream
641  *
642  * Get the configured DSCP QoS in of the outgoing sockets.
643  *
644  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
645  */
646 gint
647 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
648 {
649   GstRTSPStreamPrivate *priv;
650
651   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
652
653   priv = stream->priv;
654
655   return priv->dscp_qos;
656 }
657
658 /**
659  * gst_rtsp_stream_is_transport_supported:
660  * @stream: a #GstRTSPStream
661  * @transport: (transfer none): a #GstRTSPTransport
662  *
663  * Check if @transport can be handled by stream
664  *
665  * Returns: %TRUE if @transport can be handled by @stream.
666  */
667 gboolean
668 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
669     GstRTSPTransport * transport)
670 {
671   GstRTSPStreamPrivate *priv;
672
673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
674
675   priv = stream->priv;
676
677   g_mutex_lock (&priv->lock);
678   if (transport->trans != GST_RTSP_TRANS_RTP)
679     goto unsupported_transmode;
680
681   if (!(transport->profile & priv->profiles))
682     goto unsupported_profile;
683
684   if (!(transport->lower_transport & priv->protocols))
685     goto unsupported_ltrans;
686
687   g_mutex_unlock (&priv->lock);
688
689   return TRUE;
690
691   /* ERRORS */
692 unsupported_transmode:
693   {
694     GST_DEBUG ("unsupported transport mode %d", transport->trans);
695     g_mutex_unlock (&priv->lock);
696     return FALSE;
697   }
698 unsupported_profile:
699   {
700     GST_DEBUG ("unsupported profile %d", transport->profile);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_ltrans:
705   {
706     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 }
711
712 /**
713  * gst_rtsp_stream_set_profiles:
714  * @stream: a #GstRTSPStream
715  * @profiles: the new profiles
716  *
717  * Configure the allowed profiles for @stream.
718  */
719 void
720 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
721 {
722   GstRTSPStreamPrivate *priv;
723
724   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
725
726   priv = stream->priv;
727
728   g_mutex_lock (&priv->lock);
729   priv->profiles = profiles;
730   g_mutex_unlock (&priv->lock);
731 }
732
733 /**
734  * gst_rtsp_stream_get_profiles:
735  * @stream: a #GstRTSPStream
736  *
737  * Get the allowed profiles of @stream.
738  *
739  * Returns: a #GstRTSPProfile
740  */
741 GstRTSPProfile
742 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
743 {
744   GstRTSPStreamPrivate *priv;
745   GstRTSPProfile res;
746
747   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
748
749   priv = stream->priv;
750
751   g_mutex_lock (&priv->lock);
752   res = priv->profiles;
753   g_mutex_unlock (&priv->lock);
754
755   return res;
756 }
757
758 /**
759  * gst_rtsp_stream_set_protocols:
760  * @stream: a #GstRTSPStream
761  * @protocols: the new flags
762  *
763  * Configure the allowed lower transport for @stream.
764  */
765 void
766 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
767     GstRTSPLowerTrans protocols)
768 {
769   GstRTSPStreamPrivate *priv;
770
771   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
772
773   priv = stream->priv;
774
775   g_mutex_lock (&priv->lock);
776   priv->protocols = protocols;
777   g_mutex_unlock (&priv->lock);
778 }
779
780 /**
781  * gst_rtsp_stream_get_protocols:
782  * @stream: a #GstRTSPStream
783  *
784  * Get the allowed protocols of @stream.
785  *
786  * Returns: a #GstRTSPLowerTrans
787  */
788 GstRTSPLowerTrans
789 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
790 {
791   GstRTSPStreamPrivate *priv;
792   GstRTSPLowerTrans res;
793
794   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
795       GST_RTSP_LOWER_TRANS_UNKNOWN);
796
797   priv = stream->priv;
798
799   g_mutex_lock (&priv->lock);
800   res = priv->protocols;
801   g_mutex_unlock (&priv->lock);
802
803   return res;
804 }
805
806 /**
807  * gst_rtsp_stream_set_address_pool:
808  * @stream: a #GstRTSPStream
809  * @pool: (transfer none): a #GstRTSPAddressPool
810  *
811  * configure @pool to be used as the address pool of @stream.
812  */
813 void
814 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
815     GstRTSPAddressPool * pool)
816 {
817   GstRTSPStreamPrivate *priv;
818   GstRTSPAddressPool *old;
819
820   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
821
822   priv = stream->priv;
823
824   GST_LOG_OBJECT (stream, "set address pool %p", pool);
825
826   g_mutex_lock (&priv->lock);
827   if ((old = priv->pool) != pool)
828     priv->pool = pool ? g_object_ref (pool) : NULL;
829   else
830     old = NULL;
831   g_mutex_unlock (&priv->lock);
832
833   if (old)
834     g_object_unref (old);
835 }
836
837 /**
838  * gst_rtsp_stream_get_address_pool:
839  * @stream: a #GstRTSPStream
840  *
841  * Get the #GstRTSPAddressPool used as the address pool of @stream.
842  *
843  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
844  * usage.
845  */
846 GstRTSPAddressPool *
847 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
848 {
849   GstRTSPStreamPrivate *priv;
850   GstRTSPAddressPool *result;
851
852   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
853
854   priv = stream->priv;
855
856   g_mutex_lock (&priv->lock);
857   if ((result = priv->pool))
858     g_object_ref (result);
859   g_mutex_unlock (&priv->lock);
860
861   return result;
862 }
863
864 /**
865  * gst_rtsp_stream_get_multicast_address:
866  * @stream: a #GstRTSPStream
867  * @family: the #GSocketFamily
868  *
869  * Get the multicast address of @stream for @family.
870  *
871  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
872  * or %NULL when no address could be allocated. gst_rtsp_address_free()
873  * after usage.
874  */
875 GstRTSPAddress *
876 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
877     GSocketFamily family)
878 {
879   GstRTSPStreamPrivate *priv;
880   GstRTSPAddress *result;
881   GstRTSPAddress **addrp;
882   GstRTSPAddressFlags flags;
883
884   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
885
886   priv = stream->priv;
887
888   if (family == G_SOCKET_FAMILY_IPV6) {
889     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
890     addrp = &priv->addr_v6;
891   } else {
892     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
893     addrp = &priv->addr_v4;
894   }
895
896   g_mutex_lock (&priv->lock);
897   if (*addrp == NULL) {
898     if (priv->pool == NULL)
899       goto no_pool;
900
901     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
902
903     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
904     if (*addrp == NULL)
905       goto no_address;
906   }
907   result = gst_rtsp_address_copy (*addrp);
908   g_mutex_unlock (&priv->lock);
909
910   return result;
911
912   /* ERRORS */
913 no_pool:
914   {
915     GST_ERROR_OBJECT (stream, "no address pool specified");
916     g_mutex_unlock (&priv->lock);
917     return NULL;
918   }
919 no_address:
920   {
921     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 }
926
927 /**
928  * gst_rtsp_stream_reserve_address:
929  * @stream: a #GstRTSPStream
930  * @address: an address
931  * @port: a port
932  * @n_ports: n_ports
933  * @ttl: a TTL
934  *
935  * Reserve @address and @port as the address and port of @stream.
936  *
937  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
938  * the address could be reserved. gst_rtsp_address_free() after usage.
939  */
940 GstRTSPAddress *
941 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
942     const gchar * address, guint port, guint n_ports, guint ttl)
943 {
944   GstRTSPStreamPrivate *priv;
945   GstRTSPAddress *result;
946   GInetAddress *addr;
947   GSocketFamily family;
948   GstRTSPAddress **addrp;
949
950   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
951   g_return_val_if_fail (address != NULL, NULL);
952   g_return_val_if_fail (port > 0, NULL);
953   g_return_val_if_fail (n_ports > 0, NULL);
954   g_return_val_if_fail (ttl > 0, NULL);
955
956   priv = stream->priv;
957
958   addr = g_inet_address_new_from_string (address);
959   if (!addr) {
960     GST_ERROR ("failed to get inet addr from %s", address);
961     family = G_SOCKET_FAMILY_IPV4;
962   } else {
963     family = g_inet_address_get_family (addr);
964     g_object_unref (addr);
965   }
966
967   if (family == G_SOCKET_FAMILY_IPV6)
968     addrp = &priv->addr_v6;
969   else
970     addrp = &priv->addr_v4;
971
972   g_mutex_lock (&priv->lock);
973   if (*addrp == NULL) {
974     GstRTSPAddressPoolResult res;
975
976     if (priv->pool == NULL)
977       goto no_pool;
978
979     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
980         port, n_ports, ttl, addrp);
981     if (res != GST_RTSP_ADDRESS_POOL_OK)
982       goto no_address;
983   } else {
984     if (strcmp ((*addrp)->address, address) ||
985         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
986         (*addrp)->ttl != ttl)
987       goto different_address;
988   }
989   result = gst_rtsp_address_copy (*addrp);
990   g_mutex_unlock (&priv->lock);
991
992   return result;
993
994   /* ERRORS */
995 no_pool:
996   {
997     GST_ERROR_OBJECT (stream, "no address pool specified");
998     g_mutex_unlock (&priv->lock);
999     return NULL;
1000   }
1001 no_address:
1002   {
1003     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1004         address);
1005     g_mutex_unlock (&priv->lock);
1006     return NULL;
1007   }
1008 different_address:
1009   {
1010     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1011         " reserved", address);
1012     g_mutex_unlock (&priv->lock);
1013     return NULL;
1014   }
1015 }
1016
1017 /* must be called with lock */
1018 static void
1019 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1020     GSocket * rtcp_socket, GSocketFamily family)
1021 {
1022   GstRTSPStreamPrivate *priv = stream->priv;
1023   const gchar *multisink_socket;
1024
1025   if (family == G_SOCKET_FAMILY_IPV6)
1026     multisink_socket = "socket-v6";
1027   else
1028     multisink_socket = "socket";
1029
1030   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1031       NULL);
1032   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1033       NULL);
1034 }
1035
1036 /* must be called with lock */
1037 static gboolean
1038 create_and_configure_udpsinks (GstRTSPStream * stream)
1039 {
1040   GstRTSPStreamPrivate *priv = stream->priv;
1041   GstElement *udpsink0, *udpsink1;
1042
1043   udpsink0 = NULL;
1044   udpsink1 = NULL;
1045
1046   if (priv->udpsink[0])
1047     udpsink0 = priv->udpsink[0];
1048   else
1049     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1050
1051   if (!udpsink0)
1052     goto no_udp_protocol;
1053
1054   if (priv->udpsink[1])
1055     udpsink1 = priv->udpsink[1];
1056   else
1057     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1058
1059   if (!udpsink1)
1060     goto no_udp_protocol;
1061
1062   /* configure sinks */
1063
1064   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1065   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1066
1067   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1068   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1069
1070   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1071
1072   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1073   /* Needs to be async for RECORD streams, otherwise we will never go to
1074    * PLAYING because the sinks will wait for data while the udpsrc can't
1075    * provide data with timestamps in PAUSED. */
1076   if (priv->sinkpad)
1077     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1078   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1079
1080   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1081   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1082
1083   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1085
1086   /* update the dscp qos field in the sinks */
1087   update_dscp_qos (stream);
1088
1089   priv->udpsink[0] = udpsink0;
1090   priv->udpsink[1] = udpsink1;
1091
1092   return TRUE;
1093
1094   /* ERRORS */
1095 no_udp_protocol:
1096   {
1097     return FALSE;
1098   }
1099 }
1100
1101 /* must be called with lock */
1102 static void
1103 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1104     GSocketFamily family)
1105 {
1106   GstRTSPStreamPrivate *priv;
1107   GstPad *pad, *selpad;
1108   guint i;
1109   GstBin *bin;
1110
1111   priv = stream->priv;
1112   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1113
1114   for (i = 0; i < 2; i++) {
1115     if (priv->sinkpad || i == 1) {
1116       if (priv->srcpad) {
1117         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1118          * values. This is only relevant for PLAY pipelines */
1119         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1120         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1121       }
1122       /* add udpsrc */
1123       gst_bin_add (bin, udpsrc_out[i]);
1124
1125       /* and link to the funnel */
1126       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1127       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1128       gst_pad_link (pad, selpad);
1129       gst_object_unref (pad);
1130       gst_object_unref (selpad);
1131
1132       /* otherwise sync state with parent in case it's running already
1133        * at this point */
1134       if (!priv->srcpad) {
1135         gst_element_sync_state_with_parent (udpsrc_out[i]);
1136       }
1137     }
1138   }
1139
1140   gst_object_unref (bin);
1141 }
1142
1143 /* must be called with lock */
1144 static gboolean
1145 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1146     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1147     const gchar * address, gint rtpport, gint rtcpport,
1148     GstRTSPLowerTrans transport)
1149 {
1150   GstStateChangeReturn ret;
1151
1152   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1153   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1154
1155   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1156     goto error;
1157
1158   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1159     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1160     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1161     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1162     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1163     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1164     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1165   }
1166
1167   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1168   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1169
1170   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1171   if (ret == GST_STATE_CHANGE_FAILURE)
1172     goto error;
1173   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1174   if (ret == GST_STATE_CHANGE_FAILURE)
1175     goto error;
1176
1177   return TRUE;
1178   return TRUE;
1179
1180   /* ERRORS */
1181 error:
1182   {
1183     if (udpsrc_out[0])
1184       gst_object_unref (udpsrc_out[0]);
1185     if (udpsrc_out[1])
1186       gst_object_unref (udpsrc_out[1]);
1187     return FALSE;
1188   }
1189 }
1190
1191 static gboolean
1192 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1193     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1194     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1195     gboolean use_client_settings)
1196 {
1197   GstRTSPStreamPrivate *priv = stream->priv;
1198   GSocket *rtp_socket = NULL;
1199   GSocket *rtcp_socket;
1200   gint tmp_rtp, tmp_rtcp;
1201   guint count;
1202   gint rtpport, rtcpport;
1203   GList *rejected_addresses = NULL;
1204   GstRTSPAddress *addr = NULL;
1205   GInetAddress *inetaddr = NULL;
1206   gchar *addr_str;
1207   GSocketAddress *rtp_sockaddr = NULL;
1208   GSocketAddress *rtcp_sockaddr = NULL;
1209   GstRTSPAddressPool *pool;
1210   GstRTSPLowerTrans transport;
1211
1212   pool = priv->pool;
1213   count = 0;
1214   transport = ct->lower_transport;
1215
1216   /* Start with random port */
1217   tmp_rtp = 0;
1218
1219   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1220       G_SOCKET_PROTOCOL_UDP, NULL);
1221   if (!rtcp_socket)
1222     goto no_udp_protocol;
1223   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1224
1225   if (*server_addr_out)
1226     gst_rtsp_address_free (*server_addr_out);
1227
1228   /* try to allocate 2 UDP ports, the RTP port should be an even
1229    * number and the RTCP port should be the next (uneven) port */
1230 again:
1231
1232   if (rtp_socket == NULL) {
1233     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1234         G_SOCKET_PROTOCOL_UDP, NULL);
1235     if (!rtp_socket)
1236       goto no_udp_protocol;
1237     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1238   }
1239
1240   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1241               gst_rtsp_address_pool_has_unicast_addresses (pool))
1242           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1243     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1244
1245     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1246       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1247     else
1248       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1249
1250     if (addr)
1251       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1252
1253     if (family == G_SOCKET_FAMILY_IPV6)
1254       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1255     else
1256       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1257
1258     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1259         && use_client_settings)
1260       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1261           ct->port.min, 2, ct->ttl, &addr);
1262     else
1263       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1264
1265     if (addr == NULL)
1266       goto no_ports;
1267
1268     tmp_rtp = addr->port;
1269
1270     g_clear_object (&inetaddr);
1271     inetaddr = g_inet_address_new_from_string (addr->address);
1272
1273     /* On Windows it's not possible to bind to a multicast address
1274      * but the OS will make sure to filter out all packets that
1275      * arrive not for the multicast address the socket joined.
1276      *
1277      * On Linux and others it is necessary to bind to a multicast
1278      * address to let the OS filter out all packets that are received
1279      * on the same port but for different addresses than the multicast
1280      * address
1281      */
1282 #ifdef G_OS_WIN32
1283     if (g_inet_address_get_is_multicast (inetaddr)) {
1284       g_object_unref (inetaddr);
1285       inetaddr = g_inet_address_new_any (family);
1286     }
1287 #endif
1288   } else {
1289     if (tmp_rtp != 0) {
1290       tmp_rtp += 2;
1291       if (++count > 20)
1292         goto no_ports;
1293     }
1294
1295     if (inetaddr == NULL)
1296       inetaddr = g_inet_address_new_any (family);
1297   }
1298
1299   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1300   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1301     g_object_unref (rtp_sockaddr);
1302     goto again;
1303   }
1304   g_object_unref (rtp_sockaddr);
1305
1306   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1307   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1308     g_clear_object (&rtp_sockaddr);
1309     goto socket_error;
1310   }
1311
1312   tmp_rtp =
1313       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1314   g_object_unref (rtp_sockaddr);
1315
1316   /* check if port is even */
1317   if ((tmp_rtp & 1) != 0) {
1318     /* port not even, close and allocate another */
1319     tmp_rtp++;
1320     g_clear_object (&rtp_socket);
1321     goto again;
1322   }
1323
1324   /* set port */
1325   tmp_rtcp = tmp_rtp + 1;
1326
1327   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1328   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1329     g_object_unref (rtcp_sockaddr);
1330     g_clear_object (&rtp_socket);
1331     goto again;
1332   }
1333   g_object_unref (rtcp_sockaddr);
1334
1335   if (addr == NULL)
1336     addr_str = g_inet_address_to_string (inetaddr);
1337   else
1338     addr_str = addr->address;
1339   g_clear_object (&inetaddr);
1340
1341   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1342           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, transport)) {
1343     if (addr == NULL)
1344       g_free (addr_str);
1345     goto no_udp_protocol;
1346   }
1347
1348   if (addr == NULL)
1349     g_free (addr_str);
1350
1351   play_udpsources_one_family (stream, udpsrc_out, family);
1352
1353   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1354   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1355
1356   /* this should not happen... */
1357   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1358     goto port_error;
1359
1360   /* set RTP and RTCP sockets */
1361   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1362
1363   server_port_out->min = rtpport;
1364   server_port_out->max = rtcpport;
1365
1366   *server_addr_out = addr;
1367   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1368
1369   g_object_unref (rtp_socket);
1370   g_object_unref (rtcp_socket);
1371
1372   return TRUE;
1373
1374   /* ERRORS */
1375 no_udp_protocol:
1376   {
1377     goto cleanup;
1378   }
1379 no_ports:
1380   {
1381     goto cleanup;
1382   }
1383 port_error:
1384   {
1385     goto cleanup;
1386   }
1387 socket_error:
1388   {
1389     goto cleanup;
1390   }
1391 cleanup:
1392   {
1393     if (inetaddr)
1394       g_object_unref (inetaddr);
1395     g_list_free_full (rejected_addresses,
1396         (GDestroyNotify) gst_rtsp_address_free);
1397     if (addr)
1398       gst_rtsp_address_free (addr);
1399     if (rtp_socket)
1400       g_object_unref (rtp_socket);
1401     if (rtcp_socket)
1402       g_object_unref (rtcp_socket);
1403     return FALSE;
1404   }
1405 }
1406
1407 /**
1408  * gst_rtsp_stream_allocate_udp_sockets:
1409  * @stream: a #GstRTSPStream
1410  * @family: protocol family
1411  * @transport_method: transport method
1412  *
1413  * Allocates RTP and RTCP ports.
1414  *
1415  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1416  */
1417 gboolean
1418 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1419     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1420 {
1421   GstRTSPStreamPrivate *priv;
1422   gboolean result = FALSE;
1423   GstRTSPLowerTrans transport = ct->lower_transport;
1424
1425   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1426   priv = stream->priv;
1427   g_return_val_if_fail (priv->is_joined, FALSE);
1428
1429   g_mutex_lock (&priv->lock);
1430
1431   if (family == G_SOCKET_FAMILY_IPV4) {
1432     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1433       if (priv->have_ipv4_mcast)
1434         goto done;
1435       priv->have_ipv4_mcast =
1436           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1437           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1438           use_client_settings);
1439     } else {
1440       priv->have_ipv4 =
1441           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1442           &priv->server_port_v4, ct, &priv->server_addr_v4,
1443           use_client_settings);
1444     }
1445   } else {
1446     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1447       if (priv->have_ipv6_mcast)
1448         goto done;
1449       priv->have_ipv6_mcast =
1450           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1451           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1452           use_client_settings);
1453     } else {
1454       if (priv->have_ipv6)
1455         goto done;
1456       priv->have_ipv6 =
1457           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1458           &priv->server_port_v6, ct, &priv->server_addr_v6,
1459           use_client_settings);
1460     }
1461   }
1462
1463 done:
1464   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1465       priv->have_ipv6_mcast;
1466
1467   g_mutex_unlock (&priv->lock);
1468
1469   return result;
1470 }
1471
1472 /**
1473  * gst_rtsp_stream_set_client_side:
1474  * @stream: a #GstRTSPStream
1475  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1476  * an RTSP connection.
1477  *
1478  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1479  * streams to an RTSP server via RECORD. This has the practical effect
1480  * of changing which UDP port numbers are used when setting up the local
1481  * side of the stream sending to be either the 'server' or 'client' pair
1482  * of a configured UDP transport.
1483  */
1484 void
1485 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1486 {
1487   GstRTSPStreamPrivate *priv;
1488
1489   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1490   priv = stream->priv;
1491   g_mutex_lock (&priv->lock);
1492   priv->client_side = client_side;
1493   g_mutex_unlock (&priv->lock);
1494 }
1495
1496 /**
1497  * gst_rtsp_stream_is_client_side:
1498  * @stream: a #GstRTSPStream
1499  *
1500  * See gst_rtsp_stream_set_client_side()
1501  *
1502  * Returns: TRUE if this #GstRTSPStream is client-side.
1503  */
1504 gboolean
1505 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1506 {
1507   GstRTSPStreamPrivate *priv;
1508   gboolean ret;
1509
1510   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1511
1512   priv = stream->priv;
1513   g_mutex_lock (&priv->lock);
1514   ret = priv->client_side;
1515   g_mutex_unlock (&priv->lock);
1516
1517   return ret;
1518 }
1519
1520 /**
1521  * gst_rtsp_stream_get_server_port:
1522  * @stream: a #GstRTSPStream
1523  * @server_port: (out): result server port
1524  * @family: the port family to get
1525  *
1526  * Fill @server_port with the port pair used by the server. This function can
1527  * only be called when @stream has been joined.
1528  */
1529 void
1530 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1531     GstRTSPRange * server_port, GSocketFamily family)
1532 {
1533   GstRTSPStreamPrivate *priv;
1534
1535   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1536   priv = stream->priv;
1537   g_return_if_fail (priv->is_joined);
1538
1539   g_mutex_lock (&priv->lock);
1540   if (family == G_SOCKET_FAMILY_IPV4) {
1541     if (server_port)
1542       *server_port = priv->server_port_v4;
1543   } else {
1544     if (server_port)
1545       *server_port = priv->server_port_v6;
1546   }
1547   g_mutex_unlock (&priv->lock);
1548 }
1549
1550 /**
1551  * gst_rtsp_stream_get_rtpsession:
1552  * @stream: a #GstRTSPStream
1553  *
1554  * Get the RTP session of this stream.
1555  *
1556  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1557  */
1558 GObject *
1559 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1560 {
1561   GstRTSPStreamPrivate *priv;
1562   GObject *session;
1563
1564   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1565
1566   priv = stream->priv;
1567
1568   g_mutex_lock (&priv->lock);
1569   if ((session = priv->session))
1570     g_object_ref (session);
1571   g_mutex_unlock (&priv->lock);
1572
1573   return session;
1574 }
1575
1576 /**
1577  * gst_rtsp_stream_get_ssrc:
1578  * @stream: a #GstRTSPStream
1579  * @ssrc: (out): result ssrc
1580  *
1581  * Get the SSRC used by the RTP session of this stream. This function can only
1582  * be called when @stream has been joined.
1583  */
1584 void
1585 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1586 {
1587   GstRTSPStreamPrivate *priv;
1588
1589   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1590   priv = stream->priv;
1591   g_return_if_fail (priv->is_joined);
1592
1593   g_mutex_lock (&priv->lock);
1594   if (ssrc && priv->session)
1595     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1596   g_mutex_unlock (&priv->lock);
1597 }
1598
1599 /**
1600  * gst_rtsp_stream_set_retransmission_time:
1601  * @stream: a #GstRTSPStream
1602  * @time: a #GstClockTime
1603  *
1604  * Set the amount of time to store retransmission packets.
1605  */
1606 void
1607 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1608     GstClockTime time)
1609 {
1610   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1611
1612   g_mutex_lock (&stream->priv->lock);
1613   stream->priv->rtx_time = time;
1614   if (stream->priv->rtxsend)
1615     g_object_set (stream->priv->rtxsend, "max-size-time",
1616         GST_TIME_AS_MSECONDS (time), NULL);
1617   g_mutex_unlock (&stream->priv->lock);
1618 }
1619
1620 /**
1621  * gst_rtsp_stream_get_retransmission_time:
1622  * @stream: a #GstRTSPStream
1623  *
1624  * Get the amount of time to store retransmission data.
1625  *
1626  * Returns: the amount of time to store retransmission data.
1627  */
1628 GstClockTime
1629 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1630 {
1631   GstClockTime ret;
1632
1633   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1634
1635   g_mutex_lock (&stream->priv->lock);
1636   ret = stream->priv->rtx_time;
1637   g_mutex_unlock (&stream->priv->lock);
1638
1639   return ret;
1640 }
1641
1642 /**
1643  * gst_rtsp_stream_set_retransmission_pt:
1644  * @stream: a #GstRTSPStream
1645  * @rtx_pt: a #guint
1646  *
1647  * Set the payload type (pt) for retransmission of this stream.
1648  */
1649 void
1650 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1651 {
1652   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1653
1654   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1655
1656   g_mutex_lock (&stream->priv->lock);
1657   stream->priv->rtx_pt = rtx_pt;
1658   if (stream->priv->rtxsend) {
1659     guint pt = gst_rtsp_stream_get_pt (stream);
1660     gchar *pt_s = g_strdup_printf ("%d", pt);
1661     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1662         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1663     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1664     g_free (pt_s);
1665     gst_structure_free (rtx_pt_map);
1666   }
1667   g_mutex_unlock (&stream->priv->lock);
1668 }
1669
1670 /**
1671  * gst_rtsp_stream_get_retransmission_pt:
1672  * @stream: a #GstRTSPStream
1673  *
1674  * Get the payload-type used for retransmission of this stream
1675  *
1676  * Returns: The retransmission PT.
1677  */
1678 guint
1679 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1680 {
1681   guint rtx_pt;
1682
1683   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1684
1685   g_mutex_lock (&stream->priv->lock);
1686   rtx_pt = stream->priv->rtx_pt;
1687   g_mutex_unlock (&stream->priv->lock);
1688
1689   return rtx_pt;
1690 }
1691
1692 /**
1693  * gst_rtsp_stream_set_buffer_size:
1694  * @stream: a #GstRTSPStream
1695  * @size: the buffer size
1696  *
1697  * Set the size of the UDP transmission buffer (in bytes)
1698  * Needs to be set before the stream is joined to a bin.
1699  *
1700  * Since: 1.6
1701  */
1702 void
1703 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1704 {
1705   g_mutex_lock (&stream->priv->lock);
1706   stream->priv->buffer_size = size;
1707   g_mutex_unlock (&stream->priv->lock);
1708 }
1709
1710 /**
1711  * gst_rtsp_stream_get_buffer_size:
1712  * @stream: a #GstRTSPStream
1713  *
1714  * Get the size of the UDP transmission buffer (in bytes)
1715  *
1716  * Returns: the size of the UDP TX buffer
1717  *
1718  * Since: 1.6
1719  */
1720 guint
1721 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1722 {
1723   guint buffer_size;
1724
1725   g_mutex_lock (&stream->priv->lock);
1726   buffer_size = stream->priv->buffer_size;
1727   g_mutex_unlock (&stream->priv->lock);
1728
1729   return buffer_size;
1730 }
1731
1732 /* executed from streaming thread */
1733 static void
1734 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1735 {
1736   GstRTSPStreamPrivate *priv = stream->priv;
1737   GstCaps *newcaps, *oldcaps;
1738
1739   newcaps = gst_pad_get_current_caps (pad);
1740
1741   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1742       newcaps);
1743
1744   g_mutex_lock (&priv->lock);
1745   oldcaps = priv->caps;
1746   priv->caps = newcaps;
1747   g_mutex_unlock (&priv->lock);
1748
1749   if (oldcaps)
1750     gst_caps_unref (oldcaps);
1751 }
1752
1753 static void
1754 dump_structure (const GstStructure * s)
1755 {
1756   gchar *sstr;
1757
1758   sstr = gst_structure_to_string (s);
1759   GST_INFO ("structure: %s", sstr);
1760   g_free (sstr);
1761 }
1762
1763 static GstRTSPStreamTransport *
1764 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1765 {
1766   GstRTSPStreamPrivate *priv = stream->priv;
1767   GList *walk;
1768   GstRTSPStreamTransport *result = NULL;
1769   const gchar *tmp;
1770   gchar *dest;
1771   guint port;
1772
1773   if (rtcp_from == NULL)
1774     return NULL;
1775
1776   tmp = g_strrstr (rtcp_from, ":");
1777   if (tmp == NULL)
1778     return NULL;
1779
1780   port = atoi (tmp + 1);
1781   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1782
1783   g_mutex_lock (&priv->lock);
1784   GST_INFO ("finding %s:%d in %d transports", dest, port,
1785       g_list_length (priv->transports));
1786
1787   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1788     GstRTSPStreamTransport *trans = walk->data;
1789     const GstRTSPTransport *tr;
1790     gint min, max;
1791
1792     tr = gst_rtsp_stream_transport_get_transport (trans);
1793
1794     if (priv->client_side) {
1795       /* In client side mode the 'destination' is the RTSP server, so send
1796        * to those ports */
1797       min = tr->server_port.min;
1798       max = tr->server_port.max;
1799     } else {
1800       min = tr->client_port.min;
1801       max = tr->client_port.max;
1802     }
1803
1804     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1805       result = trans;
1806       break;
1807     }
1808   }
1809   if (result)
1810     g_object_ref (result);
1811   g_mutex_unlock (&priv->lock);
1812
1813   g_free (dest);
1814
1815   return result;
1816 }
1817
1818 static GstRTSPStreamTransport *
1819 check_transport (GObject * source, GstRTSPStream * stream)
1820 {
1821   GstStructure *stats;
1822   GstRTSPStreamTransport *trans;
1823
1824   /* see if we have a stream to match with the origin of the RTCP packet */
1825   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1826   if (trans == NULL) {
1827     g_object_get (source, "stats", &stats, NULL);
1828     if (stats) {
1829       const gchar *rtcp_from;
1830
1831       dump_structure (stats);
1832
1833       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1834       if ((trans = find_transport (stream, rtcp_from))) {
1835         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1836             source);
1837         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1838             g_object_unref);
1839       }
1840       gst_structure_free (stats);
1841     }
1842   }
1843   return trans;
1844 }
1845
1846
1847 static void
1848 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1849 {
1850   GstRTSPStreamTransport *trans;
1851
1852   GST_INFO ("%p: new source %p", stream, source);
1853
1854   trans = check_transport (source, stream);
1855
1856   if (trans)
1857     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1858 }
1859
1860 static void
1861 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1862 {
1863   GST_INFO ("%p: new SDES %p", stream, source);
1864 }
1865
1866 static void
1867 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1868 {
1869   GstRTSPStreamTransport *trans;
1870
1871   trans = check_transport (source, stream);
1872
1873   if (trans) {
1874     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1875     gst_rtsp_stream_transport_keep_alive (trans);
1876   }
1877 #ifdef DUMP_STATS
1878   {
1879     GstStructure *stats;
1880     g_object_get (source, "stats", &stats, NULL);
1881     if (stats) {
1882       dump_structure (stats);
1883       gst_structure_free (stats);
1884     }
1885   }
1886 #endif
1887 }
1888
1889 static void
1890 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1891 {
1892   GST_INFO ("%p: source %p bye", stream, source);
1893 }
1894
1895 static void
1896 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1897 {
1898   GstRTSPStreamTransport *trans;
1899
1900   GST_INFO ("%p: source %p bye timeout", stream, source);
1901
1902   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1903     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1904     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1905   }
1906 }
1907
1908 static void
1909 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1910 {
1911   GstRTSPStreamTransport *trans;
1912
1913   GST_INFO ("%p: source %p timeout", stream, source);
1914
1915   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1916     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1917     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1918   }
1919 }
1920
1921 static void
1922 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1923 {
1924   GST_INFO ("%p: new sender source %p", stream, source);
1925 #ifndef DUMP_STATS
1926   {
1927     GstStructure *stats;
1928     g_object_get (source, "stats", &stats, NULL);
1929     if (stats) {
1930       dump_structure (stats);
1931       gst_structure_free (stats);
1932     }
1933   }
1934 #endif
1935 }
1936
1937 static void
1938 on_sender_ssrc_active (GObject * session, GObject * source,
1939     GstRTSPStream * stream)
1940 {
1941 #ifndef DUMP_STATS
1942   {
1943     GstStructure *stats;
1944     g_object_get (source, "stats", &stats, NULL);
1945     if (stats) {
1946       dump_structure (stats);
1947       gst_structure_free (stats);
1948     }
1949   }
1950 #endif
1951 }
1952
1953 static void
1954 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1955 {
1956   if (is_rtp) {
1957     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1958     g_list_free (priv->tr_cache_rtp);
1959     priv->tr_cache_rtp = NULL;
1960   } else {
1961     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1962     g_list_free (priv->tr_cache_rtcp);
1963     priv->tr_cache_rtcp = NULL;
1964   }
1965 }
1966
1967 static GstFlowReturn
1968 handle_new_sample (GstAppSink * sink, gpointer user_data)
1969 {
1970   GstRTSPStreamPrivate *priv;
1971   GList *walk;
1972   GstSample *sample;
1973   GstBuffer *buffer;
1974   GstRTSPStream *stream;
1975   gboolean is_rtp;
1976
1977   sample = gst_app_sink_pull_sample (sink);
1978   if (!sample)
1979     return GST_FLOW_OK;
1980
1981   stream = (GstRTSPStream *) user_data;
1982   priv = stream->priv;
1983   buffer = gst_sample_get_buffer (sample);
1984
1985   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1986
1987   g_mutex_lock (&priv->lock);
1988   if (is_rtp) {
1989     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1990       clear_tr_cache (priv, is_rtp);
1991       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1992         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1993         priv->tr_cache_rtp =
1994             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1995       }
1996       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1997     }
1998   } else {
1999     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2000       clear_tr_cache (priv, is_rtp);
2001       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2002         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2003         priv->tr_cache_rtcp =
2004             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2005       }
2006       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2007     }
2008   }
2009   g_mutex_unlock (&priv->lock);
2010
2011   if (is_rtp) {
2012     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2013       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2014       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2015     }
2016   } else {
2017     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2018       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2019       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2020     }
2021   }
2022   gst_sample_unref (sample);
2023
2024   return GST_FLOW_OK;
2025 }
2026
2027 static GstAppSinkCallbacks sink_cb = {
2028   NULL,                         /* not interested in EOS */
2029   NULL,                         /* not interested in preroll samples */
2030   handle_new_sample,
2031 };
2032
2033 static GstElement *
2034 get_rtp_encoder (GstRTSPStream * stream, guint session)
2035 {
2036   GstRTSPStreamPrivate *priv = stream->priv;
2037
2038   if (priv->srtpenc == NULL) {
2039     gchar *name;
2040
2041     name = g_strdup_printf ("srtpenc_%u", session);
2042     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2043     g_free (name);
2044
2045     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2046   }
2047   return gst_object_ref (priv->srtpenc);
2048 }
2049
2050 static GstElement *
2051 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2052 {
2053   GstRTSPStreamPrivate *priv = stream->priv;
2054   GstElement *oldenc, *enc;
2055   GstPad *pad;
2056   gchar *name;
2057
2058   if (priv->idx != session)
2059     return NULL;
2060
2061   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2062
2063   oldenc = priv->srtpenc;
2064   enc = get_rtp_encoder (stream, session);
2065   name = g_strdup_printf ("rtp_sink_%d", session);
2066   pad = gst_element_get_request_pad (enc, name);
2067   g_free (name);
2068   gst_object_unref (pad);
2069
2070   if (oldenc == NULL)
2071     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2072         enc);
2073
2074   return enc;
2075 }
2076
2077 static GstElement *
2078 request_rtcp_encoder (GstElement * rtpbin, guint session,
2079     GstRTSPStream * stream)
2080 {
2081   GstRTSPStreamPrivate *priv = stream->priv;
2082   GstElement *oldenc, *enc;
2083   GstPad *pad;
2084   gchar *name;
2085
2086   if (priv->idx != session)
2087     return NULL;
2088
2089   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2090
2091   oldenc = priv->srtpenc;
2092   enc = get_rtp_encoder (stream, session);
2093   name = g_strdup_printf ("rtcp_sink_%d", session);
2094   pad = gst_element_get_request_pad (enc, name);
2095   g_free (name);
2096   gst_object_unref (pad);
2097
2098   if (oldenc == NULL)
2099     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2100         enc);
2101
2102   return enc;
2103 }
2104
2105 static GstCaps *
2106 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2107 {
2108   GstRTSPStreamPrivate *priv = stream->priv;
2109   GstCaps *caps;
2110
2111   GST_DEBUG ("request key %08x", ssrc);
2112
2113   g_mutex_lock (&priv->lock);
2114   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2115     gst_caps_ref (caps);
2116   g_mutex_unlock (&priv->lock);
2117
2118   return caps;
2119 }
2120
2121 static GstElement *
2122 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2123     GstRTSPStream * stream)
2124 {
2125   GstRTSPStreamPrivate *priv = stream->priv;
2126
2127   if (priv->idx != session)
2128     return NULL;
2129
2130   if (priv->srtpdec == NULL) {
2131     gchar *name;
2132
2133     name = g_strdup_printf ("srtpdec_%u", session);
2134     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2135     g_free (name);
2136
2137     g_signal_connect (priv->srtpdec, "request-key",
2138         (GCallback) request_key, stream);
2139   }
2140   return gst_object_ref (priv->srtpdec);
2141 }
2142
2143 /**
2144  * gst_rtsp_stream_request_aux_sender:
2145  * @stream: a #GstRTSPStream
2146  * @sessid: the session id
2147  *
2148  * Creating a rtxsend bin
2149  *
2150  * Returns: (transfer full): a #GstElement.
2151  *
2152  * Since: 1.6
2153  */
2154 GstElement *
2155 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2156 {
2157   GstElement *bin;
2158   GstPad *pad;
2159   GstStructure *pt_map;
2160   gchar *name;
2161   guint pt, rtx_pt;
2162   gchar *pt_s;
2163
2164   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2165
2166   pt = gst_rtsp_stream_get_pt (stream);
2167   pt_s = g_strdup_printf ("%u", pt);
2168   rtx_pt = stream->priv->rtx_pt;
2169
2170   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2171
2172   bin = gst_bin_new (NULL);
2173   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2174   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2175       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2176   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2177       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2178   g_free (pt_s);
2179   gst_structure_free (pt_map);
2180   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2181
2182   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2183   name = g_strdup_printf ("src_%u", sessid);
2184   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2185   g_free (name);
2186   gst_object_unref (pad);
2187
2188   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2189   name = g_strdup_printf ("sink_%u", sessid);
2190   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2191   g_free (name);
2192   gst_object_unref (pad);
2193
2194   return bin;
2195 }
2196
2197 /**
2198  * gst_rtsp_stream_set_pt_map:
2199  * @stream: a #GstRTSPStream
2200  * @pt: the pt
2201  * @caps: a #GstCaps
2202  *
2203  * Configure a pt map between @pt and @caps.
2204  */
2205 void
2206 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2207 {
2208   GstRTSPStreamPrivate *priv = stream->priv;
2209
2210   g_mutex_lock (&priv->lock);
2211   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2212   g_mutex_unlock (&priv->lock);
2213 }
2214
2215 static GstCaps *
2216 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2217     GstRTSPStream * stream)
2218 {
2219   GstRTSPStreamPrivate *priv = stream->priv;
2220   GstCaps *caps = NULL;
2221
2222   g_mutex_lock (&priv->lock);
2223
2224   if (priv->idx == session) {
2225     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2226     if (caps) {
2227       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2228       gst_caps_ref (caps);
2229     } else {
2230       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2231     }
2232   }
2233
2234   g_mutex_unlock (&priv->lock);
2235
2236   return caps;
2237 }
2238
2239 static void
2240 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2241 {
2242   GstRTSPStreamPrivate *priv = stream->priv;
2243   gchar *name;
2244   GstPadLinkReturn ret;
2245   guint sessid;
2246
2247   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2248       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2249
2250   name = gst_pad_get_name (pad);
2251   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2252     g_free (name);
2253     return;
2254   }
2255   g_free (name);
2256
2257   if (priv->idx != sessid)
2258     return;
2259
2260   if (gst_pad_is_linked (priv->sinkpad)) {
2261     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2262         GST_DEBUG_PAD_NAME (priv->sinkpad));
2263     return;
2264   }
2265
2266   /* link the RTP pad to the session manager, it should not really fail unless
2267    * this is not really an RTP pad */
2268   ret = gst_pad_link (pad, priv->sinkpad);
2269   if (ret != GST_PAD_LINK_OK)
2270     goto link_failed;
2271   priv->recv_rtp_src = gst_object_ref (pad);
2272
2273   return;
2274
2275 /* ERRORS */
2276 link_failed:
2277   {
2278     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2279         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2280   }
2281 }
2282
2283 static void
2284 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2285     GstRTSPStream * stream)
2286 {
2287   /* TODO: What to do here other than this? */
2288   GST_DEBUG ("Stream %p: Got EOS", stream);
2289   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2290 }
2291
2292 /* must be called with lock */
2293 static gboolean
2294 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2295 {
2296   GstRTSPStreamPrivate *priv;
2297   GstPad *pad, *sinkpad = NULL;
2298   gboolean is_tcp = FALSE, is_udp = FALSE;
2299   gint i;
2300
2301   priv = stream->priv;
2302
2303   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2304   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2305       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2306
2307   if (is_udp && !create_and_configure_udpsinks (stream))
2308     goto no_udp_protocol;
2309
2310   for (i = 0; i < 2; i++) {
2311     GstPad *teepad, *queuepad;
2312     /* For the sender we create this bit of pipeline for both
2313      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2314      * we need to add a queue before appsink and udpsink to make
2315      * the pipeline not block. For the TCP case, we want to pump
2316      * client as fast as possible anyway. This pipeline is used
2317      * when both TCP and UDP are present.
2318      *
2319      * .--------.      .-----.    .---------.    .---------.
2320      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2321      * |       send->sink   src->sink      src->sink       |
2322      * '--------'      |     |    '---------'    '---------'
2323      *                 |     |    .---------.    .---------.
2324      *                 |     |    |  queue  |    | appsink |
2325      *                 |    src->sink      src->sink       |
2326      *                 '-----'    '---------'    '---------'
2327      *
2328      * When only UDP or only TCP is allowed, we skip the tee and queue
2329      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2330      * the session.
2331      */
2332     /* Only link the RTP send src if we're going to send RTP, link
2333      * the RTCP send src always */
2334     if (priv->srcpad || i == 1) {
2335       if (is_udp) {
2336         /* add udpsink */
2337         gst_bin_add (bin, priv->udpsink[i]);
2338         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2339       }
2340
2341       if (is_tcp) {
2342         /* make appsink */
2343         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2344         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2345         gst_bin_add (bin, priv->appsink[i]);
2346         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2347             &sink_cb, stream, NULL);
2348       }
2349
2350       if (is_udp && is_tcp) {
2351         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2352
2353         /* make tee for RTP/RTCP */
2354         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2355         gst_bin_add (bin, priv->tee[i]);
2356
2357         /* and link to rtpbin send pad */
2358         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2359         gst_pad_link (priv->send_src[i], pad);
2360         gst_object_unref (pad);
2361
2362         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2363         g_object_set (priv->udpqueue[i], "max-size-buffers",
2364             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2365             NULL);
2366         gst_bin_add (bin, priv->udpqueue[i]);
2367         /* link tee to udpqueue */
2368         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2369         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2370         gst_pad_link (teepad, pad);
2371         gst_object_unref (pad);
2372         gst_object_unref (teepad);
2373
2374         /* link udpqueue to udpsink */
2375         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2376         gst_pad_link (queuepad, sinkpad);
2377         gst_object_unref (queuepad);
2378         gst_object_unref (sinkpad);
2379
2380         /* make appqueue */
2381         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2382         g_object_set (priv->appqueue[i], "max-size-buffers",
2383             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2384             NULL);
2385         gst_bin_add (bin, priv->appqueue[i]);
2386         /* and link tee to appqueue */
2387         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2388         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2389         gst_pad_link (teepad, pad);
2390         gst_object_unref (pad);
2391         gst_object_unref (teepad);
2392
2393         /* and link appqueue to appsink */
2394         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2395         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2396         gst_pad_link (queuepad, pad);
2397         gst_object_unref (pad);
2398         gst_object_unref (queuepad);
2399       } else if (is_tcp) {
2400         /* only appsink needed, link it to the session */
2401         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2402         gst_pad_link (priv->send_src[i], pad);
2403         gst_object_unref (pad);
2404
2405         /* when its only TCP, we need to set sync and preroll to FALSE
2406          * for the sink to avoid deadlock. And this is only needed for
2407          * sink used for RTCP data, not the RTP data. */
2408         if (i == 1)
2409           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2410       } else {
2411         /* else only udpsink needed, link it to the session */
2412         gst_pad_link (priv->send_src[i], sinkpad);
2413         gst_object_unref (sinkpad);
2414       }
2415     }
2416
2417     /* check if we need to set to a special state */
2418     if (state != GST_STATE_NULL) {
2419       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2420         gst_element_set_state (priv->udpsink[i], state);
2421       if (priv->appsink[i] && (priv->srcpad || i == 1))
2422         gst_element_set_state (priv->appsink[i], state);
2423       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2424         gst_element_set_state (priv->appqueue[i], state);
2425       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2426         gst_element_set_state (priv->udpqueue[i], state);
2427       if (priv->tee[i] && (priv->srcpad || i == 1))
2428         gst_element_set_state (priv->tee[i], state);
2429     }
2430   }
2431
2432   return TRUE;
2433
2434   /* ERRORS */
2435 no_udp_protocol:
2436   {
2437     return FALSE;
2438   }
2439 }
2440
2441 /* must be called with lock */
2442 static void
2443 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2444 {
2445   GstRTSPStreamPrivate *priv;
2446   GstPad *pad, *selpad;
2447   gboolean is_tcp;
2448   gint i;
2449
2450   priv = stream->priv;
2451
2452   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2453
2454   for (i = 0; i < 2; i++) {
2455     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2456      * RTCP sink always */
2457     if (priv->sinkpad || i == 1) {
2458       /* For the receiver we create this bit of pipeline for both
2459        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2460        * and it is all funneled into the rtpbin receive pad.
2461        *
2462        * .--------.     .--------.    .--------.
2463        * | udpsrc |     | funnel |    | rtpbin |
2464        * |       src->sink      src->sink      |
2465        * '--------'     |        |    '--------'
2466        * .--------.     |        |
2467        * | appsrc |     |        |
2468        * |       src->sink       |
2469        * '--------'     '--------'
2470        */
2471       /* make funnel for the RTP/RTCP receivers */
2472       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2473       gst_bin_add (bin, priv->funnel[i]);
2474
2475       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2476       gst_pad_link (pad, priv->recv_sink[i]);
2477       gst_object_unref (pad);
2478
2479       if (priv->udpsrc_v4[i]) {
2480         if (priv->srcpad) {
2481           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2482            * values. This is only relevant for PLAY pipelines */
2483           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2484           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2485         }
2486         /* add udpsrc */
2487         gst_bin_add (bin, priv->udpsrc_v4[i]);
2488
2489         /* and link to the funnel v4 */
2490         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2491         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2492         gst_pad_link (pad, selpad);
2493         gst_object_unref (pad);
2494         gst_object_unref (selpad);
2495       }
2496
2497       if (priv->udpsrc_v6[i]) {
2498         if (priv->srcpad) {
2499           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2500           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2501         }
2502         gst_bin_add (bin, priv->udpsrc_v6[i]);
2503
2504         /* and link to the funnel v6 */
2505         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2506         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2507         gst_pad_link (pad, selpad);
2508         gst_object_unref (pad);
2509         gst_object_unref (selpad);
2510       }
2511
2512       if (is_tcp) {
2513         /* make and add appsrc */
2514         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2515         priv->appsrc_base_time[i] = -1;
2516         if (priv->srcpad) {
2517           gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2518           gst_element_set_locked_state (priv->appsrc[i], TRUE);
2519         }
2520         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2521             TRUE, NULL);
2522         gst_bin_add (bin, priv->appsrc[i]);
2523         /* and link to the funnel */
2524         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2525         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2526         gst_pad_link (pad, selpad);
2527         gst_object_unref (pad);
2528         gst_object_unref (selpad);
2529       }
2530     }
2531
2532     /* check if we need to set to a special state */
2533     if (state != GST_STATE_NULL) {
2534       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2535         gst_element_set_state (priv->funnel[i], state);
2536     }
2537   }
2538 }
2539
2540 /**
2541  * gst_rtsp_stream_join_bin:
2542  * @stream: a #GstRTSPStream
2543  * @bin: (transfer none): a #GstBin to join
2544  * @rtpbin: (transfer none): a rtpbin element in @bin
2545  * @state: the target state of the new elements
2546  *
2547  * Join the #GstBin @bin that contains the element @rtpbin.
2548  *
2549  * @stream will link to @rtpbin, which must be inside @bin. The elements
2550  * added to @bin will be set to the state given in @state.
2551  *
2552  * Returns: %TRUE on success.
2553  */
2554 gboolean
2555 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2556     GstElement * rtpbin, GstState state)
2557 {
2558   GstRTSPStreamPrivate *priv;
2559   guint idx;
2560   gchar *name;
2561   GstPadLinkReturn ret;
2562
2563   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2564   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2565   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2566
2567   priv = stream->priv;
2568
2569   g_mutex_lock (&priv->lock);
2570   if (priv->is_joined)
2571     goto was_joined;
2572
2573   /* create a session with the same index as the stream */
2574   idx = priv->idx;
2575
2576   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2577
2578   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2579       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2580     /* For SRTP */
2581     g_signal_connect (rtpbin, "request-rtp-encoder",
2582         (GCallback) request_rtp_encoder, stream);
2583     g_signal_connect (rtpbin, "request-rtcp-encoder",
2584         (GCallback) request_rtcp_encoder, stream);
2585     g_signal_connect (rtpbin, "request-rtp-decoder",
2586         (GCallback) request_rtp_rtcp_decoder, stream);
2587     g_signal_connect (rtpbin, "request-rtcp-decoder",
2588         (GCallback) request_rtp_rtcp_decoder, stream);
2589   }
2590
2591   if (priv->sinkpad) {
2592     g_signal_connect (rtpbin, "request-pt-map",
2593         (GCallback) request_pt_map, stream);
2594   }
2595
2596   /* get pads from the RTP session element for sending and receiving
2597    * RTP/RTCP*/
2598   if (priv->srcpad) {
2599     /* get a pad for sending RTP */
2600     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2601     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2602     g_free (name);
2603
2604     /* link the RTP pad to the session manager, it should not really fail unless
2605      * this is not really an RTP pad */
2606     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2607     if (ret != GST_PAD_LINK_OK)
2608       goto link_failed;
2609
2610     name = g_strdup_printf ("send_rtp_src_%u", idx);
2611     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2612     g_free (name);
2613   } else {
2614     /* Need to connect our sinkpad from here */
2615     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2616     /* EOS */
2617     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2618
2619     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2620     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2621     g_free (name);
2622   }
2623
2624   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2625   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2626   g_free (name);
2627   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2628   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2629   g_free (name);
2630
2631   /* get the session */
2632   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2633
2634   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2635       stream);
2636   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2637       stream);
2638   g_signal_connect (priv->session, "on-ssrc-active",
2639       (GCallback) on_ssrc_active, stream);
2640   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2641       stream);
2642   g_signal_connect (priv->session, "on-bye-timeout",
2643       (GCallback) on_bye_timeout, stream);
2644   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2645       stream);
2646
2647   /* signal for sender ssrc */
2648   g_signal_connect (priv->session, "on-new-sender-ssrc",
2649       (GCallback) on_new_sender_ssrc, stream);
2650   g_signal_connect (priv->session, "on-sender-ssrc-active",
2651       (GCallback) on_sender_ssrc_active, stream);
2652
2653   if (!create_sender_part (stream, bin, state))
2654     goto no_udp_protocol;
2655
2656   create_receiver_part (stream, bin, state);
2657
2658   if (priv->srcpad) {
2659     /* be notified of caps changes */
2660     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2661         (GCallback) caps_notify, stream);
2662   }
2663
2664   priv->is_joined = TRUE;
2665   g_mutex_unlock (&priv->lock);
2666
2667   return TRUE;
2668
2669   /* ERRORS */
2670 was_joined:
2671   {
2672     g_mutex_unlock (&priv->lock);
2673     return TRUE;
2674   }
2675 link_failed:
2676   {
2677     GST_WARNING ("failed to link stream %u", idx);
2678     gst_object_unref (priv->send_rtp_sink);
2679     priv->send_rtp_sink = NULL;
2680     g_mutex_unlock (&priv->lock);
2681     return FALSE;
2682   }
2683 no_udp_protocol:
2684   {
2685     GST_WARNING ("failed to allocate ports %u", idx);
2686     gst_object_unref (priv->send_rtp_sink);
2687     priv->send_rtp_sink = NULL;
2688     gst_object_unref (priv->send_src[0]);
2689     priv->send_src[0] = NULL;
2690     gst_object_unref (priv->send_src[1]);
2691     priv->send_src[1] = NULL;
2692     gst_object_unref (priv->recv_sink[0]);
2693     priv->recv_sink[0] = NULL;
2694     gst_object_unref (priv->recv_sink[1]);
2695     priv->recv_sink[1] = NULL;
2696     if (priv->udpsink[0])
2697       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2698     if (priv->udpsink[1])
2699       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2700     if (priv->udpsrc_v4[0]) {
2701       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2702       gst_object_unref (priv->udpsrc_v4[0]);
2703       priv->udpsrc_v4[0] = NULL;
2704     }
2705     if (priv->udpsrc_v4[1]) {
2706       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2707       gst_object_unref (priv->udpsrc_v4[1]);
2708       priv->udpsrc_v4[1] = NULL;
2709     }
2710     if (priv->udpsrc_mcast_v4[0]) {
2711       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2712       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2713       priv->udpsrc_mcast_v4[0] = NULL;
2714     }
2715     if (priv->udpsrc_mcast_v4[1]) {
2716       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2717       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2718       priv->udpsrc_mcast_v4[1] = NULL;
2719     }
2720     if (priv->udpsrc_v6[0]) {
2721       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2722       gst_object_unref (priv->udpsrc_v6[0]);
2723       priv->udpsrc_v6[0] = NULL;
2724     }
2725     if (priv->udpsrc_v6[1]) {
2726       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2727       gst_object_unref (priv->udpsrc_v6[1]);
2728       priv->udpsrc_v6[1] = NULL;
2729     }
2730     if (priv->udpsrc_mcast_v6[0]) {
2731       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2732       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2733       priv->udpsrc_mcast_v6[0] = NULL;
2734     }
2735     if (priv->udpsrc_mcast_v6[1]) {
2736       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2737       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2738       priv->udpsrc_mcast_v6[1] = NULL;
2739     }
2740     g_mutex_unlock (&priv->lock);
2741     return FALSE;
2742   }
2743 }
2744
2745 /**
2746  * gst_rtsp_stream_leave_bin:
2747  * @stream: a #GstRTSPStream
2748  * @bin: (transfer none): a #GstBin
2749  * @rtpbin: (transfer none): a rtpbin #GstElement
2750  *
2751  * Remove the elements of @stream from @bin.
2752  *
2753  * Return: %TRUE on success.
2754  */
2755 gboolean
2756 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2757     GstElement * rtpbin)
2758 {
2759   GstRTSPStreamPrivate *priv;
2760   gint i;
2761   gboolean is_tcp, is_udp;
2762
2763   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2764   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2765   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2766
2767   priv = stream->priv;
2768
2769   g_mutex_lock (&priv->lock);
2770   if (!priv->is_joined)
2771     goto was_not_joined;
2772
2773   /* all transports must be removed by now */
2774   if (priv->transports != NULL)
2775     goto transports_not_removed;
2776
2777   clear_tr_cache (priv, TRUE);
2778   clear_tr_cache (priv, FALSE);
2779
2780   GST_INFO ("stream %p leaving bin", stream);
2781
2782   if (priv->srcpad) {
2783     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2784
2785     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2786     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2787     gst_object_unref (priv->send_rtp_sink);
2788     priv->send_rtp_sink = NULL;
2789   } else if (priv->recv_rtp_src) {
2790     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2791     gst_object_unref (priv->recv_rtp_src);
2792     priv->recv_rtp_src = NULL;
2793   }
2794
2795   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2796
2797   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2798       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2799
2800
2801   for (i = 0; i < 2; i++) {
2802     if (priv->udpsink[i])
2803       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2804     if (priv->appsink[i])
2805       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2806     if (priv->appqueue[i])
2807       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2808     if (priv->udpqueue[i])
2809       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2810     if (priv->tee[i])
2811       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2812     if (priv->funnel[i])
2813       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2814     if (priv->appsrc[i])
2815       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2816
2817     if (priv->udpsrc_v4[i]) {
2818       if (priv->sinkpad || i == 1) {
2819         /* and set udpsrc to NULL now before removing */
2820         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2821         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2822         /* removing them should also nicely release the request
2823          * pads when they finalize */
2824         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2825       } else {
2826         /* we need to set the state to NULL before unref */
2827         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2828         gst_object_unref (priv->udpsrc_v4[i]);
2829       }
2830     }
2831
2832     if (priv->udpsrc_mcast_v4[i]) {
2833       if (priv->sinkpad || i == 1) {
2834         /* and set udpsrc to NULL now before removing */
2835         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2836         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2837         /* removing them should also nicely release the request
2838          * pads when they finalize */
2839         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2840       } else {
2841         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2842         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2843       }
2844     }
2845
2846     if (priv->udpsrc_v6[i]) {
2847       if (priv->sinkpad || i == 1) {
2848         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2849         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2850         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2851       } else {
2852         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2853         gst_object_unref (priv->udpsrc_v6[i]);
2854       }
2855     }
2856     if (priv->udpsrc_mcast_v6[i]) {
2857       if (priv->sinkpad || i == 1) {
2858         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2859         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2860         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
2861       } else {
2862         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2863         gst_object_unref (priv->udpsrc_mcast_v6[i]);
2864       }
2865     }
2866
2867     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2868       gst_bin_remove (bin, priv->udpsink[i]);
2869     if (priv->appsrc[i]) {
2870       if (priv->sinkpad || i == 1) {
2871         gst_element_set_locked_state (priv->appsrc[i], FALSE);
2872         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2873         gst_bin_remove (bin, priv->appsrc[i]);
2874       } else {
2875         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2876         gst_object_unref (priv->appsrc[i]);
2877       }
2878     }
2879     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2880       gst_bin_remove (bin, priv->appsink[i]);
2881     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2882       gst_bin_remove (bin, priv->appqueue[i]);
2883     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2884       gst_bin_remove (bin, priv->udpqueue[i]);
2885     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2886       gst_bin_remove (bin, priv->tee[i]);
2887     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2888       gst_bin_remove (bin, priv->funnel[i]);
2889
2890     if (priv->sinkpad || i == 1) {
2891       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2892       gst_object_unref (priv->recv_sink[i]);
2893       priv->recv_sink[i] = NULL;
2894     }
2895
2896     priv->udpsrc_v4[i] = NULL;
2897     priv->udpsrc_v6[i] = NULL;
2898     priv->udpsrc_mcast_v4[i] = NULL;
2899     priv->udpsrc_mcast_v6[i] = NULL;
2900     priv->udpsink[i] = NULL;
2901     priv->appsrc[i] = NULL;
2902     priv->appsink[i] = NULL;
2903     priv->appqueue[i] = NULL;
2904     priv->udpqueue[i] = NULL;
2905     priv->tee[i] = NULL;
2906     priv->funnel[i] = NULL;
2907   }
2908
2909   if (priv->srcpad) {
2910     gst_object_unref (priv->send_src[0]);
2911     priv->send_src[0] = NULL;
2912   }
2913
2914   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2915   gst_object_unref (priv->send_src[1]);
2916   priv->send_src[1] = NULL;
2917
2918   g_object_unref (priv->session);
2919   priv->session = NULL;
2920   if (priv->caps)
2921     gst_caps_unref (priv->caps);
2922   priv->caps = NULL;
2923
2924   if (priv->srtpenc)
2925     gst_object_unref (priv->srtpenc);
2926   if (priv->srtpdec)
2927     gst_object_unref (priv->srtpdec);
2928
2929   priv->is_joined = FALSE;
2930   g_mutex_unlock (&priv->lock);
2931
2932   return TRUE;
2933
2934 was_not_joined:
2935   {
2936     g_mutex_unlock (&priv->lock);
2937     return TRUE;
2938   }
2939 transports_not_removed:
2940   {
2941     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2942     g_mutex_unlock (&priv->lock);
2943     return FALSE;
2944   }
2945 }
2946
2947 /**
2948  * gst_rtsp_stream_get_rtpinfo:
2949  * @stream: a #GstRTSPStream
2950  * @rtptime: (allow-none): result RTP timestamp
2951  * @seq: (allow-none): result RTP seqnum
2952  * @clock_rate: (allow-none): the clock rate
2953  * @running_time: (allow-none): result running-time
2954  *
2955  * Retrieve the current rtptime, seq and running-time. This is used to
2956  * construct a RTPInfo reply header.
2957  *
2958  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2959  */
2960 gboolean
2961 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2962     guint * rtptime, guint * seq, guint * clock_rate,
2963     GstClockTime * running_time)
2964 {
2965   GstRTSPStreamPrivate *priv;
2966   GstStructure *stats;
2967   GObjectClass *payobjclass;
2968
2969   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2970
2971   priv = stream->priv;
2972
2973   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2974
2975   g_mutex_lock (&priv->lock);
2976
2977   /* First try to extract the information from the last buffer on the sinks.
2978    * This will have a more accurate sequence number and timestamp, as between
2979    * the payloader and the sink there can be some queues
2980    */
2981   if (priv->udpsink[0] || priv->appsink[0]) {
2982     GstSample *last_sample;
2983
2984     if (priv->udpsink[0])
2985       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2986     else
2987       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2988
2989     if (last_sample) {
2990       GstCaps *caps;
2991       GstBuffer *buffer;
2992       GstSegment *segment;
2993       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2994
2995       caps = gst_sample_get_caps (last_sample);
2996       buffer = gst_sample_get_buffer (last_sample);
2997       segment = gst_sample_get_segment (last_sample);
2998
2999       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3000         if (seq) {
3001           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3002         }
3003
3004         if (rtptime) {
3005           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3006         }
3007
3008         gst_rtp_buffer_unmap (&rtp_buffer);
3009
3010         if (running_time) {
3011           *running_time =
3012               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3013               GST_BUFFER_TIMESTAMP (buffer));
3014         }
3015
3016         if (clock_rate) {
3017           GstStructure *s = gst_caps_get_structure (caps, 0);
3018
3019           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3020
3021           if (*clock_rate == 0 && running_time)
3022             *running_time = GST_CLOCK_TIME_NONE;
3023         }
3024         gst_sample_unref (last_sample);
3025
3026         goto done;
3027       } else {
3028         gst_sample_unref (last_sample);
3029       }
3030     }
3031   }
3032
3033   if (g_object_class_find_property (payobjclass, "stats")) {
3034     g_object_get (priv->payloader, "stats", &stats, NULL);
3035     if (stats == NULL)
3036       goto no_stats;
3037
3038     if (seq)
3039       gst_structure_get_uint (stats, "seqnum", seq);
3040
3041     if (rtptime)
3042       gst_structure_get_uint (stats, "timestamp", rtptime);
3043
3044     if (running_time)
3045       gst_structure_get_clock_time (stats, "running-time", running_time);
3046
3047     if (clock_rate) {
3048       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3049       if (*clock_rate == 0 && running_time)
3050         *running_time = GST_CLOCK_TIME_NONE;
3051     }
3052     gst_structure_free (stats);
3053   } else {
3054     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3055         !g_object_class_find_property (payobjclass, "timestamp"))
3056       goto no_stats;
3057
3058     if (seq)
3059       g_object_get (priv->payloader, "seqnum", seq, NULL);
3060
3061     if (rtptime)
3062       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3063
3064     if (running_time)
3065       *running_time = GST_CLOCK_TIME_NONE;
3066   }
3067
3068 done:
3069   g_mutex_unlock (&priv->lock);
3070
3071   return TRUE;
3072
3073   /* ERRORS */
3074 no_stats:
3075   {
3076     GST_WARNING ("Could not get payloader stats");
3077     g_mutex_unlock (&priv->lock);
3078     return FALSE;
3079   }
3080 }
3081
3082 /**
3083  * gst_rtsp_stream_get_caps:
3084  * @stream: a #GstRTSPStream
3085  *
3086  * Retrieve the current caps of @stream.
3087  *
3088  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3089  * after usage.
3090  */
3091 GstCaps *
3092 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3093 {
3094   GstRTSPStreamPrivate *priv;
3095   GstCaps *result;
3096
3097   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3098
3099   priv = stream->priv;
3100
3101   g_mutex_lock (&priv->lock);
3102   if ((result = priv->caps))
3103     gst_caps_ref (result);
3104   g_mutex_unlock (&priv->lock);
3105
3106   return result;
3107 }
3108
3109 /**
3110  * gst_rtsp_stream_recv_rtp:
3111  * @stream: a #GstRTSPStream
3112  * @buffer: (transfer full): a #GstBuffer
3113  *
3114  * Handle an RTP buffer for the stream. This method is usually called when a
3115  * message has been received from a client using the TCP transport.
3116  *
3117  * This function takes ownership of @buffer.
3118  *
3119  * Returns: a GstFlowReturn.
3120  */
3121 GstFlowReturn
3122 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3123 {
3124   GstRTSPStreamPrivate *priv;
3125   GstFlowReturn ret;
3126   GstElement *element;
3127
3128   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3129   priv = stream->priv;
3130   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3131   g_return_val_if_fail (priv->is_joined, FALSE);
3132
3133   g_mutex_lock (&priv->lock);
3134   if (priv->appsrc[0])
3135     element = gst_object_ref (priv->appsrc[0]);
3136   else
3137     element = NULL;
3138   g_mutex_unlock (&priv->lock);
3139
3140   if (element) {
3141     if (priv->appsrc_base_time[0] == -1) {
3142       /* Take current running_time. This timestamp will be put on
3143        * the first buffer of each stream because we are a live source and so we
3144        * timestamp with the running_time. When we are dealing with TCP, we also
3145        * only timestamp the first buffer (using the DISCONT flag) because a server
3146        * typically bursts data, for which we don't want to compensate by speeding
3147        * up the media. The other timestamps will be interpollated from this one
3148        * using the RTP timestamps. */
3149       GST_OBJECT_LOCK (element);
3150       if (GST_ELEMENT_CLOCK (element)) {
3151         GstClockTime now;
3152         GstClockTime base_time;
3153
3154         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3155         base_time = GST_ELEMENT_CAST (element)->base_time;
3156
3157         priv->appsrc_base_time[0] = now - base_time;
3158         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3159         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3160             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3161             GST_TIME_ARGS (base_time));
3162       }
3163       GST_OBJECT_UNLOCK (element);
3164     }
3165
3166     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3167     gst_object_unref (element);
3168   } else {
3169     ret = GST_FLOW_OK;
3170   }
3171   return ret;
3172 }
3173
3174 /**
3175  * gst_rtsp_stream_recv_rtcp:
3176  * @stream: a #GstRTSPStream
3177  * @buffer: (transfer full): a #GstBuffer
3178  *
3179  * Handle an RTCP buffer for the stream. This method is usually called when a
3180  * message has been received from a client using the TCP transport.
3181  *
3182  * This function takes ownership of @buffer.
3183  *
3184  * Returns: a GstFlowReturn.
3185  */
3186 GstFlowReturn
3187 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3188 {
3189   GstRTSPStreamPrivate *priv;
3190   GstFlowReturn ret;
3191   GstElement *element;
3192
3193   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3194   priv = stream->priv;
3195   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3196
3197   if (!priv->is_joined) {
3198     gst_buffer_unref (buffer);
3199     return GST_FLOW_NOT_LINKED;
3200   }
3201   g_mutex_lock (&priv->lock);
3202   if (priv->appsrc[1])
3203     element = gst_object_ref (priv->appsrc[1]);
3204   else
3205     element = NULL;
3206   g_mutex_unlock (&priv->lock);
3207
3208   if (element) {
3209     if (priv->appsrc_base_time[1] == -1) {
3210       /* Take current running_time. This timestamp will be put on
3211        * the first buffer of each stream because we are a live source and so we
3212        * timestamp with the running_time. When we are dealing with TCP, we also
3213        * only timestamp the first buffer (using the DISCONT flag) because a server
3214        * typically bursts data, for which we don't want to compensate by speeding
3215        * up the media. The other timestamps will be interpollated from this one
3216        * using the RTP timestamps. */
3217       GST_OBJECT_LOCK (element);
3218       if (GST_ELEMENT_CLOCK (element)) {
3219         GstClockTime now;
3220         GstClockTime base_time;
3221
3222         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3223         base_time = GST_ELEMENT_CAST (element)->base_time;
3224
3225         priv->appsrc_base_time[1] = now - base_time;
3226         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3227         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3228             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3229             GST_TIME_ARGS (base_time));
3230       }
3231       GST_OBJECT_UNLOCK (element);
3232     }
3233
3234     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3235     gst_object_unref (element);
3236   } else {
3237     ret = GST_FLOW_OK;
3238     gst_buffer_unref (buffer);
3239   }
3240   return ret;
3241 }
3242
3243 /* must be called with lock */
3244 static gboolean
3245 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3246     gboolean add)
3247 {
3248   GstRTSPStreamPrivate *priv = stream->priv;
3249   const GstRTSPTransport *tr;
3250
3251   tr = gst_rtsp_stream_transport_get_transport (trans);
3252
3253   switch (tr->lower_transport) {
3254     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3255     case GST_RTSP_LOWER_TRANS_UDP:
3256     {
3257       gchar *dest;
3258       gint min, max;
3259       guint ttl = 0;
3260
3261       dest = tr->destination;
3262       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3263         min = tr->port.min;
3264         max = tr->port.max;
3265         ttl = tr->ttl;
3266       } else if (priv->client_side) {
3267         /* In client side mode the 'destination' is the RTSP server, so send
3268          * to those ports */
3269         min = tr->server_port.min;
3270         max = tr->server_port.max;
3271       } else {
3272         min = tr->client_port.min;
3273         max = tr->client_port.max;
3274       }
3275
3276       if (add) {
3277         if (ttl > 0) {
3278           GST_INFO ("setting ttl-mc %d", ttl);
3279           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3280           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3281         }
3282         GST_INFO ("adding %s:%d-%d", dest, min, max);
3283         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3284         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3285         priv->transports = g_list_prepend (priv->transports, trans);
3286       } else {
3287         GST_INFO ("removing %s:%d-%d", dest, min, max);
3288         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3289         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3290         priv->transports = g_list_remove (priv->transports, trans);
3291       }
3292       priv->transports_cookie++;
3293       break;
3294     }
3295     case GST_RTSP_LOWER_TRANS_TCP:
3296       if (add) {
3297         GST_INFO ("adding TCP %s", tr->destination);
3298         priv->transports = g_list_prepend (priv->transports, trans);
3299       } else {
3300         GST_INFO ("removing TCP %s", tr->destination);
3301         priv->transports = g_list_remove (priv->transports, trans);
3302       }
3303       priv->transports_cookie++;
3304       break;
3305     default:
3306       goto unknown_transport;
3307   }
3308   return TRUE;
3309
3310   /* ERRORS */
3311 unknown_transport:
3312   {
3313     GST_INFO ("Unknown transport %d", tr->lower_transport);
3314     return FALSE;
3315   }
3316 }
3317
3318
3319 /**
3320  * gst_rtsp_stream_add_transport:
3321  * @stream: a #GstRTSPStream
3322  * @trans: (transfer none): a #GstRTSPStreamTransport
3323  *
3324  * Add the transport in @trans to @stream. The media of @stream will
3325  * then also be send to the values configured in @trans.
3326  *
3327  * @stream must be joined to a bin.
3328  *
3329  * @trans must contain a valid #GstRTSPTransport.
3330  *
3331  * Returns: %TRUE if @trans was added
3332  */
3333 gboolean
3334 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3335     GstRTSPStreamTransport * trans)
3336 {
3337   GstRTSPStreamPrivate *priv;
3338   gboolean res;
3339
3340   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3341   priv = stream->priv;
3342   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3343   g_return_val_if_fail (priv->is_joined, FALSE);
3344
3345   g_mutex_lock (&priv->lock);
3346   res = update_transport (stream, trans, TRUE);
3347   g_mutex_unlock (&priv->lock);
3348
3349   return res;
3350 }
3351
3352 /**
3353  * gst_rtsp_stream_remove_transport:
3354  * @stream: a #GstRTSPStream
3355  * @trans: (transfer none): a #GstRTSPStreamTransport
3356  *
3357  * Remove the transport in @trans from @stream. The media of @stream will
3358  * not be sent to the values configured in @trans.
3359  *
3360  * @stream must be joined to a bin.
3361  *
3362  * @trans must contain a valid #GstRTSPTransport.
3363  *
3364  * Returns: %TRUE if @trans was removed
3365  */
3366 gboolean
3367 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3368     GstRTSPStreamTransport * trans)
3369 {
3370   GstRTSPStreamPrivate *priv;
3371   gboolean res;
3372
3373   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3374   priv = stream->priv;
3375   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3376   g_return_val_if_fail (priv->is_joined, FALSE);
3377
3378   g_mutex_lock (&priv->lock);
3379   res = update_transport (stream, trans, FALSE);
3380   g_mutex_unlock (&priv->lock);
3381
3382   return res;
3383 }
3384
3385 /**
3386  * gst_rtsp_stream_update_crypto:
3387  * @stream: a #GstRTSPStream
3388  * @ssrc: the SSRC
3389  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3390  *
3391  * Update the new crypto information for @ssrc in @stream. If information
3392  * for @ssrc did not exist, it will be added. If information
3393  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3394  * be removed from @stream.
3395  *
3396  * Returns: %TRUE if @crypto could be updated
3397  */
3398 gboolean
3399 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3400     guint ssrc, GstCaps * crypto)
3401 {
3402   GstRTSPStreamPrivate *priv;
3403
3404   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3405   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3406
3407   priv = stream->priv;
3408
3409   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3410
3411   g_mutex_lock (&priv->lock);
3412   if (crypto)
3413     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3414         gst_caps_ref (crypto));
3415   else
3416     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3417   g_mutex_unlock (&priv->lock);
3418
3419   return TRUE;
3420 }
3421
3422 /**
3423  * gst_rtsp_stream_get_rtp_socket:
3424  * @stream: a #GstRTSPStream
3425  * @family: the socket family
3426  *
3427  * Get the RTP socket from @stream for a @family.
3428  *
3429  * @stream must be joined to a bin.
3430  *
3431  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3432  * socket could be allocated for @family. Unref after usage
3433  */
3434 GSocket *
3435 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3436 {
3437   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3438   GSocket *socket;
3439   const gchar *name;
3440
3441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3442   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3443       family == G_SOCKET_FAMILY_IPV6, NULL);
3444   g_return_val_if_fail (priv->udpsink[0], NULL);
3445
3446   if (family == G_SOCKET_FAMILY_IPV6)
3447     name = "socket-v6";
3448   else
3449     name = "socket";
3450
3451   g_object_get (priv->udpsink[0], name, &socket, NULL);
3452
3453   return socket;
3454 }
3455
3456 /**
3457  * gst_rtsp_stream_get_rtcp_socket:
3458  * @stream: a #GstRTSPStream
3459  * @family: the socket family
3460  *
3461  * Get the RTCP socket from @stream for a @family.
3462  *
3463  * @stream must be joined to a bin.
3464  *
3465  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3466  * socket could be allocated for @family. Unref after usage
3467  */
3468 GSocket *
3469 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3470 {
3471   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3472   GSocket *socket;
3473   const gchar *name;
3474
3475   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3476   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3477       family == G_SOCKET_FAMILY_IPV6, NULL);
3478   g_return_val_if_fail (priv->udpsink[1], NULL);
3479
3480   if (family == G_SOCKET_FAMILY_IPV6)
3481     name = "socket-v6";
3482   else
3483     name = "socket";
3484
3485   g_object_get (priv->udpsink[1], name, &socket, NULL);
3486
3487   return socket;
3488 }
3489
3490 /**
3491  * gst_rtsp_stream_set_seqnum:
3492  * @stream: a #GstRTSPStream
3493  * @seqnum: a new sequence number
3494  *
3495  * Configure the sequence number in the payloader of @stream to @seqnum.
3496  */
3497 void
3498 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3499 {
3500   GstRTSPStreamPrivate *priv;
3501
3502   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3503
3504   priv = stream->priv;
3505
3506   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3507 }
3508
3509 /**
3510  * gst_rtsp_stream_get_seqnum:
3511  * @stream: a #GstRTSPStream
3512  *
3513  * Get the configured sequence number in the payloader of @stream.
3514  *
3515  * Returns: the sequence number of the payloader.
3516  */
3517 guint16
3518 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3519 {
3520   GstRTSPStreamPrivate *priv;
3521   guint seqnum;
3522
3523   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3524
3525   priv = stream->priv;
3526
3527   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3528
3529   return seqnum;
3530 }
3531
3532 /**
3533  * gst_rtsp_stream_transport_filter:
3534  * @stream: a #GstRTSPStream
3535  * @func: (scope call) (allow-none): a callback
3536  * @user_data: (closure): user data passed to @func
3537  *
3538  * Call @func for each transport managed by @stream. The result value of @func
3539  * determines what happens to the transport. @func will be called with @stream
3540  * locked so no further actions on @stream can be performed from @func.
3541  *
3542  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3543  * @stream.
3544  *
3545  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3546  *
3547  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3548  * will also be added with an additional ref to the result #GList of this
3549  * function..
3550  *
3551  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3552  *
3553  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3554  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3555  * element in the #GList should be unreffed before the list is freed.
3556  */
3557 GList *
3558 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3559     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3560 {
3561   GstRTSPStreamPrivate *priv;
3562   GList *result, *walk, *next;
3563   GHashTable *visited = NULL;
3564   guint cookie;
3565
3566   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3567
3568   priv = stream->priv;
3569
3570   result = NULL;
3571   if (func)
3572     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3573
3574   g_mutex_lock (&priv->lock);
3575 restart:
3576   cookie = priv->transports_cookie;
3577   for (walk = priv->transports; walk; walk = next) {
3578     GstRTSPStreamTransport *trans = walk->data;
3579     GstRTSPFilterResult res;
3580     gboolean changed;
3581
3582     next = g_list_next (walk);
3583
3584     if (func) {
3585       /* only visit each transport once */
3586       if (g_hash_table_contains (visited, trans))
3587         continue;
3588
3589       g_hash_table_add (visited, g_object_ref (trans));
3590       g_mutex_unlock (&priv->lock);
3591
3592       res = func (stream, trans, user_data);
3593
3594       g_mutex_lock (&priv->lock);
3595     } else
3596       res = GST_RTSP_FILTER_REF;
3597
3598     changed = (cookie != priv->transports_cookie);
3599
3600     switch (res) {
3601       case GST_RTSP_FILTER_REMOVE:
3602         update_transport (stream, trans, FALSE);
3603         break;
3604       case GST_RTSP_FILTER_REF:
3605         result = g_list_prepend (result, g_object_ref (trans));
3606         break;
3607       case GST_RTSP_FILTER_KEEP:
3608       default:
3609         break;
3610     }
3611     if (changed)
3612       goto restart;
3613   }
3614   g_mutex_unlock (&priv->lock);
3615
3616   if (func)
3617     g_hash_table_unref (visited);
3618
3619   return result;
3620 }
3621
3622 static GstPadProbeReturn
3623 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3624 {
3625   GstRTSPStreamPrivate *priv;
3626   GstRTSPStream *stream;
3627
3628   stream = user_data;
3629   priv = stream->priv;
3630
3631   GST_DEBUG_OBJECT (pad, "now blocking");
3632
3633   g_mutex_lock (&priv->lock);
3634   priv->blocking = TRUE;
3635   g_mutex_unlock (&priv->lock);
3636
3637   gst_element_post_message (priv->payloader,
3638       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3639           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3640
3641   return GST_PAD_PROBE_OK;
3642 }
3643
3644 /**
3645  * gst_rtsp_stream_set_blocked:
3646  * @stream: a #GstRTSPStream
3647  * @blocked: boolean indicating we should block or unblock
3648  *
3649  * Blocks or unblocks the dataflow on @stream.
3650  *
3651  * Returns: %TRUE on success
3652  */
3653 gboolean
3654 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3655 {
3656   GstRTSPStreamPrivate *priv;
3657
3658   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3659
3660   priv = stream->priv;
3661
3662   g_mutex_lock (&priv->lock);
3663   if (blocked) {
3664     priv->blocking = FALSE;
3665     if (priv->blocked_id == 0) {
3666       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3667           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3668           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3669           g_object_ref (stream), g_object_unref);
3670     }
3671   } else {
3672     if (priv->blocked_id != 0) {
3673       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3674       priv->blocked_id = 0;
3675       priv->blocking = FALSE;
3676     }
3677   }
3678   g_mutex_unlock (&priv->lock);
3679
3680   return TRUE;
3681 }
3682
3683 /**
3684  * gst_rtsp_stream_is_blocking:
3685  * @stream: a #GstRTSPStream
3686  *
3687  * Check if @stream is blocking on a #GstBuffer.
3688  *
3689  * Returns: %TRUE if @stream is blocking
3690  */
3691 gboolean
3692 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3693 {
3694   GstRTSPStreamPrivate *priv;
3695   gboolean result;
3696
3697   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3698
3699   priv = stream->priv;
3700
3701   g_mutex_lock (&priv->lock);
3702   result = priv->blocking;
3703   g_mutex_unlock (&priv->lock);
3704
3705   return result;
3706 }
3707
3708 /**
3709  * gst_rtsp_stream_query_position:
3710  * @stream: a #GstRTSPStream
3711  *
3712  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3713  * the RTP parts of the pipeline and not the RTCP parts.
3714  *
3715  * Returns: %TRUE if the position could be queried
3716  */
3717 gboolean
3718 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3719 {
3720   GstRTSPStreamPrivate *priv;
3721   GstElement *sink;
3722   gboolean ret;
3723
3724   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3725
3726   priv = stream->priv;
3727
3728   g_mutex_lock (&priv->lock);
3729   /* depending on the transport type, it should query corresponding sink */
3730   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3731       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3732     sink = priv->udpsink[0];
3733   else
3734     sink = priv->appsink[0];
3735
3736   if (sink)
3737     gst_object_ref (sink);
3738   g_mutex_unlock (&priv->lock);
3739
3740   if (!sink)
3741     return FALSE;
3742
3743   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3744   gst_object_unref (sink);
3745
3746   return ret;
3747 }
3748
3749 /**
3750  * gst_rtsp_stream_query_stop:
3751  * @stream: a #GstRTSPStream
3752  *
3753  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3754  * the RTP parts of the pipeline and not the RTCP parts.
3755  *
3756  * Returns: %TRUE if the stop could be queried
3757  */
3758 gboolean
3759 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3760 {
3761   GstRTSPStreamPrivate *priv;
3762   GstElement *sink;
3763   GstQuery *query;
3764   gboolean ret;
3765
3766   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3767
3768   priv = stream->priv;
3769
3770   g_mutex_lock (&priv->lock);
3771   /* depending on the transport type, it should query corresponding sink */
3772   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3773       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3774     sink = priv->udpsink[0];
3775   else
3776     sink = priv->appsink[0];
3777
3778   if (sink)
3779     gst_object_ref (sink);
3780   g_mutex_unlock (&priv->lock);
3781
3782   if (!sink)
3783     return FALSE;
3784
3785   query = gst_query_new_segment (GST_FORMAT_TIME);
3786   if ((ret = gst_element_query (sink, query))) {
3787     GstFormat format;
3788
3789     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3790     if (format != GST_FORMAT_TIME)
3791       *stop = -1;
3792   }
3793   gst_query_unref (query);
3794   gst_object_unref (sink);
3795
3796   return ret;
3797
3798 }