stream: revert back to create udpsrc/udpsink on DESCRIBE for unicast
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102
103   /* for UDP multicast */
104   GstElement *mcast_udpsrc_v4[2];
105   GstElement *mcast_udpsrc_v6[2];
106   GstElement *mcast_udpqueue[2];
107   GstElement *mcast_udpsink[2];
108
109   /* for TCP transport */
110   GstElement *appsrc[2];
111   GstClockTime appsrc_base_time[2];
112   GstElement *appqueue[2];
113   GstElement *appsink[2];
114
115   GstElement *tee[2];
116   GstElement *funnel[2];
117
118   /* retransmission */
119   GstElement *rtxsend;
120   guint rtx_pt;
121   GstClockTime rtx_time;
122
123   /* pool used to manage unicast and multicast addresses */
124   GstRTSPAddressPool *pool;
125
126   /* unicast server addr/port */
127   GstRTSPRange server_port_v4;
128   GstRTSPRange server_port_v6;
129   GstRTSPAddress *server_addr_v4;
130   GstRTSPAddress *server_addr_v6;
131
132   /* multicast addresses */
133   GstRTSPAddress *mcast_addr_v4;
134   GstRTSPAddress *mcast_addr_v6;
135
136   gchar *multicast_iface;
137
138   /* the caps of the stream */
139   gulong caps_sig;
140   GstCaps *caps;
141
142   /* transports we stream to */
143   guint n_active;
144   GList *transports;
145   guint transports_cookie;
146   GList *tr_cache_rtp;
147   GList *tr_cache_rtcp;
148   guint tr_cache_cookie_rtp;
149   guint tr_cache_cookie_rtcp;
150
151   gint dscp_qos;
152
153   /* stream blocking */
154   gulong blocked_id;
155   gboolean blocking;
156
157   /* pt->caps map for RECORD streams */
158   GHashTable *ptmap;
159
160   GstRTSPPublishClockMode publish_clock_mode;
161 };
162
163 #define DEFAULT_CONTROL         NULL
164 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
165 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
166                                         GST_RTSP_LOWER_TRANS_TCP
167
168 enum
169 {
170   PROP_0,
171   PROP_CONTROL,
172   PROP_PROFILES,
173   PROP_PROTOCOLS,
174   PROP_LAST
175 };
176
177 enum
178 {
179   SIGNAL_NEW_RTP_ENCODER,
180   SIGNAL_NEW_RTCP_ENCODER,
181   SIGNAL_LAST
182 };
183
184 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
185 #define GST_CAT_DEFAULT rtsp_stream_debug
186
187 static GQuark ssrc_stream_map_key;
188
189 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
190     GValue * value, GParamSpec * pspec);
191 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
192     const GValue * value, GParamSpec * pspec);
193
194 static void gst_rtsp_stream_finalize (GObject * obj);
195
196 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
197
198 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
199
200 static void
201 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
202 {
203   GObjectClass *gobject_class;
204
205   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
206
207   gobject_class = G_OBJECT_CLASS (klass);
208
209   gobject_class->get_property = gst_rtsp_stream_get_property;
210   gobject_class->set_property = gst_rtsp_stream_set_property;
211   gobject_class->finalize = gst_rtsp_stream_finalize;
212
213   g_object_class_install_property (gobject_class, PROP_CONTROL,
214       g_param_spec_string ("control", "Control",
215           "The control string for this stream", DEFAULT_CONTROL,
216           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
217
218   g_object_class_install_property (gobject_class, PROP_PROFILES,
219       g_param_spec_flags ("profiles", "Profiles",
220           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
221           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222
223   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
224       g_param_spec_flags ("protocols", "Protocols",
225           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
226           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
229       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
230       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
231       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
232
233   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
234       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
235       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
236       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
237
238   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
239
240   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
241 }
242
243 static void
244 gst_rtsp_stream_init (GstRTSPStream * stream)
245 {
246   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
247
248   GST_DEBUG ("new stream %p", stream);
249
250   stream->priv = priv;
251
252   priv->dscp_qos = -1;
253   priv->control = g_strdup (DEFAULT_CONTROL);
254   priv->profiles = DEFAULT_PROFILES;
255   priv->protocols = DEFAULT_PROTOCOLS;
256   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
257
258   g_mutex_init (&priv->lock);
259
260   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
261       NULL, (GDestroyNotify) gst_caps_unref);
262   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
263       (GDestroyNotify) gst_caps_unref);
264 }
265
266 static void
267 gst_rtsp_stream_finalize (GObject * obj)
268 {
269   GstRTSPStream *stream;
270   GstRTSPStreamPrivate *priv;
271
272   stream = GST_RTSP_STREAM (obj);
273   priv = stream->priv;
274
275   GST_DEBUG ("finalize stream %p", stream);
276
277   /* we really need to be unjoined now */
278   g_return_if_fail (priv->joined_bin == NULL);
279
280   if (priv->mcast_addr_v4)
281     gst_rtsp_address_free (priv->mcast_addr_v4);
282   if (priv->mcast_addr_v6)
283     gst_rtsp_address_free (priv->mcast_addr_v6);
284   if (priv->server_addr_v4)
285     gst_rtsp_address_free (priv->server_addr_v4);
286   if (priv->server_addr_v6)
287     gst_rtsp_address_free (priv->server_addr_v6);
288   if (priv->pool)
289     g_object_unref (priv->pool);
290   if (priv->rtxsend)
291     g_object_unref (priv->rtxsend);
292
293   g_free (priv->multicast_iface);
294
295   gst_object_unref (priv->payloader);
296   if (priv->srcpad)
297     gst_object_unref (priv->srcpad);
298   if (priv->sinkpad)
299     gst_object_unref (priv->sinkpad);
300   g_free (priv->control);
301   g_mutex_clear (&priv->lock);
302
303   g_hash_table_unref (priv->keys);
304   g_hash_table_destroy (priv->ptmap);
305
306   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
307 }
308
309 static void
310 gst_rtsp_stream_get_property (GObject * object, guint propid,
311     GValue * value, GParamSpec * pspec)
312 {
313   GstRTSPStream *stream = GST_RTSP_STREAM (object);
314
315   switch (propid) {
316     case PROP_CONTROL:
317       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
318       break;
319     case PROP_PROFILES:
320       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
321       break;
322     case PROP_PROTOCOLS:
323       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
324       break;
325     default:
326       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
327   }
328 }
329
330 static void
331 gst_rtsp_stream_set_property (GObject * object, guint propid,
332     const GValue * value, GParamSpec * pspec)
333 {
334   GstRTSPStream *stream = GST_RTSP_STREAM (object);
335
336   switch (propid) {
337     case PROP_CONTROL:
338       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
339       break;
340     case PROP_PROFILES:
341       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
342       break;
343     case PROP_PROTOCOLS:
344       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
345       break;
346     default:
347       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
348   }
349 }
350
351 /**
352  * gst_rtsp_stream_new:
353  * @idx: an index
354  * @pad: a #GstPad
355  * @payloader: a #GstElement
356  *
357  * Create a new media stream with index @idx that handles RTP data on
358  * @pad and has a payloader element @payloader if @pad is a source pad
359  * or a depayloader element @payloader if @pad is a sink pad.
360  *
361  * Returns: (transfer full): a new #GstRTSPStream
362  */
363 GstRTSPStream *
364 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
365 {
366   GstRTSPStreamPrivate *priv;
367   GstRTSPStream *stream;
368
369   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
370   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
371
372   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
373   priv = stream->priv;
374   priv->idx = idx;
375   priv->payloader = gst_object_ref (payloader);
376   if (GST_PAD_IS_SRC (pad))
377     priv->srcpad = gst_object_ref (pad);
378   else
379     priv->sinkpad = gst_object_ref (pad);
380
381   return stream;
382 }
383
384 /**
385  * gst_rtsp_stream_get_index:
386  * @stream: a #GstRTSPStream
387  *
388  * Get the stream index.
389  *
390  * Return: the stream index.
391  */
392 guint
393 gst_rtsp_stream_get_index (GstRTSPStream * stream)
394 {
395   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
396
397   return stream->priv->idx;
398 }
399
400 /**
401  * gst_rtsp_stream_get_pt:
402  * @stream: a #GstRTSPStream
403  *
404  * Get the stream payload type.
405  *
406  * Return: the stream payload type.
407  */
408 guint
409 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
410 {
411   GstRTSPStreamPrivate *priv;
412   guint pt;
413
414   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
415
416   priv = stream->priv;
417
418   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
419
420   return pt;
421 }
422
423 /**
424  * gst_rtsp_stream_get_srcpad:
425  * @stream: a #GstRTSPStream
426  *
427  * Get the srcpad associated with @stream.
428  *
429  * Returns: (transfer full): the srcpad. Unref after usage.
430  */
431 GstPad *
432 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
433 {
434   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
435
436   if (!stream->priv->srcpad)
437     return NULL;
438
439   return gst_object_ref (stream->priv->srcpad);
440 }
441
442 /**
443  * gst_rtsp_stream_get_sinkpad:
444  * @stream: a #GstRTSPStream
445  *
446  * Get the sinkpad associated with @stream.
447  *
448  * Returns: (transfer full): the sinkpad. Unref after usage.
449  */
450 GstPad *
451 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
452 {
453   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
454
455   if (!stream->priv->sinkpad)
456     return NULL;
457
458   return gst_object_ref (stream->priv->sinkpad);
459 }
460
461 /**
462  * gst_rtsp_stream_get_control:
463  * @stream: a #GstRTSPStream
464  *
465  * Get the control string to identify this stream.
466  *
467  * Returns: (transfer full): the control string. g_free() after usage.
468  */
469 gchar *
470 gst_rtsp_stream_get_control (GstRTSPStream * stream)
471 {
472   GstRTSPStreamPrivate *priv;
473   gchar *result;
474
475   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
476
477   priv = stream->priv;
478
479   g_mutex_lock (&priv->lock);
480   if ((result = g_strdup (priv->control)) == NULL)
481     result = g_strdup_printf ("stream=%u", priv->idx);
482   g_mutex_unlock (&priv->lock);
483
484   return result;
485 }
486
487 /**
488  * gst_rtsp_stream_set_control:
489  * @stream: a #GstRTSPStream
490  * @control: a control string
491  *
492  * Set the control string in @stream.
493  */
494 void
495 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
496 {
497   GstRTSPStreamPrivate *priv;
498
499   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
500
501   priv = stream->priv;
502
503   g_mutex_lock (&priv->lock);
504   g_free (priv->control);
505   priv->control = g_strdup (control);
506   g_mutex_unlock (&priv->lock);
507 }
508
509 /**
510  * gst_rtsp_stream_has_control:
511  * @stream: a #GstRTSPStream
512  * @control: a control string
513  *
514  * Check if @stream has the control string @control.
515  *
516  * Returns: %TRUE is @stream has @control as the control string
517  */
518 gboolean
519 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
520 {
521   GstRTSPStreamPrivate *priv;
522   gboolean res;
523
524   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
525
526   priv = stream->priv;
527
528   g_mutex_lock (&priv->lock);
529   if (priv->control)
530     res = (g_strcmp0 (priv->control, control) == 0);
531   else {
532     guint streamid;
533
534     if (sscanf (control, "stream=%u", &streamid) > 0)
535       res = (streamid == priv->idx);
536     else
537       res = FALSE;
538   }
539   g_mutex_unlock (&priv->lock);
540
541   return res;
542 }
543
544 /**
545  * gst_rtsp_stream_set_mtu:
546  * @stream: a #GstRTSPStream
547  * @mtu: a new MTU
548  *
549  * Configure the mtu in the payloader of @stream to @mtu.
550  */
551 void
552 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
553 {
554   GstRTSPStreamPrivate *priv;
555
556   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
557
558   priv = stream->priv;
559
560   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
561
562   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
563 }
564
565 /**
566  * gst_rtsp_stream_get_mtu:
567  * @stream: a #GstRTSPStream
568  *
569  * Get the configured MTU in the payloader of @stream.
570  *
571  * Returns: the MTU of the payloader.
572  */
573 guint
574 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
575 {
576   GstRTSPStreamPrivate *priv;
577   guint mtu;
578
579   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
580
581   priv = stream->priv;
582
583   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
584
585   return mtu;
586 }
587
588 /* Update the dscp qos property on the udp sinks */
589 static void
590 update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2])
591 {
592   GstRTSPStreamPrivate *priv;
593
594   priv = stream->priv;
595
596   if (udpsink[0]) {
597     g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL);
598   }
599
600   if (udpsink[1]) {
601     g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL);
602   }
603 }
604
605 /**
606  * gst_rtsp_stream_set_dscp_qos:
607  * @stream: a #GstRTSPStream
608  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
609  *
610  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
611  */
612 void
613 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
614 {
615   GstRTSPStreamPrivate *priv;
616
617   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
618
619   priv = stream->priv;
620
621   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
622
623   if (dscp_qos < -1 || dscp_qos > 63) {
624     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
625     return;
626   }
627
628   priv->dscp_qos = dscp_qos;
629
630   update_dscp_qos (stream, priv->udpsink);
631 }
632
633 /**
634  * gst_rtsp_stream_get_dscp_qos:
635  * @stream: a #GstRTSPStream
636  *
637  * Get the configured DSCP QoS in of the outgoing sockets.
638  *
639  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
640  */
641 gint
642 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
643 {
644   GstRTSPStreamPrivate *priv;
645
646   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
647
648   priv = stream->priv;
649
650   return priv->dscp_qos;
651 }
652
653 /**
654  * gst_rtsp_stream_is_transport_supported:
655  * @stream: a #GstRTSPStream
656  * @transport: (transfer none): a #GstRTSPTransport
657  *
658  * Check if @transport can be handled by stream
659  *
660  * Returns: %TRUE if @transport can be handled by @stream.
661  */
662 gboolean
663 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
664     GstRTSPTransport * transport)
665 {
666   GstRTSPStreamPrivate *priv;
667
668   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
669
670   priv = stream->priv;
671
672   g_mutex_lock (&priv->lock);
673   if (transport->trans != GST_RTSP_TRANS_RTP)
674     goto unsupported_transmode;
675
676   if (!(transport->profile & priv->profiles))
677     goto unsupported_profile;
678
679   if (!(transport->lower_transport & priv->protocols))
680     goto unsupported_ltrans;
681
682   g_mutex_unlock (&priv->lock);
683
684   return TRUE;
685
686   /* ERRORS */
687 unsupported_transmode:
688   {
689     GST_DEBUG ("unsupported transport mode %d", transport->trans);
690     g_mutex_unlock (&priv->lock);
691     return FALSE;
692   }
693 unsupported_profile:
694   {
695     GST_DEBUG ("unsupported profile %d", transport->profile);
696     g_mutex_unlock (&priv->lock);
697     return FALSE;
698   }
699 unsupported_ltrans:
700   {
701     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
702     g_mutex_unlock (&priv->lock);
703     return FALSE;
704   }
705 }
706
707 /**
708  * gst_rtsp_stream_set_profiles:
709  * @stream: a #GstRTSPStream
710  * @profiles: the new profiles
711  *
712  * Configure the allowed profiles for @stream.
713  */
714 void
715 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
716 {
717   GstRTSPStreamPrivate *priv;
718
719   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
720
721   priv = stream->priv;
722
723   g_mutex_lock (&priv->lock);
724   priv->profiles = profiles;
725   g_mutex_unlock (&priv->lock);
726 }
727
728 /**
729  * gst_rtsp_stream_get_profiles:
730  * @stream: a #GstRTSPStream
731  *
732  * Get the allowed profiles of @stream.
733  *
734  * Returns: a #GstRTSPProfile
735  */
736 GstRTSPProfile
737 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
738 {
739   GstRTSPStreamPrivate *priv;
740   GstRTSPProfile res;
741
742   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
743
744   priv = stream->priv;
745
746   g_mutex_lock (&priv->lock);
747   res = priv->profiles;
748   g_mutex_unlock (&priv->lock);
749
750   return res;
751 }
752
753 /**
754  * gst_rtsp_stream_set_protocols:
755  * @stream: a #GstRTSPStream
756  * @protocols: the new flags
757  *
758  * Configure the allowed lower transport for @stream.
759  */
760 void
761 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
762     GstRTSPLowerTrans protocols)
763 {
764   GstRTSPStreamPrivate *priv;
765
766   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
767
768   priv = stream->priv;
769
770   g_mutex_lock (&priv->lock);
771   priv->protocols = protocols;
772   g_mutex_unlock (&priv->lock);
773 }
774
775 /**
776  * gst_rtsp_stream_get_protocols:
777  * @stream: a #GstRTSPStream
778  *
779  * Get the allowed protocols of @stream.
780  *
781  * Returns: a #GstRTSPLowerTrans
782  */
783 GstRTSPLowerTrans
784 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
785 {
786   GstRTSPStreamPrivate *priv;
787   GstRTSPLowerTrans res;
788
789   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
790       GST_RTSP_LOWER_TRANS_UNKNOWN);
791
792   priv = stream->priv;
793
794   g_mutex_lock (&priv->lock);
795   res = priv->protocols;
796   g_mutex_unlock (&priv->lock);
797
798   return res;
799 }
800
801 /**
802  * gst_rtsp_stream_set_address_pool:
803  * @stream: a #GstRTSPStream
804  * @pool: (transfer none): a #GstRTSPAddressPool
805  *
806  * configure @pool to be used as the address pool of @stream.
807  */
808 void
809 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
810     GstRTSPAddressPool * pool)
811 {
812   GstRTSPStreamPrivate *priv;
813   GstRTSPAddressPool *old;
814
815   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
816
817   priv = stream->priv;
818
819   GST_LOG_OBJECT (stream, "set address pool %p", pool);
820
821   g_mutex_lock (&priv->lock);
822   if ((old = priv->pool) != pool)
823     priv->pool = pool ? g_object_ref (pool) : NULL;
824   else
825     old = NULL;
826   g_mutex_unlock (&priv->lock);
827
828   if (old)
829     g_object_unref (old);
830 }
831
832 /**
833  * gst_rtsp_stream_get_address_pool:
834  * @stream: a #GstRTSPStream
835  *
836  * Get the #GstRTSPAddressPool used as the address pool of @stream.
837  *
838  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
839  * usage.
840  */
841 GstRTSPAddressPool *
842 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
843 {
844   GstRTSPStreamPrivate *priv;
845   GstRTSPAddressPool *result;
846
847   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
848
849   priv = stream->priv;
850
851   g_mutex_lock (&priv->lock);
852   if ((result = priv->pool))
853     g_object_ref (result);
854   g_mutex_unlock (&priv->lock);
855
856   return result;
857 }
858
859 /**
860  * gst_rtsp_stream_set_multicast_iface:
861  * @stream: a #GstRTSPStream
862  * @multicast_iface: (transfer none): a multicast interface
863  *
864  * configure @multicast_iface to be used for @stream.
865  */
866 void
867 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
868     const gchar * multicast_iface)
869 {
870   GstRTSPStreamPrivate *priv;
871   gchar *old;
872
873   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
874
875   priv = stream->priv;
876
877   GST_LOG_OBJECT (stream, "set multicast iface %s",
878       GST_STR_NULL (multicast_iface));
879
880   g_mutex_lock (&priv->lock);
881   if ((old = priv->multicast_iface) != multicast_iface)
882     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
883   else
884     old = NULL;
885   g_mutex_unlock (&priv->lock);
886
887   if (old)
888     g_free (old);
889 }
890
891 /**
892  * gst_rtsp_stream_get_multicast_iface:
893  * @stream: a #GstRTSPStream
894  *
895  * Get the multicast interface used for @stream.
896  *
897  * Returns: (transfer full): the multicast interface for @stream. g_free() after
898  * usage.
899  */
900 gchar *
901 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
902 {
903   GstRTSPStreamPrivate *priv;
904   gchar *result;
905
906   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
907
908   priv = stream->priv;
909
910   g_mutex_lock (&priv->lock);
911   if ((result = priv->multicast_iface))
912     result = g_strdup (result);
913   g_mutex_unlock (&priv->lock);
914
915   return result;
916 }
917
918 /**
919  * gst_rtsp_stream_get_multicast_address:
920  * @stream: a #GstRTSPStream
921  * @family: the #GSocketFamily
922  *
923  * Get the multicast address of @stream for @family. The original
924  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
925  * won't release the address from the pool.
926  *
927  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
928  * or %NULL when no address could be allocated. gst_rtsp_address_free()
929  * after usage.
930  */
931 GstRTSPAddress *
932 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
933     GSocketFamily family)
934 {
935   GstRTSPStreamPrivate *priv;
936   GstRTSPAddress *result;
937   GstRTSPAddress **addrp;
938   GstRTSPAddressFlags flags;
939
940   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
941
942   priv = stream->priv;
943
944   if (family == G_SOCKET_FAMILY_IPV6) {
945     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
946     addrp = &priv->mcast_addr_v6;
947   } else {
948     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
949     addrp = &priv->mcast_addr_v4;
950   }
951
952   g_mutex_lock (&priv->lock);
953   if (*addrp == NULL) {
954     if (priv->pool == NULL)
955       goto no_pool;
956
957     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
958
959     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
960     if (*addrp == NULL)
961       goto no_address;
962
963     /* FIXME: Also reserve the same port with unicast ANY address, since that's
964      * where we are going to bind our socket. Probably loop until we find a port
965      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
966      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
967      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
968   }
969   result = gst_rtsp_address_copy (*addrp);
970   g_mutex_unlock (&priv->lock);
971
972   return result;
973
974   /* ERRORS */
975 no_pool:
976   {
977     GST_ERROR_OBJECT (stream, "no address pool specified");
978     g_mutex_unlock (&priv->lock);
979     return NULL;
980   }
981 no_address:
982   {
983     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
984     g_mutex_unlock (&priv->lock);
985     return NULL;
986   }
987 }
988
989 /**
990  * gst_rtsp_stream_reserve_address:
991  * @stream: a #GstRTSPStream
992  * @address: an address
993  * @port: a port
994  * @n_ports: n_ports
995  * @ttl: a TTL
996  *
997  * Reserve @address and @port as the address and port of @stream. The original
998  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
999  * won't release the address from the pool.
1000  *
1001  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1002  * the address could be reserved. gst_rtsp_address_free() after usage.
1003  */
1004 GstRTSPAddress *
1005 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1006     const gchar * address, guint port, guint n_ports, guint ttl)
1007 {
1008   GstRTSPStreamPrivate *priv;
1009   GstRTSPAddress *result;
1010   GInetAddress *addr;
1011   GSocketFamily family;
1012   GstRTSPAddress **addrp;
1013
1014   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1015   g_return_val_if_fail (address != NULL, NULL);
1016   g_return_val_if_fail (port > 0, NULL);
1017   g_return_val_if_fail (n_ports > 0, NULL);
1018   g_return_val_if_fail (ttl > 0, NULL);
1019
1020   priv = stream->priv;
1021
1022   addr = g_inet_address_new_from_string (address);
1023   if (!addr) {
1024     GST_ERROR ("failed to get inet addr from %s", address);
1025     family = G_SOCKET_FAMILY_IPV4;
1026   } else {
1027     family = g_inet_address_get_family (addr);
1028     g_object_unref (addr);
1029   }
1030
1031   if (family == G_SOCKET_FAMILY_IPV6)
1032     addrp = &priv->mcast_addr_v6;
1033   else
1034     addrp = &priv->mcast_addr_v4;
1035
1036   g_mutex_lock (&priv->lock);
1037   if (*addrp == NULL) {
1038     GstRTSPAddressPoolResult res;
1039
1040     if (priv->pool == NULL)
1041       goto no_pool;
1042
1043     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1044         port, n_ports, ttl, addrp);
1045     if (res != GST_RTSP_ADDRESS_POOL_OK)
1046       goto no_address;
1047
1048     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1049      * where we are going to bind our socket. */
1050   } else {
1051     if (strcmp ((*addrp)->address, address) ||
1052         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1053         (*addrp)->ttl != ttl)
1054       goto different_address;
1055   }
1056   result = gst_rtsp_address_copy (*addrp);
1057   g_mutex_unlock (&priv->lock);
1058
1059   return result;
1060
1061   /* ERRORS */
1062 no_pool:
1063   {
1064     GST_ERROR_OBJECT (stream, "no address pool specified");
1065     g_mutex_unlock (&priv->lock);
1066     return NULL;
1067   }
1068 no_address:
1069   {
1070     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1071         address);
1072     g_mutex_unlock (&priv->lock);
1073     return NULL;
1074   }
1075 different_address:
1076   {
1077     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1078         " reserved", address);
1079     g_mutex_unlock (&priv->lock);
1080     return NULL;
1081   }
1082 }
1083
1084 /* must be called with lock */
1085 static void
1086 set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket,
1087     GSocket * rtcp_socket, GSocketFamily family)
1088 {
1089   const gchar *multisink_socket;
1090
1091   if (family == G_SOCKET_FAMILY_IPV6)
1092     multisink_socket = "socket-v6";
1093   else
1094     multisink_socket = "socket";
1095
1096   g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL);
1097   g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL);
1098 }
1099
1100 static gboolean
1101 create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2],
1102     const gchar * multicast_iface)
1103 {
1104   GstRTSPStreamPrivate *priv = stream->priv;
1105   GstElement *udpsink0, *udpsink1;
1106
1107   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1108   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1109
1110   if (!udpsink0 || !udpsink1)
1111     goto no_udp_protocol;
1112
1113   /* configure sinks */
1114
1115   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1116   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1117
1118   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1119   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1120
1121   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1122
1123   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1124   /* Needs to be async for RECORD streams, otherwise we will never go to
1125    * PLAYING because the sinks will wait for data while the udpsrc can't
1126    * provide data with timestamps in PAUSED. */
1127   if (priv->sinkpad)
1128     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1129   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1130
1131   /* join multicast group when adding clients, so we'll start receiving from it.
1132    * We cannot rely on the udpsrc to join the group since its socket is always a
1133    * local unicast one. */
1134   g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL);
1135   g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL);
1136
1137   g_object_set (G_OBJECT (udpsink0), "multicast-iface", multicast_iface, NULL);
1138   g_object_set (G_OBJECT (udpsink1), "multicast-iface", multicast_iface, NULL);
1139
1140   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1141   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1142
1143   udpsink[0] = udpsink0;
1144   udpsink[1] = udpsink1;
1145
1146   /* update the dscp qos field in the sinks */
1147   update_dscp_qos (stream, udpsink);
1148
1149   return TRUE;
1150
1151   /* ERRORS */
1152 no_udp_protocol:
1153   {
1154     return FALSE;
1155   }
1156 }
1157
1158 /* must be called with lock */
1159 static gboolean
1160 create_and_configure_udpsources (GstElement * udpsrc_out[2],
1161     GSocket * rtp_socket, GSocket * rtcp_socket)
1162 {
1163   GstStateChangeReturn ret;
1164
1165   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1166   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1167
1168   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1169     goto error;
1170
1171   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1172   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1173
1174   /* The udpsrc cannot do the join because its socket is always a local unicast
1175    * one. The udpsink sharing the same socket will do it for us. */
1176   g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL);
1177   g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL);
1178
1179   g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1180   g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1181
1182   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1183   if (ret == GST_STATE_CHANGE_FAILURE)
1184     goto error;
1185   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1186   if (ret == GST_STATE_CHANGE_FAILURE)
1187     goto error;
1188
1189   return TRUE;
1190
1191   /* ERRORS */
1192 error:
1193   {
1194     if (udpsrc_out[0]) {
1195       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1196       g_clear_object (&udpsrc_out[0]);
1197     }
1198     if (udpsrc_out[1]) {
1199       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1200       g_clear_object (&udpsrc_out[1]);
1201     }
1202     return FALSE;
1203   }
1204 }
1205
1206 static gboolean
1207 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1208     GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
1209     GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out)
1210 {
1211   GstRTSPStreamPrivate *priv = stream->priv;
1212   GSocket *rtp_socket = NULL;
1213   GSocket *rtcp_socket;
1214   gint tmp_rtp, tmp_rtcp;
1215   guint count;
1216   gint rtpport, rtcpport;
1217   GList *rejected_addresses = NULL;
1218   GstRTSPAddress *addr = NULL;
1219   GInetAddress *inetaddr = NULL;
1220   gchar *addr_str;
1221   GSocketAddress *rtp_sockaddr = NULL;
1222   GSocketAddress *rtcp_sockaddr = NULL;
1223   GstRTSPAddressPool *pool;
1224
1225   g_assert (!udpsrc_out[0]);
1226   g_assert (!udpsrc_out[1]);
1227   g_assert ((!udpsink_out[0] && !udpsink_out[1]) ||
1228       (udpsink_out[0] && udpsink_out[1]));
1229   g_assert (*server_addr_out == NULL);
1230
1231   pool = priv->pool;
1232   count = 0;
1233
1234   /* Start with random port */
1235   tmp_rtp = 0;
1236
1237   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1238       G_SOCKET_PROTOCOL_UDP, NULL);
1239   if (!rtcp_socket)
1240     goto no_udp_protocol;
1241   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1242
1243   /* try to allocate 2 UDP ports, the RTP port should be an even
1244    * number and the RTCP port should be the next (uneven) port */
1245 again:
1246
1247   if (rtp_socket == NULL) {
1248     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1249         G_SOCKET_PROTOCOL_UDP, NULL);
1250     if (!rtp_socket)
1251       goto no_udp_protocol;
1252     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1253   }
1254
1255   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1256     GstRTSPAddressFlags flags;
1257
1258     if (addr)
1259       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1260
1261     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1262     if (family == G_SOCKET_FAMILY_IPV6)
1263       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1264     else
1265       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1266
1267     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1268
1269     if (addr == NULL)
1270       goto no_ports;
1271
1272     tmp_rtp = addr->port;
1273
1274     g_clear_object (&inetaddr);
1275     inetaddr = g_inet_address_new_from_string (addr->address);
1276   } else {
1277     if (tmp_rtp != 0) {
1278       tmp_rtp += 2;
1279       if (++count > 20)
1280         goto no_ports;
1281     }
1282
1283     if (inetaddr == NULL)
1284       inetaddr = g_inet_address_new_any (family);
1285   }
1286
1287   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1288   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1289     g_object_unref (rtp_sockaddr);
1290     goto again;
1291   }
1292   g_object_unref (rtp_sockaddr);
1293
1294   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1295   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1296     g_clear_object (&rtp_sockaddr);
1297     goto socket_error;
1298   }
1299
1300   tmp_rtp =
1301       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1302   g_object_unref (rtp_sockaddr);
1303
1304   /* check if port is even */
1305   if ((tmp_rtp & 1) != 0) {
1306     /* port not even, close and allocate another */
1307     tmp_rtp++;
1308     g_clear_object (&rtp_socket);
1309     goto again;
1310   }
1311
1312   /* set port */
1313   tmp_rtcp = tmp_rtp + 1;
1314
1315   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1316   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1317     g_object_unref (rtcp_sockaddr);
1318     g_clear_object (&rtp_socket);
1319     goto again;
1320   }
1321   g_object_unref (rtcp_sockaddr);
1322
1323   if (addr == NULL)
1324     addr_str = g_inet_address_to_string (inetaddr);
1325   else
1326     addr_str = addr->address;
1327   g_clear_object (&inetaddr);
1328
1329   if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
1330     if (addr == NULL)
1331       g_free (addr_str);
1332     goto no_udp_protocol;
1333   }
1334
1335   if (addr == NULL)
1336     g_free (addr_str);
1337
1338   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1339   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1340
1341   /* this should not happen... */
1342   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1343     goto port_error;
1344
1345   server_port_out->min = rtpport;
1346   server_port_out->max = rtcpport;
1347
1348   /* This function is called twice (for v4 and v6) but we create only one pair
1349    * of udpsinks. */
1350   if (!udpsink_out[0]
1351       && !create_and_configure_udpsinks (stream, udpsink_out, NULL))
1352     goto no_udp_protocol;
1353
1354   set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
1355
1356   *server_addr_out = addr;
1357   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1358
1359   g_object_unref (rtp_socket);
1360   g_object_unref (rtcp_socket);
1361
1362   return TRUE;
1363
1364   /* ERRORS */
1365 no_udp_protocol:
1366   {
1367     goto cleanup;
1368   }
1369 no_ports:
1370   {
1371     goto cleanup;
1372   }
1373 port_error:
1374   {
1375     goto cleanup;
1376   }
1377 socket_error:
1378   {
1379     goto cleanup;
1380   }
1381 cleanup:
1382   {
1383     if (inetaddr)
1384       g_object_unref (inetaddr);
1385     g_list_free_full (rejected_addresses,
1386         (GDestroyNotify) gst_rtsp_address_free);
1387     if (addr)
1388       gst_rtsp_address_free (addr);
1389     if (rtp_socket)
1390       g_object_unref (rtp_socket);
1391     if (rtcp_socket)
1392       g_object_unref (rtcp_socket);
1393     return FALSE;
1394   }
1395 }
1396
1397 /**
1398  * gst_rtsp_stream_allocate_udp_sockets:
1399  * @stream: a #GstRTSPStream
1400  * @family: protocol family
1401  * @transport_method: transport method
1402  *
1403  * Allocates RTP and RTCP ports.
1404  *
1405  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1406  * Deprecated: This function shouldn't have been made public
1407  */
1408 gboolean
1409 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1410     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1411 {
1412   g_warn_if_reached ();
1413   return FALSE;
1414 }
1415
1416 /**
1417  * gst_rtsp_stream_set_client_side:
1418  * @stream: a #GstRTSPStream
1419  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1420  * an RTSP connection.
1421  *
1422  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1423  * streams to an RTSP server via RECORD. This has the practical effect
1424  * of changing which UDP port numbers are used when setting up the local
1425  * side of the stream sending to be either the 'server' or 'client' pair
1426  * of a configured UDP transport.
1427  */
1428 void
1429 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1430 {
1431   GstRTSPStreamPrivate *priv;
1432
1433   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1434   priv = stream->priv;
1435   g_mutex_lock (&priv->lock);
1436   priv->client_side = client_side;
1437   g_mutex_unlock (&priv->lock);
1438 }
1439
1440 /**
1441  * gst_rtsp_stream_is_client_side:
1442  * @stream: a #GstRTSPStream
1443  *
1444  * See gst_rtsp_stream_set_client_side()
1445  *
1446  * Returns: TRUE if this #GstRTSPStream is client-side.
1447  */
1448 gboolean
1449 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1450 {
1451   GstRTSPStreamPrivate *priv;
1452   gboolean ret;
1453
1454   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1455
1456   priv = stream->priv;
1457   g_mutex_lock (&priv->lock);
1458   ret = priv->client_side;
1459   g_mutex_unlock (&priv->lock);
1460
1461   return ret;
1462 }
1463
1464 /* must be called with lock */
1465 static gboolean
1466 alloc_ports (GstRTSPStream * stream)
1467 {
1468   GstRTSPStreamPrivate *priv = stream->priv;
1469   gboolean ret = TRUE;
1470
1471   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
1472       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1473     ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1474         priv->udpsrc_v4, priv->udpsink,
1475         &priv->server_port_v4, &priv->server_addr_v4);
1476
1477     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1478         priv->udpsrc_v6, priv->udpsink,
1479         &priv->server_port_v6, &priv->server_addr_v6);
1480   }
1481
1482   return ret;
1483 }
1484
1485 /**
1486  * gst_rtsp_stream_get_server_port:
1487  * @stream: a #GstRTSPStream
1488  * @server_port: (out): result server port
1489  * @family: the port family to get
1490  *
1491  * Fill @server_port with the port pair used by the server. This function can
1492  * only be called when @stream has been joined.
1493  */
1494 void
1495 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1496     GstRTSPRange * server_port, GSocketFamily family)
1497 {
1498   GstRTSPStreamPrivate *priv;
1499
1500   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1501   priv = stream->priv;
1502   g_return_if_fail (priv->joined_bin != NULL);
1503
1504   g_mutex_lock (&priv->lock);
1505   if (family == G_SOCKET_FAMILY_IPV4) {
1506     if (server_port)
1507       *server_port = priv->server_port_v4;
1508   } else {
1509     if (server_port)
1510       *server_port = priv->server_port_v6;
1511   }
1512   g_mutex_unlock (&priv->lock);
1513 }
1514
1515 /**
1516  * gst_rtsp_stream_get_rtpsession:
1517  * @stream: a #GstRTSPStream
1518  *
1519  * Get the RTP session of this stream.
1520  *
1521  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1522  */
1523 GObject *
1524 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1525 {
1526   GstRTSPStreamPrivate *priv;
1527   GObject *session;
1528
1529   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1530
1531   priv = stream->priv;
1532
1533   g_mutex_lock (&priv->lock);
1534   if ((session = priv->session))
1535     g_object_ref (session);
1536   g_mutex_unlock (&priv->lock);
1537
1538   return session;
1539 }
1540
1541 /**
1542  * gst_rtsp_stream_get_encoder:
1543  * @stream: a #GstRTSPStream
1544  *
1545  * Get the SRTP encoder for this stream.
1546  *
1547  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1548  */
1549 GstElement *
1550 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1551 {
1552   GstRTSPStreamPrivate *priv;
1553   GstElement *encoder;
1554
1555   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1556
1557   priv = stream->priv;
1558
1559   g_mutex_lock (&priv->lock);
1560   if ((encoder = priv->srtpenc))
1561     g_object_ref (encoder);
1562   g_mutex_unlock (&priv->lock);
1563
1564   return encoder;
1565 }
1566
1567 /**
1568  * gst_rtsp_stream_get_ssrc:
1569  * @stream: a #GstRTSPStream
1570  * @ssrc: (out): result ssrc
1571  *
1572  * Get the SSRC used by the RTP session of this stream. This function can only
1573  * be called when @stream has been joined.
1574  */
1575 void
1576 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1577 {
1578   GstRTSPStreamPrivate *priv;
1579
1580   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1581   priv = stream->priv;
1582   g_return_if_fail (priv->joined_bin != NULL);
1583
1584   g_mutex_lock (&priv->lock);
1585   if (ssrc && priv->session)
1586     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1587   g_mutex_unlock (&priv->lock);
1588 }
1589
1590 /**
1591  * gst_rtsp_stream_set_retransmission_time:
1592  * @stream: a #GstRTSPStream
1593  * @time: a #GstClockTime
1594  *
1595  * Set the amount of time to store retransmission packets.
1596  */
1597 void
1598 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1599     GstClockTime time)
1600 {
1601   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1602
1603   g_mutex_lock (&stream->priv->lock);
1604   stream->priv->rtx_time = time;
1605   if (stream->priv->rtxsend)
1606     g_object_set (stream->priv->rtxsend, "max-size-time",
1607         GST_TIME_AS_MSECONDS (time), NULL);
1608   g_mutex_unlock (&stream->priv->lock);
1609 }
1610
1611 /**
1612  * gst_rtsp_stream_get_retransmission_time:
1613  * @stream: a #GstRTSPStream
1614  *
1615  * Get the amount of time to store retransmission data.
1616  *
1617  * Returns: the amount of time to store retransmission data.
1618  */
1619 GstClockTime
1620 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1621 {
1622   GstClockTime ret;
1623
1624   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1625
1626   g_mutex_lock (&stream->priv->lock);
1627   ret = stream->priv->rtx_time;
1628   g_mutex_unlock (&stream->priv->lock);
1629
1630   return ret;
1631 }
1632
1633 /**
1634  * gst_rtsp_stream_set_retransmission_pt:
1635  * @stream: a #GstRTSPStream
1636  * @rtx_pt: a #guint
1637  *
1638  * Set the payload type (pt) for retransmission of this stream.
1639  */
1640 void
1641 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1642 {
1643   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1644
1645   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1646
1647   g_mutex_lock (&stream->priv->lock);
1648   stream->priv->rtx_pt = rtx_pt;
1649   if (stream->priv->rtxsend) {
1650     guint pt = gst_rtsp_stream_get_pt (stream);
1651     gchar *pt_s = g_strdup_printf ("%d", pt);
1652     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1653         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1654     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1655     g_free (pt_s);
1656     gst_structure_free (rtx_pt_map);
1657   }
1658   g_mutex_unlock (&stream->priv->lock);
1659 }
1660
1661 /**
1662  * gst_rtsp_stream_get_retransmission_pt:
1663  * @stream: a #GstRTSPStream
1664  *
1665  * Get the payload-type used for retransmission of this stream
1666  *
1667  * Returns: The retransmission PT.
1668  */
1669 guint
1670 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1671 {
1672   guint rtx_pt;
1673
1674   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1675
1676   g_mutex_lock (&stream->priv->lock);
1677   rtx_pt = stream->priv->rtx_pt;
1678   g_mutex_unlock (&stream->priv->lock);
1679
1680   return rtx_pt;
1681 }
1682
1683 /**
1684  * gst_rtsp_stream_set_buffer_size:
1685  * @stream: a #GstRTSPStream
1686  * @size: the buffer size
1687  *
1688  * Set the size of the UDP transmission buffer (in bytes)
1689  * Needs to be set before the stream is joined to a bin.
1690  *
1691  * Since: 1.6
1692  */
1693 void
1694 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1695 {
1696   g_mutex_lock (&stream->priv->lock);
1697   stream->priv->buffer_size = size;
1698   g_mutex_unlock (&stream->priv->lock);
1699 }
1700
1701 /**
1702  * gst_rtsp_stream_get_buffer_size:
1703  * @stream: a #GstRTSPStream
1704  *
1705  * Get the size of the UDP transmission buffer (in bytes)
1706  *
1707  * Returns: the size of the UDP TX buffer
1708  *
1709  * Since: 1.6
1710  */
1711 guint
1712 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1713 {
1714   guint buffer_size;
1715
1716   g_mutex_lock (&stream->priv->lock);
1717   buffer_size = stream->priv->buffer_size;
1718   g_mutex_unlock (&stream->priv->lock);
1719
1720   return buffer_size;
1721 }
1722
1723 /* executed from streaming thread */
1724 static void
1725 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1726 {
1727   GstRTSPStreamPrivate *priv = stream->priv;
1728   GstCaps *newcaps, *oldcaps;
1729
1730   newcaps = gst_pad_get_current_caps (pad);
1731
1732   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1733       newcaps);
1734
1735   g_mutex_lock (&priv->lock);
1736   oldcaps = priv->caps;
1737   priv->caps = newcaps;
1738   g_mutex_unlock (&priv->lock);
1739
1740   if (oldcaps)
1741     gst_caps_unref (oldcaps);
1742 }
1743
1744 static void
1745 dump_structure (const GstStructure * s)
1746 {
1747   gchar *sstr;
1748
1749   sstr = gst_structure_to_string (s);
1750   GST_INFO ("structure: %s", sstr);
1751   g_free (sstr);
1752 }
1753
1754 static GstRTSPStreamTransport *
1755 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1756 {
1757   GstRTSPStreamPrivate *priv = stream->priv;
1758   GList *walk;
1759   GstRTSPStreamTransport *result = NULL;
1760   const gchar *tmp;
1761   gchar *dest;
1762   guint port;
1763
1764   if (rtcp_from == NULL)
1765     return NULL;
1766
1767   tmp = g_strrstr (rtcp_from, ":");
1768   if (tmp == NULL)
1769     return NULL;
1770
1771   port = atoi (tmp + 1);
1772   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1773
1774   g_mutex_lock (&priv->lock);
1775   GST_INFO ("finding %s:%d in %d transports", dest, port,
1776       g_list_length (priv->transports));
1777
1778   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1779     GstRTSPStreamTransport *trans = walk->data;
1780     const GstRTSPTransport *tr;
1781     gint min, max;
1782
1783     tr = gst_rtsp_stream_transport_get_transport (trans);
1784
1785     if (priv->client_side) {
1786       /* In client side mode the 'destination' is the RTSP server, so send
1787        * to those ports */
1788       min = tr->server_port.min;
1789       max = tr->server_port.max;
1790     } else {
1791       min = tr->client_port.min;
1792       max = tr->client_port.max;
1793     }
1794
1795     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1796       result = trans;
1797       break;
1798     }
1799   }
1800   if (result)
1801     g_object_ref (result);
1802   g_mutex_unlock (&priv->lock);
1803
1804   g_free (dest);
1805
1806   return result;
1807 }
1808
1809 static GstRTSPStreamTransport *
1810 check_transport (GObject * source, GstRTSPStream * stream)
1811 {
1812   GstStructure *stats;
1813   GstRTSPStreamTransport *trans;
1814
1815   /* see if we have a stream to match with the origin of the RTCP packet */
1816   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1817   if (trans == NULL) {
1818     g_object_get (source, "stats", &stats, NULL);
1819     if (stats) {
1820       const gchar *rtcp_from;
1821
1822       dump_structure (stats);
1823
1824       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1825       if ((trans = find_transport (stream, rtcp_from))) {
1826         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1827             source);
1828         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1829             g_object_unref);
1830       }
1831       gst_structure_free (stats);
1832     }
1833   }
1834   return trans;
1835 }
1836
1837
1838 static void
1839 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1840 {
1841   GstRTSPStreamTransport *trans;
1842
1843   GST_INFO ("%p: new source %p", stream, source);
1844
1845   trans = check_transport (source, stream);
1846
1847   if (trans)
1848     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1849 }
1850
1851 static void
1852 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1853 {
1854   GST_INFO ("%p: new SDES %p", stream, source);
1855 }
1856
1857 static void
1858 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1859 {
1860   GstRTSPStreamTransport *trans;
1861
1862   trans = check_transport (source, stream);
1863
1864   if (trans) {
1865     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1866     gst_rtsp_stream_transport_keep_alive (trans);
1867   }
1868 #ifdef DUMP_STATS
1869   {
1870     GstStructure *stats;
1871     g_object_get (source, "stats", &stats, NULL);
1872     if (stats) {
1873       dump_structure (stats);
1874       gst_structure_free (stats);
1875     }
1876   }
1877 #endif
1878 }
1879
1880 static void
1881 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1882 {
1883   GST_INFO ("%p: source %p bye", stream, source);
1884 }
1885
1886 static void
1887 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1888 {
1889   GstRTSPStreamTransport *trans;
1890
1891   GST_INFO ("%p: source %p bye timeout", stream, source);
1892
1893   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1894     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1895     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1896   }
1897 }
1898
1899 static void
1900 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1901 {
1902   GstRTSPStreamTransport *trans;
1903
1904   GST_INFO ("%p: source %p timeout", stream, source);
1905
1906   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1907     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1908     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1909   }
1910 }
1911
1912 static void
1913 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1914 {
1915   GST_INFO ("%p: new sender source %p", stream, source);
1916 #ifndef DUMP_STATS
1917   {
1918     GstStructure *stats;
1919     g_object_get (source, "stats", &stats, NULL);
1920     if (stats) {
1921       dump_structure (stats);
1922       gst_structure_free (stats);
1923     }
1924   }
1925 #endif
1926 }
1927
1928 static void
1929 on_sender_ssrc_active (GObject * session, GObject * source,
1930     GstRTSPStream * stream)
1931 {
1932 #ifndef DUMP_STATS
1933   {
1934     GstStructure *stats;
1935     g_object_get (source, "stats", &stats, NULL);
1936     if (stats) {
1937       dump_structure (stats);
1938       gst_structure_free (stats);
1939     }
1940   }
1941 #endif
1942 }
1943
1944 static void
1945 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1946 {
1947   if (is_rtp) {
1948     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1949     g_list_free (priv->tr_cache_rtp);
1950     priv->tr_cache_rtp = NULL;
1951   } else {
1952     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1953     g_list_free (priv->tr_cache_rtcp);
1954     priv->tr_cache_rtcp = NULL;
1955   }
1956 }
1957
1958 static GstFlowReturn
1959 handle_new_sample (GstAppSink * sink, gpointer user_data)
1960 {
1961   GstRTSPStreamPrivate *priv;
1962   GList *walk;
1963   GstSample *sample;
1964   GstBuffer *buffer;
1965   GstRTSPStream *stream;
1966   gboolean is_rtp;
1967
1968   sample = gst_app_sink_pull_sample (sink);
1969   if (!sample)
1970     return GST_FLOW_OK;
1971
1972   stream = (GstRTSPStream *) user_data;
1973   priv = stream->priv;
1974   buffer = gst_sample_get_buffer (sample);
1975
1976   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1977
1978   g_mutex_lock (&priv->lock);
1979   if (is_rtp) {
1980     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1981       clear_tr_cache (priv, is_rtp);
1982       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1983         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1984         priv->tr_cache_rtp =
1985             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1986       }
1987       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1988     }
1989   } else {
1990     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1991       clear_tr_cache (priv, is_rtp);
1992       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1993         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1994         priv->tr_cache_rtcp =
1995             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1996       }
1997       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1998     }
1999   }
2000   g_mutex_unlock (&priv->lock);
2001
2002   if (is_rtp) {
2003     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2004       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2005       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2006     }
2007   } else {
2008     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2009       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2010       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2011     }
2012   }
2013   gst_sample_unref (sample);
2014
2015   return GST_FLOW_OK;
2016 }
2017
2018 static GstAppSinkCallbacks sink_cb = {
2019   NULL,                         /* not interested in EOS */
2020   NULL,                         /* not interested in preroll samples */
2021   handle_new_sample,
2022 };
2023
2024 static GstElement *
2025 get_rtp_encoder (GstRTSPStream * stream, guint session)
2026 {
2027   GstRTSPStreamPrivate *priv = stream->priv;
2028
2029   if (priv->srtpenc == NULL) {
2030     gchar *name;
2031
2032     name = g_strdup_printf ("srtpenc_%u", session);
2033     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2034     g_free (name);
2035
2036     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2037   }
2038   return gst_object_ref (priv->srtpenc);
2039 }
2040
2041 static GstElement *
2042 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2043 {
2044   GstRTSPStreamPrivate *priv = stream->priv;
2045   GstElement *oldenc, *enc;
2046   GstPad *pad;
2047   gchar *name;
2048
2049   if (priv->idx != session)
2050     return NULL;
2051
2052   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2053
2054   oldenc = priv->srtpenc;
2055   enc = get_rtp_encoder (stream, session);
2056   name = g_strdup_printf ("rtp_sink_%d", session);
2057   pad = gst_element_get_request_pad (enc, name);
2058   g_free (name);
2059   gst_object_unref (pad);
2060
2061   if (oldenc == NULL)
2062     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2063         enc);
2064
2065   return enc;
2066 }
2067
2068 static GstElement *
2069 request_rtcp_encoder (GstElement * rtpbin, guint session,
2070     GstRTSPStream * stream)
2071 {
2072   GstRTSPStreamPrivate *priv = stream->priv;
2073   GstElement *oldenc, *enc;
2074   GstPad *pad;
2075   gchar *name;
2076
2077   if (priv->idx != session)
2078     return NULL;
2079
2080   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2081
2082   oldenc = priv->srtpenc;
2083   enc = get_rtp_encoder (stream, session);
2084   name = g_strdup_printf ("rtcp_sink_%d", session);
2085   pad = gst_element_get_request_pad (enc, name);
2086   g_free (name);
2087   gst_object_unref (pad);
2088
2089   if (oldenc == NULL)
2090     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2091         enc);
2092
2093   return enc;
2094 }
2095
2096 static GstCaps *
2097 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2098 {
2099   GstRTSPStreamPrivate *priv = stream->priv;
2100   GstCaps *caps;
2101
2102   GST_DEBUG ("request key %08x", ssrc);
2103
2104   g_mutex_lock (&priv->lock);
2105   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2106     gst_caps_ref (caps);
2107   g_mutex_unlock (&priv->lock);
2108
2109   return caps;
2110 }
2111
2112 static GstElement *
2113 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2114     GstRTSPStream * stream)
2115 {
2116   GstRTSPStreamPrivate *priv = stream->priv;
2117
2118   if (priv->idx != session)
2119     return NULL;
2120
2121   if (priv->srtpdec == NULL) {
2122     gchar *name;
2123
2124     name = g_strdup_printf ("srtpdec_%u", session);
2125     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2126     g_free (name);
2127
2128     g_signal_connect (priv->srtpdec, "request-key",
2129         (GCallback) request_key, stream);
2130   }
2131   return gst_object_ref (priv->srtpdec);
2132 }
2133
2134 /**
2135  * gst_rtsp_stream_request_aux_sender:
2136  * @stream: a #GstRTSPStream
2137  * @sessid: the session id
2138  *
2139  * Creating a rtxsend bin
2140  *
2141  * Returns: (transfer full): a #GstElement.
2142  *
2143  * Since: 1.6
2144  */
2145 GstElement *
2146 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2147 {
2148   GstElement *bin;
2149   GstPad *pad;
2150   GstStructure *pt_map;
2151   gchar *name;
2152   guint pt, rtx_pt;
2153   gchar *pt_s;
2154
2155   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2156
2157   pt = gst_rtsp_stream_get_pt (stream);
2158   pt_s = g_strdup_printf ("%u", pt);
2159   rtx_pt = stream->priv->rtx_pt;
2160
2161   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2162
2163   bin = gst_bin_new (NULL);
2164   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2165   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2166       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2167   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2168       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2169   g_free (pt_s);
2170   gst_structure_free (pt_map);
2171   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2172
2173   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2174   name = g_strdup_printf ("src_%u", sessid);
2175   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2176   g_free (name);
2177   gst_object_unref (pad);
2178
2179   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2180   name = g_strdup_printf ("sink_%u", sessid);
2181   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2182   g_free (name);
2183   gst_object_unref (pad);
2184
2185   return bin;
2186 }
2187
2188 /**
2189  * gst_rtsp_stream_set_pt_map:
2190  * @stream: a #GstRTSPStream
2191  * @pt: the pt
2192  * @caps: a #GstCaps
2193  *
2194  * Configure a pt map between @pt and @caps.
2195  */
2196 void
2197 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2198 {
2199   GstRTSPStreamPrivate *priv = stream->priv;
2200
2201   g_mutex_lock (&priv->lock);
2202   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2203   g_mutex_unlock (&priv->lock);
2204 }
2205
2206 /**
2207  * gst_rtsp_stream_set_publish_clock_mode:
2208  * @stream: a #GstRTSPStream
2209  * @mode: the clock publish mode
2210  *
2211  * Sets if and how the stream clock should be published according to RFC7273.
2212  *
2213  * Since: 1.8
2214  */
2215 void
2216 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2217     GstRTSPPublishClockMode mode)
2218 {
2219   GstRTSPStreamPrivate *priv;
2220
2221   priv = stream->priv;
2222   g_mutex_lock (&priv->lock);
2223   priv->publish_clock_mode = mode;
2224   g_mutex_unlock (&priv->lock);
2225 }
2226
2227 /**
2228  * gst_rtsp_stream_get_publish_clock_mode:
2229  * @factory: a #GstRTSPStream
2230  *
2231  * Gets if and how the stream clock should be published according to RFC7273.
2232  *
2233  * Returns: The GstRTSPPublishClockMode
2234  *
2235  * Since: 1.8
2236  */
2237 GstRTSPPublishClockMode
2238 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2239 {
2240   GstRTSPStreamPrivate *priv;
2241   GstRTSPPublishClockMode ret;
2242
2243   priv = stream->priv;
2244   g_mutex_lock (&priv->lock);
2245   ret = priv->publish_clock_mode;
2246   g_mutex_unlock (&priv->lock);
2247
2248   return ret;
2249 }
2250
2251 static GstCaps *
2252 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2253     GstRTSPStream * stream)
2254 {
2255   GstRTSPStreamPrivate *priv = stream->priv;
2256   GstCaps *caps = NULL;
2257
2258   g_mutex_lock (&priv->lock);
2259
2260   if (priv->idx == session) {
2261     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2262     if (caps) {
2263       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2264       gst_caps_ref (caps);
2265     } else {
2266       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2267     }
2268   }
2269
2270   g_mutex_unlock (&priv->lock);
2271
2272   return caps;
2273 }
2274
2275 static void
2276 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2277 {
2278   GstRTSPStreamPrivate *priv = stream->priv;
2279   gchar *name;
2280   GstPadLinkReturn ret;
2281   guint sessid;
2282
2283   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2284       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2285
2286   name = gst_pad_get_name (pad);
2287   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2288     g_free (name);
2289     return;
2290   }
2291   g_free (name);
2292
2293   if (priv->idx != sessid)
2294     return;
2295
2296   if (gst_pad_is_linked (priv->sinkpad)) {
2297     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2298         GST_DEBUG_PAD_NAME (priv->sinkpad));
2299     return;
2300   }
2301
2302   /* link the RTP pad to the session manager, it should not really fail unless
2303    * this is not really an RTP pad */
2304   ret = gst_pad_link (pad, priv->sinkpad);
2305   if (ret != GST_PAD_LINK_OK)
2306     goto link_failed;
2307   priv->recv_rtp_src = gst_object_ref (pad);
2308
2309   return;
2310
2311 /* ERRORS */
2312 link_failed:
2313   {
2314     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2315         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2316   }
2317 }
2318
2319 static void
2320 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2321     GstRTSPStream * stream)
2322 {
2323   /* TODO: What to do here other than this? */
2324   GST_DEBUG ("Stream %p: Got EOS", stream);
2325   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2326 }
2327
2328 static void
2329 plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
2330     GstElement ** queue_out)
2331 {
2332   GstPad *pad;
2333   GstPad *teepad;
2334   GstPad *queuepad;
2335
2336   gst_bin_add (bin, sink);
2337
2338   *queue_out = gst_element_factory_make ("queue", NULL);
2339   g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
2340       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2341   gst_bin_add (bin, *queue_out);
2342
2343   /* link tee to queue */
2344   teepad = gst_element_get_request_pad (tee, "src_%u");
2345   pad = gst_element_get_static_pad (*queue_out, "sink");
2346   gst_pad_link (teepad, pad);
2347   gst_object_unref (pad);
2348   gst_object_unref (teepad);
2349
2350   /* link queue to sink */
2351   queuepad = gst_element_get_static_pad (*queue_out, "src");
2352   pad = gst_element_get_static_pad (sink, "sink");
2353   gst_pad_link (queuepad, pad);
2354   gst_object_unref (queuepad);
2355   gst_object_unref (pad);
2356 }
2357
2358 /* must be called with lock */
2359 static void
2360 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2361 {
2362   GstRTSPStreamPrivate *priv;
2363   GstPad *pad;
2364   gboolean is_tcp = FALSE, is_udp = FALSE;
2365   gint i;
2366
2367   priv = stream->priv;
2368
2369   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2370   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2371       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2372
2373   for (i = 0; i < 2; i++) {
2374     /* For the sender we create this bit of pipeline for both
2375      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2376      * we need to add a queue before appsink and udpsink to make
2377      * the pipeline not block. For the TCP case, we want to pump
2378      * client as fast as possible anyway. This pipeline is used
2379      * when both TCP and UDP are present.
2380      *
2381      * .--------.      .-----.    .---------.    .---------.
2382      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2383      * |       send->sink   src->sink      src->sink       |
2384      * '--------'      |     |    '---------'    '---------'
2385      *                 |     |    .---------.    .---------.
2386      *                 |     |    |  queue  |    | appsink |
2387      *                 |    src->sink      src->sink       |
2388      *                 '-----'    '---------'    '---------'
2389      *
2390      * When only UDP or only TCP is allowed, we skip the tee and queue
2391      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2392      * the session.
2393      */
2394
2395     /* Only link the RTP send src if we're going to send RTP, link
2396      * the RTCP send src always */
2397     if (!priv->srcpad && i == 0)
2398       continue;
2399
2400     if (is_tcp) {
2401       /* make appsink */
2402       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2403       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2404       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2405           &sink_cb, stream, NULL);
2406     }
2407
2408     /* If we have udp always use a tee because we could have mcast clients
2409      * requesting different ports, in which case we'll have to plug more
2410      * udpsinks. */
2411     if (is_udp) {
2412       /* make tee for RTP/RTCP */
2413       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2414       gst_bin_add (bin, priv->tee[i]);
2415
2416       /* and link to rtpbin send pad */
2417       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2418       gst_pad_link (priv->send_src[i], pad);
2419       gst_object_unref (pad);
2420
2421       plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
2422
2423       if (is_tcp) {
2424         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2425         plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
2426       }
2427     } else if (is_tcp) {
2428       /* only appsink needed, link it to the session */
2429       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2430       gst_pad_link (priv->send_src[i], pad);
2431       gst_object_unref (pad);
2432
2433       /* when its only TCP, we need to set sync and preroll to FALSE
2434        * for the sink to avoid deadlock. And this is only needed for
2435        * sink used for RTCP data, not the RTP data. */
2436       if (i == 1)
2437         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2438     }
2439
2440     /* check if we need to set to a special state */
2441     if (state != GST_STATE_NULL) {
2442       if (priv->udpsink[i])
2443         gst_element_set_state (priv->udpsink[i], state);
2444       if (priv->appsink[i])
2445         gst_element_set_state (priv->appsink[i], state);
2446       if (priv->appqueue[i])
2447         gst_element_set_state (priv->appqueue[i], state);
2448       if (priv->udpqueue[i])
2449         gst_element_set_state (priv->udpqueue[i], state);
2450       if (priv->tee[i])
2451         gst_element_set_state (priv->tee[i], state);
2452     }
2453   }
2454 }
2455
2456 /* must be called with lock */
2457 static void
2458 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2459     GstElement * funnel)
2460 {
2461   GstRTSPStreamPrivate *priv;
2462   GstPad *pad, *selpad;
2463
2464   priv = stream->priv;
2465
2466   if (priv->srcpad) {
2467     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2468      * values. This is only relevant for PLAY pipelines */
2469     gst_element_set_state (src, GST_STATE_PLAYING);
2470     gst_element_set_locked_state (src, TRUE);
2471   }
2472
2473   /* add src */
2474   gst_bin_add (bin, src);
2475
2476   /* and link to the funnel */
2477   selpad = gst_element_get_request_pad (funnel, "sink_%u");
2478   pad = gst_element_get_static_pad (src, "src");
2479   gst_pad_link (pad, selpad);
2480   gst_object_unref (pad);
2481   gst_object_unref (selpad);
2482 }
2483
2484 /* must be called with lock */
2485 static void
2486 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2487 {
2488   GstRTSPStreamPrivate *priv;
2489   GstPad *pad;
2490   gboolean is_tcp;
2491   gint i;
2492
2493   priv = stream->priv;
2494
2495   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2496
2497   for (i = 0; i < 2; i++) {
2498     /* For the receiver we create this bit of pipeline for both
2499      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2500      * and it is all funneled into the rtpbin receive pad.
2501      *
2502      * .--------.     .--------.    .--------.
2503      * | udpsrc |     | funnel |    | rtpbin |
2504      * |       src->sink      src->sink      |
2505      * '--------'     |        |    '--------'
2506      * .--------.     |        |
2507      * | appsrc |     |        |
2508      * |       src->sink       |
2509      * '--------'     '--------'
2510      */
2511
2512     if (!priv->sinkpad && i == 0) {
2513       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2514        * RTCP sink always */
2515       continue;
2516     }
2517
2518     /* make funnel for the RTP/RTCP receivers */
2519     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2520     gst_bin_add (bin, priv->funnel[i]);
2521
2522     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2523     gst_pad_link (pad, priv->recv_sink[i]);
2524     gst_object_unref (pad);
2525
2526     if (priv->udpsrc_v4[i])
2527       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
2528
2529     if (priv->udpsrc_v6[i])
2530       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
2531
2532     if (is_tcp) {
2533       /* make and add appsrc */
2534       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2535       priv->appsrc_base_time[i] = -1;
2536       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2537           TRUE, NULL);
2538       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
2539     }
2540
2541     /* check if we need to set to a special state */
2542     if (state != GST_STATE_NULL) {
2543       gst_element_set_state (priv->funnel[i], state);
2544     }
2545   }
2546 }
2547
2548 static gboolean
2549 create_mcast_part_for_transport (GstRTSPStream * stream,
2550     const GstRTSPTransport * tr)
2551 {
2552   GstRTSPStreamPrivate *priv = stream->priv;
2553   GInetAddress *inetaddr;
2554   GSocketFamily family;
2555   GstRTSPAddress *mcast_addr;
2556   GstElement **mcast_udpsrc;
2557   GSocket *rtp_socket = NULL;
2558   GSocket *rtcp_socket = NULL;
2559   GSocketAddress *rtp_sockaddr = NULL;
2560   GSocketAddress *rtcp_sockaddr = NULL;
2561   GError *error = NULL;
2562   const gchar *multicast_iface = priv->multicast_iface;
2563
2564   /* Check if it's a ipv4 or ipv6 transport */
2565   inetaddr = g_inet_address_new_from_string (tr->destination);
2566   family = g_inet_address_get_family (inetaddr);
2567   g_object_unref (inetaddr);
2568
2569   /* Select fields corresponding to the family */
2570   if (family == G_SOCKET_FAMILY_IPV4) {
2571     mcast_addr = priv->mcast_addr_v4;
2572     mcast_udpsrc = priv->mcast_udpsrc_v4;
2573   } else {
2574     mcast_addr = priv->mcast_addr_v6;
2575     mcast_udpsrc = priv->mcast_udpsrc_v6;
2576   }
2577
2578   /* We support only one mcast group per family, make sure this transport
2579    * matches it. */
2580   if (!mcast_addr)
2581     goto no_addr;
2582
2583   if (!g_str_equal (tr->destination, mcast_addr->address) ||
2584       tr->port.min != mcast_addr->port ||
2585       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
2586       tr->ttl != mcast_addr->ttl)
2587     goto wrong_addr;
2588
2589   if (mcast_udpsrc[0]) {
2590     /* We already created elements for this family. Since we support only one
2591      * mcast group per family, there is nothing more to do here. */
2592     g_assert (mcast_udpsrc[1]);
2593     g_assert (priv->mcast_udpqueue[0]);
2594     g_assert (priv->mcast_udpqueue[1]);
2595     g_assert (priv->mcast_udpsink[0]);
2596     g_assert (priv->mcast_udpsink[1]);
2597     return TRUE;
2598   }
2599
2600   g_assert (!mcast_udpsrc[1]);
2601
2602   /* Create RTP/RTCP sockets and bind them on ANY with mcast ports */
2603   rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
2604       G_SOCKET_PROTOCOL_UDP, &error);
2605   if (!rtp_socket)
2606     goto socket_error;
2607   g_socket_set_multicast_loopback (rtp_socket, FALSE);
2608
2609   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
2610       G_SOCKET_PROTOCOL_UDP, &error);
2611   if (!rtcp_socket)
2612     goto socket_error;
2613   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
2614
2615   inetaddr = g_inet_address_new_any (family);
2616   rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port);
2617   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1);
2618   g_object_unref (inetaddr);
2619
2620   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error))
2621     goto socket_error;
2622
2623   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error))
2624     goto socket_error;
2625
2626   g_object_unref (rtp_sockaddr);
2627   g_object_unref (rtcp_sockaddr);
2628
2629   /* Add receiver part */
2630   create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket);
2631   if (priv->srcpad) {
2632     plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]);
2633     gst_element_sync_state_with_parent (mcast_udpsrc[0]);
2634   }
2635   plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]);
2636   gst_element_sync_state_with_parent (mcast_udpsrc[1]);
2637
2638   /* Add sender part, could already have been created for the other family. */
2639   if (!priv->mcast_udpsink[0]) {
2640     g_assert (!priv->mcast_udpsink[1]);
2641     g_assert (!priv->mcast_udpqueue[0]);
2642     g_assert (!priv->mcast_udpqueue[1]);
2643
2644     create_and_configure_udpsinks (stream, priv->mcast_udpsink,
2645         multicast_iface);
2646     set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
2647         family);
2648
2649     if (priv->sinkpad) {
2650       plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0],
2651           &priv->mcast_udpqueue[0]);
2652       gst_element_sync_state_with_parent (priv->mcast_udpsink[0]);
2653       gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]);
2654     }
2655     plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1],
2656         &priv->mcast_udpqueue[1]);
2657     gst_element_sync_state_with_parent (priv->mcast_udpsink[1]);
2658     gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]);
2659   } else {
2660     g_assert (priv->mcast_udpsink[1]);
2661     g_assert (priv->mcast_udpqueue[0]);
2662     g_assert (priv->mcast_udpqueue[1]);
2663
2664     set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
2665         family);
2666   }
2667
2668   return TRUE;
2669
2670 no_addr:
2671   {
2672     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
2673         "has been reserved");
2674     return FALSE;
2675   }
2676 wrong_addr:
2677   {
2678     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
2679         "the reserved address");
2680     return FALSE;
2681   }
2682 socket_error:
2683   {
2684     GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s",
2685         error->message);
2686     g_clear_object (&rtp_socket);
2687     g_clear_object (&rtcp_socket);
2688     g_clear_object (&rtp_sockaddr);
2689     g_clear_object (&rtcp_sockaddr);
2690     g_clear_error (&error);
2691     return FALSE;
2692   }
2693 }
2694
2695 /**
2696  * gst_rtsp_stream_join_bin:
2697  * @stream: a #GstRTSPStream
2698  * @bin: (transfer none): a #GstBin to join
2699  * @rtpbin: (transfer none): a rtpbin element in @bin
2700  * @state: the target state of the new elements
2701  *
2702  * Join the #GstBin @bin that contains the element @rtpbin.
2703  *
2704  * @stream will link to @rtpbin, which must be inside @bin. The elements
2705  * added to @bin will be set to the state given in @state.
2706  *
2707  * Returns: %TRUE on success.
2708  */
2709 gboolean
2710 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2711     GstElement * rtpbin, GstState state)
2712 {
2713   GstRTSPStreamPrivate *priv;
2714   guint idx;
2715   gchar *name;
2716   GstPadLinkReturn ret;
2717
2718   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2719   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2720   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2721
2722   priv = stream->priv;
2723
2724   g_mutex_lock (&priv->lock);
2725   if (priv->joined_bin != NULL)
2726     goto was_joined;
2727
2728   /* create a session with the same index as the stream */
2729   idx = priv->idx;
2730
2731   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2732
2733   if (!alloc_ports (stream))
2734     goto no_ports;
2735
2736   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2737       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2738     /* For SRTP */
2739     g_signal_connect (rtpbin, "request-rtp-encoder",
2740         (GCallback) request_rtp_encoder, stream);
2741     g_signal_connect (rtpbin, "request-rtcp-encoder",
2742         (GCallback) request_rtcp_encoder, stream);
2743     g_signal_connect (rtpbin, "request-rtp-decoder",
2744         (GCallback) request_rtp_rtcp_decoder, stream);
2745     g_signal_connect (rtpbin, "request-rtcp-decoder",
2746         (GCallback) request_rtp_rtcp_decoder, stream);
2747   }
2748
2749   if (priv->sinkpad) {
2750     g_signal_connect (rtpbin, "request-pt-map",
2751         (GCallback) request_pt_map, stream);
2752   }
2753
2754   /* get pads from the RTP session element for sending and receiving
2755    * RTP/RTCP*/
2756   if (priv->srcpad) {
2757     /* get a pad for sending RTP */
2758     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2759     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2760     g_free (name);
2761
2762     /* link the RTP pad to the session manager, it should not really fail unless
2763      * this is not really an RTP pad */
2764     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2765     if (ret != GST_PAD_LINK_OK)
2766       goto link_failed;
2767
2768     name = g_strdup_printf ("send_rtp_src_%u", idx);
2769     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2770     g_free (name);
2771   } else {
2772     /* Need to connect our sinkpad from here */
2773     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2774     /* EOS */
2775     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2776
2777     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2778     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2779     g_free (name);
2780   }
2781
2782   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2783   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2784   g_free (name);
2785   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2786   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2787   g_free (name);
2788
2789   /* get the session */
2790   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2791
2792   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2793       stream);
2794   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2795       stream);
2796   g_signal_connect (priv->session, "on-ssrc-active",
2797       (GCallback) on_ssrc_active, stream);
2798   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2799       stream);
2800   g_signal_connect (priv->session, "on-bye-timeout",
2801       (GCallback) on_bye_timeout, stream);
2802   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2803       stream);
2804
2805   /* signal for sender ssrc */
2806   g_signal_connect (priv->session, "on-new-sender-ssrc",
2807       (GCallback) on_new_sender_ssrc, stream);
2808   g_signal_connect (priv->session, "on-sender-ssrc-active",
2809       (GCallback) on_sender_ssrc_active, stream);
2810
2811   create_sender_part (stream, bin, state);
2812   create_receiver_part (stream, bin, state);
2813
2814   if (priv->srcpad) {
2815     /* be notified of caps changes */
2816     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2817         (GCallback) caps_notify, stream);
2818   }
2819
2820   priv->joined_bin = gst_object_ref (bin);
2821   g_mutex_unlock (&priv->lock);
2822
2823   return TRUE;
2824
2825   /* ERRORS */
2826 was_joined:
2827   {
2828     g_mutex_unlock (&priv->lock);
2829     return TRUE;
2830   }
2831 no_ports:
2832   {
2833     g_mutex_unlock (&priv->lock);
2834     GST_WARNING ("failed to allocate ports %u", idx);
2835     return FALSE;
2836   }
2837 link_failed:
2838   {
2839     GST_WARNING ("failed to link stream %u", idx);
2840     gst_object_unref (priv->send_rtp_sink);
2841     priv->send_rtp_sink = NULL;
2842     g_mutex_unlock (&priv->lock);
2843     return FALSE;
2844   }
2845 }
2846
2847 static void
2848 clear_element (GstBin * bin, GstElement ** elementptr)
2849 {
2850   if (*elementptr) {
2851     gst_element_set_locked_state (*elementptr, FALSE);
2852     gst_element_set_state (*elementptr, GST_STATE_NULL);
2853     if (GST_ELEMENT_PARENT (*elementptr))
2854       gst_bin_remove (bin, *elementptr);
2855     else
2856       gst_object_unref (*elementptr);
2857     *elementptr = NULL;
2858   }
2859 }
2860
2861 /**
2862  * gst_rtsp_stream_leave_bin:
2863  * @stream: a #GstRTSPStream
2864  * @bin: (transfer none): a #GstBin
2865  * @rtpbin: (transfer none): a rtpbin #GstElement
2866  *
2867  * Remove the elements of @stream from @bin.
2868  *
2869  * Return: %TRUE on success.
2870  */
2871 gboolean
2872 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2873     GstElement * rtpbin)
2874 {
2875   GstRTSPStreamPrivate *priv;
2876   gint i;
2877
2878   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2879   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2880   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2881
2882   priv = stream->priv;
2883
2884   g_mutex_lock (&priv->lock);
2885   if (priv->joined_bin == NULL)
2886     goto was_not_joined;
2887   if (priv->joined_bin != bin)
2888     goto wrong_bin;
2889
2890   priv->joined_bin = NULL;
2891
2892   /* all transports must be removed by now */
2893   if (priv->transports != NULL)
2894     goto transports_not_removed;
2895
2896   clear_tr_cache (priv, TRUE);
2897   clear_tr_cache (priv, FALSE);
2898
2899   GST_INFO ("stream %p leaving bin", stream);
2900
2901   if (priv->srcpad) {
2902     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2903
2904     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2905     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2906     gst_object_unref (priv->send_rtp_sink);
2907     priv->send_rtp_sink = NULL;
2908   } else if (priv->recv_rtp_src) {
2909     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2910     gst_object_unref (priv->recv_rtp_src);
2911     priv->recv_rtp_src = NULL;
2912   }
2913
2914   for (i = 0; i < 2; i++) {
2915     clear_element (bin, &priv->udpsrc_v4[i]);
2916     clear_element (bin, &priv->udpsrc_v6[i]);
2917     clear_element (bin, &priv->udpqueue[i]);
2918     clear_element (bin, &priv->udpsink[i]);
2919
2920     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
2921     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
2922     clear_element (bin, &priv->mcast_udpqueue[i]);
2923     clear_element (bin, &priv->mcast_udpsink[i]);
2924
2925     clear_element (bin, &priv->appsrc[i]);
2926     clear_element (bin, &priv->appqueue[i]);
2927     clear_element (bin, &priv->appsink[i]);
2928
2929     clear_element (bin, &priv->tee[i]);
2930     clear_element (bin, &priv->funnel[i]);
2931
2932     if (priv->sinkpad || i == 1) {
2933       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2934       gst_object_unref (priv->recv_sink[i]);
2935       priv->recv_sink[i] = NULL;
2936     }
2937   }
2938
2939   if (priv->srcpad) {
2940     gst_object_unref (priv->send_src[0]);
2941     priv->send_src[0] = NULL;
2942   }
2943
2944   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2945   gst_object_unref (priv->send_src[1]);
2946   priv->send_src[1] = NULL;
2947
2948   g_object_unref (priv->session);
2949   priv->session = NULL;
2950   if (priv->caps)
2951     gst_caps_unref (priv->caps);
2952   priv->caps = NULL;
2953
2954   if (priv->srtpenc)
2955     gst_object_unref (priv->srtpenc);
2956   if (priv->srtpdec)
2957     gst_object_unref (priv->srtpdec);
2958
2959   g_clear_object (&priv->joined_bin);
2960   g_mutex_unlock (&priv->lock);
2961
2962   return TRUE;
2963
2964 was_not_joined:
2965   {
2966     g_mutex_unlock (&priv->lock);
2967     return TRUE;
2968   }
2969 transports_not_removed:
2970   {
2971     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2972     g_mutex_unlock (&priv->lock);
2973     return FALSE;
2974   }
2975 wrong_bin:
2976   {
2977     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
2978     g_mutex_unlock (&priv->lock);
2979     return FALSE;
2980   }
2981 }
2982
2983 /**
2984  * gst_rtsp_stream_get_joined_bin:
2985  * @stream: a #GstRTSPStream
2986  *
2987  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
2988  *
2989  * Return: (transfer full): the joined bin or NULL.
2990  */
2991 GstBin *
2992 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
2993 {
2994   GstRTSPStreamPrivate *priv;
2995   GstBin *bin = NULL;
2996
2997   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2998
2999   priv = stream->priv;
3000
3001   g_mutex_lock (&priv->lock);
3002   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3003   g_mutex_unlock (&priv->lock);
3004
3005   return bin;
3006 }
3007
3008 /**
3009  * gst_rtsp_stream_get_rtpinfo:
3010  * @stream: a #GstRTSPStream
3011  * @rtptime: (allow-none): result RTP timestamp
3012  * @seq: (allow-none): result RTP seqnum
3013  * @clock_rate: (allow-none): the clock rate
3014  * @running_time: (allow-none): result running-time
3015  *
3016  * Retrieve the current rtptime, seq and running-time. This is used to
3017  * construct a RTPInfo reply header.
3018  *
3019  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3020  */
3021 gboolean
3022 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3023     guint * rtptime, guint * seq, guint * clock_rate,
3024     GstClockTime * running_time)
3025 {
3026   GstRTSPStreamPrivate *priv;
3027   GstStructure *stats;
3028   GObjectClass *payobjclass;
3029
3030   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3031
3032   priv = stream->priv;
3033
3034   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3035
3036   g_mutex_lock (&priv->lock);
3037
3038   /* First try to extract the information from the last buffer on the sinks.
3039    * This will have a more accurate sequence number and timestamp, as between
3040    * the payloader and the sink there can be some queues
3041    */
3042   if (priv->udpsink[0] || priv->appsink[0]) {
3043     GstSample *last_sample;
3044
3045     if (priv->udpsink[0])
3046       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3047     else
3048       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3049
3050     if (last_sample) {
3051       GstCaps *caps;
3052       GstBuffer *buffer;
3053       GstSegment *segment;
3054       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3055
3056       caps = gst_sample_get_caps (last_sample);
3057       buffer = gst_sample_get_buffer (last_sample);
3058       segment = gst_sample_get_segment (last_sample);
3059
3060       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3061         if (seq) {
3062           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3063         }
3064
3065         if (rtptime) {
3066           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3067         }
3068
3069         gst_rtp_buffer_unmap (&rtp_buffer);
3070
3071         if (running_time) {
3072           *running_time =
3073               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3074               GST_BUFFER_TIMESTAMP (buffer));
3075         }
3076
3077         if (clock_rate) {
3078           GstStructure *s = gst_caps_get_structure (caps, 0);
3079
3080           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3081
3082           if (*clock_rate == 0 && running_time)
3083             *running_time = GST_CLOCK_TIME_NONE;
3084         }
3085         gst_sample_unref (last_sample);
3086
3087         goto done;
3088       } else {
3089         gst_sample_unref (last_sample);
3090       }
3091     }
3092   }
3093
3094   if (g_object_class_find_property (payobjclass, "stats")) {
3095     g_object_get (priv->payloader, "stats", &stats, NULL);
3096     if (stats == NULL)
3097       goto no_stats;
3098
3099     if (seq)
3100       gst_structure_get_uint (stats, "seqnum", seq);
3101
3102     if (rtptime)
3103       gst_structure_get_uint (stats, "timestamp", rtptime);
3104
3105     if (running_time)
3106       gst_structure_get_clock_time (stats, "running-time", running_time);
3107
3108     if (clock_rate) {
3109       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3110       if (*clock_rate == 0 && running_time)
3111         *running_time = GST_CLOCK_TIME_NONE;
3112     }
3113     gst_structure_free (stats);
3114   } else {
3115     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3116         !g_object_class_find_property (payobjclass, "timestamp"))
3117       goto no_stats;
3118
3119     if (seq)
3120       g_object_get (priv->payloader, "seqnum", seq, NULL);
3121
3122     if (rtptime)
3123       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3124
3125     if (running_time)
3126       *running_time = GST_CLOCK_TIME_NONE;
3127   }
3128
3129 done:
3130   g_mutex_unlock (&priv->lock);
3131
3132   return TRUE;
3133
3134   /* ERRORS */
3135 no_stats:
3136   {
3137     GST_WARNING ("Could not get payloader stats");
3138     g_mutex_unlock (&priv->lock);
3139     return FALSE;
3140   }
3141 }
3142
3143 /**
3144  * gst_rtsp_stream_get_caps:
3145  * @stream: a #GstRTSPStream
3146  *
3147  * Retrieve the current caps of @stream.
3148  *
3149  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3150  * after usage.
3151  */
3152 GstCaps *
3153 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3154 {
3155   GstRTSPStreamPrivate *priv;
3156   GstCaps *result;
3157
3158   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3159
3160   priv = stream->priv;
3161
3162   g_mutex_lock (&priv->lock);
3163   if ((result = priv->caps))
3164     gst_caps_ref (result);
3165   g_mutex_unlock (&priv->lock);
3166
3167   return result;
3168 }
3169
3170 /**
3171  * gst_rtsp_stream_recv_rtp:
3172  * @stream: a #GstRTSPStream
3173  * @buffer: (transfer full): a #GstBuffer
3174  *
3175  * Handle an RTP buffer for the stream. This method is usually called when a
3176  * message has been received from a client using the TCP transport.
3177  *
3178  * This function takes ownership of @buffer.
3179  *
3180  * Returns: a GstFlowReturn.
3181  */
3182 GstFlowReturn
3183 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3184 {
3185   GstRTSPStreamPrivate *priv;
3186   GstFlowReturn ret;
3187   GstElement *element;
3188
3189   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3190   priv = stream->priv;
3191   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3192   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3193
3194   g_mutex_lock (&priv->lock);
3195   if (priv->appsrc[0])
3196     element = gst_object_ref (priv->appsrc[0]);
3197   else
3198     element = NULL;
3199   g_mutex_unlock (&priv->lock);
3200
3201   if (element) {
3202     if (priv->appsrc_base_time[0] == -1) {
3203       /* Take current running_time. This timestamp will be put on
3204        * the first buffer of each stream because we are a live source and so we
3205        * timestamp with the running_time. When we are dealing with TCP, we also
3206        * only timestamp the first buffer (using the DISCONT flag) because a server
3207        * typically bursts data, for which we don't want to compensate by speeding
3208        * up the media. The other timestamps will be interpollated from this one
3209        * using the RTP timestamps. */
3210       GST_OBJECT_LOCK (element);
3211       if (GST_ELEMENT_CLOCK (element)) {
3212         GstClockTime now;
3213         GstClockTime base_time;
3214
3215         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3216         base_time = GST_ELEMENT_CAST (element)->base_time;
3217
3218         priv->appsrc_base_time[0] = now - base_time;
3219         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3220         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3221             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3222             GST_TIME_ARGS (base_time));
3223       }
3224       GST_OBJECT_UNLOCK (element);
3225     }
3226
3227     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3228     gst_object_unref (element);
3229   } else {
3230     ret = GST_FLOW_OK;
3231   }
3232   return ret;
3233 }
3234
3235 /**
3236  * gst_rtsp_stream_recv_rtcp:
3237  * @stream: a #GstRTSPStream
3238  * @buffer: (transfer full): a #GstBuffer
3239  *
3240  * Handle an RTCP buffer for the stream. This method is usually called when a
3241  * message has been received from a client using the TCP transport.
3242  *
3243  * This function takes ownership of @buffer.
3244  *
3245  * Returns: a GstFlowReturn.
3246  */
3247 GstFlowReturn
3248 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3249 {
3250   GstRTSPStreamPrivate *priv;
3251   GstFlowReturn ret;
3252   GstElement *element;
3253
3254   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3255   priv = stream->priv;
3256   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3257
3258   if (priv->joined_bin == NULL) {
3259     gst_buffer_unref (buffer);
3260     return GST_FLOW_NOT_LINKED;
3261   }
3262   g_mutex_lock (&priv->lock);
3263   if (priv->appsrc[1])
3264     element = gst_object_ref (priv->appsrc[1]);
3265   else
3266     element = NULL;
3267   g_mutex_unlock (&priv->lock);
3268
3269   if (element) {
3270     if (priv->appsrc_base_time[1] == -1) {
3271       /* Take current running_time. This timestamp will be put on
3272        * the first buffer of each stream because we are a live source and so we
3273        * timestamp with the running_time. When we are dealing with TCP, we also
3274        * only timestamp the first buffer (using the DISCONT flag) because a server
3275        * typically bursts data, for which we don't want to compensate by speeding
3276        * up the media. The other timestamps will be interpollated from this one
3277        * using the RTP timestamps. */
3278       GST_OBJECT_LOCK (element);
3279       if (GST_ELEMENT_CLOCK (element)) {
3280         GstClockTime now;
3281         GstClockTime base_time;
3282
3283         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3284         base_time = GST_ELEMENT_CAST (element)->base_time;
3285
3286         priv->appsrc_base_time[1] = now - base_time;
3287         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3288         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3289             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3290             GST_TIME_ARGS (base_time));
3291       }
3292       GST_OBJECT_UNLOCK (element);
3293     }
3294
3295     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3296     gst_object_unref (element);
3297   } else {
3298     ret = GST_FLOW_OK;
3299     gst_buffer_unref (buffer);
3300   }
3301   return ret;
3302 }
3303
3304 /* must be called with lock */
3305 static gboolean
3306 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3307     gboolean add)
3308 {
3309   GstRTSPStreamPrivate *priv = stream->priv;
3310   const GstRTSPTransport *tr;
3311
3312   tr = gst_rtsp_stream_transport_get_transport (trans);
3313
3314   switch (tr->lower_transport) {
3315     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3316     {
3317       if (add) {
3318         if (!create_mcast_part_for_transport (stream, tr))
3319           goto mcast_error;
3320         priv->transports = g_list_prepend (priv->transports, trans);
3321       } else {
3322         priv->transports = g_list_remove (priv->transports, trans);
3323         /* FIXME: Check if there are remaining mcast transports, and destroy
3324          * mcast part if its now unused */
3325       }
3326       break;
3327     }
3328     case GST_RTSP_LOWER_TRANS_UDP:
3329     {
3330       gchar *dest;
3331       gint min, max;
3332       guint ttl = 0;
3333
3334       dest = tr->destination;
3335       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3336         min = tr->port.min;
3337         max = tr->port.max;
3338         ttl = tr->ttl;
3339       } else if (priv->client_side) {
3340         /* In client side mode the 'destination' is the RTSP server, so send
3341          * to those ports */
3342         min = tr->server_port.min;
3343         max = tr->server_port.max;
3344       } else {
3345         min = tr->client_port.min;
3346         max = tr->client_port.max;
3347       }
3348
3349       if (add) {
3350         if (ttl > 0) {
3351           GST_INFO ("setting ttl-mc %d", ttl);
3352           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3353           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3354         }
3355         GST_INFO ("adding %s:%d-%d", dest, min, max);
3356         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3357         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3358         priv->transports = g_list_prepend (priv->transports, trans);
3359       } else {
3360         GST_INFO ("removing %s:%d-%d", dest, min, max);
3361         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3362         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3363         priv->transports = g_list_remove (priv->transports, trans);
3364       }
3365       priv->transports_cookie++;
3366       break;
3367     }
3368     case GST_RTSP_LOWER_TRANS_TCP:
3369       if (add) {
3370         GST_INFO ("adding TCP %s", tr->destination);
3371         priv->transports = g_list_prepend (priv->transports, trans);
3372       } else {
3373         GST_INFO ("removing TCP %s", tr->destination);
3374         priv->transports = g_list_remove (priv->transports, trans);
3375       }
3376       priv->transports_cookie++;
3377       break;
3378     default:
3379       goto unknown_transport;
3380   }
3381   return TRUE;
3382
3383   /* ERRORS */
3384 unknown_transport:
3385   {
3386     GST_INFO ("Unknown transport %d", tr->lower_transport);
3387     return FALSE;
3388   }
3389 mcast_error:
3390   {
3391     return FALSE;
3392   }
3393 }
3394
3395
3396 /**
3397  * gst_rtsp_stream_add_transport:
3398  * @stream: a #GstRTSPStream
3399  * @trans: (transfer none): a #GstRTSPStreamTransport
3400  *
3401  * Add the transport in @trans to @stream. The media of @stream will
3402  * then also be send to the values configured in @trans.
3403  *
3404  * @stream must be joined to a bin.
3405  *
3406  * @trans must contain a valid #GstRTSPTransport.
3407  *
3408  * Returns: %TRUE if @trans was added
3409  */
3410 gboolean
3411 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3412     GstRTSPStreamTransport * trans)
3413 {
3414   GstRTSPStreamPrivate *priv;
3415   gboolean res;
3416
3417   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3418   priv = stream->priv;
3419   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3420   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3421
3422   g_mutex_lock (&priv->lock);
3423   res = update_transport (stream, trans, TRUE);
3424   g_mutex_unlock (&priv->lock);
3425
3426   return res;
3427 }
3428
3429 /**
3430  * gst_rtsp_stream_remove_transport:
3431  * @stream: a #GstRTSPStream
3432  * @trans: (transfer none): a #GstRTSPStreamTransport
3433  *
3434  * Remove the transport in @trans from @stream. The media of @stream will
3435  * not be sent to the values configured in @trans.
3436  *
3437  * @stream must be joined to a bin.
3438  *
3439  * @trans must contain a valid #GstRTSPTransport.
3440  *
3441  * Returns: %TRUE if @trans was removed
3442  */
3443 gboolean
3444 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3445     GstRTSPStreamTransport * trans)
3446 {
3447   GstRTSPStreamPrivate *priv;
3448   gboolean res;
3449
3450   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3451   priv = stream->priv;
3452   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3453   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3454
3455   g_mutex_lock (&priv->lock);
3456   res = update_transport (stream, trans, FALSE);
3457   g_mutex_unlock (&priv->lock);
3458
3459   return res;
3460 }
3461
3462 /**
3463  * gst_rtsp_stream_update_crypto:
3464  * @stream: a #GstRTSPStream
3465  * @ssrc: the SSRC
3466  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3467  *
3468  * Update the new crypto information for @ssrc in @stream. If information
3469  * for @ssrc did not exist, it will be added. If information
3470  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3471  * be removed from @stream.
3472  *
3473  * Returns: %TRUE if @crypto could be updated
3474  */
3475 gboolean
3476 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3477     guint ssrc, GstCaps * crypto)
3478 {
3479   GstRTSPStreamPrivate *priv;
3480
3481   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3482   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3483
3484   priv = stream->priv;
3485
3486   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3487
3488   g_mutex_lock (&priv->lock);
3489   if (crypto)
3490     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3491         gst_caps_ref (crypto));
3492   else
3493     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3494   g_mutex_unlock (&priv->lock);
3495
3496   return TRUE;
3497 }
3498
3499 /**
3500  * gst_rtsp_stream_get_rtp_socket:
3501  * @stream: a #GstRTSPStream
3502  * @family: the socket family
3503  *
3504  * Get the RTP socket from @stream for a @family.
3505  *
3506  * @stream must be joined to a bin.
3507  *
3508  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3509  * socket could be allocated for @family. Unref after usage
3510  */
3511 GSocket *
3512 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3513 {
3514   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3515   GSocket *socket;
3516   const gchar *name;
3517
3518   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3519   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3520       family == G_SOCKET_FAMILY_IPV6, NULL);
3521   g_return_val_if_fail (priv->udpsink[0], NULL);
3522
3523   if (family == G_SOCKET_FAMILY_IPV6)
3524     name = "socket-v6";
3525   else
3526     name = "socket";
3527
3528   g_object_get (priv->udpsink[0], name, &socket, NULL);
3529
3530   return socket;
3531 }
3532
3533 /**
3534  * gst_rtsp_stream_get_rtcp_socket:
3535  * @stream: a #GstRTSPStream
3536  * @family: the socket family
3537  *
3538  * Get the RTCP socket from @stream for a @family.
3539  *
3540  * @stream must be joined to a bin.
3541  *
3542  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3543  * socket could be allocated for @family. Unref after usage
3544  */
3545 GSocket *
3546 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3547 {
3548   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3549   GSocket *socket;
3550   const gchar *name;
3551
3552   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3553   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3554       family == G_SOCKET_FAMILY_IPV6, NULL);
3555   g_return_val_if_fail (priv->udpsink[1], NULL);
3556
3557   if (family == G_SOCKET_FAMILY_IPV6)
3558     name = "socket-v6";
3559   else
3560     name = "socket";
3561
3562   g_object_get (priv->udpsink[1], name, &socket, NULL);
3563
3564   return socket;
3565 }
3566
3567 /**
3568  * gst_rtsp_stream_set_seqnum:
3569  * @stream: a #GstRTSPStream
3570  * @seqnum: a new sequence number
3571  *
3572  * Configure the sequence number in the payloader of @stream to @seqnum.
3573  */
3574 void
3575 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3576 {
3577   GstRTSPStreamPrivate *priv;
3578
3579   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3580
3581   priv = stream->priv;
3582
3583   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3584 }
3585
3586 /**
3587  * gst_rtsp_stream_get_seqnum:
3588  * @stream: a #GstRTSPStream
3589  *
3590  * Get the configured sequence number in the payloader of @stream.
3591  *
3592  * Returns: the sequence number of the payloader.
3593  */
3594 guint16
3595 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3596 {
3597   GstRTSPStreamPrivate *priv;
3598   guint seqnum;
3599
3600   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3601
3602   priv = stream->priv;
3603
3604   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3605
3606   return seqnum;
3607 }
3608
3609 /**
3610  * gst_rtsp_stream_transport_filter:
3611  * @stream: a #GstRTSPStream
3612  * @func: (scope call) (allow-none): a callback
3613  * @user_data: (closure): user data passed to @func
3614  *
3615  * Call @func for each transport managed by @stream. The result value of @func
3616  * determines what happens to the transport. @func will be called with @stream
3617  * locked so no further actions on @stream can be performed from @func.
3618  *
3619  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3620  * @stream.
3621  *
3622  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3623  *
3624  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3625  * will also be added with an additional ref to the result #GList of this
3626  * function..
3627  *
3628  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3629  *
3630  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3631  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3632  * element in the #GList should be unreffed before the list is freed.
3633  */
3634 GList *
3635 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3636     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3637 {
3638   GstRTSPStreamPrivate *priv;
3639   GList *result, *walk, *next;
3640   GHashTable *visited = NULL;
3641   guint cookie;
3642
3643   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3644
3645   priv = stream->priv;
3646
3647   result = NULL;
3648   if (func)
3649     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3650
3651   g_mutex_lock (&priv->lock);
3652 restart:
3653   cookie = priv->transports_cookie;
3654   for (walk = priv->transports; walk; walk = next) {
3655     GstRTSPStreamTransport *trans = walk->data;
3656     GstRTSPFilterResult res;
3657     gboolean changed;
3658
3659     next = g_list_next (walk);
3660
3661     if (func) {
3662       /* only visit each transport once */
3663       if (g_hash_table_contains (visited, trans))
3664         continue;
3665
3666       g_hash_table_add (visited, g_object_ref (trans));
3667       g_mutex_unlock (&priv->lock);
3668
3669       res = func (stream, trans, user_data);
3670
3671       g_mutex_lock (&priv->lock);
3672     } else
3673       res = GST_RTSP_FILTER_REF;
3674
3675     changed = (cookie != priv->transports_cookie);
3676
3677     switch (res) {
3678       case GST_RTSP_FILTER_REMOVE:
3679         update_transport (stream, trans, FALSE);
3680         break;
3681       case GST_RTSP_FILTER_REF:
3682         result = g_list_prepend (result, g_object_ref (trans));
3683         break;
3684       case GST_RTSP_FILTER_KEEP:
3685       default:
3686         break;
3687     }
3688     if (changed)
3689       goto restart;
3690   }
3691   g_mutex_unlock (&priv->lock);
3692
3693   if (func)
3694     g_hash_table_unref (visited);
3695
3696   return result;
3697 }
3698
3699 static GstPadProbeReturn
3700 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3701 {
3702   GstRTSPStreamPrivate *priv;
3703   GstRTSPStream *stream;
3704
3705   stream = user_data;
3706   priv = stream->priv;
3707
3708   GST_DEBUG_OBJECT (pad, "now blocking");
3709
3710   g_mutex_lock (&priv->lock);
3711   priv->blocking = TRUE;
3712   g_mutex_unlock (&priv->lock);
3713
3714   gst_element_post_message (priv->payloader,
3715       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3716           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3717
3718   return GST_PAD_PROBE_OK;
3719 }
3720
3721 /**
3722  * gst_rtsp_stream_set_blocked:
3723  * @stream: a #GstRTSPStream
3724  * @blocked: boolean indicating we should block or unblock
3725  *
3726  * Blocks or unblocks the dataflow on @stream.
3727  *
3728  * Returns: %TRUE on success
3729  */
3730 gboolean
3731 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3732 {
3733   GstRTSPStreamPrivate *priv;
3734
3735   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3736
3737   priv = stream->priv;
3738
3739   g_mutex_lock (&priv->lock);
3740   if (blocked) {
3741     priv->blocking = FALSE;
3742     if (priv->blocked_id == 0) {
3743       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3744           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3745           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3746           g_object_ref (stream), g_object_unref);
3747     }
3748   } else {
3749     if (priv->blocked_id != 0) {
3750       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3751       priv->blocked_id = 0;
3752       priv->blocking = FALSE;
3753     }
3754   }
3755   g_mutex_unlock (&priv->lock);
3756
3757   return TRUE;
3758 }
3759
3760 /**
3761  * gst_rtsp_stream_is_blocking:
3762  * @stream: a #GstRTSPStream
3763  *
3764  * Check if @stream is blocking on a #GstBuffer.
3765  *
3766  * Returns: %TRUE if @stream is blocking
3767  */
3768 gboolean
3769 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3770 {
3771   GstRTSPStreamPrivate *priv;
3772   gboolean result;
3773
3774   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3775
3776   priv = stream->priv;
3777
3778   g_mutex_lock (&priv->lock);
3779   result = priv->blocking;
3780   g_mutex_unlock (&priv->lock);
3781
3782   return result;
3783 }
3784
3785 /**
3786  * gst_rtsp_stream_query_position:
3787  * @stream: a #GstRTSPStream
3788  *
3789  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3790  * the RTP parts of the pipeline and not the RTCP parts.
3791  *
3792  * Returns: %TRUE if the position could be queried
3793  */
3794 gboolean
3795 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3796 {
3797   GstRTSPStreamPrivate *priv;
3798   GstElement *sink;
3799   gboolean ret;
3800
3801   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3802
3803   priv = stream->priv;
3804
3805   g_mutex_lock (&priv->lock);
3806   /* depending on the transport type, it should query corresponding sink */
3807   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3808       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3809     sink = priv->udpsink[0];
3810   else
3811     sink = priv->appsink[0];
3812
3813   if (sink)
3814     gst_object_ref (sink);
3815   g_mutex_unlock (&priv->lock);
3816
3817   if (!sink)
3818     return FALSE;
3819
3820   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3821   gst_object_unref (sink);
3822
3823   return ret;
3824 }
3825
3826 /**
3827  * gst_rtsp_stream_query_stop:
3828  * @stream: a #GstRTSPStream
3829  *
3830  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3831  * the RTP parts of the pipeline and not the RTCP parts.
3832  *
3833  * Returns: %TRUE if the stop could be queried
3834  */
3835 gboolean
3836 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3837 {
3838   GstRTSPStreamPrivate *priv;
3839   GstElement *sink;
3840   GstQuery *query;
3841   gboolean ret;
3842
3843   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3844
3845   priv = stream->priv;
3846
3847   g_mutex_lock (&priv->lock);
3848   /* depending on the transport type, it should query corresponding sink */
3849   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3850       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3851     sink = priv->udpsink[0];
3852   else
3853     sink = priv->appsink[0];
3854
3855   if (sink)
3856     gst_object_ref (sink);
3857   g_mutex_unlock (&priv->lock);
3858
3859   if (!sink)
3860     return FALSE;
3861
3862   query = gst_query_new_segment (GST_FORMAT_TIME);
3863   if ((ret = gst_element_query (sink, query))) {
3864     GstFormat format;
3865
3866     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3867     if (format != GST_FORMAT_TIME)
3868       *stop = -1;
3869   }
3870   gst_query_unref (query);
3871   gst_object_unref (sink);
3872
3873   return ret;
3874
3875 }