rtsp-media: Add support for setting the multicast interface
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   gboolean is_joined;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
98    * sockets */
99   GstElement *udpsrc_v4[2];
100   /* UDP sources for UDP multicast transports */
101   GstElement *udpsrc_mcast_v4[2];
102
103   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
104    * sockets */
105   GstElement *udpsrc_v6[2];
106   /* UDP sources for UDP multicast transports */
107   GstElement *udpsrc_mcast_v6[2];
108
109   GstElement *udpqueue[2];
110   GstElement *udpsink[2];
111
112   /* for TCP transport */
113   GstElement *appsrc[2];
114   GstClockTime appsrc_base_time[2];
115   GstElement *appqueue[2];
116   GstElement *appsink[2];
117
118   GstElement *tee[2];
119   GstElement *funnel[2];
120
121   /* retransmission */
122   GstElement *rtxsend;
123   guint rtx_pt;
124   GstClockTime rtx_time;
125
126   /* server ports for sending/receiving over ipv4 */
127   GstRTSPRange server_port_v4;
128   GstRTSPAddress *server_addr_v4;
129   gboolean have_ipv4;
130
131   /* server ports for sending/receiving over ipv6 */
132   GstRTSPRange server_port_v6;
133   GstRTSPAddress *server_addr_v6;
134   gboolean have_ipv6;
135
136   /* multicast addresses */
137   GstRTSPAddressPool *pool;
138   GstRTSPAddress *addr_v4;
139   GstRTSPAddress *addr_v6;
140   gboolean have_ipv4_mcast;
141   gboolean have_ipv6_mcast;
142
143   gchar *multicast_iface;
144
145   /* the caps of the stream */
146   gulong caps_sig;
147   GstCaps *caps;
148
149   /* transports we stream to */
150   guint n_active;
151   GList *transports;
152   guint transports_cookie;
153   GList *tr_cache_rtp;
154   GList *tr_cache_rtcp;
155   guint tr_cache_cookie_rtp;
156   guint tr_cache_cookie_rtcp;
157
158
159   gint dscp_qos;
160
161   /* stream blocking */
162   gulong blocked_id;
163   gboolean blocking;
164
165   /* pt->caps map for RECORD streams */
166   GHashTable *ptmap;
167 };
168
169 #define DEFAULT_CONTROL         NULL
170 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
171 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
172                                         GST_RTSP_LOWER_TRANS_TCP
173
174 enum
175 {
176   PROP_0,
177   PROP_CONTROL,
178   PROP_PROFILES,
179   PROP_PROTOCOLS,
180   PROP_LAST
181 };
182
183 enum
184 {
185   SIGNAL_NEW_RTP_ENCODER,
186   SIGNAL_NEW_RTCP_ENCODER,
187   SIGNAL_LAST
188 };
189
190 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
191 #define GST_CAT_DEFAULT rtsp_stream_debug
192
193 static GQuark ssrc_stream_map_key;
194
195 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
196     GValue * value, GParamSpec * pspec);
197 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
198     const GValue * value, GParamSpec * pspec);
199
200 static void gst_rtsp_stream_finalize (GObject * obj);
201
202 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
203
204 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
205
206 static void
207 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
208 {
209   GObjectClass *gobject_class;
210
211   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
212
213   gobject_class = G_OBJECT_CLASS (klass);
214
215   gobject_class->get_property = gst_rtsp_stream_get_property;
216   gobject_class->set_property = gst_rtsp_stream_set_property;
217   gobject_class->finalize = gst_rtsp_stream_finalize;
218
219   g_object_class_install_property (gobject_class, PROP_CONTROL,
220       g_param_spec_string ("control", "Control",
221           "The control string for this stream", DEFAULT_CONTROL,
222           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
223
224   g_object_class_install_property (gobject_class, PROP_PROFILES,
225       g_param_spec_flags ("profiles", "Profiles",
226           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
227           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228
229   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
230       g_param_spec_flags ("protocols", "Protocols",
231           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
232           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
233
234   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
235       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
236       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
237       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
238
239   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
240       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
241       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
242       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
243
244   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
245
246   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
247 }
248
249 static void
250 gst_rtsp_stream_init (GstRTSPStream * stream)
251 {
252   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
253
254   GST_DEBUG ("new stream %p", stream);
255
256   stream->priv = priv;
257
258   priv->dscp_qos = -1;
259   priv->control = g_strdup (DEFAULT_CONTROL);
260   priv->profiles = DEFAULT_PROFILES;
261   priv->protocols = DEFAULT_PROTOCOLS;
262
263   g_mutex_init (&priv->lock);
264
265   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
266       NULL, (GDestroyNotify) gst_caps_unref);
267   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
268       (GDestroyNotify) gst_caps_unref);
269 }
270
271 static void
272 gst_rtsp_stream_finalize (GObject * obj)
273 {
274   GstRTSPStream *stream;
275   GstRTSPStreamPrivate *priv;
276
277   stream = GST_RTSP_STREAM (obj);
278   priv = stream->priv;
279
280   GST_DEBUG ("finalize stream %p", stream);
281
282   /* we really need to be unjoined now */
283   g_return_if_fail (!priv->is_joined);
284
285   if (priv->addr_v4)
286     gst_rtsp_address_free (priv->addr_v4);
287   if (priv->addr_v6)
288     gst_rtsp_address_free (priv->addr_v6);
289   if (priv->server_addr_v4)
290     gst_rtsp_address_free (priv->server_addr_v4);
291   if (priv->server_addr_v6)
292     gst_rtsp_address_free (priv->server_addr_v6);
293   if (priv->pool)
294     g_object_unref (priv->pool);
295   if (priv->rtxsend)
296     g_object_unref (priv->rtxsend);
297
298   g_free (priv->multicast_iface);
299
300   gst_object_unref (priv->payloader);
301   if (priv->srcpad)
302     gst_object_unref (priv->srcpad);
303   if (priv->sinkpad)
304     gst_object_unref (priv->sinkpad);
305   g_free (priv->control);
306   g_mutex_clear (&priv->lock);
307
308   g_hash_table_unref (priv->keys);
309   g_hash_table_destroy (priv->ptmap);
310
311   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
312 }
313
314 static void
315 gst_rtsp_stream_get_property (GObject * object, guint propid,
316     GValue * value, GParamSpec * pspec)
317 {
318   GstRTSPStream *stream = GST_RTSP_STREAM (object);
319
320   switch (propid) {
321     case PROP_CONTROL:
322       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
323       break;
324     case PROP_PROFILES:
325       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
326       break;
327     case PROP_PROTOCOLS:
328       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
329       break;
330     default:
331       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
332   }
333 }
334
335 static void
336 gst_rtsp_stream_set_property (GObject * object, guint propid,
337     const GValue * value, GParamSpec * pspec)
338 {
339   GstRTSPStream *stream = GST_RTSP_STREAM (object);
340
341   switch (propid) {
342     case PROP_CONTROL:
343       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
344       break;
345     case PROP_PROFILES:
346       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
347       break;
348     case PROP_PROTOCOLS:
349       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
350       break;
351     default:
352       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
353   }
354 }
355
356 /**
357  * gst_rtsp_stream_new:
358  * @idx: an index
359  * @pad: a #GstPad
360  * @payloader: a #GstElement
361  *
362  * Create a new media stream with index @idx that handles RTP data on
363  * @pad and has a payloader element @payloader if @pad is a source pad
364  * or a depayloader element @payloader if @pad is a sink pad.
365  *
366  * Returns: (transfer full): a new #GstRTSPStream
367  */
368 GstRTSPStream *
369 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
370 {
371   GstRTSPStreamPrivate *priv;
372   GstRTSPStream *stream;
373
374   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
375   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
376
377   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
378   priv = stream->priv;
379   priv->idx = idx;
380   priv->payloader = gst_object_ref (payloader);
381   if (GST_PAD_IS_SRC (pad))
382     priv->srcpad = gst_object_ref (pad);
383   else
384     priv->sinkpad = gst_object_ref (pad);
385
386   return stream;
387 }
388
389 /**
390  * gst_rtsp_stream_get_index:
391  * @stream: a #GstRTSPStream
392  *
393  * Get the stream index.
394  *
395  * Return: the stream index.
396  */
397 guint
398 gst_rtsp_stream_get_index (GstRTSPStream * stream)
399 {
400   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
401
402   return stream->priv->idx;
403 }
404
405 /**
406  * gst_rtsp_stream_get_pt:
407  * @stream: a #GstRTSPStream
408  *
409  * Get the stream payload type.
410  *
411  * Return: the stream payload type.
412  */
413 guint
414 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
415 {
416   GstRTSPStreamPrivate *priv;
417   guint pt;
418
419   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
420
421   priv = stream->priv;
422
423   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
424
425   return pt;
426 }
427
428 /**
429  * gst_rtsp_stream_get_srcpad:
430  * @stream: a #GstRTSPStream
431  *
432  * Get the srcpad associated with @stream.
433  *
434  * Returns: (transfer full): the srcpad. Unref after usage.
435  */
436 GstPad *
437 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
438 {
439   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
440
441   if (!stream->priv->srcpad)
442     return NULL;
443
444   return gst_object_ref (stream->priv->srcpad);
445 }
446
447 /**
448  * gst_rtsp_stream_get_sinkpad:
449  * @stream: a #GstRTSPStream
450  *
451  * Get the sinkpad associated with @stream.
452  *
453  * Returns: (transfer full): the sinkpad. Unref after usage.
454  */
455 GstPad *
456 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
457 {
458   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
459
460   if (!stream->priv->sinkpad)
461     return NULL;
462
463   return gst_object_ref (stream->priv->sinkpad);
464 }
465
466 /**
467  * gst_rtsp_stream_get_control:
468  * @stream: a #GstRTSPStream
469  *
470  * Get the control string to identify this stream.
471  *
472  * Returns: (transfer full): the control string. g_free() after usage.
473  */
474 gchar *
475 gst_rtsp_stream_get_control (GstRTSPStream * stream)
476 {
477   GstRTSPStreamPrivate *priv;
478   gchar *result;
479
480   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
481
482   priv = stream->priv;
483
484   g_mutex_lock (&priv->lock);
485   if ((result = g_strdup (priv->control)) == NULL)
486     result = g_strdup_printf ("stream=%u", priv->idx);
487   g_mutex_unlock (&priv->lock);
488
489   return result;
490 }
491
492 /**
493  * gst_rtsp_stream_set_control:
494  * @stream: a #GstRTSPStream
495  * @control: a control string
496  *
497  * Set the control string in @stream.
498  */
499 void
500 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
501 {
502   GstRTSPStreamPrivate *priv;
503
504   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
505
506   priv = stream->priv;
507
508   g_mutex_lock (&priv->lock);
509   g_free (priv->control);
510   priv->control = g_strdup (control);
511   g_mutex_unlock (&priv->lock);
512 }
513
514 /**
515  * gst_rtsp_stream_has_control:
516  * @stream: a #GstRTSPStream
517  * @control: a control string
518  *
519  * Check if @stream has the control string @control.
520  *
521  * Returns: %TRUE is @stream has @control as the control string
522  */
523 gboolean
524 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
525 {
526   GstRTSPStreamPrivate *priv;
527   gboolean res;
528
529   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
530
531   priv = stream->priv;
532
533   g_mutex_lock (&priv->lock);
534   if (priv->control)
535     res = (g_strcmp0 (priv->control, control) == 0);
536   else {
537     guint streamid;
538
539     if (sscanf (control, "stream=%u", &streamid) > 0)
540       res = (streamid == priv->idx);
541     else
542       res = FALSE;
543   }
544   g_mutex_unlock (&priv->lock);
545
546   return res;
547 }
548
549 /**
550  * gst_rtsp_stream_set_mtu:
551  * @stream: a #GstRTSPStream
552  * @mtu: a new MTU
553  *
554  * Configure the mtu in the payloader of @stream to @mtu.
555  */
556 void
557 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
558 {
559   GstRTSPStreamPrivate *priv;
560
561   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
562
563   priv = stream->priv;
564
565   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
566
567   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
568 }
569
570 /**
571  * gst_rtsp_stream_get_mtu:
572  * @stream: a #GstRTSPStream
573  *
574  * Get the configured MTU in the payloader of @stream.
575  *
576  * Returns: the MTU of the payloader.
577  */
578 guint
579 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
580 {
581   GstRTSPStreamPrivate *priv;
582   guint mtu;
583
584   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
585
586   priv = stream->priv;
587
588   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
589
590   return mtu;
591 }
592
593 /* Update the dscp qos property on the udp sinks */
594 static void
595 update_dscp_qos (GstRTSPStream * stream)
596 {
597   GstRTSPStreamPrivate *priv;
598
599   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
600
601   priv = stream->priv;
602
603   if (priv->udpsink[0]) {
604     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
605         NULL);
606   }
607
608   if (priv->udpsink[1]) {
609     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
610         NULL);
611   }
612 }
613
614 /**
615  * gst_rtsp_stream_set_dscp_qos:
616  * @stream: a #GstRTSPStream
617  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
618  *
619  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
620  */
621 void
622 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
623 {
624   GstRTSPStreamPrivate *priv;
625
626   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
627
628   priv = stream->priv;
629
630   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
631
632   if (dscp_qos < -1 || dscp_qos > 63) {
633     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
634     return;
635   }
636
637   priv->dscp_qos = dscp_qos;
638
639   update_dscp_qos (stream);
640 }
641
642 /**
643  * gst_rtsp_stream_get_dscp_qos:
644  * @stream: a #GstRTSPStream
645  *
646  * Get the configured DSCP QoS in of the outgoing sockets.
647  *
648  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
649  */
650 gint
651 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
652 {
653   GstRTSPStreamPrivate *priv;
654
655   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
656
657   priv = stream->priv;
658
659   return priv->dscp_qos;
660 }
661
662 /**
663  * gst_rtsp_stream_is_transport_supported:
664  * @stream: a #GstRTSPStream
665  * @transport: (transfer none): a #GstRTSPTransport
666  *
667  * Check if @transport can be handled by stream
668  *
669  * Returns: %TRUE if @transport can be handled by @stream.
670  */
671 gboolean
672 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
673     GstRTSPTransport * transport)
674 {
675   GstRTSPStreamPrivate *priv;
676
677   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
678
679   priv = stream->priv;
680
681   g_mutex_lock (&priv->lock);
682   if (transport->trans != GST_RTSP_TRANS_RTP)
683     goto unsupported_transmode;
684
685   if (!(transport->profile & priv->profiles))
686     goto unsupported_profile;
687
688   if (!(transport->lower_transport & priv->protocols))
689     goto unsupported_ltrans;
690
691   g_mutex_unlock (&priv->lock);
692
693   return TRUE;
694
695   /* ERRORS */
696 unsupported_transmode:
697   {
698     GST_DEBUG ("unsupported transport mode %d", transport->trans);
699     g_mutex_unlock (&priv->lock);
700     return FALSE;
701   }
702 unsupported_profile:
703   {
704     GST_DEBUG ("unsupported profile %d", transport->profile);
705     g_mutex_unlock (&priv->lock);
706     return FALSE;
707   }
708 unsupported_ltrans:
709   {
710     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
711     g_mutex_unlock (&priv->lock);
712     return FALSE;
713   }
714 }
715
716 /**
717  * gst_rtsp_stream_set_profiles:
718  * @stream: a #GstRTSPStream
719  * @profiles: the new profiles
720  *
721  * Configure the allowed profiles for @stream.
722  */
723 void
724 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
725 {
726   GstRTSPStreamPrivate *priv;
727
728   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
729
730   priv = stream->priv;
731
732   g_mutex_lock (&priv->lock);
733   priv->profiles = profiles;
734   g_mutex_unlock (&priv->lock);
735 }
736
737 /**
738  * gst_rtsp_stream_get_profiles:
739  * @stream: a #GstRTSPStream
740  *
741  * Get the allowed profiles of @stream.
742  *
743  * Returns: a #GstRTSPProfile
744  */
745 GstRTSPProfile
746 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
747 {
748   GstRTSPStreamPrivate *priv;
749   GstRTSPProfile res;
750
751   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
752
753   priv = stream->priv;
754
755   g_mutex_lock (&priv->lock);
756   res = priv->profiles;
757   g_mutex_unlock (&priv->lock);
758
759   return res;
760 }
761
762 /**
763  * gst_rtsp_stream_set_protocols:
764  * @stream: a #GstRTSPStream
765  * @protocols: the new flags
766  *
767  * Configure the allowed lower transport for @stream.
768  */
769 void
770 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
771     GstRTSPLowerTrans protocols)
772 {
773   GstRTSPStreamPrivate *priv;
774
775   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
776
777   priv = stream->priv;
778
779   g_mutex_lock (&priv->lock);
780   priv->protocols = protocols;
781   g_mutex_unlock (&priv->lock);
782 }
783
784 /**
785  * gst_rtsp_stream_get_protocols:
786  * @stream: a #GstRTSPStream
787  *
788  * Get the allowed protocols of @stream.
789  *
790  * Returns: a #GstRTSPLowerTrans
791  */
792 GstRTSPLowerTrans
793 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
794 {
795   GstRTSPStreamPrivate *priv;
796   GstRTSPLowerTrans res;
797
798   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
799       GST_RTSP_LOWER_TRANS_UNKNOWN);
800
801   priv = stream->priv;
802
803   g_mutex_lock (&priv->lock);
804   res = priv->protocols;
805   g_mutex_unlock (&priv->lock);
806
807   return res;
808 }
809
810 /**
811  * gst_rtsp_stream_set_address_pool:
812  * @stream: a #GstRTSPStream
813  * @pool: (transfer none): a #GstRTSPAddressPool
814  *
815  * configure @pool to be used as the address pool of @stream.
816  */
817 void
818 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
819     GstRTSPAddressPool * pool)
820 {
821   GstRTSPStreamPrivate *priv;
822   GstRTSPAddressPool *old;
823
824   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
825
826   priv = stream->priv;
827
828   GST_LOG_OBJECT (stream, "set address pool %p", pool);
829
830   g_mutex_lock (&priv->lock);
831   if ((old = priv->pool) != pool)
832     priv->pool = pool ? g_object_ref (pool) : NULL;
833   else
834     old = NULL;
835   g_mutex_unlock (&priv->lock);
836
837   if (old)
838     g_object_unref (old);
839 }
840
841 /**
842  * gst_rtsp_stream_get_address_pool:
843  * @stream: a #GstRTSPStream
844  *
845  * Get the #GstRTSPAddressPool used as the address pool of @stream.
846  *
847  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
848  * usage.
849  */
850 GstRTSPAddressPool *
851 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
852 {
853   GstRTSPStreamPrivate *priv;
854   GstRTSPAddressPool *result;
855
856   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
857
858   priv = stream->priv;
859
860   g_mutex_lock (&priv->lock);
861   if ((result = priv->pool))
862     g_object_ref (result);
863   g_mutex_unlock (&priv->lock);
864
865   return result;
866 }
867
868 /**
869  * gst_rtsp_stream_set_multicast_iface:
870  * @stream: a #GstRTSPStream
871  * @multicast_iface: (transfer none): a multicast interface
872  *
873  * configure @multicast_iface to be used for @stream.
874  */
875 void
876 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
877     const gchar * multicast_iface)
878 {
879   GstRTSPStreamPrivate *priv;
880   gchar *old;
881
882   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
883
884   priv = stream->priv;
885
886   GST_LOG_OBJECT (stream, "set multicast iface %s",
887       GST_STR_NULL (multicast_iface));
888
889   g_mutex_lock (&priv->lock);
890   if ((old = priv->multicast_iface) != multicast_iface)
891     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
892   else
893     old = NULL;
894   g_mutex_unlock (&priv->lock);
895
896   if (old)
897     g_free (old);
898 }
899
900 /**
901  * gst_rtsp_stream_get_multicast_iface:
902  * @stream: a #GstRTSPStream
903  *
904  * Get the multicast interface used for @stream.
905  *
906  * Returns: (transfer full): the multicast interface for @stream. g_free() after
907  * usage.
908  */
909 gchar *
910 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
911 {
912   GstRTSPStreamPrivate *priv;
913   gchar *result;
914
915   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
916
917   priv = stream->priv;
918
919   g_mutex_lock (&priv->lock);
920   if ((result = priv->multicast_iface))
921     result = g_strdup (result);
922   g_mutex_unlock (&priv->lock);
923
924   return result;
925 }
926
927 /**
928  * gst_rtsp_stream_get_multicast_address:
929  * @stream: a #GstRTSPStream
930  * @family: the #GSocketFamily
931  *
932  * Get the multicast address of @stream for @family.
933  *
934  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
935  * or %NULL when no address could be allocated. gst_rtsp_address_free()
936  * after usage.
937  */
938 GstRTSPAddress *
939 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
940     GSocketFamily family)
941 {
942   GstRTSPStreamPrivate *priv;
943   GstRTSPAddress *result;
944   GstRTSPAddress **addrp;
945   GstRTSPAddressFlags flags;
946
947   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
948
949   priv = stream->priv;
950
951   if (family == G_SOCKET_FAMILY_IPV6) {
952     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
953     addrp = &priv->addr_v6;
954   } else {
955     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
956     addrp = &priv->addr_v4;
957   }
958
959   g_mutex_lock (&priv->lock);
960   if (*addrp == NULL) {
961     if (priv->pool == NULL)
962       goto no_pool;
963
964     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
965
966     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
967     if (*addrp == NULL)
968       goto no_address;
969   }
970   result = gst_rtsp_address_copy (*addrp);
971   g_mutex_unlock (&priv->lock);
972
973   return result;
974
975   /* ERRORS */
976 no_pool:
977   {
978     GST_ERROR_OBJECT (stream, "no address pool specified");
979     g_mutex_unlock (&priv->lock);
980     return NULL;
981   }
982 no_address:
983   {
984     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
985     g_mutex_unlock (&priv->lock);
986     return NULL;
987   }
988 }
989
990 /**
991  * gst_rtsp_stream_reserve_address:
992  * @stream: a #GstRTSPStream
993  * @address: an address
994  * @port: a port
995  * @n_ports: n_ports
996  * @ttl: a TTL
997  *
998  * Reserve @address and @port as the address and port of @stream.
999  *
1000  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1001  * the address could be reserved. gst_rtsp_address_free() after usage.
1002  */
1003 GstRTSPAddress *
1004 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1005     const gchar * address, guint port, guint n_ports, guint ttl)
1006 {
1007   GstRTSPStreamPrivate *priv;
1008   GstRTSPAddress *result;
1009   GInetAddress *addr;
1010   GSocketFamily family;
1011   GstRTSPAddress **addrp;
1012
1013   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1014   g_return_val_if_fail (address != NULL, NULL);
1015   g_return_val_if_fail (port > 0, NULL);
1016   g_return_val_if_fail (n_ports > 0, NULL);
1017   g_return_val_if_fail (ttl > 0, NULL);
1018
1019   priv = stream->priv;
1020
1021   addr = g_inet_address_new_from_string (address);
1022   if (!addr) {
1023     GST_ERROR ("failed to get inet addr from %s", address);
1024     family = G_SOCKET_FAMILY_IPV4;
1025   } else {
1026     family = g_inet_address_get_family (addr);
1027     g_object_unref (addr);
1028   }
1029
1030   if (family == G_SOCKET_FAMILY_IPV6)
1031     addrp = &priv->addr_v6;
1032   else
1033     addrp = &priv->addr_v4;
1034
1035   g_mutex_lock (&priv->lock);
1036   if (*addrp == NULL) {
1037     GstRTSPAddressPoolResult res;
1038
1039     if (priv->pool == NULL)
1040       goto no_pool;
1041
1042     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1043         port, n_ports, ttl, addrp);
1044     if (res != GST_RTSP_ADDRESS_POOL_OK)
1045       goto no_address;
1046   } else {
1047     if (strcmp ((*addrp)->address, address) ||
1048         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1049         (*addrp)->ttl != ttl)
1050       goto different_address;
1051   }
1052   result = gst_rtsp_address_copy (*addrp);
1053   g_mutex_unlock (&priv->lock);
1054
1055   return result;
1056
1057   /* ERRORS */
1058 no_pool:
1059   {
1060     GST_ERROR_OBJECT (stream, "no address pool specified");
1061     g_mutex_unlock (&priv->lock);
1062     return NULL;
1063   }
1064 no_address:
1065   {
1066     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1067         address);
1068     g_mutex_unlock (&priv->lock);
1069     return NULL;
1070   }
1071 different_address:
1072   {
1073     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1074         " reserved", address);
1075     g_mutex_unlock (&priv->lock);
1076     return NULL;
1077   }
1078 }
1079
1080 /* must be called with lock */
1081 static void
1082 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1083     GSocket * rtcp_socket, GSocketFamily family)
1084 {
1085   GstRTSPStreamPrivate *priv = stream->priv;
1086   const gchar *multisink_socket;
1087
1088   if (family == G_SOCKET_FAMILY_IPV6)
1089     multisink_socket = "socket-v6";
1090   else
1091     multisink_socket = "socket";
1092
1093   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1094       NULL);
1095   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1096       NULL);
1097 }
1098
1099 /* must be called with lock */
1100 static gboolean
1101 create_and_configure_udpsinks (GstRTSPStream * stream)
1102 {
1103   GstRTSPStreamPrivate *priv = stream->priv;
1104   GstElement *udpsink0, *udpsink1;
1105
1106   udpsink0 = NULL;
1107   udpsink1 = NULL;
1108
1109   if (priv->udpsink[0])
1110     udpsink0 = priv->udpsink[0];
1111   else
1112     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1113
1114   if (!udpsink0)
1115     goto no_udp_protocol;
1116
1117   if (priv->udpsink[1])
1118     udpsink1 = priv->udpsink[1];
1119   else
1120     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1121
1122   if (!udpsink1)
1123     goto no_udp_protocol;
1124
1125   /* configure sinks */
1126
1127   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1128   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1129
1130   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1131   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1132
1133   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1134
1135   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1136   /* Needs to be async for RECORD streams, otherwise we will never go to
1137    * PLAYING because the sinks will wait for data while the udpsrc can't
1138    * provide data with timestamps in PAUSED. */
1139   if (priv->sinkpad)
1140     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1141   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1142
1143   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1144   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1145
1146   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1147   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1148
1149   /* update the dscp qos field in the sinks */
1150   update_dscp_qos (stream);
1151
1152   priv->udpsink[0] = udpsink0;
1153   priv->udpsink[1] = udpsink1;
1154
1155   return TRUE;
1156
1157   /* ERRORS */
1158 no_udp_protocol:
1159   {
1160     return FALSE;
1161   }
1162 }
1163
1164 /* must be called with lock */
1165 static void
1166 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1167     GSocketFamily family)
1168 {
1169   GstRTSPStreamPrivate *priv;
1170   GstPad *pad, *selpad;
1171   guint i;
1172   GstBin *bin;
1173
1174   priv = stream->priv;
1175   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1176
1177   for (i = 0; i < 2; i++) {
1178     if (priv->sinkpad || i == 1) {
1179       if (priv->srcpad) {
1180         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1181          * values. This is only relevant for PLAY pipelines */
1182         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1183         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1184       }
1185       /* add udpsrc */
1186       gst_bin_add (bin, udpsrc_out[i]);
1187
1188       /* and link to the funnel */
1189       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1190       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1191       gst_pad_link (pad, selpad);
1192       gst_object_unref (pad);
1193       gst_object_unref (selpad);
1194
1195       /* otherwise sync state with parent in case it's running already
1196        * at this point */
1197       if (!priv->srcpad) {
1198         gst_element_sync_state_with_parent (udpsrc_out[i]);
1199       }
1200     }
1201   }
1202
1203   gst_object_unref (bin);
1204 }
1205
1206 /* must be called with lock */
1207 static gboolean
1208 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1209     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1210     const gchar * address, gint rtpport, gint rtcpport,
1211     const gchar * multicast_iface, GstRTSPLowerTrans transport)
1212 {
1213   GstStateChangeReturn ret;
1214
1215   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1216   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1217
1218   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1219     goto error;
1220
1221   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1222     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1223     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1224     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1225     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1226     g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
1227         NULL);
1228     g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
1229         NULL);
1230     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1231     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1232   }
1233
1234   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1235   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1236
1237   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1238   if (ret == GST_STATE_CHANGE_FAILURE)
1239     goto error;
1240   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1241   if (ret == GST_STATE_CHANGE_FAILURE)
1242     goto error;
1243
1244   return TRUE;
1245   return TRUE;
1246
1247   /* ERRORS */
1248 error:
1249   {
1250     if (udpsrc_out[0])
1251       gst_object_unref (udpsrc_out[0]);
1252     if (udpsrc_out[1])
1253       gst_object_unref (udpsrc_out[1]);
1254     return FALSE;
1255   }
1256 }
1257
1258 static gboolean
1259 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1260     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1261     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1262     gboolean use_client_settings)
1263 {
1264   GstRTSPStreamPrivate *priv = stream->priv;
1265   GSocket *rtp_socket = NULL;
1266   GSocket *rtcp_socket;
1267   gint tmp_rtp, tmp_rtcp;
1268   guint count;
1269   gint rtpport, rtcpport;
1270   GList *rejected_addresses = NULL;
1271   GstRTSPAddress *addr = NULL;
1272   GInetAddress *inetaddr = NULL;
1273   gchar *addr_str;
1274   GSocketAddress *rtp_sockaddr = NULL;
1275   GSocketAddress *rtcp_sockaddr = NULL;
1276   GstRTSPAddressPool *pool;
1277   GstRTSPLowerTrans transport;
1278   const gchar *multicast_iface = priv->multicast_iface;
1279
1280   pool = priv->pool;
1281   count = 0;
1282   transport = ct->lower_transport;
1283
1284   /* Start with random port */
1285   tmp_rtp = 0;
1286
1287   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1288       G_SOCKET_PROTOCOL_UDP, NULL);
1289   if (!rtcp_socket)
1290     goto no_udp_protocol;
1291   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1292
1293   if (*server_addr_out)
1294     gst_rtsp_address_free (*server_addr_out);
1295
1296   /* try to allocate 2 UDP ports, the RTP port should be an even
1297    * number and the RTCP port should be the next (uneven) port */
1298 again:
1299
1300   if (rtp_socket == NULL) {
1301     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1302         G_SOCKET_PROTOCOL_UDP, NULL);
1303     if (!rtp_socket)
1304       goto no_udp_protocol;
1305     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1306   }
1307
1308   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1309               gst_rtsp_address_pool_has_unicast_addresses (pool))
1310           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1311     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1312
1313     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1314       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1315     else
1316       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1317
1318     if (addr)
1319       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1320
1321     if (family == G_SOCKET_FAMILY_IPV6)
1322       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1323     else
1324       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1325
1326     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1327         && use_client_settings)
1328       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1329           ct->port.min, 2, ct->ttl, &addr);
1330     else
1331       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1332
1333     if (addr == NULL)
1334       goto no_ports;
1335
1336     tmp_rtp = addr->port;
1337
1338     g_clear_object (&inetaddr);
1339     inetaddr = g_inet_address_new_from_string (addr->address);
1340
1341     /* On Windows it's not possible to bind to a multicast address
1342      * but the OS will make sure to filter out all packets that
1343      * arrive not for the multicast address the socket joined.
1344      *
1345      * On Linux and others it is necessary to bind to a multicast
1346      * address to let the OS filter out all packets that are received
1347      * on the same port but for different addresses than the multicast
1348      * address
1349      */
1350 #ifdef G_OS_WIN32
1351     if (g_inet_address_get_is_multicast (inetaddr)) {
1352       g_object_unref (inetaddr);
1353       inetaddr = g_inet_address_new_any (family);
1354     }
1355 #endif
1356   } else {
1357     if (tmp_rtp != 0) {
1358       tmp_rtp += 2;
1359       if (++count > 20)
1360         goto no_ports;
1361     }
1362
1363     if (inetaddr == NULL)
1364       inetaddr = g_inet_address_new_any (family);
1365   }
1366
1367   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1368   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1369     g_object_unref (rtp_sockaddr);
1370     goto again;
1371   }
1372   g_object_unref (rtp_sockaddr);
1373
1374   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1375   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1376     g_clear_object (&rtp_sockaddr);
1377     goto socket_error;
1378   }
1379
1380   tmp_rtp =
1381       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1382   g_object_unref (rtp_sockaddr);
1383
1384   /* check if port is even */
1385   if ((tmp_rtp & 1) != 0) {
1386     /* port not even, close and allocate another */
1387     tmp_rtp++;
1388     g_clear_object (&rtp_socket);
1389     goto again;
1390   }
1391
1392   /* set port */
1393   tmp_rtcp = tmp_rtp + 1;
1394
1395   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1396   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1397     g_object_unref (rtcp_sockaddr);
1398     g_clear_object (&rtp_socket);
1399     goto again;
1400   }
1401   g_object_unref (rtcp_sockaddr);
1402
1403   if (addr == NULL)
1404     addr_str = g_inet_address_to_string (inetaddr);
1405   else
1406     addr_str = addr->address;
1407   g_clear_object (&inetaddr);
1408
1409   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1410           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
1411           transport)) {
1412     if (addr == NULL)
1413       g_free (addr_str);
1414     goto no_udp_protocol;
1415   }
1416
1417   if (addr == NULL)
1418     g_free (addr_str);
1419
1420   play_udpsources_one_family (stream, udpsrc_out, family);
1421
1422   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1423   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1424
1425   /* this should not happen... */
1426   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1427     goto port_error;
1428
1429   /* set RTP and RTCP sockets */
1430   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1431
1432   server_port_out->min = rtpport;
1433   server_port_out->max = rtcpport;
1434
1435   *server_addr_out = addr;
1436   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1437
1438   g_object_unref (rtp_socket);
1439   g_object_unref (rtcp_socket);
1440
1441   return TRUE;
1442
1443   /* ERRORS */
1444 no_udp_protocol:
1445   {
1446     goto cleanup;
1447   }
1448 no_ports:
1449   {
1450     goto cleanup;
1451   }
1452 port_error:
1453   {
1454     goto cleanup;
1455   }
1456 socket_error:
1457   {
1458     goto cleanup;
1459   }
1460 cleanup:
1461   {
1462     if (inetaddr)
1463       g_object_unref (inetaddr);
1464     g_list_free_full (rejected_addresses,
1465         (GDestroyNotify) gst_rtsp_address_free);
1466     if (addr)
1467       gst_rtsp_address_free (addr);
1468     if (rtp_socket)
1469       g_object_unref (rtp_socket);
1470     if (rtcp_socket)
1471       g_object_unref (rtcp_socket);
1472     return FALSE;
1473   }
1474 }
1475
1476 /**
1477  * gst_rtsp_stream_allocate_udp_sockets:
1478  * @stream: a #GstRTSPStream
1479  * @family: protocol family
1480  * @transport_method: transport method
1481  *
1482  * Allocates RTP and RTCP ports.
1483  *
1484  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1485  */
1486 gboolean
1487 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1488     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1489 {
1490   GstRTSPStreamPrivate *priv;
1491   gboolean result = FALSE;
1492   GstRTSPLowerTrans transport = ct->lower_transport;
1493
1494   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1495   priv = stream->priv;
1496   g_return_val_if_fail (priv->is_joined, FALSE);
1497
1498   g_mutex_lock (&priv->lock);
1499
1500   if (family == G_SOCKET_FAMILY_IPV4) {
1501     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1502       if (priv->have_ipv4_mcast)
1503         goto done;
1504       priv->have_ipv4_mcast =
1505           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1506           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1507           use_client_settings);
1508     } else {
1509       priv->have_ipv4 =
1510           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1511           &priv->server_port_v4, ct, &priv->server_addr_v4,
1512           use_client_settings);
1513     }
1514   } else {
1515     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1516       if (priv->have_ipv6_mcast)
1517         goto done;
1518       priv->have_ipv6_mcast =
1519           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1520           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1521           use_client_settings);
1522     } else {
1523       if (priv->have_ipv6)
1524         goto done;
1525       priv->have_ipv6 =
1526           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1527           &priv->server_port_v6, ct, &priv->server_addr_v6,
1528           use_client_settings);
1529     }
1530   }
1531
1532 done:
1533   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1534       priv->have_ipv6_mcast;
1535
1536   g_mutex_unlock (&priv->lock);
1537
1538   return result;
1539 }
1540
1541 /**
1542  * gst_rtsp_stream_set_client_side:
1543  * @stream: a #GstRTSPStream
1544  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1545  * an RTSP connection.
1546  *
1547  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1548  * streams to an RTSP server via RECORD. This has the practical effect
1549  * of changing which UDP port numbers are used when setting up the local
1550  * side of the stream sending to be either the 'server' or 'client' pair
1551  * of a configured UDP transport.
1552  */
1553 void
1554 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1555 {
1556   GstRTSPStreamPrivate *priv;
1557
1558   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1559   priv = stream->priv;
1560   g_mutex_lock (&priv->lock);
1561   priv->client_side = client_side;
1562   g_mutex_unlock (&priv->lock);
1563 }
1564
1565 /**
1566  * gst_rtsp_stream_is_client_side:
1567  * @stream: a #GstRTSPStream
1568  *
1569  * See gst_rtsp_stream_set_client_side()
1570  *
1571  * Returns: TRUE if this #GstRTSPStream is client-side.
1572  */
1573 gboolean
1574 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1575 {
1576   GstRTSPStreamPrivate *priv;
1577   gboolean ret;
1578
1579   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1580
1581   priv = stream->priv;
1582   g_mutex_lock (&priv->lock);
1583   ret = priv->client_side;
1584   g_mutex_unlock (&priv->lock);
1585
1586   return ret;
1587 }
1588
1589 /**
1590  * gst_rtsp_stream_get_server_port:
1591  * @stream: a #GstRTSPStream
1592  * @server_port: (out): result server port
1593  * @family: the port family to get
1594  *
1595  * Fill @server_port with the port pair used by the server. This function can
1596  * only be called when @stream has been joined.
1597  */
1598 void
1599 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1600     GstRTSPRange * server_port, GSocketFamily family)
1601 {
1602   GstRTSPStreamPrivate *priv;
1603
1604   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1605   priv = stream->priv;
1606   g_return_if_fail (priv->is_joined);
1607
1608   g_mutex_lock (&priv->lock);
1609   if (family == G_SOCKET_FAMILY_IPV4) {
1610     if (server_port)
1611       *server_port = priv->server_port_v4;
1612   } else {
1613     if (server_port)
1614       *server_port = priv->server_port_v6;
1615   }
1616   g_mutex_unlock (&priv->lock);
1617 }
1618
1619 /**
1620  * gst_rtsp_stream_get_rtpsession:
1621  * @stream: a #GstRTSPStream
1622  *
1623  * Get the RTP session of this stream.
1624  *
1625  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1626  */
1627 GObject *
1628 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1629 {
1630   GstRTSPStreamPrivate *priv;
1631   GObject *session;
1632
1633   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1634
1635   priv = stream->priv;
1636
1637   g_mutex_lock (&priv->lock);
1638   if ((session = priv->session))
1639     g_object_ref (session);
1640   g_mutex_unlock (&priv->lock);
1641
1642   return session;
1643 }
1644
1645 /**
1646  * gst_rtsp_stream_get_ssrc:
1647  * @stream: a #GstRTSPStream
1648  * @ssrc: (out): result ssrc
1649  *
1650  * Get the SSRC used by the RTP session of this stream. This function can only
1651  * be called when @stream has been joined.
1652  */
1653 void
1654 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1655 {
1656   GstRTSPStreamPrivate *priv;
1657
1658   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1659   priv = stream->priv;
1660   g_return_if_fail (priv->is_joined);
1661
1662   g_mutex_lock (&priv->lock);
1663   if (ssrc && priv->session)
1664     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1665   g_mutex_unlock (&priv->lock);
1666 }
1667
1668 /**
1669  * gst_rtsp_stream_set_retransmission_time:
1670  * @stream: a #GstRTSPStream
1671  * @time: a #GstClockTime
1672  *
1673  * Set the amount of time to store retransmission packets.
1674  */
1675 void
1676 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1677     GstClockTime time)
1678 {
1679   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1680
1681   g_mutex_lock (&stream->priv->lock);
1682   stream->priv->rtx_time = time;
1683   if (stream->priv->rtxsend)
1684     g_object_set (stream->priv->rtxsend, "max-size-time",
1685         GST_TIME_AS_MSECONDS (time), NULL);
1686   g_mutex_unlock (&stream->priv->lock);
1687 }
1688
1689 /**
1690  * gst_rtsp_stream_get_retransmission_time:
1691  * @stream: a #GstRTSPStream
1692  *
1693  * Get the amount of time to store retransmission data.
1694  *
1695  * Returns: the amount of time to store retransmission data.
1696  */
1697 GstClockTime
1698 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1699 {
1700   GstClockTime ret;
1701
1702   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1703
1704   g_mutex_lock (&stream->priv->lock);
1705   ret = stream->priv->rtx_time;
1706   g_mutex_unlock (&stream->priv->lock);
1707
1708   return ret;
1709 }
1710
1711 /**
1712  * gst_rtsp_stream_set_retransmission_pt:
1713  * @stream: a #GstRTSPStream
1714  * @rtx_pt: a #guint
1715  *
1716  * Set the payload type (pt) for retransmission of this stream.
1717  */
1718 void
1719 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1720 {
1721   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1722
1723   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1724
1725   g_mutex_lock (&stream->priv->lock);
1726   stream->priv->rtx_pt = rtx_pt;
1727   if (stream->priv->rtxsend) {
1728     guint pt = gst_rtsp_stream_get_pt (stream);
1729     gchar *pt_s = g_strdup_printf ("%d", pt);
1730     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1731         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1732     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1733     g_free (pt_s);
1734     gst_structure_free (rtx_pt_map);
1735   }
1736   g_mutex_unlock (&stream->priv->lock);
1737 }
1738
1739 /**
1740  * gst_rtsp_stream_get_retransmission_pt:
1741  * @stream: a #GstRTSPStream
1742  *
1743  * Get the payload-type used for retransmission of this stream
1744  *
1745  * Returns: The retransmission PT.
1746  */
1747 guint
1748 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1749 {
1750   guint rtx_pt;
1751
1752   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1753
1754   g_mutex_lock (&stream->priv->lock);
1755   rtx_pt = stream->priv->rtx_pt;
1756   g_mutex_unlock (&stream->priv->lock);
1757
1758   return rtx_pt;
1759 }
1760
1761 /**
1762  * gst_rtsp_stream_set_buffer_size:
1763  * @stream: a #GstRTSPStream
1764  * @size: the buffer size
1765  *
1766  * Set the size of the UDP transmission buffer (in bytes)
1767  * Needs to be set before the stream is joined to a bin.
1768  *
1769  * Since: 1.6
1770  */
1771 void
1772 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1773 {
1774   g_mutex_lock (&stream->priv->lock);
1775   stream->priv->buffer_size = size;
1776   g_mutex_unlock (&stream->priv->lock);
1777 }
1778
1779 /**
1780  * gst_rtsp_stream_get_buffer_size:
1781  * @stream: a #GstRTSPStream
1782  *
1783  * Get the size of the UDP transmission buffer (in bytes)
1784  *
1785  * Returns: the size of the UDP TX buffer
1786  *
1787  * Since: 1.6
1788  */
1789 guint
1790 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1791 {
1792   guint buffer_size;
1793
1794   g_mutex_lock (&stream->priv->lock);
1795   buffer_size = stream->priv->buffer_size;
1796   g_mutex_unlock (&stream->priv->lock);
1797
1798   return buffer_size;
1799 }
1800
1801 /* executed from streaming thread */
1802 static void
1803 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1804 {
1805   GstRTSPStreamPrivate *priv = stream->priv;
1806   GstCaps *newcaps, *oldcaps;
1807
1808   newcaps = gst_pad_get_current_caps (pad);
1809
1810   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1811       newcaps);
1812
1813   g_mutex_lock (&priv->lock);
1814   oldcaps = priv->caps;
1815   priv->caps = newcaps;
1816   g_mutex_unlock (&priv->lock);
1817
1818   if (oldcaps)
1819     gst_caps_unref (oldcaps);
1820 }
1821
1822 static void
1823 dump_structure (const GstStructure * s)
1824 {
1825   gchar *sstr;
1826
1827   sstr = gst_structure_to_string (s);
1828   GST_INFO ("structure: %s", sstr);
1829   g_free (sstr);
1830 }
1831
1832 static GstRTSPStreamTransport *
1833 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1834 {
1835   GstRTSPStreamPrivate *priv = stream->priv;
1836   GList *walk;
1837   GstRTSPStreamTransport *result = NULL;
1838   const gchar *tmp;
1839   gchar *dest;
1840   guint port;
1841
1842   if (rtcp_from == NULL)
1843     return NULL;
1844
1845   tmp = g_strrstr (rtcp_from, ":");
1846   if (tmp == NULL)
1847     return NULL;
1848
1849   port = atoi (tmp + 1);
1850   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1851
1852   g_mutex_lock (&priv->lock);
1853   GST_INFO ("finding %s:%d in %d transports", dest, port,
1854       g_list_length (priv->transports));
1855
1856   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1857     GstRTSPStreamTransport *trans = walk->data;
1858     const GstRTSPTransport *tr;
1859     gint min, max;
1860
1861     tr = gst_rtsp_stream_transport_get_transport (trans);
1862
1863     if (priv->client_side) {
1864       /* In client side mode the 'destination' is the RTSP server, so send
1865        * to those ports */
1866       min = tr->server_port.min;
1867       max = tr->server_port.max;
1868     } else {
1869       min = tr->client_port.min;
1870       max = tr->client_port.max;
1871     }
1872
1873     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1874       result = trans;
1875       break;
1876     }
1877   }
1878   if (result)
1879     g_object_ref (result);
1880   g_mutex_unlock (&priv->lock);
1881
1882   g_free (dest);
1883
1884   return result;
1885 }
1886
1887 static GstRTSPStreamTransport *
1888 check_transport (GObject * source, GstRTSPStream * stream)
1889 {
1890   GstStructure *stats;
1891   GstRTSPStreamTransport *trans;
1892
1893   /* see if we have a stream to match with the origin of the RTCP packet */
1894   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1895   if (trans == NULL) {
1896     g_object_get (source, "stats", &stats, NULL);
1897     if (stats) {
1898       const gchar *rtcp_from;
1899
1900       dump_structure (stats);
1901
1902       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1903       if ((trans = find_transport (stream, rtcp_from))) {
1904         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1905             source);
1906         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1907             g_object_unref);
1908       }
1909       gst_structure_free (stats);
1910     }
1911   }
1912   return trans;
1913 }
1914
1915
1916 static void
1917 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1918 {
1919   GstRTSPStreamTransport *trans;
1920
1921   GST_INFO ("%p: new source %p", stream, source);
1922
1923   trans = check_transport (source, stream);
1924
1925   if (trans)
1926     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1927 }
1928
1929 static void
1930 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1931 {
1932   GST_INFO ("%p: new SDES %p", stream, source);
1933 }
1934
1935 static void
1936 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1937 {
1938   GstRTSPStreamTransport *trans;
1939
1940   trans = check_transport (source, stream);
1941
1942   if (trans) {
1943     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1944     gst_rtsp_stream_transport_keep_alive (trans);
1945   }
1946 #ifdef DUMP_STATS
1947   {
1948     GstStructure *stats;
1949     g_object_get (source, "stats", &stats, NULL);
1950     if (stats) {
1951       dump_structure (stats);
1952       gst_structure_free (stats);
1953     }
1954   }
1955 #endif
1956 }
1957
1958 static void
1959 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1960 {
1961   GST_INFO ("%p: source %p bye", stream, source);
1962 }
1963
1964 static void
1965 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1966 {
1967   GstRTSPStreamTransport *trans;
1968
1969   GST_INFO ("%p: source %p bye timeout", stream, source);
1970
1971   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1972     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1973     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1974   }
1975 }
1976
1977 static void
1978 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1979 {
1980   GstRTSPStreamTransport *trans;
1981
1982   GST_INFO ("%p: source %p timeout", stream, source);
1983
1984   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1985     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1986     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1987   }
1988 }
1989
1990 static void
1991 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1992 {
1993   GST_INFO ("%p: new sender source %p", stream, source);
1994 #ifndef DUMP_STATS
1995   {
1996     GstStructure *stats;
1997     g_object_get (source, "stats", &stats, NULL);
1998     if (stats) {
1999       dump_structure (stats);
2000       gst_structure_free (stats);
2001     }
2002   }
2003 #endif
2004 }
2005
2006 static void
2007 on_sender_ssrc_active (GObject * session, GObject * source,
2008     GstRTSPStream * stream)
2009 {
2010 #ifndef DUMP_STATS
2011   {
2012     GstStructure *stats;
2013     g_object_get (source, "stats", &stats, NULL);
2014     if (stats) {
2015       dump_structure (stats);
2016       gst_structure_free (stats);
2017     }
2018   }
2019 #endif
2020 }
2021
2022 static void
2023 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2024 {
2025   if (is_rtp) {
2026     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2027     g_list_free (priv->tr_cache_rtp);
2028     priv->tr_cache_rtp = NULL;
2029   } else {
2030     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2031     g_list_free (priv->tr_cache_rtcp);
2032     priv->tr_cache_rtcp = NULL;
2033   }
2034 }
2035
2036 static GstFlowReturn
2037 handle_new_sample (GstAppSink * sink, gpointer user_data)
2038 {
2039   GstRTSPStreamPrivate *priv;
2040   GList *walk;
2041   GstSample *sample;
2042   GstBuffer *buffer;
2043   GstRTSPStream *stream;
2044   gboolean is_rtp;
2045
2046   sample = gst_app_sink_pull_sample (sink);
2047   if (!sample)
2048     return GST_FLOW_OK;
2049
2050   stream = (GstRTSPStream *) user_data;
2051   priv = stream->priv;
2052   buffer = gst_sample_get_buffer (sample);
2053
2054   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2055
2056   g_mutex_lock (&priv->lock);
2057   if (is_rtp) {
2058     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2059       clear_tr_cache (priv, is_rtp);
2060       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2061         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2062         priv->tr_cache_rtp =
2063             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2064       }
2065       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2066     }
2067   } else {
2068     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2069       clear_tr_cache (priv, is_rtp);
2070       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2071         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2072         priv->tr_cache_rtcp =
2073             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2074       }
2075       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2076     }
2077   }
2078   g_mutex_unlock (&priv->lock);
2079
2080   if (is_rtp) {
2081     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2082       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2083       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2084     }
2085   } else {
2086     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2087       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2088       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2089     }
2090   }
2091   gst_sample_unref (sample);
2092
2093   return GST_FLOW_OK;
2094 }
2095
2096 static GstAppSinkCallbacks sink_cb = {
2097   NULL,                         /* not interested in EOS */
2098   NULL,                         /* not interested in preroll samples */
2099   handle_new_sample,
2100 };
2101
2102 static GstElement *
2103 get_rtp_encoder (GstRTSPStream * stream, guint session)
2104 {
2105   GstRTSPStreamPrivate *priv = stream->priv;
2106
2107   if (priv->srtpenc == NULL) {
2108     gchar *name;
2109
2110     name = g_strdup_printf ("srtpenc_%u", session);
2111     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2112     g_free (name);
2113
2114     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2115   }
2116   return gst_object_ref (priv->srtpenc);
2117 }
2118
2119 static GstElement *
2120 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2121 {
2122   GstRTSPStreamPrivate *priv = stream->priv;
2123   GstElement *oldenc, *enc;
2124   GstPad *pad;
2125   gchar *name;
2126
2127   if (priv->idx != session)
2128     return NULL;
2129
2130   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2131
2132   oldenc = priv->srtpenc;
2133   enc = get_rtp_encoder (stream, session);
2134   name = g_strdup_printf ("rtp_sink_%d", session);
2135   pad = gst_element_get_request_pad (enc, name);
2136   g_free (name);
2137   gst_object_unref (pad);
2138
2139   if (oldenc == NULL)
2140     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2141         enc);
2142
2143   return enc;
2144 }
2145
2146 static GstElement *
2147 request_rtcp_encoder (GstElement * rtpbin, guint session,
2148     GstRTSPStream * stream)
2149 {
2150   GstRTSPStreamPrivate *priv = stream->priv;
2151   GstElement *oldenc, *enc;
2152   GstPad *pad;
2153   gchar *name;
2154
2155   if (priv->idx != session)
2156     return NULL;
2157
2158   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2159
2160   oldenc = priv->srtpenc;
2161   enc = get_rtp_encoder (stream, session);
2162   name = g_strdup_printf ("rtcp_sink_%d", session);
2163   pad = gst_element_get_request_pad (enc, name);
2164   g_free (name);
2165   gst_object_unref (pad);
2166
2167   if (oldenc == NULL)
2168     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2169         enc);
2170
2171   return enc;
2172 }
2173
2174 static GstCaps *
2175 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2176 {
2177   GstRTSPStreamPrivate *priv = stream->priv;
2178   GstCaps *caps;
2179
2180   GST_DEBUG ("request key %08x", ssrc);
2181
2182   g_mutex_lock (&priv->lock);
2183   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2184     gst_caps_ref (caps);
2185   g_mutex_unlock (&priv->lock);
2186
2187   return caps;
2188 }
2189
2190 static GstElement *
2191 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2192     GstRTSPStream * stream)
2193 {
2194   GstRTSPStreamPrivate *priv = stream->priv;
2195
2196   if (priv->idx != session)
2197     return NULL;
2198
2199   if (priv->srtpdec == NULL) {
2200     gchar *name;
2201
2202     name = g_strdup_printf ("srtpdec_%u", session);
2203     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2204     g_free (name);
2205
2206     g_signal_connect (priv->srtpdec, "request-key",
2207         (GCallback) request_key, stream);
2208   }
2209   return gst_object_ref (priv->srtpdec);
2210 }
2211
2212 /**
2213  * gst_rtsp_stream_request_aux_sender:
2214  * @stream: a #GstRTSPStream
2215  * @sessid: the session id
2216  *
2217  * Creating a rtxsend bin
2218  *
2219  * Returns: (transfer full): a #GstElement.
2220  *
2221  * Since: 1.6
2222  */
2223 GstElement *
2224 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2225 {
2226   GstElement *bin;
2227   GstPad *pad;
2228   GstStructure *pt_map;
2229   gchar *name;
2230   guint pt, rtx_pt;
2231   gchar *pt_s;
2232
2233   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2234
2235   pt = gst_rtsp_stream_get_pt (stream);
2236   pt_s = g_strdup_printf ("%u", pt);
2237   rtx_pt = stream->priv->rtx_pt;
2238
2239   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2240
2241   bin = gst_bin_new (NULL);
2242   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2243   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2244       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2245   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2246       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2247   g_free (pt_s);
2248   gst_structure_free (pt_map);
2249   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2250
2251   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2252   name = g_strdup_printf ("src_%u", sessid);
2253   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2254   g_free (name);
2255   gst_object_unref (pad);
2256
2257   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2258   name = g_strdup_printf ("sink_%u", sessid);
2259   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2260   g_free (name);
2261   gst_object_unref (pad);
2262
2263   return bin;
2264 }
2265
2266 /**
2267  * gst_rtsp_stream_set_pt_map:
2268  * @stream: a #GstRTSPStream
2269  * @pt: the pt
2270  * @caps: a #GstCaps
2271  *
2272  * Configure a pt map between @pt and @caps.
2273  */
2274 void
2275 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2276 {
2277   GstRTSPStreamPrivate *priv = stream->priv;
2278
2279   g_mutex_lock (&priv->lock);
2280   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2281   g_mutex_unlock (&priv->lock);
2282 }
2283
2284 static GstCaps *
2285 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2286     GstRTSPStream * stream)
2287 {
2288   GstRTSPStreamPrivate *priv = stream->priv;
2289   GstCaps *caps = NULL;
2290
2291   g_mutex_lock (&priv->lock);
2292
2293   if (priv->idx == session) {
2294     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2295     if (caps) {
2296       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2297       gst_caps_ref (caps);
2298     } else {
2299       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2300     }
2301   }
2302
2303   g_mutex_unlock (&priv->lock);
2304
2305   return caps;
2306 }
2307
2308 static void
2309 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2310 {
2311   GstRTSPStreamPrivate *priv = stream->priv;
2312   gchar *name;
2313   GstPadLinkReturn ret;
2314   guint sessid;
2315
2316   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2317       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2318
2319   name = gst_pad_get_name (pad);
2320   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2321     g_free (name);
2322     return;
2323   }
2324   g_free (name);
2325
2326   if (priv->idx != sessid)
2327     return;
2328
2329   if (gst_pad_is_linked (priv->sinkpad)) {
2330     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2331         GST_DEBUG_PAD_NAME (priv->sinkpad));
2332     return;
2333   }
2334
2335   /* link the RTP pad to the session manager, it should not really fail unless
2336    * this is not really an RTP pad */
2337   ret = gst_pad_link (pad, priv->sinkpad);
2338   if (ret != GST_PAD_LINK_OK)
2339     goto link_failed;
2340   priv->recv_rtp_src = gst_object_ref (pad);
2341
2342   return;
2343
2344 /* ERRORS */
2345 link_failed:
2346   {
2347     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2348         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2349   }
2350 }
2351
2352 static void
2353 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2354     GstRTSPStream * stream)
2355 {
2356   /* TODO: What to do here other than this? */
2357   GST_DEBUG ("Stream %p: Got EOS", stream);
2358   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2359 }
2360
2361 /* must be called with lock */
2362 static gboolean
2363 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2364 {
2365   GstRTSPStreamPrivate *priv;
2366   GstPad *pad, *sinkpad = NULL;
2367   gboolean is_tcp = FALSE, is_udp = FALSE;
2368   gint i;
2369
2370   priv = stream->priv;
2371
2372   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2373   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2374       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2375
2376   if (is_udp && !create_and_configure_udpsinks (stream))
2377     goto no_udp_protocol;
2378
2379   for (i = 0; i < 2; i++) {
2380     GstPad *teepad, *queuepad;
2381     /* For the sender we create this bit of pipeline for both
2382      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2383      * we need to add a queue before appsink and udpsink to make
2384      * the pipeline not block. For the TCP case, we want to pump
2385      * client as fast as possible anyway. This pipeline is used
2386      * when both TCP and UDP are present.
2387      *
2388      * .--------.      .-----.    .---------.    .---------.
2389      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2390      * |       send->sink   src->sink      src->sink       |
2391      * '--------'      |     |    '---------'    '---------'
2392      *                 |     |    .---------.    .---------.
2393      *                 |     |    |  queue  |    | appsink |
2394      *                 |    src->sink      src->sink       |
2395      *                 '-----'    '---------'    '---------'
2396      *
2397      * When only UDP or only TCP is allowed, we skip the tee and queue
2398      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2399      * the session.
2400      */
2401     /* Only link the RTP send src if we're going to send RTP, link
2402      * the RTCP send src always */
2403     if (priv->srcpad || i == 1) {
2404       if (is_udp) {
2405         /* add udpsink */
2406         gst_bin_add (bin, priv->udpsink[i]);
2407         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2408       }
2409
2410       if (is_tcp) {
2411         /* make appsink */
2412         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2413         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2414         gst_bin_add (bin, priv->appsink[i]);
2415         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2416             &sink_cb, stream, NULL);
2417       }
2418
2419       if (is_udp && is_tcp) {
2420         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2421
2422         /* make tee for RTP/RTCP */
2423         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2424         gst_bin_add (bin, priv->tee[i]);
2425
2426         /* and link to rtpbin send pad */
2427         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2428         gst_pad_link (priv->send_src[i], pad);
2429         gst_object_unref (pad);
2430
2431         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2432         g_object_set (priv->udpqueue[i], "max-size-buffers",
2433             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2434             NULL);
2435         gst_bin_add (bin, priv->udpqueue[i]);
2436         /* link tee to udpqueue */
2437         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2438         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2439         gst_pad_link (teepad, pad);
2440         gst_object_unref (pad);
2441         gst_object_unref (teepad);
2442
2443         /* link udpqueue to udpsink */
2444         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2445         gst_pad_link (queuepad, sinkpad);
2446         gst_object_unref (queuepad);
2447         gst_object_unref (sinkpad);
2448
2449         /* make appqueue */
2450         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2451         g_object_set (priv->appqueue[i], "max-size-buffers",
2452             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2453             NULL);
2454         gst_bin_add (bin, priv->appqueue[i]);
2455         /* and link tee to appqueue */
2456         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2457         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2458         gst_pad_link (teepad, pad);
2459         gst_object_unref (pad);
2460         gst_object_unref (teepad);
2461
2462         /* and link appqueue to appsink */
2463         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2464         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2465         gst_pad_link (queuepad, pad);
2466         gst_object_unref (pad);
2467         gst_object_unref (queuepad);
2468       } else if (is_tcp) {
2469         /* only appsink needed, link it to the session */
2470         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2471         gst_pad_link (priv->send_src[i], pad);
2472         gst_object_unref (pad);
2473
2474         /* when its only TCP, we need to set sync and preroll to FALSE
2475          * for the sink to avoid deadlock. And this is only needed for
2476          * sink used for RTCP data, not the RTP data. */
2477         if (i == 1)
2478           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2479       } else {
2480         /* else only udpsink needed, link it to the session */
2481         gst_pad_link (priv->send_src[i], sinkpad);
2482         gst_object_unref (sinkpad);
2483       }
2484     }
2485
2486     /* check if we need to set to a special state */
2487     if (state != GST_STATE_NULL) {
2488       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2489         gst_element_set_state (priv->udpsink[i], state);
2490       if (priv->appsink[i] && (priv->srcpad || i == 1))
2491         gst_element_set_state (priv->appsink[i], state);
2492       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2493         gst_element_set_state (priv->appqueue[i], state);
2494       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2495         gst_element_set_state (priv->udpqueue[i], state);
2496       if (priv->tee[i] && (priv->srcpad || i == 1))
2497         gst_element_set_state (priv->tee[i], state);
2498     }
2499   }
2500
2501   return TRUE;
2502
2503   /* ERRORS */
2504 no_udp_protocol:
2505   {
2506     return FALSE;
2507   }
2508 }
2509
2510 /* must be called with lock */
2511 static void
2512 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2513 {
2514   GstRTSPStreamPrivate *priv;
2515   GstPad *pad, *selpad;
2516   gboolean is_tcp;
2517   gint i;
2518
2519   priv = stream->priv;
2520
2521   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2522
2523   for (i = 0; i < 2; i++) {
2524     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2525      * RTCP sink always */
2526     if (priv->sinkpad || i == 1) {
2527       /* For the receiver we create this bit of pipeline for both
2528        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2529        * and it is all funneled into the rtpbin receive pad.
2530        *
2531        * .--------.     .--------.    .--------.
2532        * | udpsrc |     | funnel |    | rtpbin |
2533        * |       src->sink      src->sink      |
2534        * '--------'     |        |    '--------'
2535        * .--------.     |        |
2536        * | appsrc |     |        |
2537        * |       src->sink       |
2538        * '--------'     '--------'
2539        */
2540       /* make funnel for the RTP/RTCP receivers */
2541       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2542       gst_bin_add (bin, priv->funnel[i]);
2543
2544       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2545       gst_pad_link (pad, priv->recv_sink[i]);
2546       gst_object_unref (pad);
2547
2548       if (priv->udpsrc_v4[i]) {
2549         if (priv->srcpad) {
2550           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2551            * values. This is only relevant for PLAY pipelines */
2552           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2553           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2554         }
2555         /* add udpsrc */
2556         gst_bin_add (bin, priv->udpsrc_v4[i]);
2557
2558         /* and link to the funnel v4 */
2559         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2560         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2561         gst_pad_link (pad, selpad);
2562         gst_object_unref (pad);
2563         gst_object_unref (selpad);
2564       }
2565
2566       if (priv->udpsrc_v6[i]) {
2567         if (priv->srcpad) {
2568           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2569           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2570         }
2571         gst_bin_add (bin, priv->udpsrc_v6[i]);
2572
2573         /* and link to the funnel v6 */
2574         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2575         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2576         gst_pad_link (pad, selpad);
2577         gst_object_unref (pad);
2578         gst_object_unref (selpad);
2579       }
2580
2581       if (is_tcp) {
2582         /* make and add appsrc */
2583         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2584         priv->appsrc_base_time[i] = -1;
2585         if (priv->srcpad) {
2586           gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2587           gst_element_set_locked_state (priv->appsrc[i], TRUE);
2588         }
2589         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2590             TRUE, NULL);
2591         gst_bin_add (bin, priv->appsrc[i]);
2592         /* and link to the funnel */
2593         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2594         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2595         gst_pad_link (pad, selpad);
2596         gst_object_unref (pad);
2597         gst_object_unref (selpad);
2598       }
2599     }
2600
2601     /* check if we need to set to a special state */
2602     if (state != GST_STATE_NULL) {
2603       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2604         gst_element_set_state (priv->funnel[i], state);
2605     }
2606   }
2607 }
2608
2609 /**
2610  * gst_rtsp_stream_join_bin:
2611  * @stream: a #GstRTSPStream
2612  * @bin: (transfer none): a #GstBin to join
2613  * @rtpbin: (transfer none): a rtpbin element in @bin
2614  * @state: the target state of the new elements
2615  *
2616  * Join the #GstBin @bin that contains the element @rtpbin.
2617  *
2618  * @stream will link to @rtpbin, which must be inside @bin. The elements
2619  * added to @bin will be set to the state given in @state.
2620  *
2621  * Returns: %TRUE on success.
2622  */
2623 gboolean
2624 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2625     GstElement * rtpbin, GstState state)
2626 {
2627   GstRTSPStreamPrivate *priv;
2628   guint idx;
2629   gchar *name;
2630   GstPadLinkReturn ret;
2631
2632   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2633   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2634   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2635
2636   priv = stream->priv;
2637
2638   g_mutex_lock (&priv->lock);
2639   if (priv->is_joined)
2640     goto was_joined;
2641
2642   /* create a session with the same index as the stream */
2643   idx = priv->idx;
2644
2645   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2646
2647   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2648       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2649     /* For SRTP */
2650     g_signal_connect (rtpbin, "request-rtp-encoder",
2651         (GCallback) request_rtp_encoder, stream);
2652     g_signal_connect (rtpbin, "request-rtcp-encoder",
2653         (GCallback) request_rtcp_encoder, stream);
2654     g_signal_connect (rtpbin, "request-rtp-decoder",
2655         (GCallback) request_rtp_rtcp_decoder, stream);
2656     g_signal_connect (rtpbin, "request-rtcp-decoder",
2657         (GCallback) request_rtp_rtcp_decoder, stream);
2658   }
2659
2660   if (priv->sinkpad) {
2661     g_signal_connect (rtpbin, "request-pt-map",
2662         (GCallback) request_pt_map, stream);
2663   }
2664
2665   /* get pads from the RTP session element for sending and receiving
2666    * RTP/RTCP*/
2667   if (priv->srcpad) {
2668     /* get a pad for sending RTP */
2669     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2670     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2671     g_free (name);
2672
2673     /* link the RTP pad to the session manager, it should not really fail unless
2674      * this is not really an RTP pad */
2675     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2676     if (ret != GST_PAD_LINK_OK)
2677       goto link_failed;
2678
2679     name = g_strdup_printf ("send_rtp_src_%u", idx);
2680     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2681     g_free (name);
2682   } else {
2683     /* Need to connect our sinkpad from here */
2684     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2685     /* EOS */
2686     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2687
2688     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2689     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2690     g_free (name);
2691   }
2692
2693   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2694   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2695   g_free (name);
2696   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2697   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2698   g_free (name);
2699
2700   /* get the session */
2701   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2702
2703   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2704       stream);
2705   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2706       stream);
2707   g_signal_connect (priv->session, "on-ssrc-active",
2708       (GCallback) on_ssrc_active, stream);
2709   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2710       stream);
2711   g_signal_connect (priv->session, "on-bye-timeout",
2712       (GCallback) on_bye_timeout, stream);
2713   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2714       stream);
2715
2716   /* signal for sender ssrc */
2717   g_signal_connect (priv->session, "on-new-sender-ssrc",
2718       (GCallback) on_new_sender_ssrc, stream);
2719   g_signal_connect (priv->session, "on-sender-ssrc-active",
2720       (GCallback) on_sender_ssrc_active, stream);
2721
2722   if (!create_sender_part (stream, bin, state))
2723     goto no_udp_protocol;
2724
2725   create_receiver_part (stream, bin, state);
2726
2727   if (priv->srcpad) {
2728     /* be notified of caps changes */
2729     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2730         (GCallback) caps_notify, stream);
2731   }
2732
2733   priv->is_joined = TRUE;
2734   g_mutex_unlock (&priv->lock);
2735
2736   return TRUE;
2737
2738   /* ERRORS */
2739 was_joined:
2740   {
2741     g_mutex_unlock (&priv->lock);
2742     return TRUE;
2743   }
2744 link_failed:
2745   {
2746     GST_WARNING ("failed to link stream %u", idx);
2747     gst_object_unref (priv->send_rtp_sink);
2748     priv->send_rtp_sink = NULL;
2749     g_mutex_unlock (&priv->lock);
2750     return FALSE;
2751   }
2752 no_udp_protocol:
2753   {
2754     GST_WARNING ("failed to allocate ports %u", idx);
2755     gst_object_unref (priv->send_rtp_sink);
2756     priv->send_rtp_sink = NULL;
2757     gst_object_unref (priv->send_src[0]);
2758     priv->send_src[0] = NULL;
2759     gst_object_unref (priv->send_src[1]);
2760     priv->send_src[1] = NULL;
2761     gst_object_unref (priv->recv_sink[0]);
2762     priv->recv_sink[0] = NULL;
2763     gst_object_unref (priv->recv_sink[1]);
2764     priv->recv_sink[1] = NULL;
2765     if (priv->udpsink[0])
2766       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2767     if (priv->udpsink[1])
2768       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2769     if (priv->udpsrc_v4[0]) {
2770       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2771       gst_object_unref (priv->udpsrc_v4[0]);
2772       priv->udpsrc_v4[0] = NULL;
2773     }
2774     if (priv->udpsrc_v4[1]) {
2775       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2776       gst_object_unref (priv->udpsrc_v4[1]);
2777       priv->udpsrc_v4[1] = NULL;
2778     }
2779     if (priv->udpsrc_mcast_v4[0]) {
2780       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2781       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2782       priv->udpsrc_mcast_v4[0] = NULL;
2783     }
2784     if (priv->udpsrc_mcast_v4[1]) {
2785       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2786       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2787       priv->udpsrc_mcast_v4[1] = NULL;
2788     }
2789     if (priv->udpsrc_v6[0]) {
2790       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2791       gst_object_unref (priv->udpsrc_v6[0]);
2792       priv->udpsrc_v6[0] = NULL;
2793     }
2794     if (priv->udpsrc_v6[1]) {
2795       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2796       gst_object_unref (priv->udpsrc_v6[1]);
2797       priv->udpsrc_v6[1] = NULL;
2798     }
2799     if (priv->udpsrc_mcast_v6[0]) {
2800       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2801       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2802       priv->udpsrc_mcast_v6[0] = NULL;
2803     }
2804     if (priv->udpsrc_mcast_v6[1]) {
2805       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2806       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2807       priv->udpsrc_mcast_v6[1] = NULL;
2808     }
2809     g_mutex_unlock (&priv->lock);
2810     return FALSE;
2811   }
2812 }
2813
2814 /**
2815  * gst_rtsp_stream_leave_bin:
2816  * @stream: a #GstRTSPStream
2817  * @bin: (transfer none): a #GstBin
2818  * @rtpbin: (transfer none): a rtpbin #GstElement
2819  *
2820  * Remove the elements of @stream from @bin.
2821  *
2822  * Return: %TRUE on success.
2823  */
2824 gboolean
2825 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2826     GstElement * rtpbin)
2827 {
2828   GstRTSPStreamPrivate *priv;
2829   gint i;
2830   gboolean is_tcp, is_udp;
2831
2832   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2833   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2834   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2835
2836   priv = stream->priv;
2837
2838   g_mutex_lock (&priv->lock);
2839   if (!priv->is_joined)
2840     goto was_not_joined;
2841
2842   /* all transports must be removed by now */
2843   if (priv->transports != NULL)
2844     goto transports_not_removed;
2845
2846   clear_tr_cache (priv, TRUE);
2847   clear_tr_cache (priv, FALSE);
2848
2849   GST_INFO ("stream %p leaving bin", stream);
2850
2851   if (priv->srcpad) {
2852     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2853
2854     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2855     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2856     gst_object_unref (priv->send_rtp_sink);
2857     priv->send_rtp_sink = NULL;
2858   } else if (priv->recv_rtp_src) {
2859     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2860     gst_object_unref (priv->recv_rtp_src);
2861     priv->recv_rtp_src = NULL;
2862   }
2863
2864   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2865
2866   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2867       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2868
2869
2870   for (i = 0; i < 2; i++) {
2871     if (priv->udpsink[i])
2872       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2873     if (priv->appsink[i])
2874       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2875     if (priv->appqueue[i])
2876       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2877     if (priv->udpqueue[i])
2878       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2879     if (priv->tee[i])
2880       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2881     if (priv->funnel[i])
2882       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2883     if (priv->appsrc[i])
2884       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2885
2886     if (priv->udpsrc_v4[i]) {
2887       if (priv->sinkpad || i == 1) {
2888         /* and set udpsrc to NULL now before removing */
2889         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2890         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2891         /* removing them should also nicely release the request
2892          * pads when they finalize */
2893         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2894       } else {
2895         /* we need to set the state to NULL before unref */
2896         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2897         gst_object_unref (priv->udpsrc_v4[i]);
2898       }
2899     }
2900
2901     if (priv->udpsrc_mcast_v4[i]) {
2902       if (priv->sinkpad || i == 1) {
2903         /* and set udpsrc to NULL now before removing */
2904         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2905         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2906         /* removing them should also nicely release the request
2907          * pads when they finalize */
2908         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2909       } else {
2910         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2911         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2912       }
2913     }
2914
2915     if (priv->udpsrc_v6[i]) {
2916       if (priv->sinkpad || i == 1) {
2917         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2918         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2919         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2920       } else {
2921         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2922         gst_object_unref (priv->udpsrc_v6[i]);
2923       }
2924     }
2925     if (priv->udpsrc_mcast_v6[i]) {
2926       if (priv->sinkpad || i == 1) {
2927         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
2928         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2929         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
2930       } else {
2931         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
2932         gst_object_unref (priv->udpsrc_mcast_v6[i]);
2933       }
2934     }
2935
2936     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2937       gst_bin_remove (bin, priv->udpsink[i]);
2938     if (priv->appsrc[i]) {
2939       if (priv->sinkpad || i == 1) {
2940         gst_element_set_locked_state (priv->appsrc[i], FALSE);
2941         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2942         gst_bin_remove (bin, priv->appsrc[i]);
2943       } else {
2944         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2945         gst_object_unref (priv->appsrc[i]);
2946       }
2947     }
2948     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2949       gst_bin_remove (bin, priv->appsink[i]);
2950     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2951       gst_bin_remove (bin, priv->appqueue[i]);
2952     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2953       gst_bin_remove (bin, priv->udpqueue[i]);
2954     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2955       gst_bin_remove (bin, priv->tee[i]);
2956     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2957       gst_bin_remove (bin, priv->funnel[i]);
2958
2959     if (priv->sinkpad || i == 1) {
2960       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2961       gst_object_unref (priv->recv_sink[i]);
2962       priv->recv_sink[i] = NULL;
2963     }
2964
2965     priv->udpsrc_v4[i] = NULL;
2966     priv->udpsrc_v6[i] = NULL;
2967     priv->udpsrc_mcast_v4[i] = NULL;
2968     priv->udpsrc_mcast_v6[i] = NULL;
2969     priv->udpsink[i] = NULL;
2970     priv->appsrc[i] = NULL;
2971     priv->appsink[i] = NULL;
2972     priv->appqueue[i] = NULL;
2973     priv->udpqueue[i] = NULL;
2974     priv->tee[i] = NULL;
2975     priv->funnel[i] = NULL;
2976   }
2977
2978   if (priv->srcpad) {
2979     gst_object_unref (priv->send_src[0]);
2980     priv->send_src[0] = NULL;
2981   }
2982
2983   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2984   gst_object_unref (priv->send_src[1]);
2985   priv->send_src[1] = NULL;
2986
2987   g_object_unref (priv->session);
2988   priv->session = NULL;
2989   if (priv->caps)
2990     gst_caps_unref (priv->caps);
2991   priv->caps = NULL;
2992
2993   if (priv->srtpenc)
2994     gst_object_unref (priv->srtpenc);
2995   if (priv->srtpdec)
2996     gst_object_unref (priv->srtpdec);
2997
2998   priv->is_joined = FALSE;
2999   g_mutex_unlock (&priv->lock);
3000
3001   return TRUE;
3002
3003 was_not_joined:
3004   {
3005     g_mutex_unlock (&priv->lock);
3006     return TRUE;
3007   }
3008 transports_not_removed:
3009   {
3010     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3011     g_mutex_unlock (&priv->lock);
3012     return FALSE;
3013   }
3014 }
3015
3016 /**
3017  * gst_rtsp_stream_get_rtpinfo:
3018  * @stream: a #GstRTSPStream
3019  * @rtptime: (allow-none): result RTP timestamp
3020  * @seq: (allow-none): result RTP seqnum
3021  * @clock_rate: (allow-none): the clock rate
3022  * @running_time: (allow-none): result running-time
3023  *
3024  * Retrieve the current rtptime, seq and running-time. This is used to
3025  * construct a RTPInfo reply header.
3026  *
3027  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3028  */
3029 gboolean
3030 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3031     guint * rtptime, guint * seq, guint * clock_rate,
3032     GstClockTime * running_time)
3033 {
3034   GstRTSPStreamPrivate *priv;
3035   GstStructure *stats;
3036   GObjectClass *payobjclass;
3037
3038   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3039
3040   priv = stream->priv;
3041
3042   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3043
3044   g_mutex_lock (&priv->lock);
3045
3046   /* First try to extract the information from the last buffer on the sinks.
3047    * This will have a more accurate sequence number and timestamp, as between
3048    * the payloader and the sink there can be some queues
3049    */
3050   if (priv->udpsink[0] || priv->appsink[0]) {
3051     GstSample *last_sample;
3052
3053     if (priv->udpsink[0])
3054       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3055     else
3056       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3057
3058     if (last_sample) {
3059       GstCaps *caps;
3060       GstBuffer *buffer;
3061       GstSegment *segment;
3062       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3063
3064       caps = gst_sample_get_caps (last_sample);
3065       buffer = gst_sample_get_buffer (last_sample);
3066       segment = gst_sample_get_segment (last_sample);
3067
3068       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3069         if (seq) {
3070           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3071         }
3072
3073         if (rtptime) {
3074           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3075         }
3076
3077         gst_rtp_buffer_unmap (&rtp_buffer);
3078
3079         if (running_time) {
3080           *running_time =
3081               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3082               GST_BUFFER_TIMESTAMP (buffer));
3083         }
3084
3085         if (clock_rate) {
3086           GstStructure *s = gst_caps_get_structure (caps, 0);
3087
3088           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3089
3090           if (*clock_rate == 0 && running_time)
3091             *running_time = GST_CLOCK_TIME_NONE;
3092         }
3093         gst_sample_unref (last_sample);
3094
3095         goto done;
3096       } else {
3097         gst_sample_unref (last_sample);
3098       }
3099     }
3100   }
3101
3102   if (g_object_class_find_property (payobjclass, "stats")) {
3103     g_object_get (priv->payloader, "stats", &stats, NULL);
3104     if (stats == NULL)
3105       goto no_stats;
3106
3107     if (seq)
3108       gst_structure_get_uint (stats, "seqnum", seq);
3109
3110     if (rtptime)
3111       gst_structure_get_uint (stats, "timestamp", rtptime);
3112
3113     if (running_time)
3114       gst_structure_get_clock_time (stats, "running-time", running_time);
3115
3116     if (clock_rate) {
3117       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3118       if (*clock_rate == 0 && running_time)
3119         *running_time = GST_CLOCK_TIME_NONE;
3120     }
3121     gst_structure_free (stats);
3122   } else {
3123     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3124         !g_object_class_find_property (payobjclass, "timestamp"))
3125       goto no_stats;
3126
3127     if (seq)
3128       g_object_get (priv->payloader, "seqnum", seq, NULL);
3129
3130     if (rtptime)
3131       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3132
3133     if (running_time)
3134       *running_time = GST_CLOCK_TIME_NONE;
3135   }
3136
3137 done:
3138   g_mutex_unlock (&priv->lock);
3139
3140   return TRUE;
3141
3142   /* ERRORS */
3143 no_stats:
3144   {
3145     GST_WARNING ("Could not get payloader stats");
3146     g_mutex_unlock (&priv->lock);
3147     return FALSE;
3148   }
3149 }
3150
3151 /**
3152  * gst_rtsp_stream_get_caps:
3153  * @stream: a #GstRTSPStream
3154  *
3155  * Retrieve the current caps of @stream.
3156  *
3157  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3158  * after usage.
3159  */
3160 GstCaps *
3161 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3162 {
3163   GstRTSPStreamPrivate *priv;
3164   GstCaps *result;
3165
3166   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3167
3168   priv = stream->priv;
3169
3170   g_mutex_lock (&priv->lock);
3171   if ((result = priv->caps))
3172     gst_caps_ref (result);
3173   g_mutex_unlock (&priv->lock);
3174
3175   return result;
3176 }
3177
3178 /**
3179  * gst_rtsp_stream_recv_rtp:
3180  * @stream: a #GstRTSPStream
3181  * @buffer: (transfer full): a #GstBuffer
3182  *
3183  * Handle an RTP buffer for the stream. This method is usually called when a
3184  * message has been received from a client using the TCP transport.
3185  *
3186  * This function takes ownership of @buffer.
3187  *
3188  * Returns: a GstFlowReturn.
3189  */
3190 GstFlowReturn
3191 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3192 {
3193   GstRTSPStreamPrivate *priv;
3194   GstFlowReturn ret;
3195   GstElement *element;
3196
3197   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3198   priv = stream->priv;
3199   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3200   g_return_val_if_fail (priv->is_joined, FALSE);
3201
3202   g_mutex_lock (&priv->lock);
3203   if (priv->appsrc[0])
3204     element = gst_object_ref (priv->appsrc[0]);
3205   else
3206     element = NULL;
3207   g_mutex_unlock (&priv->lock);
3208
3209   if (element) {
3210     if (priv->appsrc_base_time[0] == -1) {
3211       /* Take current running_time. This timestamp will be put on
3212        * the first buffer of each stream because we are a live source and so we
3213        * timestamp with the running_time. When we are dealing with TCP, we also
3214        * only timestamp the first buffer (using the DISCONT flag) because a server
3215        * typically bursts data, for which we don't want to compensate by speeding
3216        * up the media. The other timestamps will be interpollated from this one
3217        * using the RTP timestamps. */
3218       GST_OBJECT_LOCK (element);
3219       if (GST_ELEMENT_CLOCK (element)) {
3220         GstClockTime now;
3221         GstClockTime base_time;
3222
3223         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3224         base_time = GST_ELEMENT_CAST (element)->base_time;
3225
3226         priv->appsrc_base_time[0] = now - base_time;
3227         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3228         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3229             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3230             GST_TIME_ARGS (base_time));
3231       }
3232       GST_OBJECT_UNLOCK (element);
3233     }
3234
3235     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3236     gst_object_unref (element);
3237   } else {
3238     ret = GST_FLOW_OK;
3239   }
3240   return ret;
3241 }
3242
3243 /**
3244  * gst_rtsp_stream_recv_rtcp:
3245  * @stream: a #GstRTSPStream
3246  * @buffer: (transfer full): a #GstBuffer
3247  *
3248  * Handle an RTCP buffer for the stream. This method is usually called when a
3249  * message has been received from a client using the TCP transport.
3250  *
3251  * This function takes ownership of @buffer.
3252  *
3253  * Returns: a GstFlowReturn.
3254  */
3255 GstFlowReturn
3256 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3257 {
3258   GstRTSPStreamPrivate *priv;
3259   GstFlowReturn ret;
3260   GstElement *element;
3261
3262   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3263   priv = stream->priv;
3264   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3265
3266   if (!priv->is_joined) {
3267     gst_buffer_unref (buffer);
3268     return GST_FLOW_NOT_LINKED;
3269   }
3270   g_mutex_lock (&priv->lock);
3271   if (priv->appsrc[1])
3272     element = gst_object_ref (priv->appsrc[1]);
3273   else
3274     element = NULL;
3275   g_mutex_unlock (&priv->lock);
3276
3277   if (element) {
3278     if (priv->appsrc_base_time[1] == -1) {
3279       /* Take current running_time. This timestamp will be put on
3280        * the first buffer of each stream because we are a live source and so we
3281        * timestamp with the running_time. When we are dealing with TCP, we also
3282        * only timestamp the first buffer (using the DISCONT flag) because a server
3283        * typically bursts data, for which we don't want to compensate by speeding
3284        * up the media. The other timestamps will be interpollated from this one
3285        * using the RTP timestamps. */
3286       GST_OBJECT_LOCK (element);
3287       if (GST_ELEMENT_CLOCK (element)) {
3288         GstClockTime now;
3289         GstClockTime base_time;
3290
3291         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3292         base_time = GST_ELEMENT_CAST (element)->base_time;
3293
3294         priv->appsrc_base_time[1] = now - base_time;
3295         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3296         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3297             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3298             GST_TIME_ARGS (base_time));
3299       }
3300       GST_OBJECT_UNLOCK (element);
3301     }
3302
3303     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3304     gst_object_unref (element);
3305   } else {
3306     ret = GST_FLOW_OK;
3307     gst_buffer_unref (buffer);
3308   }
3309   return ret;
3310 }
3311
3312 /* must be called with lock */
3313 static gboolean
3314 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3315     gboolean add)
3316 {
3317   GstRTSPStreamPrivate *priv = stream->priv;
3318   const GstRTSPTransport *tr;
3319
3320   tr = gst_rtsp_stream_transport_get_transport (trans);
3321
3322   switch (tr->lower_transport) {
3323     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3324     case GST_RTSP_LOWER_TRANS_UDP:
3325     {
3326       gchar *dest;
3327       gint min, max;
3328       guint ttl = 0;
3329
3330       dest = tr->destination;
3331       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3332         min = tr->port.min;
3333         max = tr->port.max;
3334         ttl = tr->ttl;
3335       } else if (priv->client_side) {
3336         /* In client side mode the 'destination' is the RTSP server, so send
3337          * to those ports */
3338         min = tr->server_port.min;
3339         max = tr->server_port.max;
3340       } else {
3341         min = tr->client_port.min;
3342         max = tr->client_port.max;
3343       }
3344
3345       if (add) {
3346         if (ttl > 0) {
3347           GST_INFO ("setting ttl-mc %d", ttl);
3348           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3349           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3350         }
3351         GST_INFO ("adding %s:%d-%d", dest, min, max);
3352         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3353         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3354         priv->transports = g_list_prepend (priv->transports, trans);
3355       } else {
3356         GST_INFO ("removing %s:%d-%d", dest, min, max);
3357         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3358         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3359         priv->transports = g_list_remove (priv->transports, trans);
3360       }
3361       priv->transports_cookie++;
3362       break;
3363     }
3364     case GST_RTSP_LOWER_TRANS_TCP:
3365       if (add) {
3366         GST_INFO ("adding TCP %s", tr->destination);
3367         priv->transports = g_list_prepend (priv->transports, trans);
3368       } else {
3369         GST_INFO ("removing TCP %s", tr->destination);
3370         priv->transports = g_list_remove (priv->transports, trans);
3371       }
3372       priv->transports_cookie++;
3373       break;
3374     default:
3375       goto unknown_transport;
3376   }
3377   return TRUE;
3378
3379   /* ERRORS */
3380 unknown_transport:
3381   {
3382     GST_INFO ("Unknown transport %d", tr->lower_transport);
3383     return FALSE;
3384   }
3385 }
3386
3387
3388 /**
3389  * gst_rtsp_stream_add_transport:
3390  * @stream: a #GstRTSPStream
3391  * @trans: (transfer none): a #GstRTSPStreamTransport
3392  *
3393  * Add the transport in @trans to @stream. The media of @stream will
3394  * then also be send to the values configured in @trans.
3395  *
3396  * @stream must be joined to a bin.
3397  *
3398  * @trans must contain a valid #GstRTSPTransport.
3399  *
3400  * Returns: %TRUE if @trans was added
3401  */
3402 gboolean
3403 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3404     GstRTSPStreamTransport * trans)
3405 {
3406   GstRTSPStreamPrivate *priv;
3407   gboolean res;
3408
3409   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3410   priv = stream->priv;
3411   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3412   g_return_val_if_fail (priv->is_joined, FALSE);
3413
3414   g_mutex_lock (&priv->lock);
3415   res = update_transport (stream, trans, TRUE);
3416   g_mutex_unlock (&priv->lock);
3417
3418   return res;
3419 }
3420
3421 /**
3422  * gst_rtsp_stream_remove_transport:
3423  * @stream: a #GstRTSPStream
3424  * @trans: (transfer none): a #GstRTSPStreamTransport
3425  *
3426  * Remove the transport in @trans from @stream. The media of @stream will
3427  * not be sent to the values configured in @trans.
3428  *
3429  * @stream must be joined to a bin.
3430  *
3431  * @trans must contain a valid #GstRTSPTransport.
3432  *
3433  * Returns: %TRUE if @trans was removed
3434  */
3435 gboolean
3436 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3437     GstRTSPStreamTransport * trans)
3438 {
3439   GstRTSPStreamPrivate *priv;
3440   gboolean res;
3441
3442   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3443   priv = stream->priv;
3444   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3445   g_return_val_if_fail (priv->is_joined, FALSE);
3446
3447   g_mutex_lock (&priv->lock);
3448   res = update_transport (stream, trans, FALSE);
3449   g_mutex_unlock (&priv->lock);
3450
3451   return res;
3452 }
3453
3454 /**
3455  * gst_rtsp_stream_update_crypto:
3456  * @stream: a #GstRTSPStream
3457  * @ssrc: the SSRC
3458  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3459  *
3460  * Update the new crypto information for @ssrc in @stream. If information
3461  * for @ssrc did not exist, it will be added. If information
3462  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3463  * be removed from @stream.
3464  *
3465  * Returns: %TRUE if @crypto could be updated
3466  */
3467 gboolean
3468 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3469     guint ssrc, GstCaps * crypto)
3470 {
3471   GstRTSPStreamPrivate *priv;
3472
3473   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3474   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3475
3476   priv = stream->priv;
3477
3478   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3479
3480   g_mutex_lock (&priv->lock);
3481   if (crypto)
3482     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3483         gst_caps_ref (crypto));
3484   else
3485     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3486   g_mutex_unlock (&priv->lock);
3487
3488   return TRUE;
3489 }
3490
3491 /**
3492  * gst_rtsp_stream_get_rtp_socket:
3493  * @stream: a #GstRTSPStream
3494  * @family: the socket family
3495  *
3496  * Get the RTP socket from @stream for a @family.
3497  *
3498  * @stream must be joined to a bin.
3499  *
3500  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3501  * socket could be allocated for @family. Unref after usage
3502  */
3503 GSocket *
3504 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3505 {
3506   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3507   GSocket *socket;
3508   const gchar *name;
3509
3510   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3511   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3512       family == G_SOCKET_FAMILY_IPV6, NULL);
3513   g_return_val_if_fail (priv->udpsink[0], NULL);
3514
3515   if (family == G_SOCKET_FAMILY_IPV6)
3516     name = "socket-v6";
3517   else
3518     name = "socket";
3519
3520   g_object_get (priv->udpsink[0], name, &socket, NULL);
3521
3522   return socket;
3523 }
3524
3525 /**
3526  * gst_rtsp_stream_get_rtcp_socket:
3527  * @stream: a #GstRTSPStream
3528  * @family: the socket family
3529  *
3530  * Get the RTCP socket from @stream for a @family.
3531  *
3532  * @stream must be joined to a bin.
3533  *
3534  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3535  * socket could be allocated for @family. Unref after usage
3536  */
3537 GSocket *
3538 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3539 {
3540   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3541   GSocket *socket;
3542   const gchar *name;
3543
3544   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3545   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3546       family == G_SOCKET_FAMILY_IPV6, NULL);
3547   g_return_val_if_fail (priv->udpsink[1], NULL);
3548
3549   if (family == G_SOCKET_FAMILY_IPV6)
3550     name = "socket-v6";
3551   else
3552     name = "socket";
3553
3554   g_object_get (priv->udpsink[1], name, &socket, NULL);
3555
3556   return socket;
3557 }
3558
3559 /**
3560  * gst_rtsp_stream_set_seqnum:
3561  * @stream: a #GstRTSPStream
3562  * @seqnum: a new sequence number
3563  *
3564  * Configure the sequence number in the payloader of @stream to @seqnum.
3565  */
3566 void
3567 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3568 {
3569   GstRTSPStreamPrivate *priv;
3570
3571   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3572
3573   priv = stream->priv;
3574
3575   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3576 }
3577
3578 /**
3579  * gst_rtsp_stream_get_seqnum:
3580  * @stream: a #GstRTSPStream
3581  *
3582  * Get the configured sequence number in the payloader of @stream.
3583  *
3584  * Returns: the sequence number of the payloader.
3585  */
3586 guint16
3587 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3588 {
3589   GstRTSPStreamPrivate *priv;
3590   guint seqnum;
3591
3592   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3593
3594   priv = stream->priv;
3595
3596   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3597
3598   return seqnum;
3599 }
3600
3601 /**
3602  * gst_rtsp_stream_transport_filter:
3603  * @stream: a #GstRTSPStream
3604  * @func: (scope call) (allow-none): a callback
3605  * @user_data: (closure): user data passed to @func
3606  *
3607  * Call @func for each transport managed by @stream. The result value of @func
3608  * determines what happens to the transport. @func will be called with @stream
3609  * locked so no further actions on @stream can be performed from @func.
3610  *
3611  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3612  * @stream.
3613  *
3614  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3615  *
3616  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3617  * will also be added with an additional ref to the result #GList of this
3618  * function..
3619  *
3620  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3621  *
3622  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3623  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3624  * element in the #GList should be unreffed before the list is freed.
3625  */
3626 GList *
3627 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3628     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3629 {
3630   GstRTSPStreamPrivate *priv;
3631   GList *result, *walk, *next;
3632   GHashTable *visited = NULL;
3633   guint cookie;
3634
3635   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3636
3637   priv = stream->priv;
3638
3639   result = NULL;
3640   if (func)
3641     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3642
3643   g_mutex_lock (&priv->lock);
3644 restart:
3645   cookie = priv->transports_cookie;
3646   for (walk = priv->transports; walk; walk = next) {
3647     GstRTSPStreamTransport *trans = walk->data;
3648     GstRTSPFilterResult res;
3649     gboolean changed;
3650
3651     next = g_list_next (walk);
3652
3653     if (func) {
3654       /* only visit each transport once */
3655       if (g_hash_table_contains (visited, trans))
3656         continue;
3657
3658       g_hash_table_add (visited, g_object_ref (trans));
3659       g_mutex_unlock (&priv->lock);
3660
3661       res = func (stream, trans, user_data);
3662
3663       g_mutex_lock (&priv->lock);
3664     } else
3665       res = GST_RTSP_FILTER_REF;
3666
3667     changed = (cookie != priv->transports_cookie);
3668
3669     switch (res) {
3670       case GST_RTSP_FILTER_REMOVE:
3671         update_transport (stream, trans, FALSE);
3672         break;
3673       case GST_RTSP_FILTER_REF:
3674         result = g_list_prepend (result, g_object_ref (trans));
3675         break;
3676       case GST_RTSP_FILTER_KEEP:
3677       default:
3678         break;
3679     }
3680     if (changed)
3681       goto restart;
3682   }
3683   g_mutex_unlock (&priv->lock);
3684
3685   if (func)
3686     g_hash_table_unref (visited);
3687
3688   return result;
3689 }
3690
3691 static GstPadProbeReturn
3692 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3693 {
3694   GstRTSPStreamPrivate *priv;
3695   GstRTSPStream *stream;
3696
3697   stream = user_data;
3698   priv = stream->priv;
3699
3700   GST_DEBUG_OBJECT (pad, "now blocking");
3701
3702   g_mutex_lock (&priv->lock);
3703   priv->blocking = TRUE;
3704   g_mutex_unlock (&priv->lock);
3705
3706   gst_element_post_message (priv->payloader,
3707       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3708           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3709
3710   return GST_PAD_PROBE_OK;
3711 }
3712
3713 /**
3714  * gst_rtsp_stream_set_blocked:
3715  * @stream: a #GstRTSPStream
3716  * @blocked: boolean indicating we should block or unblock
3717  *
3718  * Blocks or unblocks the dataflow on @stream.
3719  *
3720  * Returns: %TRUE on success
3721  */
3722 gboolean
3723 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3724 {
3725   GstRTSPStreamPrivate *priv;
3726
3727   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3728
3729   priv = stream->priv;
3730
3731   g_mutex_lock (&priv->lock);
3732   if (blocked) {
3733     priv->blocking = FALSE;
3734     if (priv->blocked_id == 0) {
3735       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3736           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3737           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3738           g_object_ref (stream), g_object_unref);
3739     }
3740   } else {
3741     if (priv->blocked_id != 0) {
3742       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3743       priv->blocked_id = 0;
3744       priv->blocking = FALSE;
3745     }
3746   }
3747   g_mutex_unlock (&priv->lock);
3748
3749   return TRUE;
3750 }
3751
3752 /**
3753  * gst_rtsp_stream_is_blocking:
3754  * @stream: a #GstRTSPStream
3755  *
3756  * Check if @stream is blocking on a #GstBuffer.
3757  *
3758  * Returns: %TRUE if @stream is blocking
3759  */
3760 gboolean
3761 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3762 {
3763   GstRTSPStreamPrivate *priv;
3764   gboolean result;
3765
3766   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3767
3768   priv = stream->priv;
3769
3770   g_mutex_lock (&priv->lock);
3771   result = priv->blocking;
3772   g_mutex_unlock (&priv->lock);
3773
3774   return result;
3775 }
3776
3777 /**
3778  * gst_rtsp_stream_query_position:
3779  * @stream: a #GstRTSPStream
3780  *
3781  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3782  * the RTP parts of the pipeline and not the RTCP parts.
3783  *
3784  * Returns: %TRUE if the position could be queried
3785  */
3786 gboolean
3787 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3788 {
3789   GstRTSPStreamPrivate *priv;
3790   GstElement *sink;
3791   gboolean ret;
3792
3793   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3794
3795   priv = stream->priv;
3796
3797   g_mutex_lock (&priv->lock);
3798   /* depending on the transport type, it should query corresponding sink */
3799   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3800       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3801     sink = priv->udpsink[0];
3802   else
3803     sink = priv->appsink[0];
3804
3805   if (sink)
3806     gst_object_ref (sink);
3807   g_mutex_unlock (&priv->lock);
3808
3809   if (!sink)
3810     return FALSE;
3811
3812   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3813   gst_object_unref (sink);
3814
3815   return ret;
3816 }
3817
3818 /**
3819  * gst_rtsp_stream_query_stop:
3820  * @stream: a #GstRTSPStream
3821  *
3822  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3823  * the RTP parts of the pipeline and not the RTCP parts.
3824  *
3825  * Returns: %TRUE if the stop could be queried
3826  */
3827 gboolean
3828 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3829 {
3830   GstRTSPStreamPrivate *priv;
3831   GstElement *sink;
3832   GstQuery *query;
3833   gboolean ret;
3834
3835   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3836
3837   priv = stream->priv;
3838
3839   g_mutex_lock (&priv->lock);
3840   /* depending on the transport type, it should query corresponding sink */
3841   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3842       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3843     sink = priv->udpsink[0];
3844   else
3845     sink = priv->appsink[0];
3846
3847   if (sink)
3848     gst_object_ref (sink);
3849   g_mutex_unlock (&priv->lock);
3850
3851   if (!sink)
3852     return FALSE;
3853
3854   query = gst_query_new_segment (GST_FORMAT_TIME);
3855   if ((ret = gst_element_query (sink, query))) {
3856     GstFormat format;
3857
3858     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3859     if (format != GST_FORMAT_TIME)
3860       *stop = -1;
3861   }
3862   gst_query_unref (query);
3863   gst_object_unref (sink);
3864
3865   return ret;
3866
3867 }