rtsp-stream: Fix up various multicast related issues
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* for UDP unicast */
98   GstElement *udpsrc_v4[2];
99   GstElement *udpsrc_v6[2];
100   GstElement *udpqueue[2];
101   GstElement *udpsink[2];
102
103   /* for UDP multicast */
104   GstElement *mcast_udpsrc_v4[2];
105   GstElement *mcast_udpsrc_v6[2];
106   GstElement *mcast_udpqueue[2];
107   GstElement *mcast_udpsink[2];
108
109   /* for TCP transport */
110   GstElement *appsrc[2];
111   GstClockTime appsrc_base_time[2];
112   GstElement *appqueue[2];
113   GstElement *appsink[2];
114
115   GstElement *tee[2];
116   GstElement *funnel[2];
117
118   /* retransmission */
119   GstElement *rtxsend;
120   guint rtx_pt;
121   GstClockTime rtx_time;
122
123   /* pool used to manage unicast and multicast addresses */
124   GstRTSPAddressPool *pool;
125
126   /* unicast server addr/port */
127   GstRTSPRange server_port_v4;
128   GstRTSPRange server_port_v6;
129   GstRTSPAddress *server_addr_v4;
130   GstRTSPAddress *server_addr_v6;
131
132   /* multicast addresses */
133   GstRTSPAddress *mcast_addr_v4;
134   GstRTSPAddress *mcast_addr_v6;
135
136   gchar *multicast_iface;
137
138   /* the caps of the stream */
139   gulong caps_sig;
140   GstCaps *caps;
141
142   /* transports we stream to */
143   guint n_active;
144   GList *transports;
145   guint transports_cookie;
146   GList *tr_cache_rtp;
147   GList *tr_cache_rtcp;
148   guint tr_cache_cookie_rtp;
149   guint tr_cache_cookie_rtcp;
150
151   gint dscp_qos;
152
153   /* stream blocking */
154   gulong blocked_id;
155   gboolean blocking;
156
157   /* pt->caps map for RECORD streams */
158   GHashTable *ptmap;
159
160   GstRTSPPublishClockMode publish_clock_mode;
161 };
162
163 #define DEFAULT_CONTROL         NULL
164 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
165 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
166                                         GST_RTSP_LOWER_TRANS_TCP
167
168 enum
169 {
170   PROP_0,
171   PROP_CONTROL,
172   PROP_PROFILES,
173   PROP_PROTOCOLS,
174   PROP_LAST
175 };
176
177 enum
178 {
179   SIGNAL_NEW_RTP_ENCODER,
180   SIGNAL_NEW_RTCP_ENCODER,
181   SIGNAL_LAST
182 };
183
184 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
185 #define GST_CAT_DEFAULT rtsp_stream_debug
186
187 static GQuark ssrc_stream_map_key;
188
189 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
190     GValue * value, GParamSpec * pspec);
191 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
192     const GValue * value, GParamSpec * pspec);
193
194 static void gst_rtsp_stream_finalize (GObject * obj);
195
196 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
197
198 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
199
200 static void
201 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
202 {
203   GObjectClass *gobject_class;
204
205   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
206
207   gobject_class = G_OBJECT_CLASS (klass);
208
209   gobject_class->get_property = gst_rtsp_stream_get_property;
210   gobject_class->set_property = gst_rtsp_stream_set_property;
211   gobject_class->finalize = gst_rtsp_stream_finalize;
212
213   g_object_class_install_property (gobject_class, PROP_CONTROL,
214       g_param_spec_string ("control", "Control",
215           "The control string for this stream", DEFAULT_CONTROL,
216           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
217
218   g_object_class_install_property (gobject_class, PROP_PROFILES,
219       g_param_spec_flags ("profiles", "Profiles",
220           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
221           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222
223   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
224       g_param_spec_flags ("protocols", "Protocols",
225           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
226           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
229       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
230       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
231       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
232
233   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
234       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
235       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
236       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
237
238   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
239
240   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
241 }
242
243 static void
244 gst_rtsp_stream_init (GstRTSPStream * stream)
245 {
246   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
247
248   GST_DEBUG ("new stream %p", stream);
249
250   stream->priv = priv;
251
252   priv->dscp_qos = -1;
253   priv->control = g_strdup (DEFAULT_CONTROL);
254   priv->profiles = DEFAULT_PROFILES;
255   priv->protocols = DEFAULT_PROTOCOLS;
256   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
257
258   g_mutex_init (&priv->lock);
259
260   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
261       NULL, (GDestroyNotify) gst_caps_unref);
262   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
263       (GDestroyNotify) gst_caps_unref);
264 }
265
266 static void
267 gst_rtsp_stream_finalize (GObject * obj)
268 {
269   GstRTSPStream *stream;
270   GstRTSPStreamPrivate *priv;
271
272   stream = GST_RTSP_STREAM (obj);
273   priv = stream->priv;
274
275   GST_DEBUG ("finalize stream %p", stream);
276
277   /* we really need to be unjoined now */
278   g_return_if_fail (priv->joined_bin == NULL);
279
280   if (priv->mcast_addr_v4)
281     gst_rtsp_address_free (priv->mcast_addr_v4);
282   if (priv->mcast_addr_v6)
283     gst_rtsp_address_free (priv->mcast_addr_v6);
284   if (priv->server_addr_v4)
285     gst_rtsp_address_free (priv->server_addr_v4);
286   if (priv->server_addr_v6)
287     gst_rtsp_address_free (priv->server_addr_v6);
288   if (priv->pool)
289     g_object_unref (priv->pool);
290   if (priv->rtxsend)
291     g_object_unref (priv->rtxsend);
292
293   g_free (priv->multicast_iface);
294
295   gst_object_unref (priv->payloader);
296   if (priv->srcpad)
297     gst_object_unref (priv->srcpad);
298   if (priv->sinkpad)
299     gst_object_unref (priv->sinkpad);
300   g_free (priv->control);
301   g_mutex_clear (&priv->lock);
302
303   g_hash_table_unref (priv->keys);
304   g_hash_table_destroy (priv->ptmap);
305
306   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
307 }
308
309 static void
310 gst_rtsp_stream_get_property (GObject * object, guint propid,
311     GValue * value, GParamSpec * pspec)
312 {
313   GstRTSPStream *stream = GST_RTSP_STREAM (object);
314
315   switch (propid) {
316     case PROP_CONTROL:
317       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
318       break;
319     case PROP_PROFILES:
320       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
321       break;
322     case PROP_PROTOCOLS:
323       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
324       break;
325     default:
326       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
327   }
328 }
329
330 static void
331 gst_rtsp_stream_set_property (GObject * object, guint propid,
332     const GValue * value, GParamSpec * pspec)
333 {
334   GstRTSPStream *stream = GST_RTSP_STREAM (object);
335
336   switch (propid) {
337     case PROP_CONTROL:
338       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
339       break;
340     case PROP_PROFILES:
341       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
342       break;
343     case PROP_PROTOCOLS:
344       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
345       break;
346     default:
347       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
348   }
349 }
350
351 /**
352  * gst_rtsp_stream_new:
353  * @idx: an index
354  * @pad: a #GstPad
355  * @payloader: a #GstElement
356  *
357  * Create a new media stream with index @idx that handles RTP data on
358  * @pad and has a payloader element @payloader if @pad is a source pad
359  * or a depayloader element @payloader if @pad is a sink pad.
360  *
361  * Returns: (transfer full): a new #GstRTSPStream
362  */
363 GstRTSPStream *
364 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
365 {
366   GstRTSPStreamPrivate *priv;
367   GstRTSPStream *stream;
368
369   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
370   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
371
372   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
373   priv = stream->priv;
374   priv->idx = idx;
375   priv->payloader = gst_object_ref (payloader);
376   if (GST_PAD_IS_SRC (pad))
377     priv->srcpad = gst_object_ref (pad);
378   else
379     priv->sinkpad = gst_object_ref (pad);
380
381   return stream;
382 }
383
384 /**
385  * gst_rtsp_stream_get_index:
386  * @stream: a #GstRTSPStream
387  *
388  * Get the stream index.
389  *
390  * Return: the stream index.
391  */
392 guint
393 gst_rtsp_stream_get_index (GstRTSPStream * stream)
394 {
395   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
396
397   return stream->priv->idx;
398 }
399
400 /**
401  * gst_rtsp_stream_get_pt:
402  * @stream: a #GstRTSPStream
403  *
404  * Get the stream payload type.
405  *
406  * Return: the stream payload type.
407  */
408 guint
409 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
410 {
411   GstRTSPStreamPrivate *priv;
412   guint pt;
413
414   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
415
416   priv = stream->priv;
417
418   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
419
420   return pt;
421 }
422
423 /**
424  * gst_rtsp_stream_get_srcpad:
425  * @stream: a #GstRTSPStream
426  *
427  * Get the srcpad associated with @stream.
428  *
429  * Returns: (transfer full): the srcpad. Unref after usage.
430  */
431 GstPad *
432 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
433 {
434   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
435
436   if (!stream->priv->srcpad)
437     return NULL;
438
439   return gst_object_ref (stream->priv->srcpad);
440 }
441
442 /**
443  * gst_rtsp_stream_get_sinkpad:
444  * @stream: a #GstRTSPStream
445  *
446  * Get the sinkpad associated with @stream.
447  *
448  * Returns: (transfer full): the sinkpad. Unref after usage.
449  */
450 GstPad *
451 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
452 {
453   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
454
455   if (!stream->priv->sinkpad)
456     return NULL;
457
458   return gst_object_ref (stream->priv->sinkpad);
459 }
460
461 /**
462  * gst_rtsp_stream_get_control:
463  * @stream: a #GstRTSPStream
464  *
465  * Get the control string to identify this stream.
466  *
467  * Returns: (transfer full): the control string. g_free() after usage.
468  */
469 gchar *
470 gst_rtsp_stream_get_control (GstRTSPStream * stream)
471 {
472   GstRTSPStreamPrivate *priv;
473   gchar *result;
474
475   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
476
477   priv = stream->priv;
478
479   g_mutex_lock (&priv->lock);
480   if ((result = g_strdup (priv->control)) == NULL)
481     result = g_strdup_printf ("stream=%u", priv->idx);
482   g_mutex_unlock (&priv->lock);
483
484   return result;
485 }
486
487 /**
488  * gst_rtsp_stream_set_control:
489  * @stream: a #GstRTSPStream
490  * @control: a control string
491  *
492  * Set the control string in @stream.
493  */
494 void
495 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
496 {
497   GstRTSPStreamPrivate *priv;
498
499   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
500
501   priv = stream->priv;
502
503   g_mutex_lock (&priv->lock);
504   g_free (priv->control);
505   priv->control = g_strdup (control);
506   g_mutex_unlock (&priv->lock);
507 }
508
509 /**
510  * gst_rtsp_stream_has_control:
511  * @stream: a #GstRTSPStream
512  * @control: a control string
513  *
514  * Check if @stream has the control string @control.
515  *
516  * Returns: %TRUE is @stream has @control as the control string
517  */
518 gboolean
519 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
520 {
521   GstRTSPStreamPrivate *priv;
522   gboolean res;
523
524   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
525
526   priv = stream->priv;
527
528   g_mutex_lock (&priv->lock);
529   if (priv->control)
530     res = (g_strcmp0 (priv->control, control) == 0);
531   else {
532     guint streamid;
533
534     if (sscanf (control, "stream=%u", &streamid) > 0)
535       res = (streamid == priv->idx);
536     else
537       res = FALSE;
538   }
539   g_mutex_unlock (&priv->lock);
540
541   return res;
542 }
543
544 /**
545  * gst_rtsp_stream_set_mtu:
546  * @stream: a #GstRTSPStream
547  * @mtu: a new MTU
548  *
549  * Configure the mtu in the payloader of @stream to @mtu.
550  */
551 void
552 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
553 {
554   GstRTSPStreamPrivate *priv;
555
556   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
557
558   priv = stream->priv;
559
560   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
561
562   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
563 }
564
565 /**
566  * gst_rtsp_stream_get_mtu:
567  * @stream: a #GstRTSPStream
568  *
569  * Get the configured MTU in the payloader of @stream.
570  *
571  * Returns: the MTU of the payloader.
572  */
573 guint
574 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
575 {
576   GstRTSPStreamPrivate *priv;
577   guint mtu;
578
579   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
580
581   priv = stream->priv;
582
583   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
584
585   return mtu;
586 }
587
588 /* Update the dscp qos property on the udp sinks */
589 static void
590 update_dscp_qos (GstRTSPStream * stream, GstElement * udpsink[2])
591 {
592   GstRTSPStreamPrivate *priv;
593
594   priv = stream->priv;
595
596   if (udpsink[0]) {
597     g_object_set (G_OBJECT (udpsink[0]), "qos-dscp", priv->dscp_qos, NULL);
598   }
599
600   if (udpsink[1]) {
601     g_object_set (G_OBJECT (udpsink[1]), "qos-dscp", priv->dscp_qos, NULL);
602   }
603 }
604
605 /**
606  * gst_rtsp_stream_set_dscp_qos:
607  * @stream: a #GstRTSPStream
608  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
609  *
610  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
611  */
612 void
613 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
614 {
615   GstRTSPStreamPrivate *priv;
616
617   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
618
619   priv = stream->priv;
620
621   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
622
623   if (dscp_qos < -1 || dscp_qos > 63) {
624     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
625     return;
626   }
627
628   priv->dscp_qos = dscp_qos;
629
630   update_dscp_qos (stream, priv->udpsink);
631 }
632
633 /**
634  * gst_rtsp_stream_get_dscp_qos:
635  * @stream: a #GstRTSPStream
636  *
637  * Get the configured DSCP QoS in of the outgoing sockets.
638  *
639  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
640  */
641 gint
642 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
643 {
644   GstRTSPStreamPrivate *priv;
645
646   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
647
648   priv = stream->priv;
649
650   return priv->dscp_qos;
651 }
652
653 /**
654  * gst_rtsp_stream_is_transport_supported:
655  * @stream: a #GstRTSPStream
656  * @transport: (transfer none): a #GstRTSPTransport
657  *
658  * Check if @transport can be handled by stream
659  *
660  * Returns: %TRUE if @transport can be handled by @stream.
661  */
662 gboolean
663 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
664     GstRTSPTransport * transport)
665 {
666   GstRTSPStreamPrivate *priv;
667
668   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
669
670   priv = stream->priv;
671
672   g_mutex_lock (&priv->lock);
673   if (transport->trans != GST_RTSP_TRANS_RTP)
674     goto unsupported_transmode;
675
676   if (!(transport->profile & priv->profiles))
677     goto unsupported_profile;
678
679   if (!(transport->lower_transport & priv->protocols))
680     goto unsupported_ltrans;
681
682   g_mutex_unlock (&priv->lock);
683
684   return TRUE;
685
686   /* ERRORS */
687 unsupported_transmode:
688   {
689     GST_DEBUG ("unsupported transport mode %d", transport->trans);
690     g_mutex_unlock (&priv->lock);
691     return FALSE;
692   }
693 unsupported_profile:
694   {
695     GST_DEBUG ("unsupported profile %d", transport->profile);
696     g_mutex_unlock (&priv->lock);
697     return FALSE;
698   }
699 unsupported_ltrans:
700   {
701     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
702     g_mutex_unlock (&priv->lock);
703     return FALSE;
704   }
705 }
706
707 /**
708  * gst_rtsp_stream_set_profiles:
709  * @stream: a #GstRTSPStream
710  * @profiles: the new profiles
711  *
712  * Configure the allowed profiles for @stream.
713  */
714 void
715 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
716 {
717   GstRTSPStreamPrivate *priv;
718
719   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
720
721   priv = stream->priv;
722
723   g_mutex_lock (&priv->lock);
724   priv->profiles = profiles;
725   g_mutex_unlock (&priv->lock);
726 }
727
728 /**
729  * gst_rtsp_stream_get_profiles:
730  * @stream: a #GstRTSPStream
731  *
732  * Get the allowed profiles of @stream.
733  *
734  * Returns: a #GstRTSPProfile
735  */
736 GstRTSPProfile
737 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
738 {
739   GstRTSPStreamPrivate *priv;
740   GstRTSPProfile res;
741
742   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
743
744   priv = stream->priv;
745
746   g_mutex_lock (&priv->lock);
747   res = priv->profiles;
748   g_mutex_unlock (&priv->lock);
749
750   return res;
751 }
752
753 /**
754  * gst_rtsp_stream_set_protocols:
755  * @stream: a #GstRTSPStream
756  * @protocols: the new flags
757  *
758  * Configure the allowed lower transport for @stream.
759  */
760 void
761 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
762     GstRTSPLowerTrans protocols)
763 {
764   GstRTSPStreamPrivate *priv;
765
766   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
767
768   priv = stream->priv;
769
770   g_mutex_lock (&priv->lock);
771   priv->protocols = protocols;
772   g_mutex_unlock (&priv->lock);
773 }
774
775 /**
776  * gst_rtsp_stream_get_protocols:
777  * @stream: a #GstRTSPStream
778  *
779  * Get the allowed protocols of @stream.
780  *
781  * Returns: a #GstRTSPLowerTrans
782  */
783 GstRTSPLowerTrans
784 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
785 {
786   GstRTSPStreamPrivate *priv;
787   GstRTSPLowerTrans res;
788
789   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
790       GST_RTSP_LOWER_TRANS_UNKNOWN);
791
792   priv = stream->priv;
793
794   g_mutex_lock (&priv->lock);
795   res = priv->protocols;
796   g_mutex_unlock (&priv->lock);
797
798   return res;
799 }
800
801 /**
802  * gst_rtsp_stream_set_address_pool:
803  * @stream: a #GstRTSPStream
804  * @pool: (transfer none): a #GstRTSPAddressPool
805  *
806  * configure @pool to be used as the address pool of @stream.
807  */
808 void
809 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
810     GstRTSPAddressPool * pool)
811 {
812   GstRTSPStreamPrivate *priv;
813   GstRTSPAddressPool *old;
814
815   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
816
817   priv = stream->priv;
818
819   GST_LOG_OBJECT (stream, "set address pool %p", pool);
820
821   g_mutex_lock (&priv->lock);
822   if ((old = priv->pool) != pool)
823     priv->pool = pool ? g_object_ref (pool) : NULL;
824   else
825     old = NULL;
826   g_mutex_unlock (&priv->lock);
827
828   if (old)
829     g_object_unref (old);
830 }
831
832 /**
833  * gst_rtsp_stream_get_address_pool:
834  * @stream: a #GstRTSPStream
835  *
836  * Get the #GstRTSPAddressPool used as the address pool of @stream.
837  *
838  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
839  * usage.
840  */
841 GstRTSPAddressPool *
842 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
843 {
844   GstRTSPStreamPrivate *priv;
845   GstRTSPAddressPool *result;
846
847   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
848
849   priv = stream->priv;
850
851   g_mutex_lock (&priv->lock);
852   if ((result = priv->pool))
853     g_object_ref (result);
854   g_mutex_unlock (&priv->lock);
855
856   return result;
857 }
858
859 /**
860  * gst_rtsp_stream_set_multicast_iface:
861  * @stream: a #GstRTSPStream
862  * @multicast_iface: (transfer none): a multicast interface
863  *
864  * configure @multicast_iface to be used for @stream.
865  */
866 void
867 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
868     const gchar * multicast_iface)
869 {
870   GstRTSPStreamPrivate *priv;
871   gchar *old;
872
873   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
874
875   priv = stream->priv;
876
877   GST_LOG_OBJECT (stream, "set multicast iface %s",
878       GST_STR_NULL (multicast_iface));
879
880   g_mutex_lock (&priv->lock);
881   if ((old = priv->multicast_iface) != multicast_iface)
882     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
883   else
884     old = NULL;
885   g_mutex_unlock (&priv->lock);
886
887   if (old)
888     g_free (old);
889 }
890
891 /**
892  * gst_rtsp_stream_get_multicast_iface:
893  * @stream: a #GstRTSPStream
894  *
895  * Get the multicast interface used for @stream.
896  *
897  * Returns: (transfer full): the multicast interface for @stream. g_free() after
898  * usage.
899  */
900 gchar *
901 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
902 {
903   GstRTSPStreamPrivate *priv;
904   gchar *result;
905
906   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
907
908   priv = stream->priv;
909
910   g_mutex_lock (&priv->lock);
911   if ((result = priv->multicast_iface))
912     result = g_strdup (result);
913   g_mutex_unlock (&priv->lock);
914
915   return result;
916 }
917
918 /**
919  * gst_rtsp_stream_get_multicast_address:
920  * @stream: a #GstRTSPStream
921  * @family: the #GSocketFamily
922  *
923  * Get the multicast address of @stream for @family. The original
924  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
925  * won't release the address from the pool.
926  *
927  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
928  * or %NULL when no address could be allocated. gst_rtsp_address_free()
929  * after usage.
930  */
931 GstRTSPAddress *
932 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
933     GSocketFamily family)
934 {
935   GstRTSPStreamPrivate *priv;
936   GstRTSPAddress *result;
937   GstRTSPAddress **addrp;
938   GstRTSPAddressFlags flags;
939
940   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
941
942   priv = stream->priv;
943
944   if (family == G_SOCKET_FAMILY_IPV6) {
945     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
946     addrp = &priv->mcast_addr_v6;
947   } else {
948     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
949     addrp = &priv->mcast_addr_v4;
950   }
951
952   g_mutex_lock (&priv->lock);
953   if (*addrp == NULL) {
954     if (priv->pool == NULL)
955       goto no_pool;
956
957     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
958
959     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
960     if (*addrp == NULL)
961       goto no_address;
962
963     /* FIXME: Also reserve the same port with unicast ANY address, since that's
964      * where we are going to bind our socket. Probably loop until we find a port
965      * available in both mcast and unicast pools. Maybe GstRTSPAddressPool
966      * should do it for us when both GST_RTSP_ADDRESS_FLAG_MULTICAST and
967      * GST_RTSP_ADDRESS_FLAG_UNICAST are givent. */
968   }
969   result = gst_rtsp_address_copy (*addrp);
970   g_mutex_unlock (&priv->lock);
971
972   return result;
973
974   /* ERRORS */
975 no_pool:
976   {
977     GST_ERROR_OBJECT (stream, "no address pool specified");
978     g_mutex_unlock (&priv->lock);
979     return NULL;
980   }
981 no_address:
982   {
983     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
984     g_mutex_unlock (&priv->lock);
985     return NULL;
986   }
987 }
988
989 /**
990  * gst_rtsp_stream_reserve_address:
991  * @stream: a #GstRTSPStream
992  * @address: an address
993  * @port: a port
994  * @n_ports: n_ports
995  * @ttl: a TTL
996  *
997  * Reserve @address and @port as the address and port of @stream. The original
998  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
999  * won't release the address from the pool.
1000  *
1001  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1002  * the address could be reserved. gst_rtsp_address_free() after usage.
1003  */
1004 GstRTSPAddress *
1005 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1006     const gchar * address, guint port, guint n_ports, guint ttl)
1007 {
1008   GstRTSPStreamPrivate *priv;
1009   GstRTSPAddress *result;
1010   GInetAddress *addr;
1011   GSocketFamily family;
1012   GstRTSPAddress **addrp;
1013
1014   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1015   g_return_val_if_fail (address != NULL, NULL);
1016   g_return_val_if_fail (port > 0, NULL);
1017   g_return_val_if_fail (n_ports > 0, NULL);
1018   g_return_val_if_fail (ttl > 0, NULL);
1019
1020   priv = stream->priv;
1021
1022   addr = g_inet_address_new_from_string (address);
1023   if (!addr) {
1024     GST_ERROR ("failed to get inet addr from %s", address);
1025     family = G_SOCKET_FAMILY_IPV4;
1026   } else {
1027     family = g_inet_address_get_family (addr);
1028     g_object_unref (addr);
1029   }
1030
1031   if (family == G_SOCKET_FAMILY_IPV6)
1032     addrp = &priv->mcast_addr_v6;
1033   else
1034     addrp = &priv->mcast_addr_v4;
1035
1036   g_mutex_lock (&priv->lock);
1037   if (*addrp == NULL) {
1038     GstRTSPAddressPoolResult res;
1039
1040     if (priv->pool == NULL)
1041       goto no_pool;
1042
1043     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1044         port, n_ports, ttl, addrp);
1045     if (res != GST_RTSP_ADDRESS_POOL_OK)
1046       goto no_address;
1047
1048     /* FIXME: Also reserve the same port with unicast ANY address, since that's
1049      * where we are going to bind our socket. */
1050   } else {
1051     if (strcmp ((*addrp)->address, address) ||
1052         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1053         (*addrp)->ttl != ttl)
1054       goto different_address;
1055   }
1056   result = gst_rtsp_address_copy (*addrp);
1057   g_mutex_unlock (&priv->lock);
1058
1059   return result;
1060
1061   /* ERRORS */
1062 no_pool:
1063   {
1064     GST_ERROR_OBJECT (stream, "no address pool specified");
1065     g_mutex_unlock (&priv->lock);
1066     return NULL;
1067   }
1068 no_address:
1069   {
1070     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1071         address);
1072     g_mutex_unlock (&priv->lock);
1073     return NULL;
1074   }
1075 different_address:
1076   {
1077     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1078         " reserved", address);
1079     g_mutex_unlock (&priv->lock);
1080     return NULL;
1081   }
1082 }
1083
1084 /* must be called with lock */
1085 static void
1086 set_sockets_for_udpsinks (GstElement * udpsink[2], GSocket * rtp_socket,
1087     GSocket * rtcp_socket, GSocketFamily family)
1088 {
1089   const gchar *multisink_socket;
1090
1091   if (family == G_SOCKET_FAMILY_IPV6)
1092     multisink_socket = "socket-v6";
1093   else
1094     multisink_socket = "socket";
1095
1096   g_object_set (G_OBJECT (udpsink[0]), multisink_socket, rtp_socket, NULL);
1097   g_object_set (G_OBJECT (udpsink[1]), multisink_socket, rtcp_socket, NULL);
1098 }
1099
1100 static gboolean
1101 create_and_configure_udpsinks (GstRTSPStream * stream, GstElement * udpsink[2])
1102 {
1103   GstRTSPStreamPrivate *priv = stream->priv;
1104   GstElement *udpsink0, *udpsink1;
1105
1106   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1107   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1108
1109   if (!udpsink0 || !udpsink1)
1110     goto no_udp_protocol;
1111
1112   /* configure sinks */
1113
1114   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1115   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1116
1117   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1118   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1119
1120   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1121
1122   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1123   /* Needs to be async for RECORD streams, otherwise we will never go to
1124    * PLAYING because the sinks will wait for data while the udpsrc can't
1125    * provide data with timestamps in PAUSED. */
1126   if (priv->sinkpad)
1127     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1128   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1129
1130   /* join multicast group when adding clients, so we'll start receiving from it.
1131    * We cannot rely on the udpsrc to join the group since its socket is always a
1132    * local unicast one. */
1133   g_object_set (G_OBJECT (udpsink0), "auto-multicast", TRUE, NULL);
1134   g_object_set (G_OBJECT (udpsink1), "auto-multicast", TRUE, NULL);
1135
1136   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1137   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1138
1139   udpsink[0] = udpsink0;
1140   udpsink[1] = udpsink1;
1141
1142   /* update the dscp qos field in the sinks */
1143   update_dscp_qos (stream, udpsink);
1144
1145   return TRUE;
1146
1147   /* ERRORS */
1148 no_udp_protocol:
1149   {
1150     return FALSE;
1151   }
1152 }
1153
1154 /* must be called with lock */
1155 static gboolean
1156 create_and_configure_udpsources (GstElement * udpsrc_out[2],
1157     GSocket * rtp_socket, GSocket * rtcp_socket)
1158 {
1159   GstStateChangeReturn ret;
1160
1161   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1162   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1163
1164   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1165     goto error;
1166
1167   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1168   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1169
1170   /* The udpsrc cannot do the join because its socket is always a local unicast
1171    * one. The udpsink sharing the same socket will do it for us. */
1172   g_object_set (G_OBJECT (udpsrc_out[0]), "auto-multicast", FALSE, NULL);
1173   g_object_set (G_OBJECT (udpsrc_out[1]), "auto-multicast", FALSE, NULL);
1174
1175   g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1176   g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1177
1178   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1179   if (ret == GST_STATE_CHANGE_FAILURE)
1180     goto error;
1181   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1182   if (ret == GST_STATE_CHANGE_FAILURE)
1183     goto error;
1184
1185   return TRUE;
1186
1187   /* ERRORS */
1188 error:
1189   {
1190     if (udpsrc_out[0]) {
1191       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1192       g_clear_object (&udpsrc_out[0]);
1193     }
1194     if (udpsrc_out[1]) {
1195       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1196       g_clear_object (&udpsrc_out[1]);
1197     }
1198     return FALSE;
1199   }
1200 }
1201
1202 static gboolean
1203 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1204     GstElement * udpsrc_out[2], GstElement * udpsink_out[2],
1205     GstRTSPRange * server_port_out, GstRTSPAddress ** server_addr_out)
1206 {
1207   GstRTSPStreamPrivate *priv = stream->priv;
1208   GSocket *rtp_socket = NULL;
1209   GSocket *rtcp_socket;
1210   gint tmp_rtp, tmp_rtcp;
1211   guint count;
1212   gint rtpport, rtcpport;
1213   GList *rejected_addresses = NULL;
1214   GstRTSPAddress *addr = NULL;
1215   GInetAddress *inetaddr = NULL;
1216   gchar *addr_str;
1217   GSocketAddress *rtp_sockaddr = NULL;
1218   GSocketAddress *rtcp_sockaddr = NULL;
1219   GstRTSPAddressPool *pool;
1220
1221   g_assert (!udpsrc_out[0]);
1222   g_assert (!udpsrc_out[1]);
1223   g_assert ((!udpsink_out[0] && !udpsink_out[1]) ||
1224       (udpsink_out[0] && udpsink_out[1]));
1225   g_assert (*server_addr_out == NULL);
1226
1227   pool = priv->pool;
1228   count = 0;
1229
1230   /* Start with random port */
1231   tmp_rtp = 0;
1232
1233   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1234       G_SOCKET_PROTOCOL_UDP, NULL);
1235   if (!rtcp_socket)
1236     goto no_udp_protocol;
1237   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1238
1239   /* try to allocate 2 UDP ports, the RTP port should be an even
1240    * number and the RTCP port should be the next (uneven) port */
1241 again:
1242
1243   if (rtp_socket == NULL) {
1244     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1245         G_SOCKET_PROTOCOL_UDP, NULL);
1246     if (!rtp_socket)
1247       goto no_udp_protocol;
1248     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1249   }
1250
1251   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1252     GstRTSPAddressFlags flags;
1253
1254     if (addr)
1255       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1256
1257     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1258     if (family == G_SOCKET_FAMILY_IPV6)
1259       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1260     else
1261       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1262
1263     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1264
1265     if (addr == NULL)
1266       goto no_ports;
1267
1268     tmp_rtp = addr->port;
1269
1270     g_clear_object (&inetaddr);
1271     inetaddr = g_inet_address_new_from_string (addr->address);
1272   } else {
1273     if (tmp_rtp != 0) {
1274       tmp_rtp += 2;
1275       if (++count > 20)
1276         goto no_ports;
1277     }
1278
1279     if (inetaddr == NULL)
1280       inetaddr = g_inet_address_new_any (family);
1281   }
1282
1283   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1284   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1285     g_object_unref (rtp_sockaddr);
1286     goto again;
1287   }
1288   g_object_unref (rtp_sockaddr);
1289
1290   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1291   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1292     g_clear_object (&rtp_sockaddr);
1293     goto socket_error;
1294   }
1295
1296   tmp_rtp =
1297       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1298   g_object_unref (rtp_sockaddr);
1299
1300   /* check if port is even */
1301   if ((tmp_rtp & 1) != 0) {
1302     /* port not even, close and allocate another */
1303     tmp_rtp++;
1304     g_clear_object (&rtp_socket);
1305     goto again;
1306   }
1307
1308   /* set port */
1309   tmp_rtcp = tmp_rtp + 1;
1310
1311   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1312   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1313     g_object_unref (rtcp_sockaddr);
1314     g_clear_object (&rtp_socket);
1315     goto again;
1316   }
1317   g_object_unref (rtcp_sockaddr);
1318
1319   if (addr == NULL)
1320     addr_str = g_inet_address_to_string (inetaddr);
1321   else
1322     addr_str = addr->address;
1323   g_clear_object (&inetaddr);
1324
1325   if (!create_and_configure_udpsources (udpsrc_out, rtp_socket, rtcp_socket)) {
1326     if (addr == NULL)
1327       g_free (addr_str);
1328     goto no_udp_protocol;
1329   }
1330
1331   if (addr == NULL)
1332     g_free (addr_str);
1333
1334   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1335   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1336
1337   /* this should not happen... */
1338   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1339     goto port_error;
1340
1341   server_port_out->min = rtpport;
1342   server_port_out->max = rtcpport;
1343
1344   /* This function is called twice (for v4 and v6) but we create only one pair
1345    * of udpsinks. */
1346   if (!udpsink_out[0]
1347       && !create_and_configure_udpsinks (stream, udpsink_out))
1348     goto no_udp_protocol;
1349
1350   set_sockets_for_udpsinks (udpsink_out, rtp_socket, rtcp_socket, family);
1351
1352   *server_addr_out = addr;
1353   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1354
1355   g_object_unref (rtp_socket);
1356   g_object_unref (rtcp_socket);
1357
1358   return TRUE;
1359
1360   /* ERRORS */
1361 no_udp_protocol:
1362   {
1363     goto cleanup;
1364   }
1365 no_ports:
1366   {
1367     goto cleanup;
1368   }
1369 port_error:
1370   {
1371     goto cleanup;
1372   }
1373 socket_error:
1374   {
1375     goto cleanup;
1376   }
1377 cleanup:
1378   {
1379     if (inetaddr)
1380       g_object_unref (inetaddr);
1381     g_list_free_full (rejected_addresses,
1382         (GDestroyNotify) gst_rtsp_address_free);
1383     if (addr)
1384       gst_rtsp_address_free (addr);
1385     if (rtp_socket)
1386       g_object_unref (rtp_socket);
1387     if (rtcp_socket)
1388       g_object_unref (rtcp_socket);
1389     return FALSE;
1390   }
1391 }
1392
1393 /**
1394  * gst_rtsp_stream_allocate_udp_sockets:
1395  * @stream: a #GstRTSPStream
1396  * @family: protocol family
1397  * @transport_method: transport method
1398  *
1399  * Allocates RTP and RTCP ports.
1400  *
1401  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1402  * Deprecated: This function shouldn't have been made public
1403  */
1404 gboolean
1405 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1406     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1407 {
1408   g_warn_if_reached ();
1409   return FALSE;
1410 }
1411
1412 /**
1413  * gst_rtsp_stream_set_client_side:
1414  * @stream: a #GstRTSPStream
1415  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1416  * an RTSP connection.
1417  *
1418  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1419  * streams to an RTSP server via RECORD. This has the practical effect
1420  * of changing which UDP port numbers are used when setting up the local
1421  * side of the stream sending to be either the 'server' or 'client' pair
1422  * of a configured UDP transport.
1423  */
1424 void
1425 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1426 {
1427   GstRTSPStreamPrivate *priv;
1428
1429   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1430   priv = stream->priv;
1431   g_mutex_lock (&priv->lock);
1432   priv->client_side = client_side;
1433   g_mutex_unlock (&priv->lock);
1434 }
1435
1436 /**
1437  * gst_rtsp_stream_is_client_side:
1438  * @stream: a #GstRTSPStream
1439  *
1440  * See gst_rtsp_stream_set_client_side()
1441  *
1442  * Returns: TRUE if this #GstRTSPStream is client-side.
1443  */
1444 gboolean
1445 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1446 {
1447   GstRTSPStreamPrivate *priv;
1448   gboolean ret;
1449
1450   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1451
1452   priv = stream->priv;
1453   g_mutex_lock (&priv->lock);
1454   ret = priv->client_side;
1455   g_mutex_unlock (&priv->lock);
1456
1457   return ret;
1458 }
1459
1460 /* must be called with lock */
1461 static gboolean
1462 alloc_ports (GstRTSPStream * stream)
1463 {
1464   GstRTSPStreamPrivate *priv = stream->priv;
1465   gboolean ret = TRUE;
1466
1467   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
1468       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1469     ret = alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1470         priv->udpsrc_v4, priv->udpsink,
1471         &priv->server_port_v4, &priv->server_addr_v4);
1472
1473     ret |= alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1474         priv->udpsrc_v6, priv->udpsink,
1475         &priv->server_port_v6, &priv->server_addr_v6);
1476   }
1477
1478   return ret;
1479 }
1480
1481 /**
1482  * gst_rtsp_stream_get_server_port:
1483  * @stream: a #GstRTSPStream
1484  * @server_port: (out): result server port
1485  * @family: the port family to get
1486  *
1487  * Fill @server_port with the port pair used by the server. This function can
1488  * only be called when @stream has been joined.
1489  */
1490 void
1491 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1492     GstRTSPRange * server_port, GSocketFamily family)
1493 {
1494   GstRTSPStreamPrivate *priv;
1495
1496   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1497   priv = stream->priv;
1498   g_return_if_fail (priv->joined_bin != NULL);
1499
1500   g_mutex_lock (&priv->lock);
1501   if (family == G_SOCKET_FAMILY_IPV4) {
1502     if (server_port)
1503       *server_port = priv->server_port_v4;
1504   } else {
1505     if (server_port)
1506       *server_port = priv->server_port_v6;
1507   }
1508   g_mutex_unlock (&priv->lock);
1509 }
1510
1511 /**
1512  * gst_rtsp_stream_get_rtpsession:
1513  * @stream: a #GstRTSPStream
1514  *
1515  * Get the RTP session of this stream.
1516  *
1517  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1518  */
1519 GObject *
1520 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1521 {
1522   GstRTSPStreamPrivate *priv;
1523   GObject *session;
1524
1525   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1526
1527   priv = stream->priv;
1528
1529   g_mutex_lock (&priv->lock);
1530   if ((session = priv->session))
1531     g_object_ref (session);
1532   g_mutex_unlock (&priv->lock);
1533
1534   return session;
1535 }
1536
1537 /**
1538  * gst_rtsp_stream_get_encoder:
1539  * @stream: a #GstRTSPStream
1540  *
1541  * Get the SRTP encoder for this stream.
1542  *
1543  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1544  */
1545 GstElement *
1546 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1547 {
1548   GstRTSPStreamPrivate *priv;
1549   GstElement *encoder;
1550
1551   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1552
1553   priv = stream->priv;
1554
1555   g_mutex_lock (&priv->lock);
1556   if ((encoder = priv->srtpenc))
1557     g_object_ref (encoder);
1558   g_mutex_unlock (&priv->lock);
1559
1560   return encoder;
1561 }
1562
1563 /**
1564  * gst_rtsp_stream_get_ssrc:
1565  * @stream: a #GstRTSPStream
1566  * @ssrc: (out): result ssrc
1567  *
1568  * Get the SSRC used by the RTP session of this stream. This function can only
1569  * be called when @stream has been joined.
1570  */
1571 void
1572 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1573 {
1574   GstRTSPStreamPrivate *priv;
1575
1576   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1577   priv = stream->priv;
1578   g_return_if_fail (priv->joined_bin != NULL);
1579
1580   g_mutex_lock (&priv->lock);
1581   if (ssrc && priv->session)
1582     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1583   g_mutex_unlock (&priv->lock);
1584 }
1585
1586 /**
1587  * gst_rtsp_stream_set_retransmission_time:
1588  * @stream: a #GstRTSPStream
1589  * @time: a #GstClockTime
1590  *
1591  * Set the amount of time to store retransmission packets.
1592  */
1593 void
1594 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1595     GstClockTime time)
1596 {
1597   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1598
1599   g_mutex_lock (&stream->priv->lock);
1600   stream->priv->rtx_time = time;
1601   if (stream->priv->rtxsend)
1602     g_object_set (stream->priv->rtxsend, "max-size-time",
1603         GST_TIME_AS_MSECONDS (time), NULL);
1604   g_mutex_unlock (&stream->priv->lock);
1605 }
1606
1607 /**
1608  * gst_rtsp_stream_get_retransmission_time:
1609  * @stream: a #GstRTSPStream
1610  *
1611  * Get the amount of time to store retransmission data.
1612  *
1613  * Returns: the amount of time to store retransmission data.
1614  */
1615 GstClockTime
1616 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1617 {
1618   GstClockTime ret;
1619
1620   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1621
1622   g_mutex_lock (&stream->priv->lock);
1623   ret = stream->priv->rtx_time;
1624   g_mutex_unlock (&stream->priv->lock);
1625
1626   return ret;
1627 }
1628
1629 /**
1630  * gst_rtsp_stream_set_retransmission_pt:
1631  * @stream: a #GstRTSPStream
1632  * @rtx_pt: a #guint
1633  *
1634  * Set the payload type (pt) for retransmission of this stream.
1635  */
1636 void
1637 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1638 {
1639   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1640
1641   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1642
1643   g_mutex_lock (&stream->priv->lock);
1644   stream->priv->rtx_pt = rtx_pt;
1645   if (stream->priv->rtxsend) {
1646     guint pt = gst_rtsp_stream_get_pt (stream);
1647     gchar *pt_s = g_strdup_printf ("%d", pt);
1648     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1649         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1650     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1651     g_free (pt_s);
1652     gst_structure_free (rtx_pt_map);
1653   }
1654   g_mutex_unlock (&stream->priv->lock);
1655 }
1656
1657 /**
1658  * gst_rtsp_stream_get_retransmission_pt:
1659  * @stream: a #GstRTSPStream
1660  *
1661  * Get the payload-type used for retransmission of this stream
1662  *
1663  * Returns: The retransmission PT.
1664  */
1665 guint
1666 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1667 {
1668   guint rtx_pt;
1669
1670   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1671
1672   g_mutex_lock (&stream->priv->lock);
1673   rtx_pt = stream->priv->rtx_pt;
1674   g_mutex_unlock (&stream->priv->lock);
1675
1676   return rtx_pt;
1677 }
1678
1679 /**
1680  * gst_rtsp_stream_set_buffer_size:
1681  * @stream: a #GstRTSPStream
1682  * @size: the buffer size
1683  *
1684  * Set the size of the UDP transmission buffer (in bytes)
1685  * Needs to be set before the stream is joined to a bin.
1686  *
1687  * Since: 1.6
1688  */
1689 void
1690 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1691 {
1692   g_mutex_lock (&stream->priv->lock);
1693   stream->priv->buffer_size = size;
1694   g_mutex_unlock (&stream->priv->lock);
1695 }
1696
1697 /**
1698  * gst_rtsp_stream_get_buffer_size:
1699  * @stream: a #GstRTSPStream
1700  *
1701  * Get the size of the UDP transmission buffer (in bytes)
1702  *
1703  * Returns: the size of the UDP TX buffer
1704  *
1705  * Since: 1.6
1706  */
1707 guint
1708 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1709 {
1710   guint buffer_size;
1711
1712   g_mutex_lock (&stream->priv->lock);
1713   buffer_size = stream->priv->buffer_size;
1714   g_mutex_unlock (&stream->priv->lock);
1715
1716   return buffer_size;
1717 }
1718
1719 /* executed from streaming thread */
1720 static void
1721 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1722 {
1723   GstRTSPStreamPrivate *priv = stream->priv;
1724   GstCaps *newcaps, *oldcaps;
1725
1726   newcaps = gst_pad_get_current_caps (pad);
1727
1728   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1729       newcaps);
1730
1731   g_mutex_lock (&priv->lock);
1732   oldcaps = priv->caps;
1733   priv->caps = newcaps;
1734   g_mutex_unlock (&priv->lock);
1735
1736   if (oldcaps)
1737     gst_caps_unref (oldcaps);
1738 }
1739
1740 static void
1741 dump_structure (const GstStructure * s)
1742 {
1743   gchar *sstr;
1744
1745   sstr = gst_structure_to_string (s);
1746   GST_INFO ("structure: %s", sstr);
1747   g_free (sstr);
1748 }
1749
1750 static GstRTSPStreamTransport *
1751 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1752 {
1753   GstRTSPStreamPrivate *priv = stream->priv;
1754   GList *walk;
1755   GstRTSPStreamTransport *result = NULL;
1756   const gchar *tmp;
1757   gchar *dest;
1758   guint port;
1759
1760   if (rtcp_from == NULL)
1761     return NULL;
1762
1763   tmp = g_strrstr (rtcp_from, ":");
1764   if (tmp == NULL)
1765     return NULL;
1766
1767   port = atoi (tmp + 1);
1768   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1769
1770   g_mutex_lock (&priv->lock);
1771   GST_INFO ("finding %s:%d in %d transports", dest, port,
1772       g_list_length (priv->transports));
1773
1774   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1775     GstRTSPStreamTransport *trans = walk->data;
1776     const GstRTSPTransport *tr;
1777     gint min, max;
1778
1779     tr = gst_rtsp_stream_transport_get_transport (trans);
1780
1781     if (priv->client_side) {
1782       /* In client side mode the 'destination' is the RTSP server, so send
1783        * to those ports */
1784       min = tr->server_port.min;
1785       max = tr->server_port.max;
1786     } else {
1787       min = tr->client_port.min;
1788       max = tr->client_port.max;
1789     }
1790
1791     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1792       result = trans;
1793       break;
1794     }
1795   }
1796   if (result)
1797     g_object_ref (result);
1798   g_mutex_unlock (&priv->lock);
1799
1800   g_free (dest);
1801
1802   return result;
1803 }
1804
1805 static GstRTSPStreamTransport *
1806 check_transport (GObject * source, GstRTSPStream * stream)
1807 {
1808   GstStructure *stats;
1809   GstRTSPStreamTransport *trans;
1810
1811   /* see if we have a stream to match with the origin of the RTCP packet */
1812   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1813   if (trans == NULL) {
1814     g_object_get (source, "stats", &stats, NULL);
1815     if (stats) {
1816       const gchar *rtcp_from;
1817
1818       dump_structure (stats);
1819
1820       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1821       if ((trans = find_transport (stream, rtcp_from))) {
1822         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1823             source);
1824         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1825             g_object_unref);
1826       }
1827       gst_structure_free (stats);
1828     }
1829   }
1830   return trans;
1831 }
1832
1833
1834 static void
1835 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1836 {
1837   GstRTSPStreamTransport *trans;
1838
1839   GST_INFO ("%p: new source %p", stream, source);
1840
1841   trans = check_transport (source, stream);
1842
1843   if (trans)
1844     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1845 }
1846
1847 static void
1848 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1849 {
1850   GST_INFO ("%p: new SDES %p", stream, source);
1851 }
1852
1853 static void
1854 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1855 {
1856   GstRTSPStreamTransport *trans;
1857
1858   trans = check_transport (source, stream);
1859
1860   if (trans) {
1861     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1862     gst_rtsp_stream_transport_keep_alive (trans);
1863   }
1864 #ifdef DUMP_STATS
1865   {
1866     GstStructure *stats;
1867     g_object_get (source, "stats", &stats, NULL);
1868     if (stats) {
1869       dump_structure (stats);
1870       gst_structure_free (stats);
1871     }
1872   }
1873 #endif
1874 }
1875
1876 static void
1877 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1878 {
1879   GST_INFO ("%p: source %p bye", stream, source);
1880 }
1881
1882 static void
1883 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1884 {
1885   GstRTSPStreamTransport *trans;
1886
1887   GST_INFO ("%p: source %p bye timeout", stream, source);
1888
1889   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1890     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1891     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1892   }
1893 }
1894
1895 static void
1896 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1897 {
1898   GstRTSPStreamTransport *trans;
1899
1900   GST_INFO ("%p: source %p timeout", stream, source);
1901
1902   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1903     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1904     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1905   }
1906 }
1907
1908 static void
1909 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1910 {
1911   GST_INFO ("%p: new sender source %p", stream, source);
1912 #ifndef DUMP_STATS
1913   {
1914     GstStructure *stats;
1915     g_object_get (source, "stats", &stats, NULL);
1916     if (stats) {
1917       dump_structure (stats);
1918       gst_structure_free (stats);
1919     }
1920   }
1921 #endif
1922 }
1923
1924 static void
1925 on_sender_ssrc_active (GObject * session, GObject * source,
1926     GstRTSPStream * stream)
1927 {
1928 #ifndef DUMP_STATS
1929   {
1930     GstStructure *stats;
1931     g_object_get (source, "stats", &stats, NULL);
1932     if (stats) {
1933       dump_structure (stats);
1934       gst_structure_free (stats);
1935     }
1936   }
1937 #endif
1938 }
1939
1940 static void
1941 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1942 {
1943   if (is_rtp) {
1944     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1945     g_list_free (priv->tr_cache_rtp);
1946     priv->tr_cache_rtp = NULL;
1947   } else {
1948     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1949     g_list_free (priv->tr_cache_rtcp);
1950     priv->tr_cache_rtcp = NULL;
1951   }
1952 }
1953
1954 static GstFlowReturn
1955 handle_new_sample (GstAppSink * sink, gpointer user_data)
1956 {
1957   GstRTSPStreamPrivate *priv;
1958   GList *walk;
1959   GstSample *sample;
1960   GstBuffer *buffer;
1961   GstRTSPStream *stream;
1962   gboolean is_rtp;
1963
1964   sample = gst_app_sink_pull_sample (sink);
1965   if (!sample)
1966     return GST_FLOW_OK;
1967
1968   stream = (GstRTSPStream *) user_data;
1969   priv = stream->priv;
1970   buffer = gst_sample_get_buffer (sample);
1971
1972   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1973
1974   g_mutex_lock (&priv->lock);
1975   if (is_rtp) {
1976     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1977       clear_tr_cache (priv, is_rtp);
1978       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1979         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1980         priv->tr_cache_rtp =
1981             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1982       }
1983       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1984     }
1985   } else {
1986     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1987       clear_tr_cache (priv, is_rtp);
1988       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1989         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1990         priv->tr_cache_rtcp =
1991             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1992       }
1993       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1994     }
1995   }
1996   g_mutex_unlock (&priv->lock);
1997
1998   if (is_rtp) {
1999     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2000       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2001       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2002     }
2003   } else {
2004     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2005       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2006       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2007     }
2008   }
2009   gst_sample_unref (sample);
2010
2011   return GST_FLOW_OK;
2012 }
2013
2014 static GstAppSinkCallbacks sink_cb = {
2015   NULL,                         /* not interested in EOS */
2016   NULL,                         /* not interested in preroll samples */
2017   handle_new_sample,
2018 };
2019
2020 static GstElement *
2021 get_rtp_encoder (GstRTSPStream * stream, guint session)
2022 {
2023   GstRTSPStreamPrivate *priv = stream->priv;
2024
2025   if (priv->srtpenc == NULL) {
2026     gchar *name;
2027
2028     name = g_strdup_printf ("srtpenc_%u", session);
2029     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2030     g_free (name);
2031
2032     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2033   }
2034   return gst_object_ref (priv->srtpenc);
2035 }
2036
2037 static GstElement *
2038 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2039 {
2040   GstRTSPStreamPrivate *priv = stream->priv;
2041   GstElement *oldenc, *enc;
2042   GstPad *pad;
2043   gchar *name;
2044
2045   if (priv->idx != session)
2046     return NULL;
2047
2048   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2049
2050   oldenc = priv->srtpenc;
2051   enc = get_rtp_encoder (stream, session);
2052   name = g_strdup_printf ("rtp_sink_%d", session);
2053   pad = gst_element_get_request_pad (enc, name);
2054   g_free (name);
2055   gst_object_unref (pad);
2056
2057   if (oldenc == NULL)
2058     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2059         enc);
2060
2061   return enc;
2062 }
2063
2064 static GstElement *
2065 request_rtcp_encoder (GstElement * rtpbin, guint session,
2066     GstRTSPStream * stream)
2067 {
2068   GstRTSPStreamPrivate *priv = stream->priv;
2069   GstElement *oldenc, *enc;
2070   GstPad *pad;
2071   gchar *name;
2072
2073   if (priv->idx != session)
2074     return NULL;
2075
2076   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2077
2078   oldenc = priv->srtpenc;
2079   enc = get_rtp_encoder (stream, session);
2080   name = g_strdup_printf ("rtcp_sink_%d", session);
2081   pad = gst_element_get_request_pad (enc, name);
2082   g_free (name);
2083   gst_object_unref (pad);
2084
2085   if (oldenc == NULL)
2086     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2087         enc);
2088
2089   return enc;
2090 }
2091
2092 static GstCaps *
2093 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2094 {
2095   GstRTSPStreamPrivate *priv = stream->priv;
2096   GstCaps *caps;
2097
2098   GST_DEBUG ("request key %08x", ssrc);
2099
2100   g_mutex_lock (&priv->lock);
2101   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2102     gst_caps_ref (caps);
2103   g_mutex_unlock (&priv->lock);
2104
2105   return caps;
2106 }
2107
2108 static GstElement *
2109 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2110     GstRTSPStream * stream)
2111 {
2112   GstRTSPStreamPrivate *priv = stream->priv;
2113
2114   if (priv->idx != session)
2115     return NULL;
2116
2117   if (priv->srtpdec == NULL) {
2118     gchar *name;
2119
2120     name = g_strdup_printf ("srtpdec_%u", session);
2121     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2122     g_free (name);
2123
2124     g_signal_connect (priv->srtpdec, "request-key",
2125         (GCallback) request_key, stream);
2126   }
2127   return gst_object_ref (priv->srtpdec);
2128 }
2129
2130 /**
2131  * gst_rtsp_stream_request_aux_sender:
2132  * @stream: a #GstRTSPStream
2133  * @sessid: the session id
2134  *
2135  * Creating a rtxsend bin
2136  *
2137  * Returns: (transfer full): a #GstElement.
2138  *
2139  * Since: 1.6
2140  */
2141 GstElement *
2142 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2143 {
2144   GstElement *bin;
2145   GstPad *pad;
2146   GstStructure *pt_map;
2147   gchar *name;
2148   guint pt, rtx_pt;
2149   gchar *pt_s;
2150
2151   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2152
2153   pt = gst_rtsp_stream_get_pt (stream);
2154   pt_s = g_strdup_printf ("%u", pt);
2155   rtx_pt = stream->priv->rtx_pt;
2156
2157   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2158
2159   bin = gst_bin_new (NULL);
2160   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2161   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2162       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2163   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2164       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2165   g_free (pt_s);
2166   gst_structure_free (pt_map);
2167   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2168
2169   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2170   name = g_strdup_printf ("src_%u", sessid);
2171   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2172   g_free (name);
2173   gst_object_unref (pad);
2174
2175   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2176   name = g_strdup_printf ("sink_%u", sessid);
2177   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2178   g_free (name);
2179   gst_object_unref (pad);
2180
2181   return bin;
2182 }
2183
2184 /**
2185  * gst_rtsp_stream_set_pt_map:
2186  * @stream: a #GstRTSPStream
2187  * @pt: the pt
2188  * @caps: a #GstCaps
2189  *
2190  * Configure a pt map between @pt and @caps.
2191  */
2192 void
2193 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2194 {
2195   GstRTSPStreamPrivate *priv = stream->priv;
2196
2197   g_mutex_lock (&priv->lock);
2198   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2199   g_mutex_unlock (&priv->lock);
2200 }
2201
2202 /**
2203  * gst_rtsp_stream_set_publish_clock_mode:
2204  * @stream: a #GstRTSPStream
2205  * @mode: the clock publish mode
2206  *
2207  * Sets if and how the stream clock should be published according to RFC7273.
2208  *
2209  * Since: 1.8
2210  */
2211 void
2212 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2213     GstRTSPPublishClockMode mode)
2214 {
2215   GstRTSPStreamPrivate *priv;
2216
2217   priv = stream->priv;
2218   g_mutex_lock (&priv->lock);
2219   priv->publish_clock_mode = mode;
2220   g_mutex_unlock (&priv->lock);
2221 }
2222
2223 /**
2224  * gst_rtsp_stream_get_publish_clock_mode:
2225  * @factory: a #GstRTSPStream
2226  *
2227  * Gets if and how the stream clock should be published according to RFC7273.
2228  *
2229  * Returns: The GstRTSPPublishClockMode
2230  *
2231  * Since: 1.8
2232  */
2233 GstRTSPPublishClockMode
2234 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2235 {
2236   GstRTSPStreamPrivate *priv;
2237   GstRTSPPublishClockMode ret;
2238
2239   priv = stream->priv;
2240   g_mutex_lock (&priv->lock);
2241   ret = priv->publish_clock_mode;
2242   g_mutex_unlock (&priv->lock);
2243
2244   return ret;
2245 }
2246
2247 static GstCaps *
2248 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2249     GstRTSPStream * stream)
2250 {
2251   GstRTSPStreamPrivate *priv = stream->priv;
2252   GstCaps *caps = NULL;
2253
2254   g_mutex_lock (&priv->lock);
2255
2256   if (priv->idx == session) {
2257     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2258     if (caps) {
2259       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2260       gst_caps_ref (caps);
2261     } else {
2262       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2263     }
2264   }
2265
2266   g_mutex_unlock (&priv->lock);
2267
2268   return caps;
2269 }
2270
2271 static void
2272 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2273 {
2274   GstRTSPStreamPrivate *priv = stream->priv;
2275   gchar *name;
2276   GstPadLinkReturn ret;
2277   guint sessid;
2278
2279   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2280       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2281
2282   name = gst_pad_get_name (pad);
2283   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2284     g_free (name);
2285     return;
2286   }
2287   g_free (name);
2288
2289   if (priv->idx != sessid)
2290     return;
2291
2292   if (gst_pad_is_linked (priv->sinkpad)) {
2293     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2294         GST_DEBUG_PAD_NAME (priv->sinkpad));
2295     return;
2296   }
2297
2298   /* link the RTP pad to the session manager, it should not really fail unless
2299    * this is not really an RTP pad */
2300   ret = gst_pad_link (pad, priv->sinkpad);
2301   if (ret != GST_PAD_LINK_OK)
2302     goto link_failed;
2303   priv->recv_rtp_src = gst_object_ref (pad);
2304
2305   return;
2306
2307 /* ERRORS */
2308 link_failed:
2309   {
2310     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2311         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2312   }
2313 }
2314
2315 static void
2316 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2317     GstRTSPStream * stream)
2318 {
2319   /* TODO: What to do here other than this? */
2320   GST_DEBUG ("Stream %p: Got EOS", stream);
2321   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2322 }
2323
2324 static void
2325 plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
2326     GstElement ** queue_out)
2327 {
2328   GstPad *pad;
2329   GstPad *teepad;
2330   GstPad *queuepad;
2331
2332   gst_bin_add (bin, sink);
2333
2334   *queue_out = gst_element_factory_make ("queue", NULL);
2335   g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
2336       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2337   gst_bin_add (bin, *queue_out);
2338
2339   /* link tee to queue */
2340   teepad = gst_element_get_request_pad (tee, "src_%u");
2341   pad = gst_element_get_static_pad (*queue_out, "sink");
2342   gst_pad_link (teepad, pad);
2343   gst_object_unref (pad);
2344   gst_object_unref (teepad);
2345
2346   /* link queue to sink */
2347   queuepad = gst_element_get_static_pad (*queue_out, "src");
2348   pad = gst_element_get_static_pad (sink, "sink");
2349   gst_pad_link (queuepad, pad);
2350   gst_object_unref (queuepad);
2351   gst_object_unref (pad);
2352 }
2353
2354 /* must be called with lock */
2355 static void
2356 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2357 {
2358   GstRTSPStreamPrivate *priv;
2359   GstPad *pad;
2360   gboolean is_tcp = FALSE, is_udp = FALSE;
2361   gint i;
2362
2363   priv = stream->priv;
2364
2365   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2366   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2367       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2368
2369   for (i = 0; i < 2; i++) {
2370     /* For the sender we create this bit of pipeline for both
2371      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2372      * we need to add a queue before appsink and udpsink to make
2373      * the pipeline not block. For the TCP case, we want to pump
2374      * client as fast as possible anyway. This pipeline is used
2375      * when both TCP and UDP are present.
2376      *
2377      * .--------.      .-----.    .---------.    .---------.
2378      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2379      * |       send->sink   src->sink      src->sink       |
2380      * '--------'      |     |    '---------'    '---------'
2381      *                 |     |    .---------.    .---------.
2382      *                 |     |    |  queue  |    | appsink |
2383      *                 |    src->sink      src->sink       |
2384      *                 '-----'    '---------'    '---------'
2385      *
2386      * When only UDP or only TCP is allowed, we skip the tee and queue
2387      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2388      * the session.
2389      */
2390
2391     /* Only link the RTP send src if we're going to send RTP, link
2392      * the RTCP send src always */
2393     if (!priv->srcpad && i == 0)
2394       continue;
2395
2396     if (is_tcp) {
2397       /* make appsink */
2398       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2399       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2400       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2401           &sink_cb, stream, NULL);
2402     }
2403
2404     /* If we have udp always use a tee because we could have mcast clients
2405      * requesting different ports, in which case we'll have to plug more
2406      * udpsinks. */
2407     if (is_udp) {
2408       /* make tee for RTP/RTCP */
2409       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2410       gst_bin_add (bin, priv->tee[i]);
2411
2412       /* and link to rtpbin send pad */
2413       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2414       gst_pad_link (priv->send_src[i], pad);
2415       gst_object_unref (pad);
2416
2417       plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
2418
2419       if (is_tcp) {
2420         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2421         plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
2422       }
2423     } else if (is_tcp) {
2424       /* only appsink needed, link it to the session */
2425       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2426       gst_pad_link (priv->send_src[i], pad);
2427       gst_object_unref (pad);
2428
2429       /* when its only TCP, we need to set sync and preroll to FALSE
2430        * for the sink to avoid deadlock. And this is only needed for
2431        * sink used for RTCP data, not the RTP data. */
2432       if (i == 1)
2433         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2434     }
2435
2436     /* check if we need to set to a special state */
2437     if (state != GST_STATE_NULL) {
2438       if (priv->udpsink[i])
2439         gst_element_set_state (priv->udpsink[i], state);
2440       if (priv->appsink[i])
2441         gst_element_set_state (priv->appsink[i], state);
2442       if (priv->appqueue[i])
2443         gst_element_set_state (priv->appqueue[i], state);
2444       if (priv->udpqueue[i])
2445         gst_element_set_state (priv->udpqueue[i], state);
2446       if (priv->tee[i])
2447         gst_element_set_state (priv->tee[i], state);
2448     }
2449   }
2450 }
2451
2452 /* must be called with lock */
2453 static void
2454 plug_src (GstRTSPStream * stream, GstBin * bin, GstElement * src,
2455     GstElement * funnel)
2456 {
2457   GstRTSPStreamPrivate *priv;
2458   GstPad *pad, *selpad;
2459
2460   priv = stream->priv;
2461
2462   if (priv->srcpad) {
2463     /* we set and keep these to playing so that they don't cause NO_PREROLL return
2464      * values. This is only relevant for PLAY pipelines */
2465     gst_element_set_state (src, GST_STATE_PLAYING);
2466     gst_element_set_locked_state (src, TRUE);
2467   }
2468
2469   /* add src */
2470   gst_bin_add (bin, src);
2471
2472   /* and link to the funnel */
2473   selpad = gst_element_get_request_pad (funnel, "sink_%u");
2474   pad = gst_element_get_static_pad (src, "src");
2475   gst_pad_link (pad, selpad);
2476   gst_object_unref (pad);
2477   gst_object_unref (selpad);
2478 }
2479
2480 /* must be called with lock */
2481 static void
2482 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2483 {
2484   GstRTSPStreamPrivate *priv;
2485   GstPad *pad;
2486   gboolean is_tcp;
2487   gint i;
2488
2489   priv = stream->priv;
2490
2491   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2492
2493   for (i = 0; i < 2; i++) {
2494     /* For the receiver we create this bit of pipeline for both
2495      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2496      * and it is all funneled into the rtpbin receive pad.
2497      *
2498      * .--------.     .--------.    .--------.
2499      * | udpsrc |     | funnel |    | rtpbin |
2500      * |       src->sink      src->sink      |
2501      * '--------'     |        |    '--------'
2502      * .--------.     |        |
2503      * | appsrc |     |        |
2504      * |       src->sink       |
2505      * '--------'     '--------'
2506      */
2507
2508     if (!priv->sinkpad && i == 0) {
2509       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2510        * RTCP sink always */
2511       continue;
2512     }
2513
2514     /* make funnel for the RTP/RTCP receivers */
2515     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2516     gst_bin_add (bin, priv->funnel[i]);
2517
2518     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2519     gst_pad_link (pad, priv->recv_sink[i]);
2520     gst_object_unref (pad);
2521
2522     if (priv->udpsrc_v4[i])
2523       plug_src (stream, bin, priv->udpsrc_v4[i], priv->funnel[i]);
2524
2525     if (priv->udpsrc_v6[i])
2526       plug_src (stream, bin, priv->udpsrc_v6[i], priv->funnel[i]);
2527
2528     if (is_tcp) {
2529       /* make and add appsrc */
2530       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2531       priv->appsrc_base_time[i] = -1;
2532       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2533           TRUE, NULL);
2534       plug_src (stream, bin, priv->appsrc[i], priv->funnel[i]);
2535     }
2536
2537     /* check if we need to set to a special state */
2538     if (state != GST_STATE_NULL) {
2539       gst_element_set_state (priv->funnel[i], state);
2540     }
2541   }
2542 }
2543
2544 static gboolean
2545 create_mcast_part_for_transport (GstRTSPStream * stream,
2546     const GstRTSPTransport * tr)
2547 {
2548   GstRTSPStreamPrivate *priv = stream->priv;
2549   GInetAddress *inetaddr;
2550   GSocketFamily family;
2551   GstRTSPAddress *mcast_addr;
2552   GstElement **mcast_udpsrc;
2553   GSocket *rtp_socket = NULL;
2554   GSocket *rtcp_socket = NULL;
2555   GSocketAddress *rtp_sockaddr = NULL;
2556   GSocketAddress *rtcp_sockaddr = NULL;
2557   GError *error = NULL;
2558   const gchar *multicast_iface = priv->multicast_iface;
2559
2560   /* Check if it's a ipv4 or ipv6 transport */
2561   inetaddr = g_inet_address_new_from_string (tr->destination);
2562   family = g_inet_address_get_family (inetaddr);
2563   g_object_unref (inetaddr);
2564
2565   /* Select fields corresponding to the family */
2566   if (family == G_SOCKET_FAMILY_IPV4) {
2567     mcast_addr = priv->mcast_addr_v4;
2568     mcast_udpsrc = priv->mcast_udpsrc_v4;
2569   } else {
2570     mcast_addr = priv->mcast_addr_v6;
2571     mcast_udpsrc = priv->mcast_udpsrc_v6;
2572   }
2573
2574   /* We support only one mcast group per family, make sure this transport
2575    * matches it. */
2576   if (!mcast_addr)
2577     goto no_addr;
2578
2579   if (!g_str_equal (tr->destination, mcast_addr->address) ||
2580       tr->port.min != mcast_addr->port ||
2581       tr->port.max != mcast_addr->port + mcast_addr->n_ports - 1 ||
2582       tr->ttl != mcast_addr->ttl)
2583     goto wrong_addr;
2584
2585   if (mcast_udpsrc[0]) {
2586     /* We already created elements for this family. Since we support only one
2587      * mcast group per family, there is nothing more to do here. */
2588     g_assert (mcast_udpsrc[1]);
2589     g_assert (priv->mcast_udpqueue[0]);
2590     g_assert (priv->mcast_udpqueue[1]);
2591     g_assert (priv->mcast_udpsink[0]);
2592     g_assert (priv->mcast_udpsink[1]);
2593     return TRUE;
2594   }
2595
2596   g_assert (!mcast_udpsrc[1]);
2597
2598   /* Create RTP/RTCP sockets and bind them on ANY with mcast ports */
2599   rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
2600       G_SOCKET_PROTOCOL_UDP, &error);
2601   if (!rtp_socket)
2602     goto socket_error;
2603   g_socket_set_multicast_loopback (rtp_socket, FALSE);
2604
2605   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
2606       G_SOCKET_PROTOCOL_UDP, &error);
2607   if (!rtcp_socket)
2608     goto socket_error;
2609   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
2610
2611   inetaddr = g_inet_address_new_any (family);
2612   rtp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port);
2613   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, mcast_addr->port + 1);
2614   g_object_unref (inetaddr);
2615
2616   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, &error))
2617     goto socket_error;
2618
2619   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, &error))
2620     goto socket_error;
2621
2622   g_object_unref (rtp_sockaddr);
2623   g_object_unref (rtcp_sockaddr);
2624
2625   /* Add receiver part */
2626   create_and_configure_udpsources (mcast_udpsrc, rtp_socket, rtcp_socket);
2627   if (priv->sinkpad) {
2628     plug_src (stream, priv->joined_bin, mcast_udpsrc[0], priv->funnel[0]);
2629     gst_element_sync_state_with_parent (mcast_udpsrc[0]);
2630   }
2631   plug_src (stream, priv->joined_bin, mcast_udpsrc[1], priv->funnel[1]);
2632   gst_element_sync_state_with_parent (mcast_udpsrc[1]);
2633
2634   /* Add sender part, could already have been created for the other family. */
2635   if (!priv->mcast_udpsink[0]) {
2636     g_assert (!priv->mcast_udpsink[1]);
2637     g_assert (!priv->mcast_udpqueue[0]);
2638     g_assert (!priv->mcast_udpqueue[1]);
2639
2640     create_and_configure_udpsinks (stream, priv->mcast_udpsink);
2641
2642     g_object_set (G_OBJECT (priv->mcast_udpsink[0]), "multicast-iface",
2643         multicast_iface, NULL);
2644     g_object_set (G_OBJECT (priv->mcast_udpsink[1]), "multicast-iface",
2645         multicast_iface, NULL);
2646
2647     g_signal_emit_by_name (priv->mcast_udpsink[0], "add", mcast_addr->address,
2648         mcast_addr->port, NULL);
2649     g_signal_emit_by_name (priv->mcast_udpsink[1], "add", mcast_addr->address,
2650         mcast_addr->port + 1, NULL);
2651
2652     set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
2653         family);
2654
2655     if (priv->srcpad) {
2656       plug_sink (priv->joined_bin, priv->tee[0], priv->mcast_udpsink[0],
2657           &priv->mcast_udpqueue[0]);
2658       gst_element_sync_state_with_parent (priv->mcast_udpsink[0]);
2659       gst_element_sync_state_with_parent (priv->mcast_udpqueue[0]);
2660     }
2661     plug_sink (priv->joined_bin, priv->tee[1], priv->mcast_udpsink[1],
2662         &priv->mcast_udpqueue[1]);
2663     gst_element_sync_state_with_parent (priv->mcast_udpsink[1]);
2664     gst_element_sync_state_with_parent (priv->mcast_udpqueue[1]);
2665   } else {
2666     g_assert (priv->mcast_udpsink[1]);
2667     g_assert (priv->mcast_udpqueue[0]);
2668     g_assert (priv->mcast_udpqueue[1]);
2669
2670     set_sockets_for_udpsinks (priv->mcast_udpsink, rtp_socket, rtcp_socket,
2671         family);
2672   }
2673
2674   return TRUE;
2675
2676 no_addr:
2677   {
2678     GST_WARNING_OBJECT (stream, "Adding mcast transport, but no mcast address "
2679         "has been reserved");
2680     return FALSE;
2681   }
2682 wrong_addr:
2683   {
2684     GST_WARNING_OBJECT (stream, "Adding mcast transport, but it doesn't match "
2685         "the reserved address");
2686     return FALSE;
2687   }
2688 socket_error:
2689   {
2690     GST_ERROR_OBJECT (stream, "Error creating and binding mcast socket: %s",
2691         error->message);
2692     g_clear_object (&rtp_socket);
2693     g_clear_object (&rtcp_socket);
2694     g_clear_object (&rtp_sockaddr);
2695     g_clear_object (&rtcp_sockaddr);
2696     g_clear_error (&error);
2697     return FALSE;
2698   }
2699 }
2700
2701 /**
2702  * gst_rtsp_stream_join_bin:
2703  * @stream: a #GstRTSPStream
2704  * @bin: (transfer none): a #GstBin to join
2705  * @rtpbin: (transfer none): a rtpbin element in @bin
2706  * @state: the target state of the new elements
2707  *
2708  * Join the #GstBin @bin that contains the element @rtpbin.
2709  *
2710  * @stream will link to @rtpbin, which must be inside @bin. The elements
2711  * added to @bin will be set to the state given in @state.
2712  *
2713  * Returns: %TRUE on success.
2714  */
2715 gboolean
2716 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2717     GstElement * rtpbin, GstState state)
2718 {
2719   GstRTSPStreamPrivate *priv;
2720   guint idx;
2721   gchar *name;
2722   GstPadLinkReturn ret;
2723
2724   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2725   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2726   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2727
2728   priv = stream->priv;
2729
2730   g_mutex_lock (&priv->lock);
2731   if (priv->joined_bin != NULL)
2732     goto was_joined;
2733
2734   /* create a session with the same index as the stream */
2735   idx = priv->idx;
2736
2737   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2738
2739   if (!alloc_ports (stream))
2740     goto no_ports;
2741
2742   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2743       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2744     /* For SRTP */
2745     g_signal_connect (rtpbin, "request-rtp-encoder",
2746         (GCallback) request_rtp_encoder, stream);
2747     g_signal_connect (rtpbin, "request-rtcp-encoder",
2748         (GCallback) request_rtcp_encoder, stream);
2749     g_signal_connect (rtpbin, "request-rtp-decoder",
2750         (GCallback) request_rtp_rtcp_decoder, stream);
2751     g_signal_connect (rtpbin, "request-rtcp-decoder",
2752         (GCallback) request_rtp_rtcp_decoder, stream);
2753   }
2754
2755   if (priv->sinkpad) {
2756     g_signal_connect (rtpbin, "request-pt-map",
2757         (GCallback) request_pt_map, stream);
2758   }
2759
2760   /* get pads from the RTP session element for sending and receiving
2761    * RTP/RTCP*/
2762   if (priv->srcpad) {
2763     /* get a pad for sending RTP */
2764     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2765     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2766     g_free (name);
2767
2768     /* link the RTP pad to the session manager, it should not really fail unless
2769      * this is not really an RTP pad */
2770     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2771     if (ret != GST_PAD_LINK_OK)
2772       goto link_failed;
2773
2774     name = g_strdup_printf ("send_rtp_src_%u", idx);
2775     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2776     g_free (name);
2777   } else {
2778     /* Need to connect our sinkpad from here */
2779     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2780     /* EOS */
2781     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2782
2783     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2784     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2785     g_free (name);
2786   }
2787
2788   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2789   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2790   g_free (name);
2791   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2792   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2793   g_free (name);
2794
2795   /* get the session */
2796   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2797
2798   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2799       stream);
2800   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2801       stream);
2802   g_signal_connect (priv->session, "on-ssrc-active",
2803       (GCallback) on_ssrc_active, stream);
2804   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2805       stream);
2806   g_signal_connect (priv->session, "on-bye-timeout",
2807       (GCallback) on_bye_timeout, stream);
2808   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2809       stream);
2810
2811   /* signal for sender ssrc */
2812   g_signal_connect (priv->session, "on-new-sender-ssrc",
2813       (GCallback) on_new_sender_ssrc, stream);
2814   g_signal_connect (priv->session, "on-sender-ssrc-active",
2815       (GCallback) on_sender_ssrc_active, stream);
2816
2817   create_sender_part (stream, bin, state);
2818   create_receiver_part (stream, bin, state);
2819
2820   if (priv->srcpad) {
2821     /* be notified of caps changes */
2822     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2823         (GCallback) caps_notify, stream);
2824   }
2825
2826   priv->joined_bin = gst_object_ref (bin);
2827   g_mutex_unlock (&priv->lock);
2828
2829   return TRUE;
2830
2831   /* ERRORS */
2832 was_joined:
2833   {
2834     g_mutex_unlock (&priv->lock);
2835     return TRUE;
2836   }
2837 no_ports:
2838   {
2839     g_mutex_unlock (&priv->lock);
2840     GST_WARNING ("failed to allocate ports %u", idx);
2841     return FALSE;
2842   }
2843 link_failed:
2844   {
2845     GST_WARNING ("failed to link stream %u", idx);
2846     gst_object_unref (priv->send_rtp_sink);
2847     priv->send_rtp_sink = NULL;
2848     g_mutex_unlock (&priv->lock);
2849     return FALSE;
2850   }
2851 }
2852
2853 static void
2854 clear_element (GstBin * bin, GstElement ** elementptr)
2855 {
2856   if (*elementptr) {
2857     gst_element_set_locked_state (*elementptr, FALSE);
2858     gst_element_set_state (*elementptr, GST_STATE_NULL);
2859     if (GST_ELEMENT_PARENT (*elementptr))
2860       gst_bin_remove (bin, *elementptr);
2861     else
2862       gst_object_unref (*elementptr);
2863     *elementptr = NULL;
2864   }
2865 }
2866
2867 /**
2868  * gst_rtsp_stream_leave_bin:
2869  * @stream: a #GstRTSPStream
2870  * @bin: (transfer none): a #GstBin
2871  * @rtpbin: (transfer none): a rtpbin #GstElement
2872  *
2873  * Remove the elements of @stream from @bin.
2874  *
2875  * Return: %TRUE on success.
2876  */
2877 gboolean
2878 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2879     GstElement * rtpbin)
2880 {
2881   GstRTSPStreamPrivate *priv;
2882   gint i;
2883
2884   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2885   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2886   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2887
2888   priv = stream->priv;
2889
2890   g_mutex_lock (&priv->lock);
2891   if (priv->joined_bin == NULL)
2892     goto was_not_joined;
2893   if (priv->joined_bin != bin)
2894     goto wrong_bin;
2895
2896   priv->joined_bin = NULL;
2897
2898   /* all transports must be removed by now */
2899   if (priv->transports != NULL)
2900     goto transports_not_removed;
2901
2902   clear_tr_cache (priv, TRUE);
2903   clear_tr_cache (priv, FALSE);
2904
2905   GST_INFO ("stream %p leaving bin", stream);
2906
2907   if (priv->srcpad) {
2908     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2909
2910     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2911     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2912     gst_object_unref (priv->send_rtp_sink);
2913     priv->send_rtp_sink = NULL;
2914   } else if (priv->recv_rtp_src) {
2915     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2916     gst_object_unref (priv->recv_rtp_src);
2917     priv->recv_rtp_src = NULL;
2918   }
2919
2920   for (i = 0; i < 2; i++) {
2921     clear_element (bin, &priv->udpsrc_v4[i]);
2922     clear_element (bin, &priv->udpsrc_v6[i]);
2923     clear_element (bin, &priv->udpqueue[i]);
2924     clear_element (bin, &priv->udpsink[i]);
2925
2926     clear_element (bin, &priv->mcast_udpsrc_v4[i]);
2927     clear_element (bin, &priv->mcast_udpsrc_v6[i]);
2928     clear_element (bin, &priv->mcast_udpqueue[i]);
2929     clear_element (bin, &priv->mcast_udpsink[i]);
2930
2931     clear_element (bin, &priv->appsrc[i]);
2932     clear_element (bin, &priv->appqueue[i]);
2933     clear_element (bin, &priv->appsink[i]);
2934
2935     clear_element (bin, &priv->tee[i]);
2936     clear_element (bin, &priv->funnel[i]);
2937
2938     if (priv->sinkpad || i == 1) {
2939       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2940       gst_object_unref (priv->recv_sink[i]);
2941       priv->recv_sink[i] = NULL;
2942     }
2943   }
2944
2945   if (priv->srcpad) {
2946     gst_object_unref (priv->send_src[0]);
2947     priv->send_src[0] = NULL;
2948   }
2949
2950   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2951   gst_object_unref (priv->send_src[1]);
2952   priv->send_src[1] = NULL;
2953
2954   g_object_unref (priv->session);
2955   priv->session = NULL;
2956   if (priv->caps)
2957     gst_caps_unref (priv->caps);
2958   priv->caps = NULL;
2959
2960   if (priv->srtpenc)
2961     gst_object_unref (priv->srtpenc);
2962   if (priv->srtpdec)
2963     gst_object_unref (priv->srtpdec);
2964
2965   g_clear_object (&priv->joined_bin);
2966   g_mutex_unlock (&priv->lock);
2967
2968   return TRUE;
2969
2970 was_not_joined:
2971   {
2972     g_mutex_unlock (&priv->lock);
2973     return TRUE;
2974   }
2975 transports_not_removed:
2976   {
2977     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2978     g_mutex_unlock (&priv->lock);
2979     return FALSE;
2980   }
2981 wrong_bin:
2982   {
2983     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
2984     g_mutex_unlock (&priv->lock);
2985     return FALSE;
2986   }
2987 }
2988
2989 /**
2990  * gst_rtsp_stream_get_joined_bin:
2991  * @stream: a #GstRTSPStream
2992  *
2993  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
2994  *
2995  * Return: (transfer full): the joined bin or NULL.
2996  */
2997 GstBin *
2998 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
2999 {
3000   GstRTSPStreamPrivate *priv;
3001   GstBin *bin = NULL;
3002
3003   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3004
3005   priv = stream->priv;
3006
3007   g_mutex_lock (&priv->lock);
3008   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3009   g_mutex_unlock (&priv->lock);
3010
3011   return bin;
3012 }
3013
3014 /**
3015  * gst_rtsp_stream_get_rtpinfo:
3016  * @stream: a #GstRTSPStream
3017  * @rtptime: (allow-none): result RTP timestamp
3018  * @seq: (allow-none): result RTP seqnum
3019  * @clock_rate: (allow-none): the clock rate
3020  * @running_time: (allow-none): result running-time
3021  *
3022  * Retrieve the current rtptime, seq and running-time. This is used to
3023  * construct a RTPInfo reply header.
3024  *
3025  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3026  */
3027 gboolean
3028 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3029     guint * rtptime, guint * seq, guint * clock_rate,
3030     GstClockTime * running_time)
3031 {
3032   GstRTSPStreamPrivate *priv;
3033   GstStructure *stats;
3034   GObjectClass *payobjclass;
3035
3036   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3037
3038   priv = stream->priv;
3039
3040   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3041
3042   g_mutex_lock (&priv->lock);
3043
3044   /* First try to extract the information from the last buffer on the sinks.
3045    * This will have a more accurate sequence number and timestamp, as between
3046    * the payloader and the sink there can be some queues
3047    */
3048   if (priv->udpsink[0] || priv->appsink[0]) {
3049     GstSample *last_sample;
3050
3051     if (priv->udpsink[0])
3052       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3053     else
3054       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3055
3056     if (last_sample) {
3057       GstCaps *caps;
3058       GstBuffer *buffer;
3059       GstSegment *segment;
3060       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3061
3062       caps = gst_sample_get_caps (last_sample);
3063       buffer = gst_sample_get_buffer (last_sample);
3064       segment = gst_sample_get_segment (last_sample);
3065
3066       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3067         if (seq) {
3068           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3069         }
3070
3071         if (rtptime) {
3072           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3073         }
3074
3075         gst_rtp_buffer_unmap (&rtp_buffer);
3076
3077         if (running_time) {
3078           *running_time =
3079               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3080               GST_BUFFER_TIMESTAMP (buffer));
3081         }
3082
3083         if (clock_rate) {
3084           GstStructure *s = gst_caps_get_structure (caps, 0);
3085
3086           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3087
3088           if (*clock_rate == 0 && running_time)
3089             *running_time = GST_CLOCK_TIME_NONE;
3090         }
3091         gst_sample_unref (last_sample);
3092
3093         goto done;
3094       } else {
3095         gst_sample_unref (last_sample);
3096       }
3097     }
3098   }
3099
3100   if (g_object_class_find_property (payobjclass, "stats")) {
3101     g_object_get (priv->payloader, "stats", &stats, NULL);
3102     if (stats == NULL)
3103       goto no_stats;
3104
3105     if (seq)
3106       gst_structure_get_uint (stats, "seqnum", seq);
3107
3108     if (rtptime)
3109       gst_structure_get_uint (stats, "timestamp", rtptime);
3110
3111     if (running_time)
3112       gst_structure_get_clock_time (stats, "running-time", running_time);
3113
3114     if (clock_rate) {
3115       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3116       if (*clock_rate == 0 && running_time)
3117         *running_time = GST_CLOCK_TIME_NONE;
3118     }
3119     gst_structure_free (stats);
3120   } else {
3121     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3122         !g_object_class_find_property (payobjclass, "timestamp"))
3123       goto no_stats;
3124
3125     if (seq)
3126       g_object_get (priv->payloader, "seqnum", seq, NULL);
3127
3128     if (rtptime)
3129       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3130
3131     if (running_time)
3132       *running_time = GST_CLOCK_TIME_NONE;
3133   }
3134
3135 done:
3136   g_mutex_unlock (&priv->lock);
3137
3138   return TRUE;
3139
3140   /* ERRORS */
3141 no_stats:
3142   {
3143     GST_WARNING ("Could not get payloader stats");
3144     g_mutex_unlock (&priv->lock);
3145     return FALSE;
3146   }
3147 }
3148
3149 /**
3150  * gst_rtsp_stream_get_caps:
3151  * @stream: a #GstRTSPStream
3152  *
3153  * Retrieve the current caps of @stream.
3154  *
3155  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3156  * after usage.
3157  */
3158 GstCaps *
3159 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3160 {
3161   GstRTSPStreamPrivate *priv;
3162   GstCaps *result;
3163
3164   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3165
3166   priv = stream->priv;
3167
3168   g_mutex_lock (&priv->lock);
3169   if ((result = priv->caps))
3170     gst_caps_ref (result);
3171   g_mutex_unlock (&priv->lock);
3172
3173   return result;
3174 }
3175
3176 /**
3177  * gst_rtsp_stream_recv_rtp:
3178  * @stream: a #GstRTSPStream
3179  * @buffer: (transfer full): a #GstBuffer
3180  *
3181  * Handle an RTP buffer for the stream. This method is usually called when a
3182  * message has been received from a client using the TCP transport.
3183  *
3184  * This function takes ownership of @buffer.
3185  *
3186  * Returns: a GstFlowReturn.
3187  */
3188 GstFlowReturn
3189 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3190 {
3191   GstRTSPStreamPrivate *priv;
3192   GstFlowReturn ret;
3193   GstElement *element;
3194
3195   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3196   priv = stream->priv;
3197   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3198   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3199
3200   g_mutex_lock (&priv->lock);
3201   if (priv->appsrc[0])
3202     element = gst_object_ref (priv->appsrc[0]);
3203   else
3204     element = NULL;
3205   g_mutex_unlock (&priv->lock);
3206
3207   if (element) {
3208     if (priv->appsrc_base_time[0] == -1) {
3209       /* Take current running_time. This timestamp will be put on
3210        * the first buffer of each stream because we are a live source and so we
3211        * timestamp with the running_time. When we are dealing with TCP, we also
3212        * only timestamp the first buffer (using the DISCONT flag) because a server
3213        * typically bursts data, for which we don't want to compensate by speeding
3214        * up the media. The other timestamps will be interpollated from this one
3215        * using the RTP timestamps. */
3216       GST_OBJECT_LOCK (element);
3217       if (GST_ELEMENT_CLOCK (element)) {
3218         GstClockTime now;
3219         GstClockTime base_time;
3220
3221         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3222         base_time = GST_ELEMENT_CAST (element)->base_time;
3223
3224         priv->appsrc_base_time[0] = now - base_time;
3225         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3226         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3227             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3228             GST_TIME_ARGS (base_time));
3229       }
3230       GST_OBJECT_UNLOCK (element);
3231     }
3232
3233     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3234     gst_object_unref (element);
3235   } else {
3236     ret = GST_FLOW_OK;
3237   }
3238   return ret;
3239 }
3240
3241 /**
3242  * gst_rtsp_stream_recv_rtcp:
3243  * @stream: a #GstRTSPStream
3244  * @buffer: (transfer full): a #GstBuffer
3245  *
3246  * Handle an RTCP buffer for the stream. This method is usually called when a
3247  * message has been received from a client using the TCP transport.
3248  *
3249  * This function takes ownership of @buffer.
3250  *
3251  * Returns: a GstFlowReturn.
3252  */
3253 GstFlowReturn
3254 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3255 {
3256   GstRTSPStreamPrivate *priv;
3257   GstFlowReturn ret;
3258   GstElement *element;
3259
3260   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3261   priv = stream->priv;
3262   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3263
3264   if (priv->joined_bin == NULL) {
3265     gst_buffer_unref (buffer);
3266     return GST_FLOW_NOT_LINKED;
3267   }
3268   g_mutex_lock (&priv->lock);
3269   if (priv->appsrc[1])
3270     element = gst_object_ref (priv->appsrc[1]);
3271   else
3272     element = NULL;
3273   g_mutex_unlock (&priv->lock);
3274
3275   if (element) {
3276     if (priv->appsrc_base_time[1] == -1) {
3277       /* Take current running_time. This timestamp will be put on
3278        * the first buffer of each stream because we are a live source and so we
3279        * timestamp with the running_time. When we are dealing with TCP, we also
3280        * only timestamp the first buffer (using the DISCONT flag) because a server
3281        * typically bursts data, for which we don't want to compensate by speeding
3282        * up the media. The other timestamps will be interpollated from this one
3283        * using the RTP timestamps. */
3284       GST_OBJECT_LOCK (element);
3285       if (GST_ELEMENT_CLOCK (element)) {
3286         GstClockTime now;
3287         GstClockTime base_time;
3288
3289         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3290         base_time = GST_ELEMENT_CAST (element)->base_time;
3291
3292         priv->appsrc_base_time[1] = now - base_time;
3293         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3294         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3295             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3296             GST_TIME_ARGS (base_time));
3297       }
3298       GST_OBJECT_UNLOCK (element);
3299     }
3300
3301     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3302     gst_object_unref (element);
3303   } else {
3304     ret = GST_FLOW_OK;
3305     gst_buffer_unref (buffer);
3306   }
3307   return ret;
3308 }
3309
3310 /* must be called with lock */
3311 static gboolean
3312 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3313     gboolean add)
3314 {
3315   GstRTSPStreamPrivate *priv = stream->priv;
3316   const GstRTSPTransport *tr;
3317
3318   tr = gst_rtsp_stream_transport_get_transport (trans);
3319
3320   switch (tr->lower_transport) {
3321     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3322     {
3323       if (add) {
3324         if (!create_mcast_part_for_transport (stream, tr))
3325           goto mcast_error;
3326         priv->transports = g_list_prepend (priv->transports, trans);
3327       } else {
3328         priv->transports = g_list_remove (priv->transports, trans);
3329         /* FIXME: Check if there are remaining mcast transports, and destroy
3330          * mcast part if its now unused */
3331       }
3332       break;
3333     }
3334     case GST_RTSP_LOWER_TRANS_UDP:
3335     {
3336       gchar *dest;
3337       gint min, max;
3338       guint ttl = 0;
3339
3340       dest = tr->destination;
3341       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3342         min = tr->port.min;
3343         max = tr->port.max;
3344         ttl = tr->ttl;
3345       } else if (priv->client_side) {
3346         /* In client side mode the 'destination' is the RTSP server, so send
3347          * to those ports */
3348         min = tr->server_port.min;
3349         max = tr->server_port.max;
3350       } else {
3351         min = tr->client_port.min;
3352         max = tr->client_port.max;
3353       }
3354
3355       if (add) {
3356         if (ttl > 0) {
3357           GST_INFO ("setting ttl-mc %d", ttl);
3358           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3359           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3360         }
3361         GST_INFO ("adding %s:%d-%d", dest, min, max);
3362         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3363         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3364         priv->transports = g_list_prepend (priv->transports, trans);
3365       } else {
3366         GST_INFO ("removing %s:%d-%d", dest, min, max);
3367         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3368         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3369         priv->transports = g_list_remove (priv->transports, trans);
3370       }
3371       priv->transports_cookie++;
3372       break;
3373     }
3374     case GST_RTSP_LOWER_TRANS_TCP:
3375       if (add) {
3376         GST_INFO ("adding TCP %s", tr->destination);
3377         priv->transports = g_list_prepend (priv->transports, trans);
3378       } else {
3379         GST_INFO ("removing TCP %s", tr->destination);
3380         priv->transports = g_list_remove (priv->transports, trans);
3381       }
3382       priv->transports_cookie++;
3383       break;
3384     default:
3385       goto unknown_transport;
3386   }
3387   return TRUE;
3388
3389   /* ERRORS */
3390 unknown_transport:
3391   {
3392     GST_INFO ("Unknown transport %d", tr->lower_transport);
3393     return FALSE;
3394   }
3395 mcast_error:
3396   {
3397     return FALSE;
3398   }
3399 }
3400
3401
3402 /**
3403  * gst_rtsp_stream_add_transport:
3404  * @stream: a #GstRTSPStream
3405  * @trans: (transfer none): a #GstRTSPStreamTransport
3406  *
3407  * Add the transport in @trans to @stream. The media of @stream will
3408  * then also be send to the values configured in @trans.
3409  *
3410  * @stream must be joined to a bin.
3411  *
3412  * @trans must contain a valid #GstRTSPTransport.
3413  *
3414  * Returns: %TRUE if @trans was added
3415  */
3416 gboolean
3417 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3418     GstRTSPStreamTransport * trans)
3419 {
3420   GstRTSPStreamPrivate *priv;
3421   gboolean res;
3422
3423   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3424   priv = stream->priv;
3425   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3426   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3427
3428   g_mutex_lock (&priv->lock);
3429   res = update_transport (stream, trans, TRUE);
3430   g_mutex_unlock (&priv->lock);
3431
3432   return res;
3433 }
3434
3435 /**
3436  * gst_rtsp_stream_remove_transport:
3437  * @stream: a #GstRTSPStream
3438  * @trans: (transfer none): a #GstRTSPStreamTransport
3439  *
3440  * Remove the transport in @trans from @stream. The media of @stream will
3441  * not be sent to the values configured in @trans.
3442  *
3443  * @stream must be joined to a bin.
3444  *
3445  * @trans must contain a valid #GstRTSPTransport.
3446  *
3447  * Returns: %TRUE if @trans was removed
3448  */
3449 gboolean
3450 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3451     GstRTSPStreamTransport * trans)
3452 {
3453   GstRTSPStreamPrivate *priv;
3454   gboolean res;
3455
3456   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3457   priv = stream->priv;
3458   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3459   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3460
3461   g_mutex_lock (&priv->lock);
3462   res = update_transport (stream, trans, FALSE);
3463   g_mutex_unlock (&priv->lock);
3464
3465   return res;
3466 }
3467
3468 /**
3469  * gst_rtsp_stream_update_crypto:
3470  * @stream: a #GstRTSPStream
3471  * @ssrc: the SSRC
3472  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3473  *
3474  * Update the new crypto information for @ssrc in @stream. If information
3475  * for @ssrc did not exist, it will be added. If information
3476  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3477  * be removed from @stream.
3478  *
3479  * Returns: %TRUE if @crypto could be updated
3480  */
3481 gboolean
3482 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3483     guint ssrc, GstCaps * crypto)
3484 {
3485   GstRTSPStreamPrivate *priv;
3486
3487   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3488   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3489
3490   priv = stream->priv;
3491
3492   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3493
3494   g_mutex_lock (&priv->lock);
3495   if (crypto)
3496     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3497         gst_caps_ref (crypto));
3498   else
3499     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3500   g_mutex_unlock (&priv->lock);
3501
3502   return TRUE;
3503 }
3504
3505 /**
3506  * gst_rtsp_stream_get_rtp_socket:
3507  * @stream: a #GstRTSPStream
3508  * @family: the socket family
3509  *
3510  * Get the RTP socket from @stream for a @family.
3511  *
3512  * @stream must be joined to a bin.
3513  *
3514  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3515  * socket could be allocated for @family. Unref after usage
3516  */
3517 GSocket *
3518 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3519 {
3520   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3521   GSocket *socket;
3522   const gchar *name;
3523
3524   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3525   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3526       family == G_SOCKET_FAMILY_IPV6, NULL);
3527   g_return_val_if_fail (priv->udpsink[0], NULL);
3528
3529   if (family == G_SOCKET_FAMILY_IPV6)
3530     name = "socket-v6";
3531   else
3532     name = "socket";
3533
3534   g_object_get (priv->udpsink[0], name, &socket, NULL);
3535
3536   return socket;
3537 }
3538
3539 /**
3540  * gst_rtsp_stream_get_rtcp_socket:
3541  * @stream: a #GstRTSPStream
3542  * @family: the socket family
3543  *
3544  * Get the RTCP socket from @stream for a @family.
3545  *
3546  * @stream must be joined to a bin.
3547  *
3548  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3549  * socket could be allocated for @family. Unref after usage
3550  */
3551 GSocket *
3552 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3553 {
3554   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3555   GSocket *socket;
3556   const gchar *name;
3557
3558   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3559   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3560       family == G_SOCKET_FAMILY_IPV6, NULL);
3561   g_return_val_if_fail (priv->udpsink[1], NULL);
3562
3563   if (family == G_SOCKET_FAMILY_IPV6)
3564     name = "socket-v6";
3565   else
3566     name = "socket";
3567
3568   g_object_get (priv->udpsink[1], name, &socket, NULL);
3569
3570   return socket;
3571 }
3572
3573 /**
3574  * gst_rtsp_stream_set_seqnum:
3575  * @stream: a #GstRTSPStream
3576  * @seqnum: a new sequence number
3577  *
3578  * Configure the sequence number in the payloader of @stream to @seqnum.
3579  */
3580 void
3581 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3582 {
3583   GstRTSPStreamPrivate *priv;
3584
3585   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3586
3587   priv = stream->priv;
3588
3589   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3590 }
3591
3592 /**
3593  * gst_rtsp_stream_get_seqnum:
3594  * @stream: a #GstRTSPStream
3595  *
3596  * Get the configured sequence number in the payloader of @stream.
3597  *
3598  * Returns: the sequence number of the payloader.
3599  */
3600 guint16
3601 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3602 {
3603   GstRTSPStreamPrivate *priv;
3604   guint seqnum;
3605
3606   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3607
3608   priv = stream->priv;
3609
3610   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3611
3612   return seqnum;
3613 }
3614
3615 /**
3616  * gst_rtsp_stream_transport_filter:
3617  * @stream: a #GstRTSPStream
3618  * @func: (scope call) (allow-none): a callback
3619  * @user_data: (closure): user data passed to @func
3620  *
3621  * Call @func for each transport managed by @stream. The result value of @func
3622  * determines what happens to the transport. @func will be called with @stream
3623  * locked so no further actions on @stream can be performed from @func.
3624  *
3625  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3626  * @stream.
3627  *
3628  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3629  *
3630  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3631  * will also be added with an additional ref to the result #GList of this
3632  * function..
3633  *
3634  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3635  *
3636  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3637  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3638  * element in the #GList should be unreffed before the list is freed.
3639  */
3640 GList *
3641 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3642     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3643 {
3644   GstRTSPStreamPrivate *priv;
3645   GList *result, *walk, *next;
3646   GHashTable *visited = NULL;
3647   guint cookie;
3648
3649   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3650
3651   priv = stream->priv;
3652
3653   result = NULL;
3654   if (func)
3655     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3656
3657   g_mutex_lock (&priv->lock);
3658 restart:
3659   cookie = priv->transports_cookie;
3660   for (walk = priv->transports; walk; walk = next) {
3661     GstRTSPStreamTransport *trans = walk->data;
3662     GstRTSPFilterResult res;
3663     gboolean changed;
3664
3665     next = g_list_next (walk);
3666
3667     if (func) {
3668       /* only visit each transport once */
3669       if (g_hash_table_contains (visited, trans))
3670         continue;
3671
3672       g_hash_table_add (visited, g_object_ref (trans));
3673       g_mutex_unlock (&priv->lock);
3674
3675       res = func (stream, trans, user_data);
3676
3677       g_mutex_lock (&priv->lock);
3678     } else
3679       res = GST_RTSP_FILTER_REF;
3680
3681     changed = (cookie != priv->transports_cookie);
3682
3683     switch (res) {
3684       case GST_RTSP_FILTER_REMOVE:
3685         update_transport (stream, trans, FALSE);
3686         break;
3687       case GST_RTSP_FILTER_REF:
3688         result = g_list_prepend (result, g_object_ref (trans));
3689         break;
3690       case GST_RTSP_FILTER_KEEP:
3691       default:
3692         break;
3693     }
3694     if (changed)
3695       goto restart;
3696   }
3697   g_mutex_unlock (&priv->lock);
3698
3699   if (func)
3700     g_hash_table_unref (visited);
3701
3702   return result;
3703 }
3704
3705 static GstPadProbeReturn
3706 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3707 {
3708   GstRTSPStreamPrivate *priv;
3709   GstRTSPStream *stream;
3710
3711   stream = user_data;
3712   priv = stream->priv;
3713
3714   GST_DEBUG_OBJECT (pad, "now blocking");
3715
3716   g_mutex_lock (&priv->lock);
3717   priv->blocking = TRUE;
3718   g_mutex_unlock (&priv->lock);
3719
3720   gst_element_post_message (priv->payloader,
3721       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3722           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3723
3724   return GST_PAD_PROBE_OK;
3725 }
3726
3727 /**
3728  * gst_rtsp_stream_set_blocked:
3729  * @stream: a #GstRTSPStream
3730  * @blocked: boolean indicating we should block or unblock
3731  *
3732  * Blocks or unblocks the dataflow on @stream.
3733  *
3734  * Returns: %TRUE on success
3735  */
3736 gboolean
3737 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3738 {
3739   GstRTSPStreamPrivate *priv;
3740
3741   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3742
3743   priv = stream->priv;
3744
3745   g_mutex_lock (&priv->lock);
3746   if (blocked) {
3747     priv->blocking = FALSE;
3748     if (priv->blocked_id == 0) {
3749       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3750           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3751           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3752           g_object_ref (stream), g_object_unref);
3753     }
3754   } else {
3755     if (priv->blocked_id != 0) {
3756       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3757       priv->blocked_id = 0;
3758       priv->blocking = FALSE;
3759     }
3760   }
3761   g_mutex_unlock (&priv->lock);
3762
3763   return TRUE;
3764 }
3765
3766 /**
3767  * gst_rtsp_stream_is_blocking:
3768  * @stream: a #GstRTSPStream
3769  *
3770  * Check if @stream is blocking on a #GstBuffer.
3771  *
3772  * Returns: %TRUE if @stream is blocking
3773  */
3774 gboolean
3775 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3776 {
3777   GstRTSPStreamPrivate *priv;
3778   gboolean result;
3779
3780   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3781
3782   priv = stream->priv;
3783
3784   g_mutex_lock (&priv->lock);
3785   result = priv->blocking;
3786   g_mutex_unlock (&priv->lock);
3787
3788   return result;
3789 }
3790
3791 /**
3792  * gst_rtsp_stream_query_position:
3793  * @stream: a #GstRTSPStream
3794  *
3795  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3796  * the RTP parts of the pipeline and not the RTCP parts.
3797  *
3798  * Returns: %TRUE if the position could be queried
3799  */
3800 gboolean
3801 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3802 {
3803   GstRTSPStreamPrivate *priv;
3804   GstElement *sink;
3805   gboolean ret;
3806
3807   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3808
3809   priv = stream->priv;
3810
3811   g_mutex_lock (&priv->lock);
3812   /* depending on the transport type, it should query corresponding sink */
3813   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3814       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3815     sink = priv->udpsink[0];
3816   else
3817     sink = priv->appsink[0];
3818
3819   if (sink)
3820     gst_object_ref (sink);
3821   g_mutex_unlock (&priv->lock);
3822
3823   if (!sink)
3824     return FALSE;
3825
3826   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3827   gst_object_unref (sink);
3828
3829   return ret;
3830 }
3831
3832 /**
3833  * gst_rtsp_stream_query_stop:
3834  * @stream: a #GstRTSPStream
3835  *
3836  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3837  * the RTP parts of the pipeline and not the RTCP parts.
3838  *
3839  * Returns: %TRUE if the stop could be queried
3840  */
3841 gboolean
3842 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3843 {
3844   GstRTSPStreamPrivate *priv;
3845   GstElement *sink;
3846   GstQuery *query;
3847   gboolean ret;
3848
3849   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3850
3851   priv = stream->priv;
3852
3853   g_mutex_lock (&priv->lock);
3854   /* depending on the transport type, it should query corresponding sink */
3855   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3856       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3857     sink = priv->udpsink[0];
3858   else
3859     sink = priv->appsink[0];
3860
3861   if (sink)
3862     gst_object_ref (sink);
3863   g_mutex_unlock (&priv->lock);
3864
3865   if (!sink)
3866     return FALSE;
3867
3868   query = gst_query_new_segment (GST_FORMAT_TIME);
3869   if ((ret = gst_element_query (sink, query))) {
3870     GstFormat format;
3871
3872     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3873     if (format != GST_FORMAT_TIME)
3874       *stop = -1;
3875   }
3876   gst_query_unref (query);
3877   gst_object_unref (sink);
3878
3879   return ret;
3880
3881 }