stream: rename addr_v4/6 to mcast_addr_v4/6 for clarity
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
98    * sockets */
99   GstElement *udpsrc_v4[2];
100   /* UDP sources for UDP multicast transports */
101   GstElement *udpsrc_mcast_v4[2];
102
103   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
104    * sockets */
105   GstElement *udpsrc_v6[2];
106   /* UDP sources for UDP multicast transports */
107   GstElement *udpsrc_mcast_v6[2];
108
109   GstElement *udpqueue[2];
110   GstElement *udpsink[2];
111
112   /* for TCP transport */
113   GstElement *appsrc[2];
114   GstClockTime appsrc_base_time[2];
115   GstElement *appqueue[2];
116   GstElement *appsink[2];
117
118   GstElement *tee[2];
119   GstElement *funnel[2];
120
121   /* retransmission */
122   GstElement *rtxsend;
123   guint rtx_pt;
124   GstClockTime rtx_time;
125
126   /* server ports for sending/receiving over ipv4 */
127   GstRTSPRange server_port_v4;
128   GstRTSPAddress *server_addr_v4;
129   gboolean have_ipv4;
130
131   /* server ports for sending/receiving over ipv6 */
132   GstRTSPRange server_port_v6;
133   GstRTSPAddress *server_addr_v6;
134   gboolean have_ipv6;
135
136   /* multicast addresses */
137   GstRTSPAddressPool *pool;
138   GstRTSPAddress *mcast_addr_v4;
139   GstRTSPAddress *mcast_addr_v6;
140   gboolean have_ipv4_mcast;
141   gboolean have_ipv6_mcast;
142
143   gchar *multicast_iface;
144
145   /* the caps of the stream */
146   gulong caps_sig;
147   GstCaps *caps;
148
149   /* transports we stream to */
150   guint n_active;
151   GList *transports;
152   guint transports_cookie;
153   GList *tr_cache_rtp;
154   GList *tr_cache_rtcp;
155   guint tr_cache_cookie_rtp;
156   guint tr_cache_cookie_rtcp;
157
158
159   gint dscp_qos;
160
161   /* stream blocking */
162   gulong blocked_id;
163   gboolean blocking;
164
165   /* pt->caps map for RECORD streams */
166   GHashTable *ptmap;
167
168   GstRTSPPublishClockMode publish_clock_mode;
169 };
170
171 #define DEFAULT_CONTROL         NULL
172 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
173 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
174                                         GST_RTSP_LOWER_TRANS_TCP
175
176 enum
177 {
178   PROP_0,
179   PROP_CONTROL,
180   PROP_PROFILES,
181   PROP_PROTOCOLS,
182   PROP_LAST
183 };
184
185 enum
186 {
187   SIGNAL_NEW_RTP_ENCODER,
188   SIGNAL_NEW_RTCP_ENCODER,
189   SIGNAL_LAST
190 };
191
192 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
193 #define GST_CAT_DEFAULT rtsp_stream_debug
194
195 static GQuark ssrc_stream_map_key;
196
197 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
198     GValue * value, GParamSpec * pspec);
199 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
200     const GValue * value, GParamSpec * pspec);
201
202 static void gst_rtsp_stream_finalize (GObject * obj);
203
204 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
205
206 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
207
208 static void
209 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
210 {
211   GObjectClass *gobject_class;
212
213   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
214
215   gobject_class = G_OBJECT_CLASS (klass);
216
217   gobject_class->get_property = gst_rtsp_stream_get_property;
218   gobject_class->set_property = gst_rtsp_stream_set_property;
219   gobject_class->finalize = gst_rtsp_stream_finalize;
220
221   g_object_class_install_property (gobject_class, PROP_CONTROL,
222       g_param_spec_string ("control", "Control",
223           "The control string for this stream", DEFAULT_CONTROL,
224           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
225
226   g_object_class_install_property (gobject_class, PROP_PROFILES,
227       g_param_spec_flags ("profiles", "Profiles",
228           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
229           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230
231   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
232       g_param_spec_flags ("protocols", "Protocols",
233           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
234           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
235
236   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
237       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
238       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
239       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
240
241   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
242       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
243       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
244       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
245
246   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
247
248   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
249 }
250
251 static void
252 gst_rtsp_stream_init (GstRTSPStream * stream)
253 {
254   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
255
256   GST_DEBUG ("new stream %p", stream);
257
258   stream->priv = priv;
259
260   priv->dscp_qos = -1;
261   priv->control = g_strdup (DEFAULT_CONTROL);
262   priv->profiles = DEFAULT_PROFILES;
263   priv->protocols = DEFAULT_PROTOCOLS;
264   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
265
266   g_mutex_init (&priv->lock);
267
268   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
269       NULL, (GDestroyNotify) gst_caps_unref);
270   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
271       (GDestroyNotify) gst_caps_unref);
272 }
273
274 static void
275 gst_rtsp_stream_finalize (GObject * obj)
276 {
277   GstRTSPStream *stream;
278   GstRTSPStreamPrivate *priv;
279
280   stream = GST_RTSP_STREAM (obj);
281   priv = stream->priv;
282
283   GST_DEBUG ("finalize stream %p", stream);
284
285   /* we really need to be unjoined now */
286   g_return_if_fail (priv->joined_bin == NULL);
287
288   if (priv->mcast_addr_v4)
289     gst_rtsp_address_free (priv->mcast_addr_v4);
290   if (priv->mcast_addr_v6)
291     gst_rtsp_address_free (priv->mcast_addr_v6);
292   if (priv->server_addr_v4)
293     gst_rtsp_address_free (priv->server_addr_v4);
294   if (priv->server_addr_v6)
295     gst_rtsp_address_free (priv->server_addr_v6);
296   if (priv->pool)
297     g_object_unref (priv->pool);
298   if (priv->rtxsend)
299     g_object_unref (priv->rtxsend);
300
301   g_free (priv->multicast_iface);
302
303   gst_object_unref (priv->payloader);
304   if (priv->srcpad)
305     gst_object_unref (priv->srcpad);
306   if (priv->sinkpad)
307     gst_object_unref (priv->sinkpad);
308   g_free (priv->control);
309   g_mutex_clear (&priv->lock);
310
311   g_hash_table_unref (priv->keys);
312   g_hash_table_destroy (priv->ptmap);
313
314   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
315 }
316
317 static void
318 gst_rtsp_stream_get_property (GObject * object, guint propid,
319     GValue * value, GParamSpec * pspec)
320 {
321   GstRTSPStream *stream = GST_RTSP_STREAM (object);
322
323   switch (propid) {
324     case PROP_CONTROL:
325       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
326       break;
327     case PROP_PROFILES:
328       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
329       break;
330     case PROP_PROTOCOLS:
331       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
332       break;
333     default:
334       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
335   }
336 }
337
338 static void
339 gst_rtsp_stream_set_property (GObject * object, guint propid,
340     const GValue * value, GParamSpec * pspec)
341 {
342   GstRTSPStream *stream = GST_RTSP_STREAM (object);
343
344   switch (propid) {
345     case PROP_CONTROL:
346       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
347       break;
348     case PROP_PROFILES:
349       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
350       break;
351     case PROP_PROTOCOLS:
352       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
353       break;
354     default:
355       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
356   }
357 }
358
359 /**
360  * gst_rtsp_stream_new:
361  * @idx: an index
362  * @pad: a #GstPad
363  * @payloader: a #GstElement
364  *
365  * Create a new media stream with index @idx that handles RTP data on
366  * @pad and has a payloader element @payloader if @pad is a source pad
367  * or a depayloader element @payloader if @pad is a sink pad.
368  *
369  * Returns: (transfer full): a new #GstRTSPStream
370  */
371 GstRTSPStream *
372 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
373 {
374   GstRTSPStreamPrivate *priv;
375   GstRTSPStream *stream;
376
377   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
378   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
379
380   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
381   priv = stream->priv;
382   priv->idx = idx;
383   priv->payloader = gst_object_ref (payloader);
384   if (GST_PAD_IS_SRC (pad))
385     priv->srcpad = gst_object_ref (pad);
386   else
387     priv->sinkpad = gst_object_ref (pad);
388
389   return stream;
390 }
391
392 /**
393  * gst_rtsp_stream_get_index:
394  * @stream: a #GstRTSPStream
395  *
396  * Get the stream index.
397  *
398  * Return: the stream index.
399  */
400 guint
401 gst_rtsp_stream_get_index (GstRTSPStream * stream)
402 {
403   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
404
405   return stream->priv->idx;
406 }
407
408 /**
409  * gst_rtsp_stream_get_pt:
410  * @stream: a #GstRTSPStream
411  *
412  * Get the stream payload type.
413  *
414  * Return: the stream payload type.
415  */
416 guint
417 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
418 {
419   GstRTSPStreamPrivate *priv;
420   guint pt;
421
422   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
423
424   priv = stream->priv;
425
426   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
427
428   return pt;
429 }
430
431 /**
432  * gst_rtsp_stream_get_srcpad:
433  * @stream: a #GstRTSPStream
434  *
435  * Get the srcpad associated with @stream.
436  *
437  * Returns: (transfer full): the srcpad. Unref after usage.
438  */
439 GstPad *
440 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
441 {
442   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
443
444   if (!stream->priv->srcpad)
445     return NULL;
446
447   return gst_object_ref (stream->priv->srcpad);
448 }
449
450 /**
451  * gst_rtsp_stream_get_sinkpad:
452  * @stream: a #GstRTSPStream
453  *
454  * Get the sinkpad associated with @stream.
455  *
456  * Returns: (transfer full): the sinkpad. Unref after usage.
457  */
458 GstPad *
459 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
460 {
461   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
462
463   if (!stream->priv->sinkpad)
464     return NULL;
465
466   return gst_object_ref (stream->priv->sinkpad);
467 }
468
469 /**
470  * gst_rtsp_stream_get_control:
471  * @stream: a #GstRTSPStream
472  *
473  * Get the control string to identify this stream.
474  *
475  * Returns: (transfer full): the control string. g_free() after usage.
476  */
477 gchar *
478 gst_rtsp_stream_get_control (GstRTSPStream * stream)
479 {
480   GstRTSPStreamPrivate *priv;
481   gchar *result;
482
483   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
484
485   priv = stream->priv;
486
487   g_mutex_lock (&priv->lock);
488   if ((result = g_strdup (priv->control)) == NULL)
489     result = g_strdup_printf ("stream=%u", priv->idx);
490   g_mutex_unlock (&priv->lock);
491
492   return result;
493 }
494
495 /**
496  * gst_rtsp_stream_set_control:
497  * @stream: a #GstRTSPStream
498  * @control: a control string
499  *
500  * Set the control string in @stream.
501  */
502 void
503 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
504 {
505   GstRTSPStreamPrivate *priv;
506
507   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
508
509   priv = stream->priv;
510
511   g_mutex_lock (&priv->lock);
512   g_free (priv->control);
513   priv->control = g_strdup (control);
514   g_mutex_unlock (&priv->lock);
515 }
516
517 /**
518  * gst_rtsp_stream_has_control:
519  * @stream: a #GstRTSPStream
520  * @control: a control string
521  *
522  * Check if @stream has the control string @control.
523  *
524  * Returns: %TRUE is @stream has @control as the control string
525  */
526 gboolean
527 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
528 {
529   GstRTSPStreamPrivate *priv;
530   gboolean res;
531
532   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
533
534   priv = stream->priv;
535
536   g_mutex_lock (&priv->lock);
537   if (priv->control)
538     res = (g_strcmp0 (priv->control, control) == 0);
539   else {
540     guint streamid;
541
542     if (sscanf (control, "stream=%u", &streamid) > 0)
543       res = (streamid == priv->idx);
544     else
545       res = FALSE;
546   }
547   g_mutex_unlock (&priv->lock);
548
549   return res;
550 }
551
552 /**
553  * gst_rtsp_stream_set_mtu:
554  * @stream: a #GstRTSPStream
555  * @mtu: a new MTU
556  *
557  * Configure the mtu in the payloader of @stream to @mtu.
558  */
559 void
560 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
561 {
562   GstRTSPStreamPrivate *priv;
563
564   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
565
566   priv = stream->priv;
567
568   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
569
570   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
571 }
572
573 /**
574  * gst_rtsp_stream_get_mtu:
575  * @stream: a #GstRTSPStream
576  *
577  * Get the configured MTU in the payloader of @stream.
578  *
579  * Returns: the MTU of the payloader.
580  */
581 guint
582 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
583 {
584   GstRTSPStreamPrivate *priv;
585   guint mtu;
586
587   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
588
589   priv = stream->priv;
590
591   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
592
593   return mtu;
594 }
595
596 /* Update the dscp qos property on the udp sinks */
597 static void
598 update_dscp_qos (GstRTSPStream * stream)
599 {
600   GstRTSPStreamPrivate *priv;
601
602   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
603
604   priv = stream->priv;
605
606   if (priv->udpsink[0]) {
607     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
608         NULL);
609   }
610
611   if (priv->udpsink[1]) {
612     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
613         NULL);
614   }
615 }
616
617 /**
618  * gst_rtsp_stream_set_dscp_qos:
619  * @stream: a #GstRTSPStream
620  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
621  *
622  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
623  */
624 void
625 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
626 {
627   GstRTSPStreamPrivate *priv;
628
629   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
630
631   priv = stream->priv;
632
633   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
634
635   if (dscp_qos < -1 || dscp_qos > 63) {
636     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
637     return;
638   }
639
640   priv->dscp_qos = dscp_qos;
641
642   update_dscp_qos (stream);
643 }
644
645 /**
646  * gst_rtsp_stream_get_dscp_qos:
647  * @stream: a #GstRTSPStream
648  *
649  * Get the configured DSCP QoS in of the outgoing sockets.
650  *
651  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
652  */
653 gint
654 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
655 {
656   GstRTSPStreamPrivate *priv;
657
658   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
659
660   priv = stream->priv;
661
662   return priv->dscp_qos;
663 }
664
665 /**
666  * gst_rtsp_stream_is_transport_supported:
667  * @stream: a #GstRTSPStream
668  * @transport: (transfer none): a #GstRTSPTransport
669  *
670  * Check if @transport can be handled by stream
671  *
672  * Returns: %TRUE if @transport can be handled by @stream.
673  */
674 gboolean
675 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
676     GstRTSPTransport * transport)
677 {
678   GstRTSPStreamPrivate *priv;
679
680   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
681
682   priv = stream->priv;
683
684   g_mutex_lock (&priv->lock);
685   if (transport->trans != GST_RTSP_TRANS_RTP)
686     goto unsupported_transmode;
687
688   if (!(transport->profile & priv->profiles))
689     goto unsupported_profile;
690
691   if (!(transport->lower_transport & priv->protocols))
692     goto unsupported_ltrans;
693
694   g_mutex_unlock (&priv->lock);
695
696   return TRUE;
697
698   /* ERRORS */
699 unsupported_transmode:
700   {
701     GST_DEBUG ("unsupported transport mode %d", transport->trans);
702     g_mutex_unlock (&priv->lock);
703     return FALSE;
704   }
705 unsupported_profile:
706   {
707     GST_DEBUG ("unsupported profile %d", transport->profile);
708     g_mutex_unlock (&priv->lock);
709     return FALSE;
710   }
711 unsupported_ltrans:
712   {
713     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
714     g_mutex_unlock (&priv->lock);
715     return FALSE;
716   }
717 }
718
719 /**
720  * gst_rtsp_stream_set_profiles:
721  * @stream: a #GstRTSPStream
722  * @profiles: the new profiles
723  *
724  * Configure the allowed profiles for @stream.
725  */
726 void
727 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
728 {
729   GstRTSPStreamPrivate *priv;
730
731   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
732
733   priv = stream->priv;
734
735   g_mutex_lock (&priv->lock);
736   priv->profiles = profiles;
737   g_mutex_unlock (&priv->lock);
738 }
739
740 /**
741  * gst_rtsp_stream_get_profiles:
742  * @stream: a #GstRTSPStream
743  *
744  * Get the allowed profiles of @stream.
745  *
746  * Returns: a #GstRTSPProfile
747  */
748 GstRTSPProfile
749 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
750 {
751   GstRTSPStreamPrivate *priv;
752   GstRTSPProfile res;
753
754   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
755
756   priv = stream->priv;
757
758   g_mutex_lock (&priv->lock);
759   res = priv->profiles;
760   g_mutex_unlock (&priv->lock);
761
762   return res;
763 }
764
765 /**
766  * gst_rtsp_stream_set_protocols:
767  * @stream: a #GstRTSPStream
768  * @protocols: the new flags
769  *
770  * Configure the allowed lower transport for @stream.
771  */
772 void
773 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
774     GstRTSPLowerTrans protocols)
775 {
776   GstRTSPStreamPrivate *priv;
777
778   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
779
780   priv = stream->priv;
781
782   g_mutex_lock (&priv->lock);
783   priv->protocols = protocols;
784   g_mutex_unlock (&priv->lock);
785 }
786
787 /**
788  * gst_rtsp_stream_get_protocols:
789  * @stream: a #GstRTSPStream
790  *
791  * Get the allowed protocols of @stream.
792  *
793  * Returns: a #GstRTSPLowerTrans
794  */
795 GstRTSPLowerTrans
796 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
797 {
798   GstRTSPStreamPrivate *priv;
799   GstRTSPLowerTrans res;
800
801   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
802       GST_RTSP_LOWER_TRANS_UNKNOWN);
803
804   priv = stream->priv;
805
806   g_mutex_lock (&priv->lock);
807   res = priv->protocols;
808   g_mutex_unlock (&priv->lock);
809
810   return res;
811 }
812
813 /**
814  * gst_rtsp_stream_set_address_pool:
815  * @stream: a #GstRTSPStream
816  * @pool: (transfer none): a #GstRTSPAddressPool
817  *
818  * configure @pool to be used as the address pool of @stream.
819  */
820 void
821 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
822     GstRTSPAddressPool * pool)
823 {
824   GstRTSPStreamPrivate *priv;
825   GstRTSPAddressPool *old;
826
827   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
828
829   priv = stream->priv;
830
831   GST_LOG_OBJECT (stream, "set address pool %p", pool);
832
833   g_mutex_lock (&priv->lock);
834   if ((old = priv->pool) != pool)
835     priv->pool = pool ? g_object_ref (pool) : NULL;
836   else
837     old = NULL;
838   g_mutex_unlock (&priv->lock);
839
840   if (old)
841     g_object_unref (old);
842 }
843
844 /**
845  * gst_rtsp_stream_get_address_pool:
846  * @stream: a #GstRTSPStream
847  *
848  * Get the #GstRTSPAddressPool used as the address pool of @stream.
849  *
850  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
851  * usage.
852  */
853 GstRTSPAddressPool *
854 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
855 {
856   GstRTSPStreamPrivate *priv;
857   GstRTSPAddressPool *result;
858
859   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
860
861   priv = stream->priv;
862
863   g_mutex_lock (&priv->lock);
864   if ((result = priv->pool))
865     g_object_ref (result);
866   g_mutex_unlock (&priv->lock);
867
868   return result;
869 }
870
871 /**
872  * gst_rtsp_stream_set_multicast_iface:
873  * @stream: a #GstRTSPStream
874  * @multicast_iface: (transfer none): a multicast interface
875  *
876  * configure @multicast_iface to be used for @stream.
877  */
878 void
879 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
880     const gchar * multicast_iface)
881 {
882   GstRTSPStreamPrivate *priv;
883   gchar *old;
884
885   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
886
887   priv = stream->priv;
888
889   GST_LOG_OBJECT (stream, "set multicast iface %s",
890       GST_STR_NULL (multicast_iface));
891
892   g_mutex_lock (&priv->lock);
893   if ((old = priv->multicast_iface) != multicast_iface)
894     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
895   else
896     old = NULL;
897   g_mutex_unlock (&priv->lock);
898
899   if (old)
900     g_free (old);
901 }
902
903 /**
904  * gst_rtsp_stream_get_multicast_iface:
905  * @stream: a #GstRTSPStream
906  *
907  * Get the multicast interface used for @stream.
908  *
909  * Returns: (transfer full): the multicast interface for @stream. g_free() after
910  * usage.
911  */
912 gchar *
913 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
914 {
915   GstRTSPStreamPrivate *priv;
916   gchar *result;
917
918   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
919
920   priv = stream->priv;
921
922   g_mutex_lock (&priv->lock);
923   if ((result = priv->multicast_iface))
924     result = g_strdup (result);
925   g_mutex_unlock (&priv->lock);
926
927   return result;
928 }
929
930 /**
931  * gst_rtsp_stream_get_multicast_address:
932  * @stream: a #GstRTSPStream
933  * @family: the #GSocketFamily
934  *
935  * Get the multicast address of @stream for @family.
936  *
937  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
938  * or %NULL when no address could be allocated. gst_rtsp_address_free()
939  * after usage.
940  */
941 GstRTSPAddress *
942 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
943     GSocketFamily family)
944 {
945   GstRTSPStreamPrivate *priv;
946   GstRTSPAddress *result;
947   GstRTSPAddress **addrp;
948   GstRTSPAddressFlags flags;
949
950   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
951
952   priv = stream->priv;
953
954   if (family == G_SOCKET_FAMILY_IPV6) {
955     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
956     addrp = &priv->mcast_addr_v6;
957   } else {
958     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
959     addrp = &priv->mcast_addr_v4;
960   }
961
962   g_mutex_lock (&priv->lock);
963   if (*addrp == NULL) {
964     if (priv->pool == NULL)
965       goto no_pool;
966
967     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
968
969     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
970     if (*addrp == NULL)
971       goto no_address;
972   }
973   result = gst_rtsp_address_copy (*addrp);
974   g_mutex_unlock (&priv->lock);
975
976   return result;
977
978   /* ERRORS */
979 no_pool:
980   {
981     GST_ERROR_OBJECT (stream, "no address pool specified");
982     g_mutex_unlock (&priv->lock);
983     return NULL;
984   }
985 no_address:
986   {
987     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
988     g_mutex_unlock (&priv->lock);
989     return NULL;
990   }
991 }
992
993 /**
994  * gst_rtsp_stream_reserve_address:
995  * @stream: a #GstRTSPStream
996  * @address: an address
997  * @port: a port
998  * @n_ports: n_ports
999  * @ttl: a TTL
1000  *
1001  * Reserve @address and @port as the address and port of @stream.
1002  *
1003  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1004  * the address could be reserved. gst_rtsp_address_free() after usage.
1005  */
1006 GstRTSPAddress *
1007 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1008     const gchar * address, guint port, guint n_ports, guint ttl)
1009 {
1010   GstRTSPStreamPrivate *priv;
1011   GstRTSPAddress *result;
1012   GInetAddress *addr;
1013   GSocketFamily family;
1014   GstRTSPAddress **addrp;
1015
1016   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1017   g_return_val_if_fail (address != NULL, NULL);
1018   g_return_val_if_fail (port > 0, NULL);
1019   g_return_val_if_fail (n_ports > 0, NULL);
1020   g_return_val_if_fail (ttl > 0, NULL);
1021
1022   priv = stream->priv;
1023
1024   addr = g_inet_address_new_from_string (address);
1025   if (!addr) {
1026     GST_ERROR ("failed to get inet addr from %s", address);
1027     family = G_SOCKET_FAMILY_IPV4;
1028   } else {
1029     family = g_inet_address_get_family (addr);
1030     g_object_unref (addr);
1031   }
1032
1033   if (family == G_SOCKET_FAMILY_IPV6)
1034     addrp = &priv->mcast_addr_v6;
1035   else
1036     addrp = &priv->mcast_addr_v4;
1037
1038   g_mutex_lock (&priv->lock);
1039   if (*addrp == NULL) {
1040     GstRTSPAddressPoolResult res;
1041
1042     if (priv->pool == NULL)
1043       goto no_pool;
1044
1045     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1046         port, n_ports, ttl, addrp);
1047     if (res != GST_RTSP_ADDRESS_POOL_OK)
1048       goto no_address;
1049   } else {
1050     if (strcmp ((*addrp)->address, address) ||
1051         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1052         (*addrp)->ttl != ttl)
1053       goto different_address;
1054   }
1055   result = gst_rtsp_address_copy (*addrp);
1056   g_mutex_unlock (&priv->lock);
1057
1058   return result;
1059
1060   /* ERRORS */
1061 no_pool:
1062   {
1063     GST_ERROR_OBJECT (stream, "no address pool specified");
1064     g_mutex_unlock (&priv->lock);
1065     return NULL;
1066   }
1067 no_address:
1068   {
1069     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1070         address);
1071     g_mutex_unlock (&priv->lock);
1072     return NULL;
1073   }
1074 different_address:
1075   {
1076     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1077         " reserved", address);
1078     g_mutex_unlock (&priv->lock);
1079     return NULL;
1080   }
1081 }
1082
1083 /* must be called with lock */
1084 static void
1085 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1086     GSocket * rtcp_socket, GSocketFamily family)
1087 {
1088   GstRTSPStreamPrivate *priv = stream->priv;
1089   const gchar *multisink_socket;
1090
1091   if (family == G_SOCKET_FAMILY_IPV6)
1092     multisink_socket = "socket-v6";
1093   else
1094     multisink_socket = "socket";
1095
1096   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1097       NULL);
1098   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1099       NULL);
1100 }
1101
1102 /* must be called with lock */
1103 static gboolean
1104 create_and_configure_udpsinks (GstRTSPStream * stream)
1105 {
1106   GstRTSPStreamPrivate *priv = stream->priv;
1107   GstElement *udpsink0, *udpsink1;
1108
1109   udpsink0 = NULL;
1110   udpsink1 = NULL;
1111
1112   if (priv->udpsink[0])
1113     udpsink0 = priv->udpsink[0];
1114   else
1115     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1116
1117   if (!udpsink0)
1118     goto no_udp_protocol;
1119
1120   if (priv->udpsink[1])
1121     udpsink1 = priv->udpsink[1];
1122   else
1123     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1124
1125   if (!udpsink1)
1126     goto no_udp_protocol;
1127
1128   /* configure sinks */
1129
1130   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1131   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1132
1133   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1134   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1135
1136   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1137
1138   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1139   /* Needs to be async for RECORD streams, otherwise we will never go to
1140    * PLAYING because the sinks will wait for data while the udpsrc can't
1141    * provide data with timestamps in PAUSED. */
1142   if (priv->sinkpad)
1143     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1144   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1145
1146   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1147   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1148
1149   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1150   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1151
1152   /* update the dscp qos field in the sinks */
1153   update_dscp_qos (stream);
1154
1155   priv->udpsink[0] = udpsink0;
1156   priv->udpsink[1] = udpsink1;
1157
1158   return TRUE;
1159
1160   /* ERRORS */
1161 no_udp_protocol:
1162   {
1163     return FALSE;
1164   }
1165 }
1166
1167 /* must be called with lock */
1168 static void
1169 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1170     GSocketFamily family)
1171 {
1172   GstRTSPStreamPrivate *priv;
1173   GstPad *pad, *selpad;
1174   guint i;
1175
1176   priv = stream->priv;
1177
1178   for (i = 0; i < 2; i++) {
1179     if (!priv->sinkpad && i == 0) {
1180       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
1181        * RTCP sink always */
1182       continue;
1183     }
1184
1185     if (priv->srcpad) {
1186       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1187        * values. This is only relevant for PLAY pipelines */
1188       gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1189       gst_element_set_locked_state (udpsrc_out[i], TRUE);
1190     }
1191
1192     /* add udpsrc */
1193     gst_bin_add (priv->joined_bin, udpsrc_out[i]);
1194
1195     /* and link to the funnel */
1196     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1197     pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1198     gst_pad_link (pad, selpad);
1199     gst_object_unref (pad);
1200     gst_object_unref (selpad);
1201
1202     /* otherwise sync state with parent in case it's running already
1203      * at this point */
1204     if (!priv->srcpad) {
1205       gst_element_sync_state_with_parent (udpsrc_out[i]);
1206     }
1207   }
1208 }
1209
1210 /* must be called with lock */
1211 static gboolean
1212 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1213     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1214     const gchar * address, gint rtpport, gint rtcpport,
1215     const gchar * multicast_iface, GstRTSPLowerTrans transport)
1216 {
1217   GstStateChangeReturn ret;
1218
1219   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1220   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1221
1222   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1223     goto error;
1224
1225   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1226     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1227     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1228     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1229     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1230     g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
1231         NULL);
1232     g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
1233         NULL);
1234     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1235     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1236   }
1237
1238   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1239   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1240
1241   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1242   if (ret == GST_STATE_CHANGE_FAILURE)
1243     goto error;
1244   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1245   if (ret == GST_STATE_CHANGE_FAILURE)
1246     goto error;
1247
1248   return TRUE;
1249
1250   /* ERRORS */
1251 error:
1252   {
1253     if (udpsrc_out[0]) {
1254       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1255       g_clear_object (&udpsrc_out[0]);
1256     }
1257     if (udpsrc_out[1]) {
1258       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1259       g_clear_object (&udpsrc_out[1]);
1260     }
1261     return FALSE;
1262   }
1263 }
1264
1265 static gboolean
1266 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1267     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1268     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1269     gboolean use_client_settings)
1270 {
1271   GstRTSPStreamPrivate *priv = stream->priv;
1272   GSocket *rtp_socket = NULL;
1273   GSocket *rtcp_socket;
1274   gint tmp_rtp, tmp_rtcp;
1275   guint count;
1276   gint rtpport, rtcpport;
1277   GList *rejected_addresses = NULL;
1278   GstRTSPAddress *addr = NULL;
1279   GInetAddress *inetaddr = NULL;
1280   gchar *addr_str;
1281   GSocketAddress *rtp_sockaddr = NULL;
1282   GSocketAddress *rtcp_sockaddr = NULL;
1283   GstRTSPAddressPool *pool;
1284   GstRTSPLowerTrans transport;
1285   const gchar *multicast_iface = priv->multicast_iface;
1286
1287   pool = priv->pool;
1288   count = 0;
1289   transport = ct->lower_transport;
1290
1291   /* Start with random port */
1292   tmp_rtp = 0;
1293
1294   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1295       G_SOCKET_PROTOCOL_UDP, NULL);
1296   if (!rtcp_socket)
1297     goto no_udp_protocol;
1298   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1299
1300   if (*server_addr_out)
1301     gst_rtsp_address_free (*server_addr_out);
1302
1303   /* try to allocate 2 UDP ports, the RTP port should be an even
1304    * number and the RTCP port should be the next (uneven) port */
1305 again:
1306
1307   if (rtp_socket == NULL) {
1308     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1309         G_SOCKET_PROTOCOL_UDP, NULL);
1310     if (!rtp_socket)
1311       goto no_udp_protocol;
1312     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1313   }
1314
1315   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1316               gst_rtsp_address_pool_has_unicast_addresses (pool))
1317           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1318     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1319
1320     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1321       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1322     else
1323       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1324
1325     if (addr)
1326       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1327
1328     if (family == G_SOCKET_FAMILY_IPV6)
1329       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1330     else
1331       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1332
1333     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1334         && use_client_settings)
1335       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1336           ct->port.min, 2, ct->ttl, &addr);
1337     else
1338       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1339
1340     if (addr == NULL)
1341       goto no_ports;
1342
1343     tmp_rtp = addr->port;
1344
1345     g_clear_object (&inetaddr);
1346     inetaddr = g_inet_address_new_from_string (addr->address);
1347
1348     /* If we're supposed to bind to a multicast address, instead bind
1349      * to ANY and let udpsrc later join the relevant multicast group
1350      */
1351     if (g_inet_address_get_is_multicast (inetaddr)) {
1352       g_object_unref (inetaddr);
1353       inetaddr = g_inet_address_new_any (family);
1354     }
1355   } else {
1356     if (tmp_rtp != 0) {
1357       tmp_rtp += 2;
1358       if (++count > 20)
1359         goto no_ports;
1360     }
1361
1362     if (inetaddr == NULL)
1363       inetaddr = g_inet_address_new_any (family);
1364   }
1365
1366   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1367   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1368     g_object_unref (rtp_sockaddr);
1369     goto again;
1370   }
1371   g_object_unref (rtp_sockaddr);
1372
1373   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1374   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1375     g_clear_object (&rtp_sockaddr);
1376     goto socket_error;
1377   }
1378
1379   tmp_rtp =
1380       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1381   g_object_unref (rtp_sockaddr);
1382
1383   /* check if port is even */
1384   if ((tmp_rtp & 1) != 0) {
1385     /* port not even, close and allocate another */
1386     tmp_rtp++;
1387     g_clear_object (&rtp_socket);
1388     goto again;
1389   }
1390
1391   /* set port */
1392   tmp_rtcp = tmp_rtp + 1;
1393
1394   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1395   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1396     g_object_unref (rtcp_sockaddr);
1397     g_clear_object (&rtp_socket);
1398     goto again;
1399   }
1400   g_object_unref (rtcp_sockaddr);
1401
1402   if (addr == NULL)
1403     addr_str = g_inet_address_to_string (inetaddr);
1404   else
1405     addr_str = addr->address;
1406   g_clear_object (&inetaddr);
1407
1408   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1409           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
1410           transport)) {
1411     if (addr == NULL)
1412       g_free (addr_str);
1413     goto no_udp_protocol;
1414   }
1415
1416   if (addr == NULL)
1417     g_free (addr_str);
1418
1419   play_udpsources_one_family (stream, udpsrc_out, family);
1420
1421   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1422   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1423
1424   /* this should not happen... */
1425   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1426     goto port_error;
1427
1428   /* set RTP and RTCP sockets */
1429   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1430
1431   server_port_out->min = rtpport;
1432   server_port_out->max = rtcpport;
1433
1434   *server_addr_out = addr;
1435   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1436
1437   g_object_unref (rtp_socket);
1438   g_object_unref (rtcp_socket);
1439
1440   return TRUE;
1441
1442   /* ERRORS */
1443 no_udp_protocol:
1444   {
1445     goto cleanup;
1446   }
1447 no_ports:
1448   {
1449     goto cleanup;
1450   }
1451 port_error:
1452   {
1453     goto cleanup;
1454   }
1455 socket_error:
1456   {
1457     goto cleanup;
1458   }
1459 cleanup:
1460   {
1461     if (inetaddr)
1462       g_object_unref (inetaddr);
1463     g_list_free_full (rejected_addresses,
1464         (GDestroyNotify) gst_rtsp_address_free);
1465     if (addr)
1466       gst_rtsp_address_free (addr);
1467     if (rtp_socket)
1468       g_object_unref (rtp_socket);
1469     if (rtcp_socket)
1470       g_object_unref (rtcp_socket);
1471     return FALSE;
1472   }
1473 }
1474
1475 /**
1476  * gst_rtsp_stream_allocate_udp_sockets:
1477  * @stream: a #GstRTSPStream
1478  * @family: protocol family
1479  * @transport_method: transport method
1480  *
1481  * Allocates RTP and RTCP ports.
1482  *
1483  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1484  */
1485 gboolean
1486 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1487     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1488 {
1489   GstRTSPStreamPrivate *priv;
1490   gboolean result = FALSE;
1491   GstRTSPLowerTrans transport = ct->lower_transport;
1492
1493   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1494   priv = stream->priv;
1495   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
1496
1497   g_mutex_lock (&priv->lock);
1498
1499   if (family == G_SOCKET_FAMILY_IPV4) {
1500     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1501       if (priv->have_ipv4_mcast)
1502         goto done;
1503       priv->have_ipv4_mcast =
1504           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1505           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct,
1506           &priv->mcast_addr_v4, use_client_settings);
1507     } else {
1508       priv->have_ipv4 =
1509           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1510           &priv->server_port_v4, ct, &priv->server_addr_v4,
1511           use_client_settings);
1512     }
1513   } else {
1514     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1515       if (priv->have_ipv6_mcast)
1516         goto done;
1517       priv->have_ipv6_mcast =
1518           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1519           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct,
1520           &priv->mcast_addr_v6, use_client_settings);
1521     } else {
1522       if (priv->have_ipv6)
1523         goto done;
1524       priv->have_ipv6 =
1525           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1526           &priv->server_port_v6, ct, &priv->server_addr_v6,
1527           use_client_settings);
1528     }
1529   }
1530
1531 done:
1532   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1533       priv->have_ipv6_mcast;
1534
1535   g_mutex_unlock (&priv->lock);
1536
1537   return result;
1538 }
1539
1540 /**
1541  * gst_rtsp_stream_set_client_side:
1542  * @stream: a #GstRTSPStream
1543  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1544  * an RTSP connection.
1545  *
1546  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1547  * streams to an RTSP server via RECORD. This has the practical effect
1548  * of changing which UDP port numbers are used when setting up the local
1549  * side of the stream sending to be either the 'server' or 'client' pair
1550  * of a configured UDP transport.
1551  */
1552 void
1553 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1554 {
1555   GstRTSPStreamPrivate *priv;
1556
1557   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1558   priv = stream->priv;
1559   g_mutex_lock (&priv->lock);
1560   priv->client_side = client_side;
1561   g_mutex_unlock (&priv->lock);
1562 }
1563
1564 /**
1565  * gst_rtsp_stream_is_client_side:
1566  * @stream: a #GstRTSPStream
1567  *
1568  * See gst_rtsp_stream_set_client_side()
1569  *
1570  * Returns: TRUE if this #GstRTSPStream is client-side.
1571  */
1572 gboolean
1573 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1574 {
1575   GstRTSPStreamPrivate *priv;
1576   gboolean ret;
1577
1578   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1579
1580   priv = stream->priv;
1581   g_mutex_lock (&priv->lock);
1582   ret = priv->client_side;
1583   g_mutex_unlock (&priv->lock);
1584
1585   return ret;
1586 }
1587
1588 /**
1589  * gst_rtsp_stream_get_server_port:
1590  * @stream: a #GstRTSPStream
1591  * @server_port: (out): result server port
1592  * @family: the port family to get
1593  *
1594  * Fill @server_port with the port pair used by the server. This function can
1595  * only be called when @stream has been joined.
1596  */
1597 void
1598 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1599     GstRTSPRange * server_port, GSocketFamily family)
1600 {
1601   GstRTSPStreamPrivate *priv;
1602
1603   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1604   priv = stream->priv;
1605   g_return_if_fail (priv->joined_bin != NULL);
1606
1607   g_mutex_lock (&priv->lock);
1608   if (family == G_SOCKET_FAMILY_IPV4) {
1609     if (server_port)
1610       *server_port = priv->server_port_v4;
1611   } else {
1612     if (server_port)
1613       *server_port = priv->server_port_v6;
1614   }
1615   g_mutex_unlock (&priv->lock);
1616 }
1617
1618 /**
1619  * gst_rtsp_stream_get_rtpsession:
1620  * @stream: a #GstRTSPStream
1621  *
1622  * Get the RTP session of this stream.
1623  *
1624  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1625  */
1626 GObject *
1627 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1628 {
1629   GstRTSPStreamPrivate *priv;
1630   GObject *session;
1631
1632   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1633
1634   priv = stream->priv;
1635
1636   g_mutex_lock (&priv->lock);
1637   if ((session = priv->session))
1638     g_object_ref (session);
1639   g_mutex_unlock (&priv->lock);
1640
1641   return session;
1642 }
1643
1644 /**
1645  * gst_rtsp_stream_get_encoder:
1646  * @stream: a #GstRTSPStream
1647  *
1648  * Get the SRTP encoder for this stream.
1649  *
1650  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1651  */
1652 GstElement *
1653 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1654 {
1655   GstRTSPStreamPrivate *priv;
1656   GstElement *encoder;
1657
1658   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1659
1660   priv = stream->priv;
1661
1662   g_mutex_lock (&priv->lock);
1663   if ((encoder = priv->srtpenc))
1664     g_object_ref (encoder);
1665   g_mutex_unlock (&priv->lock);
1666
1667   return encoder;
1668 }
1669
1670 /**
1671  * gst_rtsp_stream_get_ssrc:
1672  * @stream: a #GstRTSPStream
1673  * @ssrc: (out): result ssrc
1674  *
1675  * Get the SSRC used by the RTP session of this stream. This function can only
1676  * be called when @stream has been joined.
1677  */
1678 void
1679 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1680 {
1681   GstRTSPStreamPrivate *priv;
1682
1683   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1684   priv = stream->priv;
1685   g_return_if_fail (priv->joined_bin != NULL);
1686
1687   g_mutex_lock (&priv->lock);
1688   if (ssrc && priv->session)
1689     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1690   g_mutex_unlock (&priv->lock);
1691 }
1692
1693 /**
1694  * gst_rtsp_stream_set_retransmission_time:
1695  * @stream: a #GstRTSPStream
1696  * @time: a #GstClockTime
1697  *
1698  * Set the amount of time to store retransmission packets.
1699  */
1700 void
1701 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1702     GstClockTime time)
1703 {
1704   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1705
1706   g_mutex_lock (&stream->priv->lock);
1707   stream->priv->rtx_time = time;
1708   if (stream->priv->rtxsend)
1709     g_object_set (stream->priv->rtxsend, "max-size-time",
1710         GST_TIME_AS_MSECONDS (time), NULL);
1711   g_mutex_unlock (&stream->priv->lock);
1712 }
1713
1714 /**
1715  * gst_rtsp_stream_get_retransmission_time:
1716  * @stream: a #GstRTSPStream
1717  *
1718  * Get the amount of time to store retransmission data.
1719  *
1720  * Returns: the amount of time to store retransmission data.
1721  */
1722 GstClockTime
1723 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1724 {
1725   GstClockTime ret;
1726
1727   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1728
1729   g_mutex_lock (&stream->priv->lock);
1730   ret = stream->priv->rtx_time;
1731   g_mutex_unlock (&stream->priv->lock);
1732
1733   return ret;
1734 }
1735
1736 /**
1737  * gst_rtsp_stream_set_retransmission_pt:
1738  * @stream: a #GstRTSPStream
1739  * @rtx_pt: a #guint
1740  *
1741  * Set the payload type (pt) for retransmission of this stream.
1742  */
1743 void
1744 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1745 {
1746   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1747
1748   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1749
1750   g_mutex_lock (&stream->priv->lock);
1751   stream->priv->rtx_pt = rtx_pt;
1752   if (stream->priv->rtxsend) {
1753     guint pt = gst_rtsp_stream_get_pt (stream);
1754     gchar *pt_s = g_strdup_printf ("%d", pt);
1755     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1756         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1757     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1758     g_free (pt_s);
1759     gst_structure_free (rtx_pt_map);
1760   }
1761   g_mutex_unlock (&stream->priv->lock);
1762 }
1763
1764 /**
1765  * gst_rtsp_stream_get_retransmission_pt:
1766  * @stream: a #GstRTSPStream
1767  *
1768  * Get the payload-type used for retransmission of this stream
1769  *
1770  * Returns: The retransmission PT.
1771  */
1772 guint
1773 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1774 {
1775   guint rtx_pt;
1776
1777   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1778
1779   g_mutex_lock (&stream->priv->lock);
1780   rtx_pt = stream->priv->rtx_pt;
1781   g_mutex_unlock (&stream->priv->lock);
1782
1783   return rtx_pt;
1784 }
1785
1786 /**
1787  * gst_rtsp_stream_set_buffer_size:
1788  * @stream: a #GstRTSPStream
1789  * @size: the buffer size
1790  *
1791  * Set the size of the UDP transmission buffer (in bytes)
1792  * Needs to be set before the stream is joined to a bin.
1793  *
1794  * Since: 1.6
1795  */
1796 void
1797 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1798 {
1799   g_mutex_lock (&stream->priv->lock);
1800   stream->priv->buffer_size = size;
1801   g_mutex_unlock (&stream->priv->lock);
1802 }
1803
1804 /**
1805  * gst_rtsp_stream_get_buffer_size:
1806  * @stream: a #GstRTSPStream
1807  *
1808  * Get the size of the UDP transmission buffer (in bytes)
1809  *
1810  * Returns: the size of the UDP TX buffer
1811  *
1812  * Since: 1.6
1813  */
1814 guint
1815 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1816 {
1817   guint buffer_size;
1818
1819   g_mutex_lock (&stream->priv->lock);
1820   buffer_size = stream->priv->buffer_size;
1821   g_mutex_unlock (&stream->priv->lock);
1822
1823   return buffer_size;
1824 }
1825
1826 /* executed from streaming thread */
1827 static void
1828 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1829 {
1830   GstRTSPStreamPrivate *priv = stream->priv;
1831   GstCaps *newcaps, *oldcaps;
1832
1833   newcaps = gst_pad_get_current_caps (pad);
1834
1835   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1836       newcaps);
1837
1838   g_mutex_lock (&priv->lock);
1839   oldcaps = priv->caps;
1840   priv->caps = newcaps;
1841   g_mutex_unlock (&priv->lock);
1842
1843   if (oldcaps)
1844     gst_caps_unref (oldcaps);
1845 }
1846
1847 static void
1848 dump_structure (const GstStructure * s)
1849 {
1850   gchar *sstr;
1851
1852   sstr = gst_structure_to_string (s);
1853   GST_INFO ("structure: %s", sstr);
1854   g_free (sstr);
1855 }
1856
1857 static GstRTSPStreamTransport *
1858 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1859 {
1860   GstRTSPStreamPrivate *priv = stream->priv;
1861   GList *walk;
1862   GstRTSPStreamTransport *result = NULL;
1863   const gchar *tmp;
1864   gchar *dest;
1865   guint port;
1866
1867   if (rtcp_from == NULL)
1868     return NULL;
1869
1870   tmp = g_strrstr (rtcp_from, ":");
1871   if (tmp == NULL)
1872     return NULL;
1873
1874   port = atoi (tmp + 1);
1875   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1876
1877   g_mutex_lock (&priv->lock);
1878   GST_INFO ("finding %s:%d in %d transports", dest, port,
1879       g_list_length (priv->transports));
1880
1881   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1882     GstRTSPStreamTransport *trans = walk->data;
1883     const GstRTSPTransport *tr;
1884     gint min, max;
1885
1886     tr = gst_rtsp_stream_transport_get_transport (trans);
1887
1888     if (priv->client_side) {
1889       /* In client side mode the 'destination' is the RTSP server, so send
1890        * to those ports */
1891       min = tr->server_port.min;
1892       max = tr->server_port.max;
1893     } else {
1894       min = tr->client_port.min;
1895       max = tr->client_port.max;
1896     }
1897
1898     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1899       result = trans;
1900       break;
1901     }
1902   }
1903   if (result)
1904     g_object_ref (result);
1905   g_mutex_unlock (&priv->lock);
1906
1907   g_free (dest);
1908
1909   return result;
1910 }
1911
1912 static GstRTSPStreamTransport *
1913 check_transport (GObject * source, GstRTSPStream * stream)
1914 {
1915   GstStructure *stats;
1916   GstRTSPStreamTransport *trans;
1917
1918   /* see if we have a stream to match with the origin of the RTCP packet */
1919   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1920   if (trans == NULL) {
1921     g_object_get (source, "stats", &stats, NULL);
1922     if (stats) {
1923       const gchar *rtcp_from;
1924
1925       dump_structure (stats);
1926
1927       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1928       if ((trans = find_transport (stream, rtcp_from))) {
1929         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1930             source);
1931         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1932             g_object_unref);
1933       }
1934       gst_structure_free (stats);
1935     }
1936   }
1937   return trans;
1938 }
1939
1940
1941 static void
1942 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1943 {
1944   GstRTSPStreamTransport *trans;
1945
1946   GST_INFO ("%p: new source %p", stream, source);
1947
1948   trans = check_transport (source, stream);
1949
1950   if (trans)
1951     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1952 }
1953
1954 static void
1955 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1956 {
1957   GST_INFO ("%p: new SDES %p", stream, source);
1958 }
1959
1960 static void
1961 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1962 {
1963   GstRTSPStreamTransport *trans;
1964
1965   trans = check_transport (source, stream);
1966
1967   if (trans) {
1968     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1969     gst_rtsp_stream_transport_keep_alive (trans);
1970   }
1971 #ifdef DUMP_STATS
1972   {
1973     GstStructure *stats;
1974     g_object_get (source, "stats", &stats, NULL);
1975     if (stats) {
1976       dump_structure (stats);
1977       gst_structure_free (stats);
1978     }
1979   }
1980 #endif
1981 }
1982
1983 static void
1984 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1985 {
1986   GST_INFO ("%p: source %p bye", stream, source);
1987 }
1988
1989 static void
1990 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1991 {
1992   GstRTSPStreamTransport *trans;
1993
1994   GST_INFO ("%p: source %p bye timeout", stream, source);
1995
1996   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1997     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1998     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1999   }
2000 }
2001
2002 static void
2003 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2004 {
2005   GstRTSPStreamTransport *trans;
2006
2007   GST_INFO ("%p: source %p timeout", stream, source);
2008
2009   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2010     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2011     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2012   }
2013 }
2014
2015 static void
2016 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2017 {
2018   GST_INFO ("%p: new sender source %p", stream, source);
2019 #ifndef DUMP_STATS
2020   {
2021     GstStructure *stats;
2022     g_object_get (source, "stats", &stats, NULL);
2023     if (stats) {
2024       dump_structure (stats);
2025       gst_structure_free (stats);
2026     }
2027   }
2028 #endif
2029 }
2030
2031 static void
2032 on_sender_ssrc_active (GObject * session, GObject * source,
2033     GstRTSPStream * stream)
2034 {
2035 #ifndef DUMP_STATS
2036   {
2037     GstStructure *stats;
2038     g_object_get (source, "stats", &stats, NULL);
2039     if (stats) {
2040       dump_structure (stats);
2041       gst_structure_free (stats);
2042     }
2043   }
2044 #endif
2045 }
2046
2047 static void
2048 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2049 {
2050   if (is_rtp) {
2051     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2052     g_list_free (priv->tr_cache_rtp);
2053     priv->tr_cache_rtp = NULL;
2054   } else {
2055     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2056     g_list_free (priv->tr_cache_rtcp);
2057     priv->tr_cache_rtcp = NULL;
2058   }
2059 }
2060
2061 static GstFlowReturn
2062 handle_new_sample (GstAppSink * sink, gpointer user_data)
2063 {
2064   GstRTSPStreamPrivate *priv;
2065   GList *walk;
2066   GstSample *sample;
2067   GstBuffer *buffer;
2068   GstRTSPStream *stream;
2069   gboolean is_rtp;
2070
2071   sample = gst_app_sink_pull_sample (sink);
2072   if (!sample)
2073     return GST_FLOW_OK;
2074
2075   stream = (GstRTSPStream *) user_data;
2076   priv = stream->priv;
2077   buffer = gst_sample_get_buffer (sample);
2078
2079   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2080
2081   g_mutex_lock (&priv->lock);
2082   if (is_rtp) {
2083     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2084       clear_tr_cache (priv, is_rtp);
2085       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2086         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2087         priv->tr_cache_rtp =
2088             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2089       }
2090       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2091     }
2092   } else {
2093     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2094       clear_tr_cache (priv, is_rtp);
2095       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2096         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2097         priv->tr_cache_rtcp =
2098             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2099       }
2100       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2101     }
2102   }
2103   g_mutex_unlock (&priv->lock);
2104
2105   if (is_rtp) {
2106     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2107       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2108       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2109     }
2110   } else {
2111     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2112       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2113       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2114     }
2115   }
2116   gst_sample_unref (sample);
2117
2118   return GST_FLOW_OK;
2119 }
2120
2121 static GstAppSinkCallbacks sink_cb = {
2122   NULL,                         /* not interested in EOS */
2123   NULL,                         /* not interested in preroll samples */
2124   handle_new_sample,
2125 };
2126
2127 static GstElement *
2128 get_rtp_encoder (GstRTSPStream * stream, guint session)
2129 {
2130   GstRTSPStreamPrivate *priv = stream->priv;
2131
2132   if (priv->srtpenc == NULL) {
2133     gchar *name;
2134
2135     name = g_strdup_printf ("srtpenc_%u", session);
2136     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2137     g_free (name);
2138
2139     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2140   }
2141   return gst_object_ref (priv->srtpenc);
2142 }
2143
2144 static GstElement *
2145 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2146 {
2147   GstRTSPStreamPrivate *priv = stream->priv;
2148   GstElement *oldenc, *enc;
2149   GstPad *pad;
2150   gchar *name;
2151
2152   if (priv->idx != session)
2153     return NULL;
2154
2155   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2156
2157   oldenc = priv->srtpenc;
2158   enc = get_rtp_encoder (stream, session);
2159   name = g_strdup_printf ("rtp_sink_%d", session);
2160   pad = gst_element_get_request_pad (enc, name);
2161   g_free (name);
2162   gst_object_unref (pad);
2163
2164   if (oldenc == NULL)
2165     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2166         enc);
2167
2168   return enc;
2169 }
2170
2171 static GstElement *
2172 request_rtcp_encoder (GstElement * rtpbin, guint session,
2173     GstRTSPStream * stream)
2174 {
2175   GstRTSPStreamPrivate *priv = stream->priv;
2176   GstElement *oldenc, *enc;
2177   GstPad *pad;
2178   gchar *name;
2179
2180   if (priv->idx != session)
2181     return NULL;
2182
2183   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2184
2185   oldenc = priv->srtpenc;
2186   enc = get_rtp_encoder (stream, session);
2187   name = g_strdup_printf ("rtcp_sink_%d", session);
2188   pad = gst_element_get_request_pad (enc, name);
2189   g_free (name);
2190   gst_object_unref (pad);
2191
2192   if (oldenc == NULL)
2193     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2194         enc);
2195
2196   return enc;
2197 }
2198
2199 static GstCaps *
2200 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2201 {
2202   GstRTSPStreamPrivate *priv = stream->priv;
2203   GstCaps *caps;
2204
2205   GST_DEBUG ("request key %08x", ssrc);
2206
2207   g_mutex_lock (&priv->lock);
2208   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2209     gst_caps_ref (caps);
2210   g_mutex_unlock (&priv->lock);
2211
2212   return caps;
2213 }
2214
2215 static GstElement *
2216 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2217     GstRTSPStream * stream)
2218 {
2219   GstRTSPStreamPrivate *priv = stream->priv;
2220
2221   if (priv->idx != session)
2222     return NULL;
2223
2224   if (priv->srtpdec == NULL) {
2225     gchar *name;
2226
2227     name = g_strdup_printf ("srtpdec_%u", session);
2228     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2229     g_free (name);
2230
2231     g_signal_connect (priv->srtpdec, "request-key",
2232         (GCallback) request_key, stream);
2233   }
2234   return gst_object_ref (priv->srtpdec);
2235 }
2236
2237 /**
2238  * gst_rtsp_stream_request_aux_sender:
2239  * @stream: a #GstRTSPStream
2240  * @sessid: the session id
2241  *
2242  * Creating a rtxsend bin
2243  *
2244  * Returns: (transfer full): a #GstElement.
2245  *
2246  * Since: 1.6
2247  */
2248 GstElement *
2249 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2250 {
2251   GstElement *bin;
2252   GstPad *pad;
2253   GstStructure *pt_map;
2254   gchar *name;
2255   guint pt, rtx_pt;
2256   gchar *pt_s;
2257
2258   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2259
2260   pt = gst_rtsp_stream_get_pt (stream);
2261   pt_s = g_strdup_printf ("%u", pt);
2262   rtx_pt = stream->priv->rtx_pt;
2263
2264   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2265
2266   bin = gst_bin_new (NULL);
2267   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2268   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2269       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2270   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2271       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2272   g_free (pt_s);
2273   gst_structure_free (pt_map);
2274   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2275
2276   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2277   name = g_strdup_printf ("src_%u", sessid);
2278   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2279   g_free (name);
2280   gst_object_unref (pad);
2281
2282   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2283   name = g_strdup_printf ("sink_%u", sessid);
2284   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2285   g_free (name);
2286   gst_object_unref (pad);
2287
2288   return bin;
2289 }
2290
2291 /**
2292  * gst_rtsp_stream_set_pt_map:
2293  * @stream: a #GstRTSPStream
2294  * @pt: the pt
2295  * @caps: a #GstCaps
2296  *
2297  * Configure a pt map between @pt and @caps.
2298  */
2299 void
2300 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2301 {
2302   GstRTSPStreamPrivate *priv = stream->priv;
2303
2304   g_mutex_lock (&priv->lock);
2305   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2306   g_mutex_unlock (&priv->lock);
2307 }
2308
2309 /**
2310  * gst_rtsp_stream_set_publish_clock_mode:
2311  * @stream: a #GstRTSPStream
2312  * @mode: the clock publish mode
2313  *
2314  * Sets if and how the stream clock should be published according to RFC7273.
2315  *
2316  * Since: 1.8
2317  */
2318 void
2319 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2320     GstRTSPPublishClockMode mode)
2321 {
2322   GstRTSPStreamPrivate *priv;
2323
2324   priv = stream->priv;
2325   g_mutex_lock (&priv->lock);
2326   priv->publish_clock_mode = mode;
2327   g_mutex_unlock (&priv->lock);
2328 }
2329
2330 /**
2331  * gst_rtsp_stream_get_publish_clock_mode:
2332  * @factory: a #GstRTSPStream
2333  *
2334  * Gets if and how the stream clock should be published according to RFC7273.
2335  *
2336  * Returns: The GstRTSPPublishClockMode
2337  *
2338  * Since: 1.8
2339  */
2340 GstRTSPPublishClockMode
2341 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2342 {
2343   GstRTSPStreamPrivate *priv;
2344   GstRTSPPublishClockMode ret;
2345
2346   priv = stream->priv;
2347   g_mutex_lock (&priv->lock);
2348   ret = priv->publish_clock_mode;
2349   g_mutex_unlock (&priv->lock);
2350
2351   return ret;
2352 }
2353
2354 static GstCaps *
2355 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2356     GstRTSPStream * stream)
2357 {
2358   GstRTSPStreamPrivate *priv = stream->priv;
2359   GstCaps *caps = NULL;
2360
2361   g_mutex_lock (&priv->lock);
2362
2363   if (priv->idx == session) {
2364     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2365     if (caps) {
2366       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2367       gst_caps_ref (caps);
2368     } else {
2369       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2370     }
2371   }
2372
2373   g_mutex_unlock (&priv->lock);
2374
2375   return caps;
2376 }
2377
2378 static void
2379 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2380 {
2381   GstRTSPStreamPrivate *priv = stream->priv;
2382   gchar *name;
2383   GstPadLinkReturn ret;
2384   guint sessid;
2385
2386   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2387       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2388
2389   name = gst_pad_get_name (pad);
2390   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2391     g_free (name);
2392     return;
2393   }
2394   g_free (name);
2395
2396   if (priv->idx != sessid)
2397     return;
2398
2399   if (gst_pad_is_linked (priv->sinkpad)) {
2400     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2401         GST_DEBUG_PAD_NAME (priv->sinkpad));
2402     return;
2403   }
2404
2405   /* link the RTP pad to the session manager, it should not really fail unless
2406    * this is not really an RTP pad */
2407   ret = gst_pad_link (pad, priv->sinkpad);
2408   if (ret != GST_PAD_LINK_OK)
2409     goto link_failed;
2410   priv->recv_rtp_src = gst_object_ref (pad);
2411
2412   return;
2413
2414 /* ERRORS */
2415 link_failed:
2416   {
2417     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2418         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2419   }
2420 }
2421
2422 static void
2423 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2424     GstRTSPStream * stream)
2425 {
2426   /* TODO: What to do here other than this? */
2427   GST_DEBUG ("Stream %p: Got EOS", stream);
2428   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2429 }
2430
2431 /* must be called with lock */
2432 static gboolean
2433 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2434 {
2435   GstRTSPStreamPrivate *priv;
2436   GstPad *pad, *sinkpad = NULL;
2437   gboolean is_tcp = FALSE, is_udp = FALSE;
2438   gint i;
2439
2440   priv = stream->priv;
2441
2442   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2443   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2444       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2445
2446   if (is_udp && !create_and_configure_udpsinks (stream))
2447     goto no_udp_protocol;
2448
2449   for (i = 0; i < 2; i++) {
2450     GstPad *teepad, *queuepad;
2451     /* For the sender we create this bit of pipeline for both
2452      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2453      * we need to add a queue before appsink and udpsink to make
2454      * the pipeline not block. For the TCP case, we want to pump
2455      * client as fast as possible anyway. This pipeline is used
2456      * when both TCP and UDP are present.
2457      *
2458      * .--------.      .-----.    .---------.    .---------.
2459      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2460      * |       send->sink   src->sink      src->sink       |
2461      * '--------'      |     |    '---------'    '---------'
2462      *                 |     |    .---------.    .---------.
2463      *                 |     |    |  queue  |    | appsink |
2464      *                 |    src->sink      src->sink       |
2465      *                 '-----'    '---------'    '---------'
2466      *
2467      * When only UDP or only TCP is allowed, we skip the tee and queue
2468      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2469      * the session.
2470      */
2471
2472     /* Only link the RTP send src if we're going to send RTP, link
2473      * the RTCP send src always */
2474     if (!priv->srcpad && i == 0)
2475       continue;
2476
2477     if (is_udp) {
2478       /* add udpsink */
2479       gst_bin_add (bin, priv->udpsink[i]);
2480       sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2481     }
2482
2483     if (is_tcp) {
2484       /* make appsink */
2485       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2486       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2487       gst_bin_add (bin, priv->appsink[i]);
2488       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2489           &sink_cb, stream, NULL);
2490     }
2491
2492     if (is_udp && is_tcp) {
2493       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2494
2495       /* make tee for RTP/RTCP */
2496       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2497       gst_bin_add (bin, priv->tee[i]);
2498
2499       /* and link to rtpbin send pad */
2500       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2501       gst_pad_link (priv->send_src[i], pad);
2502       gst_object_unref (pad);
2503
2504       priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2505       g_object_set (priv->udpqueue[i], "max-size-buffers",
2506           1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
2507       gst_bin_add (bin, priv->udpqueue[i]);
2508       /* link tee to udpqueue */
2509       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2510       pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2511       gst_pad_link (teepad, pad);
2512       gst_object_unref (pad);
2513       gst_object_unref (teepad);
2514
2515       /* link udpqueue to udpsink */
2516       queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2517       gst_pad_link (queuepad, sinkpad);
2518       gst_object_unref (queuepad);
2519       gst_object_unref (sinkpad);
2520
2521       /* make appqueue */
2522       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2523       g_object_set (priv->appqueue[i], "max-size-buffers",
2524           1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0), NULL);
2525       gst_bin_add (bin, priv->appqueue[i]);
2526       /* and link tee to appqueue */
2527       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2528       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2529       gst_pad_link (teepad, pad);
2530       gst_object_unref (pad);
2531       gst_object_unref (teepad);
2532
2533       /* and link appqueue to appsink */
2534       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2535       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2536       gst_pad_link (queuepad, pad);
2537       gst_object_unref (pad);
2538       gst_object_unref (queuepad);
2539     } else if (is_tcp) {
2540       /* only appsink needed, link it to the session */
2541       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2542       gst_pad_link (priv->send_src[i], pad);
2543       gst_object_unref (pad);
2544
2545       /* when its only TCP, we need to set sync and preroll to FALSE
2546        * for the sink to avoid deadlock. And this is only needed for
2547        * sink used for RTCP data, not the RTP data. */
2548       if (i == 1)
2549         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2550     } else {
2551       /* else only udpsink needed, link it to the session */
2552       gst_pad_link (priv->send_src[i], sinkpad);
2553       gst_object_unref (sinkpad);
2554     }
2555
2556     /* check if we need to set to a special state */
2557     if (state != GST_STATE_NULL) {
2558       if (priv->udpsink[i])
2559         gst_element_set_state (priv->udpsink[i], state);
2560       if (priv->appsink[i])
2561         gst_element_set_state (priv->appsink[i], state);
2562       if (priv->appqueue[i])
2563         gst_element_set_state (priv->appqueue[i], state);
2564       if (priv->udpqueue[i])
2565         gst_element_set_state (priv->udpqueue[i], state);
2566       if (priv->tee[i])
2567         gst_element_set_state (priv->tee[i], state);
2568     }
2569   }
2570
2571   return TRUE;
2572
2573   /* ERRORS */
2574 no_udp_protocol:
2575   {
2576     return FALSE;
2577   }
2578 }
2579
2580 /* must be called with lock */
2581 static void
2582 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2583 {
2584   GstRTSPStreamPrivate *priv;
2585   GstPad *pad, *selpad;
2586   gboolean is_tcp;
2587   gint i;
2588
2589   priv = stream->priv;
2590
2591   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2592
2593   for (i = 0; i < 2; i++) {
2594     /* For the receiver we create this bit of pipeline for both
2595      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2596      * and it is all funneled into the rtpbin receive pad.
2597      *
2598      * .--------.     .--------.    .--------.
2599      * | udpsrc |     | funnel |    | rtpbin |
2600      * |       src->sink      src->sink      |
2601      * '--------'     |        |    '--------'
2602      * .--------.     |        |
2603      * | appsrc |     |        |
2604      * |       src->sink       |
2605      * '--------'     '--------'
2606      */
2607
2608     if (!priv->sinkpad && i == 0) {
2609       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2610        * RTCP sink always */
2611       continue;
2612     }
2613
2614     /* make funnel for the RTP/RTCP receivers */
2615     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2616     gst_bin_add (bin, priv->funnel[i]);
2617
2618     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2619     gst_pad_link (pad, priv->recv_sink[i]);
2620     gst_object_unref (pad);
2621
2622     if (priv->udpsrc_v4[i]) {
2623       if (priv->srcpad) {
2624         /* we set and keep these to playing so that they don't cause NO_PREROLL return
2625          * values. This is only relevant for PLAY pipelines */
2626         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2627         gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2628       }
2629       /* add udpsrc */
2630       gst_bin_add (bin, priv->udpsrc_v4[i]);
2631
2632       /* and link to the funnel v4 */
2633       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2634       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2635       gst_pad_link (pad, selpad);
2636       gst_object_unref (pad);
2637       gst_object_unref (selpad);
2638     }
2639
2640     if (priv->udpsrc_v6[i]) {
2641       if (priv->srcpad) {
2642         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2643         gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2644       }
2645       gst_bin_add (bin, priv->udpsrc_v6[i]);
2646
2647       /* and link to the funnel v6 */
2648       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2649       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2650       gst_pad_link (pad, selpad);
2651       gst_object_unref (pad);
2652       gst_object_unref (selpad);
2653     }
2654
2655     if (is_tcp) {
2656       /* make and add appsrc */
2657       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2658       priv->appsrc_base_time[i] = -1;
2659       if (priv->srcpad) {
2660         gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2661         gst_element_set_locked_state (priv->appsrc[i], TRUE);
2662       }
2663       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2664           TRUE, NULL);
2665       gst_bin_add (bin, priv->appsrc[i]);
2666       /* and link to the funnel */
2667       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2668       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2669       gst_pad_link (pad, selpad);
2670       gst_object_unref (pad);
2671       gst_object_unref (selpad);
2672     }
2673
2674     /* check if we need to set to a special state */
2675     if (state != GST_STATE_NULL) {
2676       gst_element_set_state (priv->funnel[i], state);
2677     }
2678   }
2679 }
2680
2681 /**
2682  * gst_rtsp_stream_join_bin:
2683  * @stream: a #GstRTSPStream
2684  * @bin: (transfer none): a #GstBin to join
2685  * @rtpbin: (transfer none): a rtpbin element in @bin
2686  * @state: the target state of the new elements
2687  *
2688  * Join the #GstBin @bin that contains the element @rtpbin.
2689  *
2690  * @stream will link to @rtpbin, which must be inside @bin. The elements
2691  * added to @bin will be set to the state given in @state.
2692  *
2693  * Returns: %TRUE on success.
2694  */
2695 gboolean
2696 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2697     GstElement * rtpbin, GstState state)
2698 {
2699   GstRTSPStreamPrivate *priv;
2700   guint idx;
2701   gchar *name;
2702   GstPadLinkReturn ret;
2703
2704   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2705   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2706   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2707
2708   priv = stream->priv;
2709
2710   g_mutex_lock (&priv->lock);
2711   if (priv->joined_bin != NULL)
2712     goto was_joined;
2713
2714   /* create a session with the same index as the stream */
2715   idx = priv->idx;
2716
2717   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2718
2719   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2720       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2721     /* For SRTP */
2722     g_signal_connect (rtpbin, "request-rtp-encoder",
2723         (GCallback) request_rtp_encoder, stream);
2724     g_signal_connect (rtpbin, "request-rtcp-encoder",
2725         (GCallback) request_rtcp_encoder, stream);
2726     g_signal_connect (rtpbin, "request-rtp-decoder",
2727         (GCallback) request_rtp_rtcp_decoder, stream);
2728     g_signal_connect (rtpbin, "request-rtcp-decoder",
2729         (GCallback) request_rtp_rtcp_decoder, stream);
2730   }
2731
2732   if (priv->sinkpad) {
2733     g_signal_connect (rtpbin, "request-pt-map",
2734         (GCallback) request_pt_map, stream);
2735   }
2736
2737   /* get pads from the RTP session element for sending and receiving
2738    * RTP/RTCP*/
2739   if (priv->srcpad) {
2740     /* get a pad for sending RTP */
2741     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2742     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2743     g_free (name);
2744
2745     /* link the RTP pad to the session manager, it should not really fail unless
2746      * this is not really an RTP pad */
2747     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2748     if (ret != GST_PAD_LINK_OK)
2749       goto link_failed;
2750
2751     name = g_strdup_printf ("send_rtp_src_%u", idx);
2752     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2753     g_free (name);
2754   } else {
2755     /* Need to connect our sinkpad from here */
2756     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2757     /* EOS */
2758     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2759
2760     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2761     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2762     g_free (name);
2763   }
2764
2765   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2766   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2767   g_free (name);
2768   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2769   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2770   g_free (name);
2771
2772   /* get the session */
2773   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2774
2775   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2776       stream);
2777   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2778       stream);
2779   g_signal_connect (priv->session, "on-ssrc-active",
2780       (GCallback) on_ssrc_active, stream);
2781   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2782       stream);
2783   g_signal_connect (priv->session, "on-bye-timeout",
2784       (GCallback) on_bye_timeout, stream);
2785   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2786       stream);
2787
2788   /* signal for sender ssrc */
2789   g_signal_connect (priv->session, "on-new-sender-ssrc",
2790       (GCallback) on_new_sender_ssrc, stream);
2791   g_signal_connect (priv->session, "on-sender-ssrc-active",
2792       (GCallback) on_sender_ssrc_active, stream);
2793
2794   if (!create_sender_part (stream, bin, state))
2795     goto no_udp_protocol;
2796
2797   create_receiver_part (stream, bin, state);
2798
2799   if (priv->srcpad) {
2800     /* be notified of caps changes */
2801     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2802         (GCallback) caps_notify, stream);
2803   }
2804
2805   priv->joined_bin = gst_object_ref (bin);
2806   g_mutex_unlock (&priv->lock);
2807
2808   return TRUE;
2809
2810   /* ERRORS */
2811 was_joined:
2812   {
2813     g_mutex_unlock (&priv->lock);
2814     return TRUE;
2815   }
2816 link_failed:
2817   {
2818     GST_WARNING ("failed to link stream %u", idx);
2819     gst_object_unref (priv->send_rtp_sink);
2820     priv->send_rtp_sink = NULL;
2821     g_mutex_unlock (&priv->lock);
2822     return FALSE;
2823   }
2824 no_udp_protocol:
2825   {
2826     GST_WARNING ("failed to allocate ports %u", idx);
2827     gst_object_unref (priv->send_rtp_sink);
2828     priv->send_rtp_sink = NULL;
2829     gst_object_unref (priv->send_src[0]);
2830     priv->send_src[0] = NULL;
2831     gst_object_unref (priv->send_src[1]);
2832     priv->send_src[1] = NULL;
2833     gst_object_unref (priv->recv_sink[0]);
2834     priv->recv_sink[0] = NULL;
2835     gst_object_unref (priv->recv_sink[1]);
2836     priv->recv_sink[1] = NULL;
2837     if (priv->udpsink[0])
2838       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2839     if (priv->udpsink[1])
2840       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2841     if (priv->udpsrc_v4[0]) {
2842       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2843       gst_object_unref (priv->udpsrc_v4[0]);
2844       priv->udpsrc_v4[0] = NULL;
2845     }
2846     if (priv->udpsrc_v4[1]) {
2847       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2848       gst_object_unref (priv->udpsrc_v4[1]);
2849       priv->udpsrc_v4[1] = NULL;
2850     }
2851     if (priv->udpsrc_mcast_v4[0]) {
2852       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2853       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2854       priv->udpsrc_mcast_v4[0] = NULL;
2855     }
2856     if (priv->udpsrc_mcast_v4[1]) {
2857       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2858       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2859       priv->udpsrc_mcast_v4[1] = NULL;
2860     }
2861     if (priv->udpsrc_v6[0]) {
2862       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2863       gst_object_unref (priv->udpsrc_v6[0]);
2864       priv->udpsrc_v6[0] = NULL;
2865     }
2866     if (priv->udpsrc_v6[1]) {
2867       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2868       gst_object_unref (priv->udpsrc_v6[1]);
2869       priv->udpsrc_v6[1] = NULL;
2870     }
2871     if (priv->udpsrc_mcast_v6[0]) {
2872       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2873       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2874       priv->udpsrc_mcast_v6[0] = NULL;
2875     }
2876     if (priv->udpsrc_mcast_v6[1]) {
2877       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2878       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2879       priv->udpsrc_mcast_v6[1] = NULL;
2880     }
2881     g_mutex_unlock (&priv->lock);
2882     return FALSE;
2883   }
2884 }
2885
2886 static void
2887 clear_element (GstBin * bin, GstElement ** elementptr)
2888 {
2889   if (*elementptr) {
2890     gst_element_set_locked_state (*elementptr, FALSE);
2891     gst_element_set_state (*elementptr, GST_STATE_NULL);
2892     if (GST_ELEMENT_PARENT (*elementptr))
2893       gst_bin_remove (bin, *elementptr);
2894     else
2895       gst_object_unref (*elementptr);
2896     *elementptr = NULL;
2897   }
2898 }
2899
2900 /**
2901  * gst_rtsp_stream_leave_bin:
2902  * @stream: a #GstRTSPStream
2903  * @bin: (transfer none): a #GstBin
2904  * @rtpbin: (transfer none): a rtpbin #GstElement
2905  *
2906  * Remove the elements of @stream from @bin.
2907  *
2908  * Return: %TRUE on success.
2909  */
2910 gboolean
2911 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2912     GstElement * rtpbin)
2913 {
2914   GstRTSPStreamPrivate *priv;
2915   gint i;
2916
2917   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2918   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2919   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2920
2921   priv = stream->priv;
2922
2923   g_mutex_lock (&priv->lock);
2924   if (priv->joined_bin == NULL)
2925     goto was_not_joined;
2926   if (priv->joined_bin != bin)
2927     goto wrong_bin;
2928
2929   priv->joined_bin = NULL;
2930
2931   /* all transports must be removed by now */
2932   if (priv->transports != NULL)
2933     goto transports_not_removed;
2934
2935   clear_tr_cache (priv, TRUE);
2936   clear_tr_cache (priv, FALSE);
2937
2938   GST_INFO ("stream %p leaving bin", stream);
2939
2940   if (priv->srcpad) {
2941     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2942
2943     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2944     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2945     gst_object_unref (priv->send_rtp_sink);
2946     priv->send_rtp_sink = NULL;
2947   } else if (priv->recv_rtp_src) {
2948     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2949     gst_object_unref (priv->recv_rtp_src);
2950     priv->recv_rtp_src = NULL;
2951   }
2952
2953   for (i = 0; i < 2; i++) {
2954     clear_element (bin, &priv->udpsink[i]);
2955     clear_element (bin, &priv->appsink[i]);
2956     clear_element (bin, &priv->appqueue[i]);
2957     clear_element (bin, &priv->udpqueue[i]);
2958     clear_element (bin, &priv->tee[i]);
2959     clear_element (bin, &priv->funnel[i]);
2960     clear_element (bin, &priv->appsrc[i]);
2961     clear_element (bin, &priv->udpsrc_v4[i]);
2962     clear_element (bin, &priv->udpsrc_v6[i]);
2963     clear_element (bin, &priv->udpsrc_mcast_v4[i]);
2964     clear_element (bin, &priv->udpsrc_mcast_v6[i]);
2965
2966     if (priv->sinkpad || i == 1) {
2967       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2968       gst_object_unref (priv->recv_sink[i]);
2969       priv->recv_sink[i] = NULL;
2970     }
2971   }
2972
2973   if (priv->srcpad) {
2974     gst_object_unref (priv->send_src[0]);
2975     priv->send_src[0] = NULL;
2976   }
2977
2978   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2979   gst_object_unref (priv->send_src[1]);
2980   priv->send_src[1] = NULL;
2981
2982   g_object_unref (priv->session);
2983   priv->session = NULL;
2984   if (priv->caps)
2985     gst_caps_unref (priv->caps);
2986   priv->caps = NULL;
2987
2988   if (priv->srtpenc)
2989     gst_object_unref (priv->srtpenc);
2990   if (priv->srtpdec)
2991     gst_object_unref (priv->srtpdec);
2992
2993   g_clear_object (&priv->joined_bin);
2994   g_mutex_unlock (&priv->lock);
2995
2996   return TRUE;
2997
2998 was_not_joined:
2999   {
3000     g_mutex_unlock (&priv->lock);
3001     return TRUE;
3002   }
3003 transports_not_removed:
3004   {
3005     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3006     g_mutex_unlock (&priv->lock);
3007     return FALSE;
3008   }
3009 wrong_bin:
3010   {
3011     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3012     g_mutex_unlock (&priv->lock);
3013     return FALSE;
3014   }
3015 }
3016
3017 /**
3018  * gst_rtsp_stream_get_joined_bin:
3019  * @stream: a #GstRTSPStream
3020  *
3021  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3022  *
3023  * Return: (transfer full): the joined bin or NULL.
3024  */
3025 GstBin *
3026 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3027 {
3028   GstRTSPStreamPrivate *priv;
3029   GstBin *bin = NULL;
3030
3031   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3032
3033   priv = stream->priv;
3034
3035   g_mutex_lock (&priv->lock);
3036   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3037   g_mutex_unlock (&priv->lock);
3038
3039   return bin;
3040 }
3041
3042 /**
3043  * gst_rtsp_stream_get_rtpinfo:
3044  * @stream: a #GstRTSPStream
3045  * @rtptime: (allow-none): result RTP timestamp
3046  * @seq: (allow-none): result RTP seqnum
3047  * @clock_rate: (allow-none): the clock rate
3048  * @running_time: (allow-none): result running-time
3049  *
3050  * Retrieve the current rtptime, seq and running-time. This is used to
3051  * construct a RTPInfo reply header.
3052  *
3053  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3054  */
3055 gboolean
3056 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3057     guint * rtptime, guint * seq, guint * clock_rate,
3058     GstClockTime * running_time)
3059 {
3060   GstRTSPStreamPrivate *priv;
3061   GstStructure *stats;
3062   GObjectClass *payobjclass;
3063
3064   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3065
3066   priv = stream->priv;
3067
3068   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3069
3070   g_mutex_lock (&priv->lock);
3071
3072   /* First try to extract the information from the last buffer on the sinks.
3073    * This will have a more accurate sequence number and timestamp, as between
3074    * the payloader and the sink there can be some queues
3075    */
3076   if (priv->udpsink[0] || priv->appsink[0]) {
3077     GstSample *last_sample;
3078
3079     if (priv->udpsink[0])
3080       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3081     else
3082       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3083
3084     if (last_sample) {
3085       GstCaps *caps;
3086       GstBuffer *buffer;
3087       GstSegment *segment;
3088       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3089
3090       caps = gst_sample_get_caps (last_sample);
3091       buffer = gst_sample_get_buffer (last_sample);
3092       segment = gst_sample_get_segment (last_sample);
3093
3094       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3095         if (seq) {
3096           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3097         }
3098
3099         if (rtptime) {
3100           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3101         }
3102
3103         gst_rtp_buffer_unmap (&rtp_buffer);
3104
3105         if (running_time) {
3106           *running_time =
3107               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3108               GST_BUFFER_TIMESTAMP (buffer));
3109         }
3110
3111         if (clock_rate) {
3112           GstStructure *s = gst_caps_get_structure (caps, 0);
3113
3114           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3115
3116           if (*clock_rate == 0 && running_time)
3117             *running_time = GST_CLOCK_TIME_NONE;
3118         }
3119         gst_sample_unref (last_sample);
3120
3121         goto done;
3122       } else {
3123         gst_sample_unref (last_sample);
3124       }
3125     }
3126   }
3127
3128   if (g_object_class_find_property (payobjclass, "stats")) {
3129     g_object_get (priv->payloader, "stats", &stats, NULL);
3130     if (stats == NULL)
3131       goto no_stats;
3132
3133     if (seq)
3134       gst_structure_get_uint (stats, "seqnum", seq);
3135
3136     if (rtptime)
3137       gst_structure_get_uint (stats, "timestamp", rtptime);
3138
3139     if (running_time)
3140       gst_structure_get_clock_time (stats, "running-time", running_time);
3141
3142     if (clock_rate) {
3143       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3144       if (*clock_rate == 0 && running_time)
3145         *running_time = GST_CLOCK_TIME_NONE;
3146     }
3147     gst_structure_free (stats);
3148   } else {
3149     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3150         !g_object_class_find_property (payobjclass, "timestamp"))
3151       goto no_stats;
3152
3153     if (seq)
3154       g_object_get (priv->payloader, "seqnum", seq, NULL);
3155
3156     if (rtptime)
3157       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3158
3159     if (running_time)
3160       *running_time = GST_CLOCK_TIME_NONE;
3161   }
3162
3163 done:
3164   g_mutex_unlock (&priv->lock);
3165
3166   return TRUE;
3167
3168   /* ERRORS */
3169 no_stats:
3170   {
3171     GST_WARNING ("Could not get payloader stats");
3172     g_mutex_unlock (&priv->lock);
3173     return FALSE;
3174   }
3175 }
3176
3177 /**
3178  * gst_rtsp_stream_get_caps:
3179  * @stream: a #GstRTSPStream
3180  *
3181  * Retrieve the current caps of @stream.
3182  *
3183  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3184  * after usage.
3185  */
3186 GstCaps *
3187 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3188 {
3189   GstRTSPStreamPrivate *priv;
3190   GstCaps *result;
3191
3192   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3193
3194   priv = stream->priv;
3195
3196   g_mutex_lock (&priv->lock);
3197   if ((result = priv->caps))
3198     gst_caps_ref (result);
3199   g_mutex_unlock (&priv->lock);
3200
3201   return result;
3202 }
3203
3204 /**
3205  * gst_rtsp_stream_recv_rtp:
3206  * @stream: a #GstRTSPStream
3207  * @buffer: (transfer full): a #GstBuffer
3208  *
3209  * Handle an RTP buffer for the stream. This method is usually called when a
3210  * message has been received from a client using the TCP transport.
3211  *
3212  * This function takes ownership of @buffer.
3213  *
3214  * Returns: a GstFlowReturn.
3215  */
3216 GstFlowReturn
3217 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3218 {
3219   GstRTSPStreamPrivate *priv;
3220   GstFlowReturn ret;
3221   GstElement *element;
3222
3223   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3224   priv = stream->priv;
3225   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3226   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3227
3228   g_mutex_lock (&priv->lock);
3229   if (priv->appsrc[0])
3230     element = gst_object_ref (priv->appsrc[0]);
3231   else
3232     element = NULL;
3233   g_mutex_unlock (&priv->lock);
3234
3235   if (element) {
3236     if (priv->appsrc_base_time[0] == -1) {
3237       /* Take current running_time. This timestamp will be put on
3238        * the first buffer of each stream because we are a live source and so we
3239        * timestamp with the running_time. When we are dealing with TCP, we also
3240        * only timestamp the first buffer (using the DISCONT flag) because a server
3241        * typically bursts data, for which we don't want to compensate by speeding
3242        * up the media. The other timestamps will be interpollated from this one
3243        * using the RTP timestamps. */
3244       GST_OBJECT_LOCK (element);
3245       if (GST_ELEMENT_CLOCK (element)) {
3246         GstClockTime now;
3247         GstClockTime base_time;
3248
3249         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3250         base_time = GST_ELEMENT_CAST (element)->base_time;
3251
3252         priv->appsrc_base_time[0] = now - base_time;
3253         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3254         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3255             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3256             GST_TIME_ARGS (base_time));
3257       }
3258       GST_OBJECT_UNLOCK (element);
3259     }
3260
3261     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3262     gst_object_unref (element);
3263   } else {
3264     ret = GST_FLOW_OK;
3265   }
3266   return ret;
3267 }
3268
3269 /**
3270  * gst_rtsp_stream_recv_rtcp:
3271  * @stream: a #GstRTSPStream
3272  * @buffer: (transfer full): a #GstBuffer
3273  *
3274  * Handle an RTCP buffer for the stream. This method is usually called when a
3275  * message has been received from a client using the TCP transport.
3276  *
3277  * This function takes ownership of @buffer.
3278  *
3279  * Returns: a GstFlowReturn.
3280  */
3281 GstFlowReturn
3282 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3283 {
3284   GstRTSPStreamPrivate *priv;
3285   GstFlowReturn ret;
3286   GstElement *element;
3287
3288   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3289   priv = stream->priv;
3290   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3291
3292   if (priv->joined_bin == NULL) {
3293     gst_buffer_unref (buffer);
3294     return GST_FLOW_NOT_LINKED;
3295   }
3296   g_mutex_lock (&priv->lock);
3297   if (priv->appsrc[1])
3298     element = gst_object_ref (priv->appsrc[1]);
3299   else
3300     element = NULL;
3301   g_mutex_unlock (&priv->lock);
3302
3303   if (element) {
3304     if (priv->appsrc_base_time[1] == -1) {
3305       /* Take current running_time. This timestamp will be put on
3306        * the first buffer of each stream because we are a live source and so we
3307        * timestamp with the running_time. When we are dealing with TCP, we also
3308        * only timestamp the first buffer (using the DISCONT flag) because a server
3309        * typically bursts data, for which we don't want to compensate by speeding
3310        * up the media. The other timestamps will be interpollated from this one
3311        * using the RTP timestamps. */
3312       GST_OBJECT_LOCK (element);
3313       if (GST_ELEMENT_CLOCK (element)) {
3314         GstClockTime now;
3315         GstClockTime base_time;
3316
3317         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3318         base_time = GST_ELEMENT_CAST (element)->base_time;
3319
3320         priv->appsrc_base_time[1] = now - base_time;
3321         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3322         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3323             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3324             GST_TIME_ARGS (base_time));
3325       }
3326       GST_OBJECT_UNLOCK (element);
3327     }
3328
3329     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3330     gst_object_unref (element);
3331   } else {
3332     ret = GST_FLOW_OK;
3333     gst_buffer_unref (buffer);
3334   }
3335   return ret;
3336 }
3337
3338 /* must be called with lock */
3339 static gboolean
3340 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3341     gboolean add)
3342 {
3343   GstRTSPStreamPrivate *priv = stream->priv;
3344   const GstRTSPTransport *tr;
3345
3346   tr = gst_rtsp_stream_transport_get_transport (trans);
3347
3348   switch (tr->lower_transport) {
3349     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3350     case GST_RTSP_LOWER_TRANS_UDP:
3351     {
3352       gchar *dest;
3353       gint min, max;
3354       guint ttl = 0;
3355
3356       dest = tr->destination;
3357       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3358         min = tr->port.min;
3359         max = tr->port.max;
3360         ttl = tr->ttl;
3361       } else if (priv->client_side) {
3362         /* In client side mode the 'destination' is the RTSP server, so send
3363          * to those ports */
3364         min = tr->server_port.min;
3365         max = tr->server_port.max;
3366       } else {
3367         min = tr->client_port.min;
3368         max = tr->client_port.max;
3369       }
3370
3371       if (add) {
3372         if (ttl > 0) {
3373           GST_INFO ("setting ttl-mc %d", ttl);
3374           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3375           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3376         }
3377         GST_INFO ("adding %s:%d-%d", dest, min, max);
3378         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3379         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3380         priv->transports = g_list_prepend (priv->transports, trans);
3381       } else {
3382         GST_INFO ("removing %s:%d-%d", dest, min, max);
3383         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3384         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3385         priv->transports = g_list_remove (priv->transports, trans);
3386       }
3387       priv->transports_cookie++;
3388       break;
3389     }
3390     case GST_RTSP_LOWER_TRANS_TCP:
3391       if (add) {
3392         GST_INFO ("adding TCP %s", tr->destination);
3393         priv->transports = g_list_prepend (priv->transports, trans);
3394       } else {
3395         GST_INFO ("removing TCP %s", tr->destination);
3396         priv->transports = g_list_remove (priv->transports, trans);
3397       }
3398       priv->transports_cookie++;
3399       break;
3400     default:
3401       goto unknown_transport;
3402   }
3403   return TRUE;
3404
3405   /* ERRORS */
3406 unknown_transport:
3407   {
3408     GST_INFO ("Unknown transport %d", tr->lower_transport);
3409     return FALSE;
3410   }
3411 }
3412
3413
3414 /**
3415  * gst_rtsp_stream_add_transport:
3416  * @stream: a #GstRTSPStream
3417  * @trans: (transfer none): a #GstRTSPStreamTransport
3418  *
3419  * Add the transport in @trans to @stream. The media of @stream will
3420  * then also be send to the values configured in @trans.
3421  *
3422  * @stream must be joined to a bin.
3423  *
3424  * @trans must contain a valid #GstRTSPTransport.
3425  *
3426  * Returns: %TRUE if @trans was added
3427  */
3428 gboolean
3429 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3430     GstRTSPStreamTransport * trans)
3431 {
3432   GstRTSPStreamPrivate *priv;
3433   gboolean res;
3434
3435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3436   priv = stream->priv;
3437   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3438   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3439
3440   g_mutex_lock (&priv->lock);
3441   res = update_transport (stream, trans, TRUE);
3442   g_mutex_unlock (&priv->lock);
3443
3444   return res;
3445 }
3446
3447 /**
3448  * gst_rtsp_stream_remove_transport:
3449  * @stream: a #GstRTSPStream
3450  * @trans: (transfer none): a #GstRTSPStreamTransport
3451  *
3452  * Remove the transport in @trans from @stream. The media of @stream will
3453  * not be sent to the values configured in @trans.
3454  *
3455  * @stream must be joined to a bin.
3456  *
3457  * @trans must contain a valid #GstRTSPTransport.
3458  *
3459  * Returns: %TRUE if @trans was removed
3460  */
3461 gboolean
3462 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3463     GstRTSPStreamTransport * trans)
3464 {
3465   GstRTSPStreamPrivate *priv;
3466   gboolean res;
3467
3468   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3469   priv = stream->priv;
3470   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3471   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3472
3473   g_mutex_lock (&priv->lock);
3474   res = update_transport (stream, trans, FALSE);
3475   g_mutex_unlock (&priv->lock);
3476
3477   return res;
3478 }
3479
3480 /**
3481  * gst_rtsp_stream_update_crypto:
3482  * @stream: a #GstRTSPStream
3483  * @ssrc: the SSRC
3484  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3485  *
3486  * Update the new crypto information for @ssrc in @stream. If information
3487  * for @ssrc did not exist, it will be added. If information
3488  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3489  * be removed from @stream.
3490  *
3491  * Returns: %TRUE if @crypto could be updated
3492  */
3493 gboolean
3494 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3495     guint ssrc, GstCaps * crypto)
3496 {
3497   GstRTSPStreamPrivate *priv;
3498
3499   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3500   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3501
3502   priv = stream->priv;
3503
3504   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3505
3506   g_mutex_lock (&priv->lock);
3507   if (crypto)
3508     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3509         gst_caps_ref (crypto));
3510   else
3511     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3512   g_mutex_unlock (&priv->lock);
3513
3514   return TRUE;
3515 }
3516
3517 /**
3518  * gst_rtsp_stream_get_rtp_socket:
3519  * @stream: a #GstRTSPStream
3520  * @family: the socket family
3521  *
3522  * Get the RTP socket from @stream for a @family.
3523  *
3524  * @stream must be joined to a bin.
3525  *
3526  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3527  * socket could be allocated for @family. Unref after usage
3528  */
3529 GSocket *
3530 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3531 {
3532   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3533   GSocket *socket;
3534   const gchar *name;
3535
3536   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3537   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3538       family == G_SOCKET_FAMILY_IPV6, NULL);
3539   g_return_val_if_fail (priv->udpsink[0], NULL);
3540
3541   if (family == G_SOCKET_FAMILY_IPV6)
3542     name = "socket-v6";
3543   else
3544     name = "socket";
3545
3546   g_object_get (priv->udpsink[0], name, &socket, NULL);
3547
3548   return socket;
3549 }
3550
3551 /**
3552  * gst_rtsp_stream_get_rtcp_socket:
3553  * @stream: a #GstRTSPStream
3554  * @family: the socket family
3555  *
3556  * Get the RTCP socket from @stream for a @family.
3557  *
3558  * @stream must be joined to a bin.
3559  *
3560  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3561  * socket could be allocated for @family. Unref after usage
3562  */
3563 GSocket *
3564 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3565 {
3566   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3567   GSocket *socket;
3568   const gchar *name;
3569
3570   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3571   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3572       family == G_SOCKET_FAMILY_IPV6, NULL);
3573   g_return_val_if_fail (priv->udpsink[1], NULL);
3574
3575   if (family == G_SOCKET_FAMILY_IPV6)
3576     name = "socket-v6";
3577   else
3578     name = "socket";
3579
3580   g_object_get (priv->udpsink[1], name, &socket, NULL);
3581
3582   return socket;
3583 }
3584
3585 /**
3586  * gst_rtsp_stream_set_seqnum:
3587  * @stream: a #GstRTSPStream
3588  * @seqnum: a new sequence number
3589  *
3590  * Configure the sequence number in the payloader of @stream to @seqnum.
3591  */
3592 void
3593 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3594 {
3595   GstRTSPStreamPrivate *priv;
3596
3597   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3598
3599   priv = stream->priv;
3600
3601   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3602 }
3603
3604 /**
3605  * gst_rtsp_stream_get_seqnum:
3606  * @stream: a #GstRTSPStream
3607  *
3608  * Get the configured sequence number in the payloader of @stream.
3609  *
3610  * Returns: the sequence number of the payloader.
3611  */
3612 guint16
3613 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3614 {
3615   GstRTSPStreamPrivate *priv;
3616   guint seqnum;
3617
3618   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3619
3620   priv = stream->priv;
3621
3622   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3623
3624   return seqnum;
3625 }
3626
3627 /**
3628  * gst_rtsp_stream_transport_filter:
3629  * @stream: a #GstRTSPStream
3630  * @func: (scope call) (allow-none): a callback
3631  * @user_data: (closure): user data passed to @func
3632  *
3633  * Call @func for each transport managed by @stream. The result value of @func
3634  * determines what happens to the transport. @func will be called with @stream
3635  * locked so no further actions on @stream can be performed from @func.
3636  *
3637  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3638  * @stream.
3639  *
3640  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3641  *
3642  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3643  * will also be added with an additional ref to the result #GList of this
3644  * function..
3645  *
3646  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3647  *
3648  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3649  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3650  * element in the #GList should be unreffed before the list is freed.
3651  */
3652 GList *
3653 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3654     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3655 {
3656   GstRTSPStreamPrivate *priv;
3657   GList *result, *walk, *next;
3658   GHashTable *visited = NULL;
3659   guint cookie;
3660
3661   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3662
3663   priv = stream->priv;
3664
3665   result = NULL;
3666   if (func)
3667     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3668
3669   g_mutex_lock (&priv->lock);
3670 restart:
3671   cookie = priv->transports_cookie;
3672   for (walk = priv->transports; walk; walk = next) {
3673     GstRTSPStreamTransport *trans = walk->data;
3674     GstRTSPFilterResult res;
3675     gboolean changed;
3676
3677     next = g_list_next (walk);
3678
3679     if (func) {
3680       /* only visit each transport once */
3681       if (g_hash_table_contains (visited, trans))
3682         continue;
3683
3684       g_hash_table_add (visited, g_object_ref (trans));
3685       g_mutex_unlock (&priv->lock);
3686
3687       res = func (stream, trans, user_data);
3688
3689       g_mutex_lock (&priv->lock);
3690     } else
3691       res = GST_RTSP_FILTER_REF;
3692
3693     changed = (cookie != priv->transports_cookie);
3694
3695     switch (res) {
3696       case GST_RTSP_FILTER_REMOVE:
3697         update_transport (stream, trans, FALSE);
3698         break;
3699       case GST_RTSP_FILTER_REF:
3700         result = g_list_prepend (result, g_object_ref (trans));
3701         break;
3702       case GST_RTSP_FILTER_KEEP:
3703       default:
3704         break;
3705     }
3706     if (changed)
3707       goto restart;
3708   }
3709   g_mutex_unlock (&priv->lock);
3710
3711   if (func)
3712     g_hash_table_unref (visited);
3713
3714   return result;
3715 }
3716
3717 static GstPadProbeReturn
3718 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3719 {
3720   GstRTSPStreamPrivate *priv;
3721   GstRTSPStream *stream;
3722
3723   stream = user_data;
3724   priv = stream->priv;
3725
3726   GST_DEBUG_OBJECT (pad, "now blocking");
3727
3728   g_mutex_lock (&priv->lock);
3729   priv->blocking = TRUE;
3730   g_mutex_unlock (&priv->lock);
3731
3732   gst_element_post_message (priv->payloader,
3733       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3734           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3735
3736   return GST_PAD_PROBE_OK;
3737 }
3738
3739 /**
3740  * gst_rtsp_stream_set_blocked:
3741  * @stream: a #GstRTSPStream
3742  * @blocked: boolean indicating we should block or unblock
3743  *
3744  * Blocks or unblocks the dataflow on @stream.
3745  *
3746  * Returns: %TRUE on success
3747  */
3748 gboolean
3749 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3750 {
3751   GstRTSPStreamPrivate *priv;
3752
3753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3754
3755   priv = stream->priv;
3756
3757   g_mutex_lock (&priv->lock);
3758   if (blocked) {
3759     priv->blocking = FALSE;
3760     if (priv->blocked_id == 0) {
3761       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3762           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3763           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3764           g_object_ref (stream), g_object_unref);
3765     }
3766   } else {
3767     if (priv->blocked_id != 0) {
3768       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3769       priv->blocked_id = 0;
3770       priv->blocking = FALSE;
3771     }
3772   }
3773   g_mutex_unlock (&priv->lock);
3774
3775   return TRUE;
3776 }
3777
3778 /**
3779  * gst_rtsp_stream_is_blocking:
3780  * @stream: a #GstRTSPStream
3781  *
3782  * Check if @stream is blocking on a #GstBuffer.
3783  *
3784  * Returns: %TRUE if @stream is blocking
3785  */
3786 gboolean
3787 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3788 {
3789   GstRTSPStreamPrivate *priv;
3790   gboolean result;
3791
3792   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3793
3794   priv = stream->priv;
3795
3796   g_mutex_lock (&priv->lock);
3797   result = priv->blocking;
3798   g_mutex_unlock (&priv->lock);
3799
3800   return result;
3801 }
3802
3803 /**
3804  * gst_rtsp_stream_query_position:
3805  * @stream: a #GstRTSPStream
3806  *
3807  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3808  * the RTP parts of the pipeline and not the RTCP parts.
3809  *
3810  * Returns: %TRUE if the position could be queried
3811  */
3812 gboolean
3813 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3814 {
3815   GstRTSPStreamPrivate *priv;
3816   GstElement *sink;
3817   gboolean ret;
3818
3819   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3820
3821   priv = stream->priv;
3822
3823   g_mutex_lock (&priv->lock);
3824   /* depending on the transport type, it should query corresponding sink */
3825   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3826       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3827     sink = priv->udpsink[0];
3828   else
3829     sink = priv->appsink[0];
3830
3831   if (sink)
3832     gst_object_ref (sink);
3833   g_mutex_unlock (&priv->lock);
3834
3835   if (!sink)
3836     return FALSE;
3837
3838   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3839   gst_object_unref (sink);
3840
3841   return ret;
3842 }
3843
3844 /**
3845  * gst_rtsp_stream_query_stop:
3846  * @stream: a #GstRTSPStream
3847  *
3848  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3849  * the RTP parts of the pipeline and not the RTCP parts.
3850  *
3851  * Returns: %TRUE if the stop could be queried
3852  */
3853 gboolean
3854 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3855 {
3856   GstRTSPStreamPrivate *priv;
3857   GstElement *sink;
3858   GstQuery *query;
3859   gboolean ret;
3860
3861   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3862
3863   priv = stream->priv;
3864
3865   g_mutex_lock (&priv->lock);
3866   /* depending on the transport type, it should query corresponding sink */
3867   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3868       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3869     sink = priv->udpsink[0];
3870   else
3871     sink = priv->appsink[0];
3872
3873   if (sink)
3874     gst_object_ref (sink);
3875   g_mutex_unlock (&priv->lock);
3876
3877   if (!sink)
3878     return FALSE;
3879
3880   query = gst_query_new_segment (GST_FORMAT_TIME);
3881   if ((ret = gst_element_query (sink, query))) {
3882     GstFormat format;
3883
3884     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3885     if (format != GST_FORMAT_TIME)
3886       *stop = -1;
3887   }
3888   gst_query_unref (query);
3889   gst_object_unref (sink);
3890
3891   return ret;
3892
3893 }