stream: factor out plug_sink function
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   GstBin *joined_bin;
74
75   /* TRUE if this stream is running on
76    * the client side of an RTSP link (for RECORD) */
77   gboolean client_side;
78   gchar *control;
79
80   GstRTSPProfile profiles;
81   GstRTSPLowerTrans protocols;
82
83   /* pads on the rtpbin */
84   GstPad *send_rtp_sink;
85   GstPad *recv_rtp_src;
86   GstPad *recv_sink[2];
87   GstPad *send_src[2];
88
89   /* the RTPSession object */
90   GObject *session;
91
92   /* SRTP encoder/decoder */
93   GstElement *srtpenc;
94   GstElement *srtpdec;
95   GHashTable *keys;
96
97   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
98    * sockets */
99   GstElement *udpsrc_v4[2];
100   /* UDP sources for UDP multicast transports */
101   GstElement *udpsrc_mcast_v4[2];
102
103   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
104    * sockets */
105   GstElement *udpsrc_v6[2];
106   /* UDP sources for UDP multicast transports */
107   GstElement *udpsrc_mcast_v6[2];
108
109   GstElement *udpqueue[2];
110   GstElement *udpsink[2];
111
112   /* for TCP transport */
113   GstElement *appsrc[2];
114   GstClockTime appsrc_base_time[2];
115   GstElement *appqueue[2];
116   GstElement *appsink[2];
117
118   GstElement *tee[2];
119   GstElement *funnel[2];
120
121   /* retransmission */
122   GstElement *rtxsend;
123   guint rtx_pt;
124   GstClockTime rtx_time;
125
126   /* server ports for sending/receiving over ipv4 */
127   GstRTSPRange server_port_v4;
128   GstRTSPAddress *server_addr_v4;
129   gboolean have_ipv4;
130
131   /* server ports for sending/receiving over ipv6 */
132   GstRTSPRange server_port_v6;
133   GstRTSPAddress *server_addr_v6;
134   gboolean have_ipv6;
135
136   /* multicast addresses */
137   GstRTSPAddressPool *pool;
138   GstRTSPAddress *mcast_addr_v4;
139   GstRTSPAddress *mcast_addr_v6;
140   gboolean have_ipv4_mcast;
141   gboolean have_ipv6_mcast;
142
143   gchar *multicast_iface;
144
145   /* the caps of the stream */
146   gulong caps_sig;
147   GstCaps *caps;
148
149   /* transports we stream to */
150   guint n_active;
151   GList *transports;
152   guint transports_cookie;
153   GList *tr_cache_rtp;
154   GList *tr_cache_rtcp;
155   guint tr_cache_cookie_rtp;
156   guint tr_cache_cookie_rtcp;
157
158
159   gint dscp_qos;
160
161   /* stream blocking */
162   gulong blocked_id;
163   gboolean blocking;
164
165   /* pt->caps map for RECORD streams */
166   GHashTable *ptmap;
167
168   GstRTSPPublishClockMode publish_clock_mode;
169 };
170
171 #define DEFAULT_CONTROL         NULL
172 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
173 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
174                                         GST_RTSP_LOWER_TRANS_TCP
175
176 enum
177 {
178   PROP_0,
179   PROP_CONTROL,
180   PROP_PROFILES,
181   PROP_PROTOCOLS,
182   PROP_LAST
183 };
184
185 enum
186 {
187   SIGNAL_NEW_RTP_ENCODER,
188   SIGNAL_NEW_RTCP_ENCODER,
189   SIGNAL_LAST
190 };
191
192 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
193 #define GST_CAT_DEFAULT rtsp_stream_debug
194
195 static GQuark ssrc_stream_map_key;
196
197 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
198     GValue * value, GParamSpec * pspec);
199 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
200     const GValue * value, GParamSpec * pspec);
201
202 static void gst_rtsp_stream_finalize (GObject * obj);
203
204 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
205
206 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
207
208 static void
209 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
210 {
211   GObjectClass *gobject_class;
212
213   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
214
215   gobject_class = G_OBJECT_CLASS (klass);
216
217   gobject_class->get_property = gst_rtsp_stream_get_property;
218   gobject_class->set_property = gst_rtsp_stream_set_property;
219   gobject_class->finalize = gst_rtsp_stream_finalize;
220
221   g_object_class_install_property (gobject_class, PROP_CONTROL,
222       g_param_spec_string ("control", "Control",
223           "The control string for this stream", DEFAULT_CONTROL,
224           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
225
226   g_object_class_install_property (gobject_class, PROP_PROFILES,
227       g_param_spec_flags ("profiles", "Profiles",
228           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
229           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230
231   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
232       g_param_spec_flags ("protocols", "Protocols",
233           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
234           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
235
236   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
237       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
238       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
239       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
240
241   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
242       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
243       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
244       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
245
246   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
247
248   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
249 }
250
251 static void
252 gst_rtsp_stream_init (GstRTSPStream * stream)
253 {
254   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
255
256   GST_DEBUG ("new stream %p", stream);
257
258   stream->priv = priv;
259
260   priv->dscp_qos = -1;
261   priv->control = g_strdup (DEFAULT_CONTROL);
262   priv->profiles = DEFAULT_PROFILES;
263   priv->protocols = DEFAULT_PROTOCOLS;
264   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
265
266   g_mutex_init (&priv->lock);
267
268   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
269       NULL, (GDestroyNotify) gst_caps_unref);
270   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
271       (GDestroyNotify) gst_caps_unref);
272 }
273
274 static void
275 gst_rtsp_stream_finalize (GObject * obj)
276 {
277   GstRTSPStream *stream;
278   GstRTSPStreamPrivate *priv;
279
280   stream = GST_RTSP_STREAM (obj);
281   priv = stream->priv;
282
283   GST_DEBUG ("finalize stream %p", stream);
284
285   /* we really need to be unjoined now */
286   g_return_if_fail (priv->joined_bin == NULL);
287
288   if (priv->mcast_addr_v4)
289     gst_rtsp_address_free (priv->mcast_addr_v4);
290   if (priv->mcast_addr_v6)
291     gst_rtsp_address_free (priv->mcast_addr_v6);
292   if (priv->server_addr_v4)
293     gst_rtsp_address_free (priv->server_addr_v4);
294   if (priv->server_addr_v6)
295     gst_rtsp_address_free (priv->server_addr_v6);
296   if (priv->pool)
297     g_object_unref (priv->pool);
298   if (priv->rtxsend)
299     g_object_unref (priv->rtxsend);
300
301   g_free (priv->multicast_iface);
302
303   gst_object_unref (priv->payloader);
304   if (priv->srcpad)
305     gst_object_unref (priv->srcpad);
306   if (priv->sinkpad)
307     gst_object_unref (priv->sinkpad);
308   g_free (priv->control);
309   g_mutex_clear (&priv->lock);
310
311   g_hash_table_unref (priv->keys);
312   g_hash_table_destroy (priv->ptmap);
313
314   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
315 }
316
317 static void
318 gst_rtsp_stream_get_property (GObject * object, guint propid,
319     GValue * value, GParamSpec * pspec)
320 {
321   GstRTSPStream *stream = GST_RTSP_STREAM (object);
322
323   switch (propid) {
324     case PROP_CONTROL:
325       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
326       break;
327     case PROP_PROFILES:
328       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
329       break;
330     case PROP_PROTOCOLS:
331       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
332       break;
333     default:
334       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
335   }
336 }
337
338 static void
339 gst_rtsp_stream_set_property (GObject * object, guint propid,
340     const GValue * value, GParamSpec * pspec)
341 {
342   GstRTSPStream *stream = GST_RTSP_STREAM (object);
343
344   switch (propid) {
345     case PROP_CONTROL:
346       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
347       break;
348     case PROP_PROFILES:
349       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
350       break;
351     case PROP_PROTOCOLS:
352       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
353       break;
354     default:
355       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
356   }
357 }
358
359 /**
360  * gst_rtsp_stream_new:
361  * @idx: an index
362  * @pad: a #GstPad
363  * @payloader: a #GstElement
364  *
365  * Create a new media stream with index @idx that handles RTP data on
366  * @pad and has a payloader element @payloader if @pad is a source pad
367  * or a depayloader element @payloader if @pad is a sink pad.
368  *
369  * Returns: (transfer full): a new #GstRTSPStream
370  */
371 GstRTSPStream *
372 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
373 {
374   GstRTSPStreamPrivate *priv;
375   GstRTSPStream *stream;
376
377   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
378   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
379
380   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
381   priv = stream->priv;
382   priv->idx = idx;
383   priv->payloader = gst_object_ref (payloader);
384   if (GST_PAD_IS_SRC (pad))
385     priv->srcpad = gst_object_ref (pad);
386   else
387     priv->sinkpad = gst_object_ref (pad);
388
389   return stream;
390 }
391
392 /**
393  * gst_rtsp_stream_get_index:
394  * @stream: a #GstRTSPStream
395  *
396  * Get the stream index.
397  *
398  * Return: the stream index.
399  */
400 guint
401 gst_rtsp_stream_get_index (GstRTSPStream * stream)
402 {
403   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
404
405   return stream->priv->idx;
406 }
407
408 /**
409  * gst_rtsp_stream_get_pt:
410  * @stream: a #GstRTSPStream
411  *
412  * Get the stream payload type.
413  *
414  * Return: the stream payload type.
415  */
416 guint
417 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
418 {
419   GstRTSPStreamPrivate *priv;
420   guint pt;
421
422   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
423
424   priv = stream->priv;
425
426   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
427
428   return pt;
429 }
430
431 /**
432  * gst_rtsp_stream_get_srcpad:
433  * @stream: a #GstRTSPStream
434  *
435  * Get the srcpad associated with @stream.
436  *
437  * Returns: (transfer full): the srcpad. Unref after usage.
438  */
439 GstPad *
440 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
441 {
442   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
443
444   if (!stream->priv->srcpad)
445     return NULL;
446
447   return gst_object_ref (stream->priv->srcpad);
448 }
449
450 /**
451  * gst_rtsp_stream_get_sinkpad:
452  * @stream: a #GstRTSPStream
453  *
454  * Get the sinkpad associated with @stream.
455  *
456  * Returns: (transfer full): the sinkpad. Unref after usage.
457  */
458 GstPad *
459 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
460 {
461   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
462
463   if (!stream->priv->sinkpad)
464     return NULL;
465
466   return gst_object_ref (stream->priv->sinkpad);
467 }
468
469 /**
470  * gst_rtsp_stream_get_control:
471  * @stream: a #GstRTSPStream
472  *
473  * Get the control string to identify this stream.
474  *
475  * Returns: (transfer full): the control string. g_free() after usage.
476  */
477 gchar *
478 gst_rtsp_stream_get_control (GstRTSPStream * stream)
479 {
480   GstRTSPStreamPrivate *priv;
481   gchar *result;
482
483   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
484
485   priv = stream->priv;
486
487   g_mutex_lock (&priv->lock);
488   if ((result = g_strdup (priv->control)) == NULL)
489     result = g_strdup_printf ("stream=%u", priv->idx);
490   g_mutex_unlock (&priv->lock);
491
492   return result;
493 }
494
495 /**
496  * gst_rtsp_stream_set_control:
497  * @stream: a #GstRTSPStream
498  * @control: a control string
499  *
500  * Set the control string in @stream.
501  */
502 void
503 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
504 {
505   GstRTSPStreamPrivate *priv;
506
507   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
508
509   priv = stream->priv;
510
511   g_mutex_lock (&priv->lock);
512   g_free (priv->control);
513   priv->control = g_strdup (control);
514   g_mutex_unlock (&priv->lock);
515 }
516
517 /**
518  * gst_rtsp_stream_has_control:
519  * @stream: a #GstRTSPStream
520  * @control: a control string
521  *
522  * Check if @stream has the control string @control.
523  *
524  * Returns: %TRUE is @stream has @control as the control string
525  */
526 gboolean
527 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
528 {
529   GstRTSPStreamPrivate *priv;
530   gboolean res;
531
532   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
533
534   priv = stream->priv;
535
536   g_mutex_lock (&priv->lock);
537   if (priv->control)
538     res = (g_strcmp0 (priv->control, control) == 0);
539   else {
540     guint streamid;
541
542     if (sscanf (control, "stream=%u", &streamid) > 0)
543       res = (streamid == priv->idx);
544     else
545       res = FALSE;
546   }
547   g_mutex_unlock (&priv->lock);
548
549   return res;
550 }
551
552 /**
553  * gst_rtsp_stream_set_mtu:
554  * @stream: a #GstRTSPStream
555  * @mtu: a new MTU
556  *
557  * Configure the mtu in the payloader of @stream to @mtu.
558  */
559 void
560 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
561 {
562   GstRTSPStreamPrivate *priv;
563
564   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
565
566   priv = stream->priv;
567
568   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
569
570   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
571 }
572
573 /**
574  * gst_rtsp_stream_get_mtu:
575  * @stream: a #GstRTSPStream
576  *
577  * Get the configured MTU in the payloader of @stream.
578  *
579  * Returns: the MTU of the payloader.
580  */
581 guint
582 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
583 {
584   GstRTSPStreamPrivate *priv;
585   guint mtu;
586
587   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
588
589   priv = stream->priv;
590
591   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
592
593   return mtu;
594 }
595
596 /* Update the dscp qos property on the udp sinks */
597 static void
598 update_dscp_qos (GstRTSPStream * stream)
599 {
600   GstRTSPStreamPrivate *priv;
601
602   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
603
604   priv = stream->priv;
605
606   if (priv->udpsink[0]) {
607     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
608         NULL);
609   }
610
611   if (priv->udpsink[1]) {
612     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
613         NULL);
614   }
615 }
616
617 /**
618  * gst_rtsp_stream_set_dscp_qos:
619  * @stream: a #GstRTSPStream
620  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
621  *
622  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
623  */
624 void
625 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
626 {
627   GstRTSPStreamPrivate *priv;
628
629   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
630
631   priv = stream->priv;
632
633   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
634
635   if (dscp_qos < -1 || dscp_qos > 63) {
636     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
637     return;
638   }
639
640   priv->dscp_qos = dscp_qos;
641
642   update_dscp_qos (stream);
643 }
644
645 /**
646  * gst_rtsp_stream_get_dscp_qos:
647  * @stream: a #GstRTSPStream
648  *
649  * Get the configured DSCP QoS in of the outgoing sockets.
650  *
651  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
652  */
653 gint
654 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
655 {
656   GstRTSPStreamPrivate *priv;
657
658   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
659
660   priv = stream->priv;
661
662   return priv->dscp_qos;
663 }
664
665 /**
666  * gst_rtsp_stream_is_transport_supported:
667  * @stream: a #GstRTSPStream
668  * @transport: (transfer none): a #GstRTSPTransport
669  *
670  * Check if @transport can be handled by stream
671  *
672  * Returns: %TRUE if @transport can be handled by @stream.
673  */
674 gboolean
675 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
676     GstRTSPTransport * transport)
677 {
678   GstRTSPStreamPrivate *priv;
679
680   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
681
682   priv = stream->priv;
683
684   g_mutex_lock (&priv->lock);
685   if (transport->trans != GST_RTSP_TRANS_RTP)
686     goto unsupported_transmode;
687
688   if (!(transport->profile & priv->profiles))
689     goto unsupported_profile;
690
691   if (!(transport->lower_transport & priv->protocols))
692     goto unsupported_ltrans;
693
694   g_mutex_unlock (&priv->lock);
695
696   return TRUE;
697
698   /* ERRORS */
699 unsupported_transmode:
700   {
701     GST_DEBUG ("unsupported transport mode %d", transport->trans);
702     g_mutex_unlock (&priv->lock);
703     return FALSE;
704   }
705 unsupported_profile:
706   {
707     GST_DEBUG ("unsupported profile %d", transport->profile);
708     g_mutex_unlock (&priv->lock);
709     return FALSE;
710   }
711 unsupported_ltrans:
712   {
713     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
714     g_mutex_unlock (&priv->lock);
715     return FALSE;
716   }
717 }
718
719 /**
720  * gst_rtsp_stream_set_profiles:
721  * @stream: a #GstRTSPStream
722  * @profiles: the new profiles
723  *
724  * Configure the allowed profiles for @stream.
725  */
726 void
727 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
728 {
729   GstRTSPStreamPrivate *priv;
730
731   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
732
733   priv = stream->priv;
734
735   g_mutex_lock (&priv->lock);
736   priv->profiles = profiles;
737   g_mutex_unlock (&priv->lock);
738 }
739
740 /**
741  * gst_rtsp_stream_get_profiles:
742  * @stream: a #GstRTSPStream
743  *
744  * Get the allowed profiles of @stream.
745  *
746  * Returns: a #GstRTSPProfile
747  */
748 GstRTSPProfile
749 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
750 {
751   GstRTSPStreamPrivate *priv;
752   GstRTSPProfile res;
753
754   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
755
756   priv = stream->priv;
757
758   g_mutex_lock (&priv->lock);
759   res = priv->profiles;
760   g_mutex_unlock (&priv->lock);
761
762   return res;
763 }
764
765 /**
766  * gst_rtsp_stream_set_protocols:
767  * @stream: a #GstRTSPStream
768  * @protocols: the new flags
769  *
770  * Configure the allowed lower transport for @stream.
771  */
772 void
773 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
774     GstRTSPLowerTrans protocols)
775 {
776   GstRTSPStreamPrivate *priv;
777
778   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
779
780   priv = stream->priv;
781
782   g_mutex_lock (&priv->lock);
783   priv->protocols = protocols;
784   g_mutex_unlock (&priv->lock);
785 }
786
787 /**
788  * gst_rtsp_stream_get_protocols:
789  * @stream: a #GstRTSPStream
790  *
791  * Get the allowed protocols of @stream.
792  *
793  * Returns: a #GstRTSPLowerTrans
794  */
795 GstRTSPLowerTrans
796 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
797 {
798   GstRTSPStreamPrivate *priv;
799   GstRTSPLowerTrans res;
800
801   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
802       GST_RTSP_LOWER_TRANS_UNKNOWN);
803
804   priv = stream->priv;
805
806   g_mutex_lock (&priv->lock);
807   res = priv->protocols;
808   g_mutex_unlock (&priv->lock);
809
810   return res;
811 }
812
813 /**
814  * gst_rtsp_stream_set_address_pool:
815  * @stream: a #GstRTSPStream
816  * @pool: (transfer none): a #GstRTSPAddressPool
817  *
818  * configure @pool to be used as the address pool of @stream.
819  */
820 void
821 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
822     GstRTSPAddressPool * pool)
823 {
824   GstRTSPStreamPrivate *priv;
825   GstRTSPAddressPool *old;
826
827   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
828
829   priv = stream->priv;
830
831   GST_LOG_OBJECT (stream, "set address pool %p", pool);
832
833   g_mutex_lock (&priv->lock);
834   if ((old = priv->pool) != pool)
835     priv->pool = pool ? g_object_ref (pool) : NULL;
836   else
837     old = NULL;
838   g_mutex_unlock (&priv->lock);
839
840   if (old)
841     g_object_unref (old);
842 }
843
844 /**
845  * gst_rtsp_stream_get_address_pool:
846  * @stream: a #GstRTSPStream
847  *
848  * Get the #GstRTSPAddressPool used as the address pool of @stream.
849  *
850  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
851  * usage.
852  */
853 GstRTSPAddressPool *
854 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
855 {
856   GstRTSPStreamPrivate *priv;
857   GstRTSPAddressPool *result;
858
859   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
860
861   priv = stream->priv;
862
863   g_mutex_lock (&priv->lock);
864   if ((result = priv->pool))
865     g_object_ref (result);
866   g_mutex_unlock (&priv->lock);
867
868   return result;
869 }
870
871 /**
872  * gst_rtsp_stream_set_multicast_iface:
873  * @stream: a #GstRTSPStream
874  * @multicast_iface: (transfer none): a multicast interface
875  *
876  * configure @multicast_iface to be used for @stream.
877  */
878 void
879 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
880     const gchar * multicast_iface)
881 {
882   GstRTSPStreamPrivate *priv;
883   gchar *old;
884
885   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
886
887   priv = stream->priv;
888
889   GST_LOG_OBJECT (stream, "set multicast iface %s",
890       GST_STR_NULL (multicast_iface));
891
892   g_mutex_lock (&priv->lock);
893   if ((old = priv->multicast_iface) != multicast_iface)
894     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
895   else
896     old = NULL;
897   g_mutex_unlock (&priv->lock);
898
899   if (old)
900     g_free (old);
901 }
902
903 /**
904  * gst_rtsp_stream_get_multicast_iface:
905  * @stream: a #GstRTSPStream
906  *
907  * Get the multicast interface used for @stream.
908  *
909  * Returns: (transfer full): the multicast interface for @stream. g_free() after
910  * usage.
911  */
912 gchar *
913 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
914 {
915   GstRTSPStreamPrivate *priv;
916   gchar *result;
917
918   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
919
920   priv = stream->priv;
921
922   g_mutex_lock (&priv->lock);
923   if ((result = priv->multicast_iface))
924     result = g_strdup (result);
925   g_mutex_unlock (&priv->lock);
926
927   return result;
928 }
929
930 /**
931  * gst_rtsp_stream_get_multicast_address:
932  * @stream: a #GstRTSPStream
933  * @family: the #GSocketFamily
934  *
935  * Get the multicast address of @stream for @family. The original
936  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
937  * won't release the address from the pool.
938  *
939  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
940  * or %NULL when no address could be allocated. gst_rtsp_address_free()
941  * after usage.
942  */
943 GstRTSPAddress *
944 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
945     GSocketFamily family)
946 {
947   GstRTSPStreamPrivate *priv;
948   GstRTSPAddress *result;
949   GstRTSPAddress **addrp;
950   GstRTSPAddressFlags flags;
951
952   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
953
954   priv = stream->priv;
955
956   if (family == G_SOCKET_FAMILY_IPV6) {
957     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
958     addrp = &priv->mcast_addr_v6;
959   } else {
960     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
961     addrp = &priv->mcast_addr_v4;
962   }
963
964   g_mutex_lock (&priv->lock);
965   if (*addrp == NULL) {
966     if (priv->pool == NULL)
967       goto no_pool;
968
969     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
970
971     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
972     if (*addrp == NULL)
973       goto no_address;
974   }
975   result = gst_rtsp_address_copy (*addrp);
976   g_mutex_unlock (&priv->lock);
977
978   return result;
979
980   /* ERRORS */
981 no_pool:
982   {
983     GST_ERROR_OBJECT (stream, "no address pool specified");
984     g_mutex_unlock (&priv->lock);
985     return NULL;
986   }
987 no_address:
988   {
989     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
990     g_mutex_unlock (&priv->lock);
991     return NULL;
992   }
993 }
994
995 /**
996  * gst_rtsp_stream_reserve_address:
997  * @stream: a #GstRTSPStream
998  * @address: an address
999  * @port: a port
1000  * @n_ports: n_ports
1001  * @ttl: a TTL
1002  *
1003  * Reserve @address and @port as the address and port of @stream. The original
1004  * #GstRTSPAddress is cached and copy is returned, so freeing the return value
1005  * won't release the address from the pool.
1006  *
1007  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1008  * the address could be reserved. gst_rtsp_address_free() after usage.
1009  */
1010 GstRTSPAddress *
1011 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1012     const gchar * address, guint port, guint n_ports, guint ttl)
1013 {
1014   GstRTSPStreamPrivate *priv;
1015   GstRTSPAddress *result;
1016   GInetAddress *addr;
1017   GSocketFamily family;
1018   GstRTSPAddress **addrp;
1019
1020   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1021   g_return_val_if_fail (address != NULL, NULL);
1022   g_return_val_if_fail (port > 0, NULL);
1023   g_return_val_if_fail (n_ports > 0, NULL);
1024   g_return_val_if_fail (ttl > 0, NULL);
1025
1026   priv = stream->priv;
1027
1028   addr = g_inet_address_new_from_string (address);
1029   if (!addr) {
1030     GST_ERROR ("failed to get inet addr from %s", address);
1031     family = G_SOCKET_FAMILY_IPV4;
1032   } else {
1033     family = g_inet_address_get_family (addr);
1034     g_object_unref (addr);
1035   }
1036
1037   if (family == G_SOCKET_FAMILY_IPV6)
1038     addrp = &priv->mcast_addr_v6;
1039   else
1040     addrp = &priv->mcast_addr_v4;
1041
1042   g_mutex_lock (&priv->lock);
1043   if (*addrp == NULL) {
1044     GstRTSPAddressPoolResult res;
1045
1046     if (priv->pool == NULL)
1047       goto no_pool;
1048
1049     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1050         port, n_ports, ttl, addrp);
1051     if (res != GST_RTSP_ADDRESS_POOL_OK)
1052       goto no_address;
1053   } else {
1054     if (strcmp ((*addrp)->address, address) ||
1055         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1056         (*addrp)->ttl != ttl)
1057       goto different_address;
1058   }
1059   result = gst_rtsp_address_copy (*addrp);
1060   g_mutex_unlock (&priv->lock);
1061
1062   return result;
1063
1064   /* ERRORS */
1065 no_pool:
1066   {
1067     GST_ERROR_OBJECT (stream, "no address pool specified");
1068     g_mutex_unlock (&priv->lock);
1069     return NULL;
1070   }
1071 no_address:
1072   {
1073     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1074         address);
1075     g_mutex_unlock (&priv->lock);
1076     return NULL;
1077   }
1078 different_address:
1079   {
1080     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1081         " reserved", address);
1082     g_mutex_unlock (&priv->lock);
1083     return NULL;
1084   }
1085 }
1086
1087 /* must be called with lock */
1088 static void
1089 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1090     GSocket * rtcp_socket, GSocketFamily family)
1091 {
1092   GstRTSPStreamPrivate *priv = stream->priv;
1093   const gchar *multisink_socket;
1094
1095   if (family == G_SOCKET_FAMILY_IPV6)
1096     multisink_socket = "socket-v6";
1097   else
1098     multisink_socket = "socket";
1099
1100   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1101       NULL);
1102   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1103       NULL);
1104 }
1105
1106 /* must be called with lock */
1107 static gboolean
1108 create_and_configure_udpsinks (GstRTSPStream * stream)
1109 {
1110   GstRTSPStreamPrivate *priv = stream->priv;
1111   GstElement *udpsink0, *udpsink1;
1112
1113   udpsink0 = NULL;
1114   udpsink1 = NULL;
1115
1116   if (priv->udpsink[0])
1117     udpsink0 = priv->udpsink[0];
1118   else
1119     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1120
1121   if (!udpsink0)
1122     goto no_udp_protocol;
1123
1124   if (priv->udpsink[1])
1125     udpsink1 = priv->udpsink[1];
1126   else
1127     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1128
1129   if (!udpsink1)
1130     goto no_udp_protocol;
1131
1132   /* configure sinks */
1133
1134   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1135   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1136
1137   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1138   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1139
1140   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1141
1142   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1143   /* Needs to be async for RECORD streams, otherwise we will never go to
1144    * PLAYING because the sinks will wait for data while the udpsrc can't
1145    * provide data with timestamps in PAUSED. */
1146   if (priv->sinkpad)
1147     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1148   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1149
1150   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1151   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1152
1153   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1154   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1155
1156   /* update the dscp qos field in the sinks */
1157   update_dscp_qos (stream);
1158
1159   priv->udpsink[0] = udpsink0;
1160   priv->udpsink[1] = udpsink1;
1161
1162   return TRUE;
1163
1164   /* ERRORS */
1165 no_udp_protocol:
1166   {
1167     return FALSE;
1168   }
1169 }
1170
1171 /* must be called with lock */
1172 static void
1173 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1174     GSocketFamily family)
1175 {
1176   GstRTSPStreamPrivate *priv;
1177   GstPad *pad, *selpad;
1178   guint i;
1179
1180   priv = stream->priv;
1181
1182   for (i = 0; i < 2; i++) {
1183     if (!priv->sinkpad && i == 0) {
1184       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
1185        * RTCP sink always */
1186       continue;
1187     }
1188
1189     if (priv->srcpad) {
1190       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1191        * values. This is only relevant for PLAY pipelines */
1192       gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1193       gst_element_set_locked_state (udpsrc_out[i], TRUE);
1194     }
1195
1196     /* add udpsrc */
1197     gst_bin_add (priv->joined_bin, udpsrc_out[i]);
1198
1199     /* and link to the funnel */
1200     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1201     pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1202     gst_pad_link (pad, selpad);
1203     gst_object_unref (pad);
1204     gst_object_unref (selpad);
1205
1206     /* otherwise sync state with parent in case it's running already
1207      * at this point */
1208     if (!priv->srcpad) {
1209       gst_element_sync_state_with_parent (udpsrc_out[i]);
1210     }
1211   }
1212 }
1213
1214 /* must be called with lock */
1215 static gboolean
1216 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1217     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1218     const gchar * address, gint rtpport, gint rtcpport,
1219     const gchar * multicast_iface, GstRTSPLowerTrans transport)
1220 {
1221   GstStateChangeReturn ret;
1222
1223   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1224   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1225
1226   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1227     goto error;
1228
1229   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1230     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1231     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1232     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1233     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1234     g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
1235         NULL);
1236     g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
1237         NULL);
1238     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1239     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1240   }
1241
1242   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1243   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1244
1245   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1246   if (ret == GST_STATE_CHANGE_FAILURE)
1247     goto error;
1248   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1249   if (ret == GST_STATE_CHANGE_FAILURE)
1250     goto error;
1251
1252   return TRUE;
1253
1254   /* ERRORS */
1255 error:
1256   {
1257     if (udpsrc_out[0]) {
1258       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1259       g_clear_object (&udpsrc_out[0]);
1260     }
1261     if (udpsrc_out[1]) {
1262       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1263       g_clear_object (&udpsrc_out[1]);
1264     }
1265     return FALSE;
1266   }
1267 }
1268
1269 static gboolean
1270 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1271     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1272     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1273     gboolean use_client_settings)
1274 {
1275   GstRTSPStreamPrivate *priv = stream->priv;
1276   GSocket *rtp_socket = NULL;
1277   GSocket *rtcp_socket;
1278   gint tmp_rtp, tmp_rtcp;
1279   guint count;
1280   gint rtpport, rtcpport;
1281   GList *rejected_addresses = NULL;
1282   GstRTSPAddress *addr = NULL;
1283   GInetAddress *inetaddr = NULL;
1284   gchar *addr_str;
1285   GSocketAddress *rtp_sockaddr = NULL;
1286   GSocketAddress *rtcp_sockaddr = NULL;
1287   GstRTSPAddressPool *pool;
1288   GstRTSPLowerTrans transport;
1289   const gchar *multicast_iface = priv->multicast_iface;
1290
1291   pool = priv->pool;
1292   count = 0;
1293   transport = ct->lower_transport;
1294
1295   /* Start with random port */
1296   tmp_rtp = 0;
1297
1298   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1299       G_SOCKET_PROTOCOL_UDP, NULL);
1300   if (!rtcp_socket)
1301     goto no_udp_protocol;
1302   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1303
1304   if (*server_addr_out)
1305     gst_rtsp_address_free (*server_addr_out);
1306
1307   /* try to allocate 2 UDP ports, the RTP port should be an even
1308    * number and the RTCP port should be the next (uneven) port */
1309 again:
1310
1311   if (rtp_socket == NULL) {
1312     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1313         G_SOCKET_PROTOCOL_UDP, NULL);
1314     if (!rtp_socket)
1315       goto no_udp_protocol;
1316     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1317   }
1318
1319   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1320               gst_rtsp_address_pool_has_unicast_addresses (pool))
1321           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1322     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1323
1324     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1325       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1326     else
1327       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1328
1329     if (addr)
1330       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1331
1332     if (family == G_SOCKET_FAMILY_IPV6)
1333       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1334     else
1335       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1336
1337     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1338         && use_client_settings)
1339       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1340           ct->port.min, 2, ct->ttl, &addr);
1341     else
1342       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1343
1344     if (addr == NULL)
1345       goto no_ports;
1346
1347     tmp_rtp = addr->port;
1348
1349     g_clear_object (&inetaddr);
1350     inetaddr = g_inet_address_new_from_string (addr->address);
1351
1352     /* If we're supposed to bind to a multicast address, instead bind
1353      * to ANY and let udpsrc later join the relevant multicast group
1354      */
1355     if (g_inet_address_get_is_multicast (inetaddr)) {
1356       g_object_unref (inetaddr);
1357       inetaddr = g_inet_address_new_any (family);
1358     }
1359   } else {
1360     if (tmp_rtp != 0) {
1361       tmp_rtp += 2;
1362       if (++count > 20)
1363         goto no_ports;
1364     }
1365
1366     if (inetaddr == NULL)
1367       inetaddr = g_inet_address_new_any (family);
1368   }
1369
1370   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1371   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1372     g_object_unref (rtp_sockaddr);
1373     goto again;
1374   }
1375   g_object_unref (rtp_sockaddr);
1376
1377   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1378   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1379     g_clear_object (&rtp_sockaddr);
1380     goto socket_error;
1381   }
1382
1383   tmp_rtp =
1384       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1385   g_object_unref (rtp_sockaddr);
1386
1387   /* check if port is even */
1388   if ((tmp_rtp & 1) != 0) {
1389     /* port not even, close and allocate another */
1390     tmp_rtp++;
1391     g_clear_object (&rtp_socket);
1392     goto again;
1393   }
1394
1395   /* set port */
1396   tmp_rtcp = tmp_rtp + 1;
1397
1398   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1399   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1400     g_object_unref (rtcp_sockaddr);
1401     g_clear_object (&rtp_socket);
1402     goto again;
1403   }
1404   g_object_unref (rtcp_sockaddr);
1405
1406   if (addr == NULL)
1407     addr_str = g_inet_address_to_string (inetaddr);
1408   else
1409     addr_str = addr->address;
1410   g_clear_object (&inetaddr);
1411
1412   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1413           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
1414           transport)) {
1415     if (addr == NULL)
1416       g_free (addr_str);
1417     goto no_udp_protocol;
1418   }
1419
1420   if (addr == NULL)
1421     g_free (addr_str);
1422
1423   play_udpsources_one_family (stream, udpsrc_out, family);
1424
1425   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1426   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1427
1428   /* this should not happen... */
1429   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1430     goto port_error;
1431
1432   /* set RTP and RTCP sockets */
1433   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1434
1435   server_port_out->min = rtpport;
1436   server_port_out->max = rtcpport;
1437
1438   *server_addr_out = addr;
1439   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1440
1441   g_object_unref (rtp_socket);
1442   g_object_unref (rtcp_socket);
1443
1444   return TRUE;
1445
1446   /* ERRORS */
1447 no_udp_protocol:
1448   {
1449     goto cleanup;
1450   }
1451 no_ports:
1452   {
1453     goto cleanup;
1454   }
1455 port_error:
1456   {
1457     goto cleanup;
1458   }
1459 socket_error:
1460   {
1461     goto cleanup;
1462   }
1463 cleanup:
1464   {
1465     if (inetaddr)
1466       g_object_unref (inetaddr);
1467     g_list_free_full (rejected_addresses,
1468         (GDestroyNotify) gst_rtsp_address_free);
1469     if (addr)
1470       gst_rtsp_address_free (addr);
1471     if (rtp_socket)
1472       g_object_unref (rtp_socket);
1473     if (rtcp_socket)
1474       g_object_unref (rtcp_socket);
1475     return FALSE;
1476   }
1477 }
1478
1479 /**
1480  * gst_rtsp_stream_allocate_udp_sockets:
1481  * @stream: a #GstRTSPStream
1482  * @family: protocol family
1483  * @transport_method: transport method
1484  *
1485  * Allocates RTP and RTCP ports.
1486  *
1487  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1488  */
1489 gboolean
1490 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1491     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1492 {
1493   GstRTSPStreamPrivate *priv;
1494   gboolean result = FALSE;
1495   GstRTSPLowerTrans transport = ct->lower_transport;
1496
1497   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1498   priv = stream->priv;
1499   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
1500
1501   g_mutex_lock (&priv->lock);
1502
1503   if (family == G_SOCKET_FAMILY_IPV4) {
1504     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1505       if (priv->have_ipv4_mcast)
1506         goto done;
1507       priv->have_ipv4_mcast =
1508           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1509           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct,
1510           &priv->mcast_addr_v4, use_client_settings);
1511     } else {
1512       priv->have_ipv4 =
1513           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1514           &priv->server_port_v4, ct, &priv->server_addr_v4,
1515           use_client_settings);
1516     }
1517   } else {
1518     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1519       if (priv->have_ipv6_mcast)
1520         goto done;
1521       priv->have_ipv6_mcast =
1522           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1523           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct,
1524           &priv->mcast_addr_v6, use_client_settings);
1525     } else {
1526       if (priv->have_ipv6)
1527         goto done;
1528       priv->have_ipv6 =
1529           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1530           &priv->server_port_v6, ct, &priv->server_addr_v6,
1531           use_client_settings);
1532     }
1533   }
1534
1535 done:
1536   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1537       priv->have_ipv6_mcast;
1538
1539   g_mutex_unlock (&priv->lock);
1540
1541   return result;
1542 }
1543
1544 /**
1545  * gst_rtsp_stream_set_client_side:
1546  * @stream: a #GstRTSPStream
1547  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1548  * an RTSP connection.
1549  *
1550  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1551  * streams to an RTSP server via RECORD. This has the practical effect
1552  * of changing which UDP port numbers are used when setting up the local
1553  * side of the stream sending to be either the 'server' or 'client' pair
1554  * of a configured UDP transport.
1555  */
1556 void
1557 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1558 {
1559   GstRTSPStreamPrivate *priv;
1560
1561   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1562   priv = stream->priv;
1563   g_mutex_lock (&priv->lock);
1564   priv->client_side = client_side;
1565   g_mutex_unlock (&priv->lock);
1566 }
1567
1568 /**
1569  * gst_rtsp_stream_is_client_side:
1570  * @stream: a #GstRTSPStream
1571  *
1572  * See gst_rtsp_stream_set_client_side()
1573  *
1574  * Returns: TRUE if this #GstRTSPStream is client-side.
1575  */
1576 gboolean
1577 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1578 {
1579   GstRTSPStreamPrivate *priv;
1580   gboolean ret;
1581
1582   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1583
1584   priv = stream->priv;
1585   g_mutex_lock (&priv->lock);
1586   ret = priv->client_side;
1587   g_mutex_unlock (&priv->lock);
1588
1589   return ret;
1590 }
1591
1592 /**
1593  * gst_rtsp_stream_get_server_port:
1594  * @stream: a #GstRTSPStream
1595  * @server_port: (out): result server port
1596  * @family: the port family to get
1597  *
1598  * Fill @server_port with the port pair used by the server. This function can
1599  * only be called when @stream has been joined.
1600  */
1601 void
1602 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1603     GstRTSPRange * server_port, GSocketFamily family)
1604 {
1605   GstRTSPStreamPrivate *priv;
1606
1607   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1608   priv = stream->priv;
1609   g_return_if_fail (priv->joined_bin != NULL);
1610
1611   g_mutex_lock (&priv->lock);
1612   if (family == G_SOCKET_FAMILY_IPV4) {
1613     if (server_port)
1614       *server_port = priv->server_port_v4;
1615   } else {
1616     if (server_port)
1617       *server_port = priv->server_port_v6;
1618   }
1619   g_mutex_unlock (&priv->lock);
1620 }
1621
1622 /**
1623  * gst_rtsp_stream_get_rtpsession:
1624  * @stream: a #GstRTSPStream
1625  *
1626  * Get the RTP session of this stream.
1627  *
1628  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1629  */
1630 GObject *
1631 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1632 {
1633   GstRTSPStreamPrivate *priv;
1634   GObject *session;
1635
1636   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1637
1638   priv = stream->priv;
1639
1640   g_mutex_lock (&priv->lock);
1641   if ((session = priv->session))
1642     g_object_ref (session);
1643   g_mutex_unlock (&priv->lock);
1644
1645   return session;
1646 }
1647
1648 /**
1649  * gst_rtsp_stream_get_encoder:
1650  * @stream: a #GstRTSPStream
1651  *
1652  * Get the SRTP encoder for this stream.
1653  *
1654  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1655  */
1656 GstElement *
1657 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1658 {
1659   GstRTSPStreamPrivate *priv;
1660   GstElement *encoder;
1661
1662   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1663
1664   priv = stream->priv;
1665
1666   g_mutex_lock (&priv->lock);
1667   if ((encoder = priv->srtpenc))
1668     g_object_ref (encoder);
1669   g_mutex_unlock (&priv->lock);
1670
1671   return encoder;
1672 }
1673
1674 /**
1675  * gst_rtsp_stream_get_ssrc:
1676  * @stream: a #GstRTSPStream
1677  * @ssrc: (out): result ssrc
1678  *
1679  * Get the SSRC used by the RTP session of this stream. This function can only
1680  * be called when @stream has been joined.
1681  */
1682 void
1683 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1684 {
1685   GstRTSPStreamPrivate *priv;
1686
1687   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1688   priv = stream->priv;
1689   g_return_if_fail (priv->joined_bin != NULL);
1690
1691   g_mutex_lock (&priv->lock);
1692   if (ssrc && priv->session)
1693     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1694   g_mutex_unlock (&priv->lock);
1695 }
1696
1697 /**
1698  * gst_rtsp_stream_set_retransmission_time:
1699  * @stream: a #GstRTSPStream
1700  * @time: a #GstClockTime
1701  *
1702  * Set the amount of time to store retransmission packets.
1703  */
1704 void
1705 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1706     GstClockTime time)
1707 {
1708   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1709
1710   g_mutex_lock (&stream->priv->lock);
1711   stream->priv->rtx_time = time;
1712   if (stream->priv->rtxsend)
1713     g_object_set (stream->priv->rtxsend, "max-size-time",
1714         GST_TIME_AS_MSECONDS (time), NULL);
1715   g_mutex_unlock (&stream->priv->lock);
1716 }
1717
1718 /**
1719  * gst_rtsp_stream_get_retransmission_time:
1720  * @stream: a #GstRTSPStream
1721  *
1722  * Get the amount of time to store retransmission data.
1723  *
1724  * Returns: the amount of time to store retransmission data.
1725  */
1726 GstClockTime
1727 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1728 {
1729   GstClockTime ret;
1730
1731   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1732
1733   g_mutex_lock (&stream->priv->lock);
1734   ret = stream->priv->rtx_time;
1735   g_mutex_unlock (&stream->priv->lock);
1736
1737   return ret;
1738 }
1739
1740 /**
1741  * gst_rtsp_stream_set_retransmission_pt:
1742  * @stream: a #GstRTSPStream
1743  * @rtx_pt: a #guint
1744  *
1745  * Set the payload type (pt) for retransmission of this stream.
1746  */
1747 void
1748 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1749 {
1750   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1751
1752   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1753
1754   g_mutex_lock (&stream->priv->lock);
1755   stream->priv->rtx_pt = rtx_pt;
1756   if (stream->priv->rtxsend) {
1757     guint pt = gst_rtsp_stream_get_pt (stream);
1758     gchar *pt_s = g_strdup_printf ("%d", pt);
1759     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1760         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1761     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1762     g_free (pt_s);
1763     gst_structure_free (rtx_pt_map);
1764   }
1765   g_mutex_unlock (&stream->priv->lock);
1766 }
1767
1768 /**
1769  * gst_rtsp_stream_get_retransmission_pt:
1770  * @stream: a #GstRTSPStream
1771  *
1772  * Get the payload-type used for retransmission of this stream
1773  *
1774  * Returns: The retransmission PT.
1775  */
1776 guint
1777 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1778 {
1779   guint rtx_pt;
1780
1781   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1782
1783   g_mutex_lock (&stream->priv->lock);
1784   rtx_pt = stream->priv->rtx_pt;
1785   g_mutex_unlock (&stream->priv->lock);
1786
1787   return rtx_pt;
1788 }
1789
1790 /**
1791  * gst_rtsp_stream_set_buffer_size:
1792  * @stream: a #GstRTSPStream
1793  * @size: the buffer size
1794  *
1795  * Set the size of the UDP transmission buffer (in bytes)
1796  * Needs to be set before the stream is joined to a bin.
1797  *
1798  * Since: 1.6
1799  */
1800 void
1801 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1802 {
1803   g_mutex_lock (&stream->priv->lock);
1804   stream->priv->buffer_size = size;
1805   g_mutex_unlock (&stream->priv->lock);
1806 }
1807
1808 /**
1809  * gst_rtsp_stream_get_buffer_size:
1810  * @stream: a #GstRTSPStream
1811  *
1812  * Get the size of the UDP transmission buffer (in bytes)
1813  *
1814  * Returns: the size of the UDP TX buffer
1815  *
1816  * Since: 1.6
1817  */
1818 guint
1819 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1820 {
1821   guint buffer_size;
1822
1823   g_mutex_lock (&stream->priv->lock);
1824   buffer_size = stream->priv->buffer_size;
1825   g_mutex_unlock (&stream->priv->lock);
1826
1827   return buffer_size;
1828 }
1829
1830 /* executed from streaming thread */
1831 static void
1832 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1833 {
1834   GstRTSPStreamPrivate *priv = stream->priv;
1835   GstCaps *newcaps, *oldcaps;
1836
1837   newcaps = gst_pad_get_current_caps (pad);
1838
1839   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1840       newcaps);
1841
1842   g_mutex_lock (&priv->lock);
1843   oldcaps = priv->caps;
1844   priv->caps = newcaps;
1845   g_mutex_unlock (&priv->lock);
1846
1847   if (oldcaps)
1848     gst_caps_unref (oldcaps);
1849 }
1850
1851 static void
1852 dump_structure (const GstStructure * s)
1853 {
1854   gchar *sstr;
1855
1856   sstr = gst_structure_to_string (s);
1857   GST_INFO ("structure: %s", sstr);
1858   g_free (sstr);
1859 }
1860
1861 static GstRTSPStreamTransport *
1862 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1863 {
1864   GstRTSPStreamPrivate *priv = stream->priv;
1865   GList *walk;
1866   GstRTSPStreamTransport *result = NULL;
1867   const gchar *tmp;
1868   gchar *dest;
1869   guint port;
1870
1871   if (rtcp_from == NULL)
1872     return NULL;
1873
1874   tmp = g_strrstr (rtcp_from, ":");
1875   if (tmp == NULL)
1876     return NULL;
1877
1878   port = atoi (tmp + 1);
1879   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1880
1881   g_mutex_lock (&priv->lock);
1882   GST_INFO ("finding %s:%d in %d transports", dest, port,
1883       g_list_length (priv->transports));
1884
1885   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1886     GstRTSPStreamTransport *trans = walk->data;
1887     const GstRTSPTransport *tr;
1888     gint min, max;
1889
1890     tr = gst_rtsp_stream_transport_get_transport (trans);
1891
1892     if (priv->client_side) {
1893       /* In client side mode the 'destination' is the RTSP server, so send
1894        * to those ports */
1895       min = tr->server_port.min;
1896       max = tr->server_port.max;
1897     } else {
1898       min = tr->client_port.min;
1899       max = tr->client_port.max;
1900     }
1901
1902     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1903       result = trans;
1904       break;
1905     }
1906   }
1907   if (result)
1908     g_object_ref (result);
1909   g_mutex_unlock (&priv->lock);
1910
1911   g_free (dest);
1912
1913   return result;
1914 }
1915
1916 static GstRTSPStreamTransport *
1917 check_transport (GObject * source, GstRTSPStream * stream)
1918 {
1919   GstStructure *stats;
1920   GstRTSPStreamTransport *trans;
1921
1922   /* see if we have a stream to match with the origin of the RTCP packet */
1923   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1924   if (trans == NULL) {
1925     g_object_get (source, "stats", &stats, NULL);
1926     if (stats) {
1927       const gchar *rtcp_from;
1928
1929       dump_structure (stats);
1930
1931       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1932       if ((trans = find_transport (stream, rtcp_from))) {
1933         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1934             source);
1935         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1936             g_object_unref);
1937       }
1938       gst_structure_free (stats);
1939     }
1940   }
1941   return trans;
1942 }
1943
1944
1945 static void
1946 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1947 {
1948   GstRTSPStreamTransport *trans;
1949
1950   GST_INFO ("%p: new source %p", stream, source);
1951
1952   trans = check_transport (source, stream);
1953
1954   if (trans)
1955     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1956 }
1957
1958 static void
1959 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1960 {
1961   GST_INFO ("%p: new SDES %p", stream, source);
1962 }
1963
1964 static void
1965 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1966 {
1967   GstRTSPStreamTransport *trans;
1968
1969   trans = check_transport (source, stream);
1970
1971   if (trans) {
1972     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1973     gst_rtsp_stream_transport_keep_alive (trans);
1974   }
1975 #ifdef DUMP_STATS
1976   {
1977     GstStructure *stats;
1978     g_object_get (source, "stats", &stats, NULL);
1979     if (stats) {
1980       dump_structure (stats);
1981       gst_structure_free (stats);
1982     }
1983   }
1984 #endif
1985 }
1986
1987 static void
1988 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1989 {
1990   GST_INFO ("%p: source %p bye", stream, source);
1991 }
1992
1993 static void
1994 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1995 {
1996   GstRTSPStreamTransport *trans;
1997
1998   GST_INFO ("%p: source %p bye timeout", stream, source);
1999
2000   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2001     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2002     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2003   }
2004 }
2005
2006 static void
2007 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2008 {
2009   GstRTSPStreamTransport *trans;
2010
2011   GST_INFO ("%p: source %p timeout", stream, source);
2012
2013   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2014     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2015     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2016   }
2017 }
2018
2019 static void
2020 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2021 {
2022   GST_INFO ("%p: new sender source %p", stream, source);
2023 #ifndef DUMP_STATS
2024   {
2025     GstStructure *stats;
2026     g_object_get (source, "stats", &stats, NULL);
2027     if (stats) {
2028       dump_structure (stats);
2029       gst_structure_free (stats);
2030     }
2031   }
2032 #endif
2033 }
2034
2035 static void
2036 on_sender_ssrc_active (GObject * session, GObject * source,
2037     GstRTSPStream * stream)
2038 {
2039 #ifndef DUMP_STATS
2040   {
2041     GstStructure *stats;
2042     g_object_get (source, "stats", &stats, NULL);
2043     if (stats) {
2044       dump_structure (stats);
2045       gst_structure_free (stats);
2046     }
2047   }
2048 #endif
2049 }
2050
2051 static void
2052 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2053 {
2054   if (is_rtp) {
2055     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2056     g_list_free (priv->tr_cache_rtp);
2057     priv->tr_cache_rtp = NULL;
2058   } else {
2059     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2060     g_list_free (priv->tr_cache_rtcp);
2061     priv->tr_cache_rtcp = NULL;
2062   }
2063 }
2064
2065 static GstFlowReturn
2066 handle_new_sample (GstAppSink * sink, gpointer user_data)
2067 {
2068   GstRTSPStreamPrivate *priv;
2069   GList *walk;
2070   GstSample *sample;
2071   GstBuffer *buffer;
2072   GstRTSPStream *stream;
2073   gboolean is_rtp;
2074
2075   sample = gst_app_sink_pull_sample (sink);
2076   if (!sample)
2077     return GST_FLOW_OK;
2078
2079   stream = (GstRTSPStream *) user_data;
2080   priv = stream->priv;
2081   buffer = gst_sample_get_buffer (sample);
2082
2083   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2084
2085   g_mutex_lock (&priv->lock);
2086   if (is_rtp) {
2087     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2088       clear_tr_cache (priv, is_rtp);
2089       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2090         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2091         priv->tr_cache_rtp =
2092             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2093       }
2094       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2095     }
2096   } else {
2097     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2098       clear_tr_cache (priv, is_rtp);
2099       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2100         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2101         priv->tr_cache_rtcp =
2102             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2103       }
2104       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2105     }
2106   }
2107   g_mutex_unlock (&priv->lock);
2108
2109   if (is_rtp) {
2110     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2111       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2112       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2113     }
2114   } else {
2115     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2116       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2117       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2118     }
2119   }
2120   gst_sample_unref (sample);
2121
2122   return GST_FLOW_OK;
2123 }
2124
2125 static GstAppSinkCallbacks sink_cb = {
2126   NULL,                         /* not interested in EOS */
2127   NULL,                         /* not interested in preroll samples */
2128   handle_new_sample,
2129 };
2130
2131 static GstElement *
2132 get_rtp_encoder (GstRTSPStream * stream, guint session)
2133 {
2134   GstRTSPStreamPrivate *priv = stream->priv;
2135
2136   if (priv->srtpenc == NULL) {
2137     gchar *name;
2138
2139     name = g_strdup_printf ("srtpenc_%u", session);
2140     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2141     g_free (name);
2142
2143     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2144   }
2145   return gst_object_ref (priv->srtpenc);
2146 }
2147
2148 static GstElement *
2149 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2150 {
2151   GstRTSPStreamPrivate *priv = stream->priv;
2152   GstElement *oldenc, *enc;
2153   GstPad *pad;
2154   gchar *name;
2155
2156   if (priv->idx != session)
2157     return NULL;
2158
2159   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2160
2161   oldenc = priv->srtpenc;
2162   enc = get_rtp_encoder (stream, session);
2163   name = g_strdup_printf ("rtp_sink_%d", session);
2164   pad = gst_element_get_request_pad (enc, name);
2165   g_free (name);
2166   gst_object_unref (pad);
2167
2168   if (oldenc == NULL)
2169     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2170         enc);
2171
2172   return enc;
2173 }
2174
2175 static GstElement *
2176 request_rtcp_encoder (GstElement * rtpbin, guint session,
2177     GstRTSPStream * stream)
2178 {
2179   GstRTSPStreamPrivate *priv = stream->priv;
2180   GstElement *oldenc, *enc;
2181   GstPad *pad;
2182   gchar *name;
2183
2184   if (priv->idx != session)
2185     return NULL;
2186
2187   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2188
2189   oldenc = priv->srtpenc;
2190   enc = get_rtp_encoder (stream, session);
2191   name = g_strdup_printf ("rtcp_sink_%d", session);
2192   pad = gst_element_get_request_pad (enc, name);
2193   g_free (name);
2194   gst_object_unref (pad);
2195
2196   if (oldenc == NULL)
2197     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2198         enc);
2199
2200   return enc;
2201 }
2202
2203 static GstCaps *
2204 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2205 {
2206   GstRTSPStreamPrivate *priv = stream->priv;
2207   GstCaps *caps;
2208
2209   GST_DEBUG ("request key %08x", ssrc);
2210
2211   g_mutex_lock (&priv->lock);
2212   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2213     gst_caps_ref (caps);
2214   g_mutex_unlock (&priv->lock);
2215
2216   return caps;
2217 }
2218
2219 static GstElement *
2220 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2221     GstRTSPStream * stream)
2222 {
2223   GstRTSPStreamPrivate *priv = stream->priv;
2224
2225   if (priv->idx != session)
2226     return NULL;
2227
2228   if (priv->srtpdec == NULL) {
2229     gchar *name;
2230
2231     name = g_strdup_printf ("srtpdec_%u", session);
2232     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2233     g_free (name);
2234
2235     g_signal_connect (priv->srtpdec, "request-key",
2236         (GCallback) request_key, stream);
2237   }
2238   return gst_object_ref (priv->srtpdec);
2239 }
2240
2241 /**
2242  * gst_rtsp_stream_request_aux_sender:
2243  * @stream: a #GstRTSPStream
2244  * @sessid: the session id
2245  *
2246  * Creating a rtxsend bin
2247  *
2248  * Returns: (transfer full): a #GstElement.
2249  *
2250  * Since: 1.6
2251  */
2252 GstElement *
2253 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2254 {
2255   GstElement *bin;
2256   GstPad *pad;
2257   GstStructure *pt_map;
2258   gchar *name;
2259   guint pt, rtx_pt;
2260   gchar *pt_s;
2261
2262   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2263
2264   pt = gst_rtsp_stream_get_pt (stream);
2265   pt_s = g_strdup_printf ("%u", pt);
2266   rtx_pt = stream->priv->rtx_pt;
2267
2268   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2269
2270   bin = gst_bin_new (NULL);
2271   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2272   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2273       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2274   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2275       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2276   g_free (pt_s);
2277   gst_structure_free (pt_map);
2278   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2279
2280   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2281   name = g_strdup_printf ("src_%u", sessid);
2282   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2283   g_free (name);
2284   gst_object_unref (pad);
2285
2286   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2287   name = g_strdup_printf ("sink_%u", sessid);
2288   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2289   g_free (name);
2290   gst_object_unref (pad);
2291
2292   return bin;
2293 }
2294
2295 /**
2296  * gst_rtsp_stream_set_pt_map:
2297  * @stream: a #GstRTSPStream
2298  * @pt: the pt
2299  * @caps: a #GstCaps
2300  *
2301  * Configure a pt map between @pt and @caps.
2302  */
2303 void
2304 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2305 {
2306   GstRTSPStreamPrivate *priv = stream->priv;
2307
2308   g_mutex_lock (&priv->lock);
2309   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2310   g_mutex_unlock (&priv->lock);
2311 }
2312
2313 /**
2314  * gst_rtsp_stream_set_publish_clock_mode:
2315  * @stream: a #GstRTSPStream
2316  * @mode: the clock publish mode
2317  *
2318  * Sets if and how the stream clock should be published according to RFC7273.
2319  *
2320  * Since: 1.8
2321  */
2322 void
2323 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2324     GstRTSPPublishClockMode mode)
2325 {
2326   GstRTSPStreamPrivate *priv;
2327
2328   priv = stream->priv;
2329   g_mutex_lock (&priv->lock);
2330   priv->publish_clock_mode = mode;
2331   g_mutex_unlock (&priv->lock);
2332 }
2333
2334 /**
2335  * gst_rtsp_stream_get_publish_clock_mode:
2336  * @factory: a #GstRTSPStream
2337  *
2338  * Gets if and how the stream clock should be published according to RFC7273.
2339  *
2340  * Returns: The GstRTSPPublishClockMode
2341  *
2342  * Since: 1.8
2343  */
2344 GstRTSPPublishClockMode
2345 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2346 {
2347   GstRTSPStreamPrivate *priv;
2348   GstRTSPPublishClockMode ret;
2349
2350   priv = stream->priv;
2351   g_mutex_lock (&priv->lock);
2352   ret = priv->publish_clock_mode;
2353   g_mutex_unlock (&priv->lock);
2354
2355   return ret;
2356 }
2357
2358 static GstCaps *
2359 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2360     GstRTSPStream * stream)
2361 {
2362   GstRTSPStreamPrivate *priv = stream->priv;
2363   GstCaps *caps = NULL;
2364
2365   g_mutex_lock (&priv->lock);
2366
2367   if (priv->idx == session) {
2368     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2369     if (caps) {
2370       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2371       gst_caps_ref (caps);
2372     } else {
2373       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2374     }
2375   }
2376
2377   g_mutex_unlock (&priv->lock);
2378
2379   return caps;
2380 }
2381
2382 static void
2383 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2384 {
2385   GstRTSPStreamPrivate *priv = stream->priv;
2386   gchar *name;
2387   GstPadLinkReturn ret;
2388   guint sessid;
2389
2390   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2391       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2392
2393   name = gst_pad_get_name (pad);
2394   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2395     g_free (name);
2396     return;
2397   }
2398   g_free (name);
2399
2400   if (priv->idx != sessid)
2401     return;
2402
2403   if (gst_pad_is_linked (priv->sinkpad)) {
2404     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2405         GST_DEBUG_PAD_NAME (priv->sinkpad));
2406     return;
2407   }
2408
2409   /* link the RTP pad to the session manager, it should not really fail unless
2410    * this is not really an RTP pad */
2411   ret = gst_pad_link (pad, priv->sinkpad);
2412   if (ret != GST_PAD_LINK_OK)
2413     goto link_failed;
2414   priv->recv_rtp_src = gst_object_ref (pad);
2415
2416   return;
2417
2418 /* ERRORS */
2419 link_failed:
2420   {
2421     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2422         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2423   }
2424 }
2425
2426 static void
2427 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2428     GstRTSPStream * stream)
2429 {
2430   /* TODO: What to do here other than this? */
2431   GST_DEBUG ("Stream %p: Got EOS", stream);
2432   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2433 }
2434
2435 static void
2436 plug_sink (GstBin * bin, GstElement * tee, GstElement * sink,
2437     GstElement ** queue_out)
2438 {
2439   GstPad *pad;
2440   GstPad *teepad;
2441   GstPad *queuepad;
2442
2443   gst_bin_add (bin, sink);
2444
2445   *queue_out = gst_element_factory_make ("queue", NULL);
2446   g_object_set (*queue_out, "max-size-buffers", 1, "max-size-bytes", 0,
2447       "max-size-time", G_GINT64_CONSTANT (0), NULL);
2448   gst_bin_add (bin, *queue_out);
2449
2450   /* link tee to queue */
2451   teepad = gst_element_get_request_pad (tee, "src_%u");
2452   pad = gst_element_get_static_pad (*queue_out, "sink");
2453   gst_pad_link (teepad, pad);
2454   gst_object_unref (pad);
2455   gst_object_unref (teepad);
2456
2457   /* link queue to sink */
2458   queuepad = gst_element_get_static_pad (*queue_out, "src");
2459   pad = gst_element_get_static_pad (sink, "sink");
2460   gst_pad_link (queuepad, pad);
2461   gst_object_unref (queuepad);
2462   gst_object_unref (pad);
2463 }
2464
2465 /* must be called with lock */
2466 static gboolean
2467 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2468 {
2469   GstRTSPStreamPrivate *priv;
2470   GstPad *pad;
2471   gboolean is_tcp = FALSE, is_udp = FALSE;
2472   gint i;
2473
2474   priv = stream->priv;
2475
2476   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2477   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2478       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2479
2480   if (is_udp && !create_and_configure_udpsinks (stream))
2481     goto no_udp_protocol;
2482
2483   for (i = 0; i < 2; i++) {
2484     /* For the sender we create this bit of pipeline for both
2485      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2486      * we need to add a queue before appsink and udpsink to make
2487      * the pipeline not block. For the TCP case, we want to pump
2488      * client as fast as possible anyway. This pipeline is used
2489      * when both TCP and UDP are present.
2490      *
2491      * .--------.      .-----.    .---------.    .---------.
2492      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2493      * |       send->sink   src->sink      src->sink       |
2494      * '--------'      |     |    '---------'    '---------'
2495      *                 |     |    .---------.    .---------.
2496      *                 |     |    |  queue  |    | appsink |
2497      *                 |    src->sink      src->sink       |
2498      *                 '-----'    '---------'    '---------'
2499      *
2500      * When only UDP or only TCP is allowed, we skip the tee and queue
2501      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2502      * the session.
2503      */
2504
2505     /* Only link the RTP send src if we're going to send RTP, link
2506      * the RTCP send src always */
2507     if (!priv->srcpad && i == 0)
2508       continue;
2509
2510     if (is_tcp) {
2511       /* make appsink */
2512       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2513       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2514       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2515           &sink_cb, stream, NULL);
2516     }
2517
2518     if (is_udp && is_tcp) {
2519       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2520
2521       /* make tee for RTP/RTCP */
2522       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2523       gst_bin_add (bin, priv->tee[i]);
2524
2525       /* and link to rtpbin send pad */
2526       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2527       gst_pad_link (priv->send_src[i], pad);
2528       gst_object_unref (pad);
2529
2530       plug_sink (bin, priv->tee[i], priv->udpsink[i], &priv->udpqueue[i]);
2531       plug_sink (bin, priv->tee[i], priv->appsink[i], &priv->appqueue[i]);
2532     } else if (is_tcp) {
2533       /* only appsink needed, link it to the session */
2534       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2535       gst_pad_link (priv->send_src[i], pad);
2536       gst_object_unref (pad);
2537
2538       /* when its only TCP, we need to set sync and preroll to FALSE
2539        * for the sink to avoid deadlock. And this is only needed for
2540        * sink used for RTCP data, not the RTP data. */
2541       if (i == 1)
2542         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2543     } else {
2544       /* else only udpsink needed, link it to the session */
2545       pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2546       gst_pad_link (priv->send_src[i], pad);
2547       gst_object_unref (pad);
2548     }
2549
2550     /* check if we need to set to a special state */
2551     if (state != GST_STATE_NULL) {
2552       if (priv->udpsink[i])
2553         gst_element_set_state (priv->udpsink[i], state);
2554       if (priv->appsink[i])
2555         gst_element_set_state (priv->appsink[i], state);
2556       if (priv->appqueue[i])
2557         gst_element_set_state (priv->appqueue[i], state);
2558       if (priv->udpqueue[i])
2559         gst_element_set_state (priv->udpqueue[i], state);
2560       if (priv->tee[i])
2561         gst_element_set_state (priv->tee[i], state);
2562     }
2563   }
2564
2565   return TRUE;
2566
2567   /* ERRORS */
2568 no_udp_protocol:
2569   {
2570     return FALSE;
2571   }
2572 }
2573
2574 /* must be called with lock */
2575 static void
2576 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2577 {
2578   GstRTSPStreamPrivate *priv;
2579   GstPad *pad, *selpad;
2580   gboolean is_tcp;
2581   gint i;
2582
2583   priv = stream->priv;
2584
2585   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2586
2587   for (i = 0; i < 2; i++) {
2588     /* For the receiver we create this bit of pipeline for both
2589      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2590      * and it is all funneled into the rtpbin receive pad.
2591      *
2592      * .--------.     .--------.    .--------.
2593      * | udpsrc |     | funnel |    | rtpbin |
2594      * |       src->sink      src->sink      |
2595      * '--------'     |        |    '--------'
2596      * .--------.     |        |
2597      * | appsrc |     |        |
2598      * |       src->sink       |
2599      * '--------'     '--------'
2600      */
2601
2602     if (!priv->sinkpad && i == 0) {
2603       /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2604        * RTCP sink always */
2605       continue;
2606     }
2607
2608     /* make funnel for the RTP/RTCP receivers */
2609     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2610     gst_bin_add (bin, priv->funnel[i]);
2611
2612     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2613     gst_pad_link (pad, priv->recv_sink[i]);
2614     gst_object_unref (pad);
2615
2616     if (priv->udpsrc_v4[i]) {
2617       if (priv->srcpad) {
2618         /* we set and keep these to playing so that they don't cause NO_PREROLL return
2619          * values. This is only relevant for PLAY pipelines */
2620         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2621         gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2622       }
2623       /* add udpsrc */
2624       gst_bin_add (bin, priv->udpsrc_v4[i]);
2625
2626       /* and link to the funnel v4 */
2627       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2628       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2629       gst_pad_link (pad, selpad);
2630       gst_object_unref (pad);
2631       gst_object_unref (selpad);
2632     }
2633
2634     if (priv->udpsrc_v6[i]) {
2635       if (priv->srcpad) {
2636         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2637         gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2638       }
2639       gst_bin_add (bin, priv->udpsrc_v6[i]);
2640
2641       /* and link to the funnel v6 */
2642       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2643       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2644       gst_pad_link (pad, selpad);
2645       gst_object_unref (pad);
2646       gst_object_unref (selpad);
2647     }
2648
2649     if (is_tcp) {
2650       /* make and add appsrc */
2651       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2652       priv->appsrc_base_time[i] = -1;
2653       if (priv->srcpad) {
2654         gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2655         gst_element_set_locked_state (priv->appsrc[i], TRUE);
2656       }
2657       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2658           TRUE, NULL);
2659       gst_bin_add (bin, priv->appsrc[i]);
2660       /* and link to the funnel */
2661       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2662       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2663       gst_pad_link (pad, selpad);
2664       gst_object_unref (pad);
2665       gst_object_unref (selpad);
2666     }
2667
2668     /* check if we need to set to a special state */
2669     if (state != GST_STATE_NULL) {
2670       gst_element_set_state (priv->funnel[i], state);
2671     }
2672   }
2673 }
2674
2675 /**
2676  * gst_rtsp_stream_join_bin:
2677  * @stream: a #GstRTSPStream
2678  * @bin: (transfer none): a #GstBin to join
2679  * @rtpbin: (transfer none): a rtpbin element in @bin
2680  * @state: the target state of the new elements
2681  *
2682  * Join the #GstBin @bin that contains the element @rtpbin.
2683  *
2684  * @stream will link to @rtpbin, which must be inside @bin. The elements
2685  * added to @bin will be set to the state given in @state.
2686  *
2687  * Returns: %TRUE on success.
2688  */
2689 gboolean
2690 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2691     GstElement * rtpbin, GstState state)
2692 {
2693   GstRTSPStreamPrivate *priv;
2694   guint idx;
2695   gchar *name;
2696   GstPadLinkReturn ret;
2697
2698   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2699   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2700   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2701
2702   priv = stream->priv;
2703
2704   g_mutex_lock (&priv->lock);
2705   if (priv->joined_bin != NULL)
2706     goto was_joined;
2707
2708   /* create a session with the same index as the stream */
2709   idx = priv->idx;
2710
2711   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2712
2713   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2714       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2715     /* For SRTP */
2716     g_signal_connect (rtpbin, "request-rtp-encoder",
2717         (GCallback) request_rtp_encoder, stream);
2718     g_signal_connect (rtpbin, "request-rtcp-encoder",
2719         (GCallback) request_rtcp_encoder, stream);
2720     g_signal_connect (rtpbin, "request-rtp-decoder",
2721         (GCallback) request_rtp_rtcp_decoder, stream);
2722     g_signal_connect (rtpbin, "request-rtcp-decoder",
2723         (GCallback) request_rtp_rtcp_decoder, stream);
2724   }
2725
2726   if (priv->sinkpad) {
2727     g_signal_connect (rtpbin, "request-pt-map",
2728         (GCallback) request_pt_map, stream);
2729   }
2730
2731   /* get pads from the RTP session element for sending and receiving
2732    * RTP/RTCP*/
2733   if (priv->srcpad) {
2734     /* get a pad for sending RTP */
2735     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2736     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2737     g_free (name);
2738
2739     /* link the RTP pad to the session manager, it should not really fail unless
2740      * this is not really an RTP pad */
2741     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2742     if (ret != GST_PAD_LINK_OK)
2743       goto link_failed;
2744
2745     name = g_strdup_printf ("send_rtp_src_%u", idx);
2746     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2747     g_free (name);
2748   } else {
2749     /* Need to connect our sinkpad from here */
2750     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2751     /* EOS */
2752     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2753
2754     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2755     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2756     g_free (name);
2757   }
2758
2759   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2760   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2761   g_free (name);
2762   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2763   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2764   g_free (name);
2765
2766   /* get the session */
2767   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2768
2769   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2770       stream);
2771   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2772       stream);
2773   g_signal_connect (priv->session, "on-ssrc-active",
2774       (GCallback) on_ssrc_active, stream);
2775   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2776       stream);
2777   g_signal_connect (priv->session, "on-bye-timeout",
2778       (GCallback) on_bye_timeout, stream);
2779   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2780       stream);
2781
2782   /* signal for sender ssrc */
2783   g_signal_connect (priv->session, "on-new-sender-ssrc",
2784       (GCallback) on_new_sender_ssrc, stream);
2785   g_signal_connect (priv->session, "on-sender-ssrc-active",
2786       (GCallback) on_sender_ssrc_active, stream);
2787
2788   if (!create_sender_part (stream, bin, state))
2789     goto no_udp_protocol;
2790
2791   create_receiver_part (stream, bin, state);
2792
2793   if (priv->srcpad) {
2794     /* be notified of caps changes */
2795     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2796         (GCallback) caps_notify, stream);
2797   }
2798
2799   priv->joined_bin = gst_object_ref (bin);
2800   g_mutex_unlock (&priv->lock);
2801
2802   return TRUE;
2803
2804   /* ERRORS */
2805 was_joined:
2806   {
2807     g_mutex_unlock (&priv->lock);
2808     return TRUE;
2809   }
2810 link_failed:
2811   {
2812     GST_WARNING ("failed to link stream %u", idx);
2813     gst_object_unref (priv->send_rtp_sink);
2814     priv->send_rtp_sink = NULL;
2815     g_mutex_unlock (&priv->lock);
2816     return FALSE;
2817   }
2818 no_udp_protocol:
2819   {
2820     GST_WARNING ("failed to allocate ports %u", idx);
2821     gst_object_unref (priv->send_rtp_sink);
2822     priv->send_rtp_sink = NULL;
2823     gst_object_unref (priv->send_src[0]);
2824     priv->send_src[0] = NULL;
2825     gst_object_unref (priv->send_src[1]);
2826     priv->send_src[1] = NULL;
2827     gst_object_unref (priv->recv_sink[0]);
2828     priv->recv_sink[0] = NULL;
2829     gst_object_unref (priv->recv_sink[1]);
2830     priv->recv_sink[1] = NULL;
2831     if (priv->udpsink[0])
2832       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2833     if (priv->udpsink[1])
2834       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2835     if (priv->udpsrc_v4[0]) {
2836       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2837       gst_object_unref (priv->udpsrc_v4[0]);
2838       priv->udpsrc_v4[0] = NULL;
2839     }
2840     if (priv->udpsrc_v4[1]) {
2841       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2842       gst_object_unref (priv->udpsrc_v4[1]);
2843       priv->udpsrc_v4[1] = NULL;
2844     }
2845     if (priv->udpsrc_mcast_v4[0]) {
2846       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2847       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2848       priv->udpsrc_mcast_v4[0] = NULL;
2849     }
2850     if (priv->udpsrc_mcast_v4[1]) {
2851       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2852       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2853       priv->udpsrc_mcast_v4[1] = NULL;
2854     }
2855     if (priv->udpsrc_v6[0]) {
2856       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2857       gst_object_unref (priv->udpsrc_v6[0]);
2858       priv->udpsrc_v6[0] = NULL;
2859     }
2860     if (priv->udpsrc_v6[1]) {
2861       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2862       gst_object_unref (priv->udpsrc_v6[1]);
2863       priv->udpsrc_v6[1] = NULL;
2864     }
2865     if (priv->udpsrc_mcast_v6[0]) {
2866       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2867       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2868       priv->udpsrc_mcast_v6[0] = NULL;
2869     }
2870     if (priv->udpsrc_mcast_v6[1]) {
2871       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2872       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2873       priv->udpsrc_mcast_v6[1] = NULL;
2874     }
2875     g_mutex_unlock (&priv->lock);
2876     return FALSE;
2877   }
2878 }
2879
2880 static void
2881 clear_element (GstBin * bin, GstElement ** elementptr)
2882 {
2883   if (*elementptr) {
2884     gst_element_set_locked_state (*elementptr, FALSE);
2885     gst_element_set_state (*elementptr, GST_STATE_NULL);
2886     if (GST_ELEMENT_PARENT (*elementptr))
2887       gst_bin_remove (bin, *elementptr);
2888     else
2889       gst_object_unref (*elementptr);
2890     *elementptr = NULL;
2891   }
2892 }
2893
2894 /**
2895  * gst_rtsp_stream_leave_bin:
2896  * @stream: a #GstRTSPStream
2897  * @bin: (transfer none): a #GstBin
2898  * @rtpbin: (transfer none): a rtpbin #GstElement
2899  *
2900  * Remove the elements of @stream from @bin.
2901  *
2902  * Return: %TRUE on success.
2903  */
2904 gboolean
2905 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2906     GstElement * rtpbin)
2907 {
2908   GstRTSPStreamPrivate *priv;
2909   gint i;
2910
2911   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2912   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2913   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2914
2915   priv = stream->priv;
2916
2917   g_mutex_lock (&priv->lock);
2918   if (priv->joined_bin == NULL)
2919     goto was_not_joined;
2920   if (priv->joined_bin != bin)
2921     goto wrong_bin;
2922
2923   priv->joined_bin = NULL;
2924
2925   /* all transports must be removed by now */
2926   if (priv->transports != NULL)
2927     goto transports_not_removed;
2928
2929   clear_tr_cache (priv, TRUE);
2930   clear_tr_cache (priv, FALSE);
2931
2932   GST_INFO ("stream %p leaving bin", stream);
2933
2934   if (priv->srcpad) {
2935     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2936
2937     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2938     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2939     gst_object_unref (priv->send_rtp_sink);
2940     priv->send_rtp_sink = NULL;
2941   } else if (priv->recv_rtp_src) {
2942     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2943     gst_object_unref (priv->recv_rtp_src);
2944     priv->recv_rtp_src = NULL;
2945   }
2946
2947   for (i = 0; i < 2; i++) {
2948     clear_element (bin, &priv->udpsink[i]);
2949     clear_element (bin, &priv->appsink[i]);
2950     clear_element (bin, &priv->appqueue[i]);
2951     clear_element (bin, &priv->udpqueue[i]);
2952     clear_element (bin, &priv->tee[i]);
2953     clear_element (bin, &priv->funnel[i]);
2954     clear_element (bin, &priv->appsrc[i]);
2955     clear_element (bin, &priv->udpsrc_v4[i]);
2956     clear_element (bin, &priv->udpsrc_v6[i]);
2957     clear_element (bin, &priv->udpsrc_mcast_v4[i]);
2958     clear_element (bin, &priv->udpsrc_mcast_v6[i]);
2959
2960     if (priv->sinkpad || i == 1) {
2961       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2962       gst_object_unref (priv->recv_sink[i]);
2963       priv->recv_sink[i] = NULL;
2964     }
2965   }
2966
2967   if (priv->srcpad) {
2968     gst_object_unref (priv->send_src[0]);
2969     priv->send_src[0] = NULL;
2970   }
2971
2972   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2973   gst_object_unref (priv->send_src[1]);
2974   priv->send_src[1] = NULL;
2975
2976   g_object_unref (priv->session);
2977   priv->session = NULL;
2978   if (priv->caps)
2979     gst_caps_unref (priv->caps);
2980   priv->caps = NULL;
2981
2982   if (priv->srtpenc)
2983     gst_object_unref (priv->srtpenc);
2984   if (priv->srtpdec)
2985     gst_object_unref (priv->srtpdec);
2986
2987   g_clear_object (&priv->joined_bin);
2988   g_mutex_unlock (&priv->lock);
2989
2990   return TRUE;
2991
2992 was_not_joined:
2993   {
2994     g_mutex_unlock (&priv->lock);
2995     return TRUE;
2996   }
2997 transports_not_removed:
2998   {
2999     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3000     g_mutex_unlock (&priv->lock);
3001     return FALSE;
3002   }
3003 wrong_bin:
3004   {
3005     GST_ERROR_OBJECT (stream, "leaving the wrong bin");
3006     g_mutex_unlock (&priv->lock);
3007     return FALSE;
3008   }
3009 }
3010
3011 /**
3012  * gst_rtsp_stream_get_joined_bin:
3013  * @stream: a #GstRTSPStream
3014  *
3015  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3016  *
3017  * Return: (transfer full): the joined bin or NULL.
3018  */
3019 GstBin *
3020 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3021 {
3022   GstRTSPStreamPrivate *priv;
3023   GstBin *bin = NULL;
3024
3025   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3026
3027   priv = stream->priv;
3028
3029   g_mutex_lock (&priv->lock);
3030   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3031   g_mutex_unlock (&priv->lock);
3032
3033   return bin;
3034 }
3035
3036 /**
3037  * gst_rtsp_stream_get_rtpinfo:
3038  * @stream: a #GstRTSPStream
3039  * @rtptime: (allow-none): result RTP timestamp
3040  * @seq: (allow-none): result RTP seqnum
3041  * @clock_rate: (allow-none): the clock rate
3042  * @running_time: (allow-none): result running-time
3043  *
3044  * Retrieve the current rtptime, seq and running-time. This is used to
3045  * construct a RTPInfo reply header.
3046  *
3047  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3048  */
3049 gboolean
3050 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3051     guint * rtptime, guint * seq, guint * clock_rate,
3052     GstClockTime * running_time)
3053 {
3054   GstRTSPStreamPrivate *priv;
3055   GstStructure *stats;
3056   GObjectClass *payobjclass;
3057
3058   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3059
3060   priv = stream->priv;
3061
3062   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3063
3064   g_mutex_lock (&priv->lock);
3065
3066   /* First try to extract the information from the last buffer on the sinks.
3067    * This will have a more accurate sequence number and timestamp, as between
3068    * the payloader and the sink there can be some queues
3069    */
3070   if (priv->udpsink[0] || priv->appsink[0]) {
3071     GstSample *last_sample;
3072
3073     if (priv->udpsink[0])
3074       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3075     else
3076       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3077
3078     if (last_sample) {
3079       GstCaps *caps;
3080       GstBuffer *buffer;
3081       GstSegment *segment;
3082       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3083
3084       caps = gst_sample_get_caps (last_sample);
3085       buffer = gst_sample_get_buffer (last_sample);
3086       segment = gst_sample_get_segment (last_sample);
3087
3088       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3089         if (seq) {
3090           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3091         }
3092
3093         if (rtptime) {
3094           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3095         }
3096
3097         gst_rtp_buffer_unmap (&rtp_buffer);
3098
3099         if (running_time) {
3100           *running_time =
3101               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3102               GST_BUFFER_TIMESTAMP (buffer));
3103         }
3104
3105         if (clock_rate) {
3106           GstStructure *s = gst_caps_get_structure (caps, 0);
3107
3108           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3109
3110           if (*clock_rate == 0 && running_time)
3111             *running_time = GST_CLOCK_TIME_NONE;
3112         }
3113         gst_sample_unref (last_sample);
3114
3115         goto done;
3116       } else {
3117         gst_sample_unref (last_sample);
3118       }
3119     }
3120   }
3121
3122   if (g_object_class_find_property (payobjclass, "stats")) {
3123     g_object_get (priv->payloader, "stats", &stats, NULL);
3124     if (stats == NULL)
3125       goto no_stats;
3126
3127     if (seq)
3128       gst_structure_get_uint (stats, "seqnum", seq);
3129
3130     if (rtptime)
3131       gst_structure_get_uint (stats, "timestamp", rtptime);
3132
3133     if (running_time)
3134       gst_structure_get_clock_time (stats, "running-time", running_time);
3135
3136     if (clock_rate) {
3137       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3138       if (*clock_rate == 0 && running_time)
3139         *running_time = GST_CLOCK_TIME_NONE;
3140     }
3141     gst_structure_free (stats);
3142   } else {
3143     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3144         !g_object_class_find_property (payobjclass, "timestamp"))
3145       goto no_stats;
3146
3147     if (seq)
3148       g_object_get (priv->payloader, "seqnum", seq, NULL);
3149
3150     if (rtptime)
3151       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3152
3153     if (running_time)
3154       *running_time = GST_CLOCK_TIME_NONE;
3155   }
3156
3157 done:
3158   g_mutex_unlock (&priv->lock);
3159
3160   return TRUE;
3161
3162   /* ERRORS */
3163 no_stats:
3164   {
3165     GST_WARNING ("Could not get payloader stats");
3166     g_mutex_unlock (&priv->lock);
3167     return FALSE;
3168   }
3169 }
3170
3171 /**
3172  * gst_rtsp_stream_get_caps:
3173  * @stream: a #GstRTSPStream
3174  *
3175  * Retrieve the current caps of @stream.
3176  *
3177  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3178  * after usage.
3179  */
3180 GstCaps *
3181 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3182 {
3183   GstRTSPStreamPrivate *priv;
3184   GstCaps *result;
3185
3186   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3187
3188   priv = stream->priv;
3189
3190   g_mutex_lock (&priv->lock);
3191   if ((result = priv->caps))
3192     gst_caps_ref (result);
3193   g_mutex_unlock (&priv->lock);
3194
3195   return result;
3196 }
3197
3198 /**
3199  * gst_rtsp_stream_recv_rtp:
3200  * @stream: a #GstRTSPStream
3201  * @buffer: (transfer full): a #GstBuffer
3202  *
3203  * Handle an RTP buffer for the stream. This method is usually called when a
3204  * message has been received from a client using the TCP transport.
3205  *
3206  * This function takes ownership of @buffer.
3207  *
3208  * Returns: a GstFlowReturn.
3209  */
3210 GstFlowReturn
3211 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3212 {
3213   GstRTSPStreamPrivate *priv;
3214   GstFlowReturn ret;
3215   GstElement *element;
3216
3217   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3218   priv = stream->priv;
3219   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3220   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3221
3222   g_mutex_lock (&priv->lock);
3223   if (priv->appsrc[0])
3224     element = gst_object_ref (priv->appsrc[0]);
3225   else
3226     element = NULL;
3227   g_mutex_unlock (&priv->lock);
3228
3229   if (element) {
3230     if (priv->appsrc_base_time[0] == -1) {
3231       /* Take current running_time. This timestamp will be put on
3232        * the first buffer of each stream because we are a live source and so we
3233        * timestamp with the running_time. When we are dealing with TCP, we also
3234        * only timestamp the first buffer (using the DISCONT flag) because a server
3235        * typically bursts data, for which we don't want to compensate by speeding
3236        * up the media. The other timestamps will be interpollated from this one
3237        * using the RTP timestamps. */
3238       GST_OBJECT_LOCK (element);
3239       if (GST_ELEMENT_CLOCK (element)) {
3240         GstClockTime now;
3241         GstClockTime base_time;
3242
3243         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3244         base_time = GST_ELEMENT_CAST (element)->base_time;
3245
3246         priv->appsrc_base_time[0] = now - base_time;
3247         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3248         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3249             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3250             GST_TIME_ARGS (base_time));
3251       }
3252       GST_OBJECT_UNLOCK (element);
3253     }
3254
3255     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3256     gst_object_unref (element);
3257   } else {
3258     ret = GST_FLOW_OK;
3259   }
3260   return ret;
3261 }
3262
3263 /**
3264  * gst_rtsp_stream_recv_rtcp:
3265  * @stream: a #GstRTSPStream
3266  * @buffer: (transfer full): a #GstBuffer
3267  *
3268  * Handle an RTCP buffer for the stream. This method is usually called when a
3269  * message has been received from a client using the TCP transport.
3270  *
3271  * This function takes ownership of @buffer.
3272  *
3273  * Returns: a GstFlowReturn.
3274  */
3275 GstFlowReturn
3276 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3277 {
3278   GstRTSPStreamPrivate *priv;
3279   GstFlowReturn ret;
3280   GstElement *element;
3281
3282   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3283   priv = stream->priv;
3284   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3285
3286   if (priv->joined_bin == NULL) {
3287     gst_buffer_unref (buffer);
3288     return GST_FLOW_NOT_LINKED;
3289   }
3290   g_mutex_lock (&priv->lock);
3291   if (priv->appsrc[1])
3292     element = gst_object_ref (priv->appsrc[1]);
3293   else
3294     element = NULL;
3295   g_mutex_unlock (&priv->lock);
3296
3297   if (element) {
3298     if (priv->appsrc_base_time[1] == -1) {
3299       /* Take current running_time. This timestamp will be put on
3300        * the first buffer of each stream because we are a live source and so we
3301        * timestamp with the running_time. When we are dealing with TCP, we also
3302        * only timestamp the first buffer (using the DISCONT flag) because a server
3303        * typically bursts data, for which we don't want to compensate by speeding
3304        * up the media. The other timestamps will be interpollated from this one
3305        * using the RTP timestamps. */
3306       GST_OBJECT_LOCK (element);
3307       if (GST_ELEMENT_CLOCK (element)) {
3308         GstClockTime now;
3309         GstClockTime base_time;
3310
3311         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3312         base_time = GST_ELEMENT_CAST (element)->base_time;
3313
3314         priv->appsrc_base_time[1] = now - base_time;
3315         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3316         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3317             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3318             GST_TIME_ARGS (base_time));
3319       }
3320       GST_OBJECT_UNLOCK (element);
3321     }
3322
3323     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3324     gst_object_unref (element);
3325   } else {
3326     ret = GST_FLOW_OK;
3327     gst_buffer_unref (buffer);
3328   }
3329   return ret;
3330 }
3331
3332 /* must be called with lock */
3333 static gboolean
3334 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3335     gboolean add)
3336 {
3337   GstRTSPStreamPrivate *priv = stream->priv;
3338   const GstRTSPTransport *tr;
3339
3340   tr = gst_rtsp_stream_transport_get_transport (trans);
3341
3342   switch (tr->lower_transport) {
3343     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3344     case GST_RTSP_LOWER_TRANS_UDP:
3345     {
3346       gchar *dest;
3347       gint min, max;
3348       guint ttl = 0;
3349
3350       dest = tr->destination;
3351       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3352         min = tr->port.min;
3353         max = tr->port.max;
3354         ttl = tr->ttl;
3355       } else if (priv->client_side) {
3356         /* In client side mode the 'destination' is the RTSP server, so send
3357          * to those ports */
3358         min = tr->server_port.min;
3359         max = tr->server_port.max;
3360       } else {
3361         min = tr->client_port.min;
3362         max = tr->client_port.max;
3363       }
3364
3365       if (add) {
3366         if (ttl > 0) {
3367           GST_INFO ("setting ttl-mc %d", ttl);
3368           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3369           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3370         }
3371         GST_INFO ("adding %s:%d-%d", dest, min, max);
3372         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3373         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3374         priv->transports = g_list_prepend (priv->transports, trans);
3375       } else {
3376         GST_INFO ("removing %s:%d-%d", dest, min, max);
3377         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3378         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3379         priv->transports = g_list_remove (priv->transports, trans);
3380       }
3381       priv->transports_cookie++;
3382       break;
3383     }
3384     case GST_RTSP_LOWER_TRANS_TCP:
3385       if (add) {
3386         GST_INFO ("adding TCP %s", tr->destination);
3387         priv->transports = g_list_prepend (priv->transports, trans);
3388       } else {
3389         GST_INFO ("removing TCP %s", tr->destination);
3390         priv->transports = g_list_remove (priv->transports, trans);
3391       }
3392       priv->transports_cookie++;
3393       break;
3394     default:
3395       goto unknown_transport;
3396   }
3397   return TRUE;
3398
3399   /* ERRORS */
3400 unknown_transport:
3401   {
3402     GST_INFO ("Unknown transport %d", tr->lower_transport);
3403     return FALSE;
3404   }
3405 }
3406
3407
3408 /**
3409  * gst_rtsp_stream_add_transport:
3410  * @stream: a #GstRTSPStream
3411  * @trans: (transfer none): a #GstRTSPStreamTransport
3412  *
3413  * Add the transport in @trans to @stream. The media of @stream will
3414  * then also be send to the values configured in @trans.
3415  *
3416  * @stream must be joined to a bin.
3417  *
3418  * @trans must contain a valid #GstRTSPTransport.
3419  *
3420  * Returns: %TRUE if @trans was added
3421  */
3422 gboolean
3423 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3424     GstRTSPStreamTransport * trans)
3425 {
3426   GstRTSPStreamPrivate *priv;
3427   gboolean res;
3428
3429   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3430   priv = stream->priv;
3431   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3432   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3433
3434   g_mutex_lock (&priv->lock);
3435   res = update_transport (stream, trans, TRUE);
3436   g_mutex_unlock (&priv->lock);
3437
3438   return res;
3439 }
3440
3441 /**
3442  * gst_rtsp_stream_remove_transport:
3443  * @stream: a #GstRTSPStream
3444  * @trans: (transfer none): a #GstRTSPStreamTransport
3445  *
3446  * Remove the transport in @trans from @stream. The media of @stream will
3447  * not be sent to the values configured in @trans.
3448  *
3449  * @stream must be joined to a bin.
3450  *
3451  * @trans must contain a valid #GstRTSPTransport.
3452  *
3453  * Returns: %TRUE if @trans was removed
3454  */
3455 gboolean
3456 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3457     GstRTSPStreamTransport * trans)
3458 {
3459   GstRTSPStreamPrivate *priv;
3460   gboolean res;
3461
3462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3463   priv = stream->priv;
3464   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3465   g_return_val_if_fail (priv->joined_bin != NULL, FALSE);
3466
3467   g_mutex_lock (&priv->lock);
3468   res = update_transport (stream, trans, FALSE);
3469   g_mutex_unlock (&priv->lock);
3470
3471   return res;
3472 }
3473
3474 /**
3475  * gst_rtsp_stream_update_crypto:
3476  * @stream: a #GstRTSPStream
3477  * @ssrc: the SSRC
3478  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3479  *
3480  * Update the new crypto information for @ssrc in @stream. If information
3481  * for @ssrc did not exist, it will be added. If information
3482  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3483  * be removed from @stream.
3484  *
3485  * Returns: %TRUE if @crypto could be updated
3486  */
3487 gboolean
3488 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3489     guint ssrc, GstCaps * crypto)
3490 {
3491   GstRTSPStreamPrivate *priv;
3492
3493   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3494   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3495
3496   priv = stream->priv;
3497
3498   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3499
3500   g_mutex_lock (&priv->lock);
3501   if (crypto)
3502     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3503         gst_caps_ref (crypto));
3504   else
3505     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3506   g_mutex_unlock (&priv->lock);
3507
3508   return TRUE;
3509 }
3510
3511 /**
3512  * gst_rtsp_stream_get_rtp_socket:
3513  * @stream: a #GstRTSPStream
3514  * @family: the socket family
3515  *
3516  * Get the RTP socket from @stream for a @family.
3517  *
3518  * @stream must be joined to a bin.
3519  *
3520  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3521  * socket could be allocated for @family. Unref after usage
3522  */
3523 GSocket *
3524 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3525 {
3526   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3527   GSocket *socket;
3528   const gchar *name;
3529
3530   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3531   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3532       family == G_SOCKET_FAMILY_IPV6, NULL);
3533   g_return_val_if_fail (priv->udpsink[0], NULL);
3534
3535   if (family == G_SOCKET_FAMILY_IPV6)
3536     name = "socket-v6";
3537   else
3538     name = "socket";
3539
3540   g_object_get (priv->udpsink[0], name, &socket, NULL);
3541
3542   return socket;
3543 }
3544
3545 /**
3546  * gst_rtsp_stream_get_rtcp_socket:
3547  * @stream: a #GstRTSPStream
3548  * @family: the socket family
3549  *
3550  * Get the RTCP socket from @stream for a @family.
3551  *
3552  * @stream must be joined to a bin.
3553  *
3554  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3555  * socket could be allocated for @family. Unref after usage
3556  */
3557 GSocket *
3558 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3559 {
3560   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3561   GSocket *socket;
3562   const gchar *name;
3563
3564   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3565   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3566       family == G_SOCKET_FAMILY_IPV6, NULL);
3567   g_return_val_if_fail (priv->udpsink[1], NULL);
3568
3569   if (family == G_SOCKET_FAMILY_IPV6)
3570     name = "socket-v6";
3571   else
3572     name = "socket";
3573
3574   g_object_get (priv->udpsink[1], name, &socket, NULL);
3575
3576   return socket;
3577 }
3578
3579 /**
3580  * gst_rtsp_stream_set_seqnum:
3581  * @stream: a #GstRTSPStream
3582  * @seqnum: a new sequence number
3583  *
3584  * Configure the sequence number in the payloader of @stream to @seqnum.
3585  */
3586 void
3587 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3588 {
3589   GstRTSPStreamPrivate *priv;
3590
3591   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3592
3593   priv = stream->priv;
3594
3595   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3596 }
3597
3598 /**
3599  * gst_rtsp_stream_get_seqnum:
3600  * @stream: a #GstRTSPStream
3601  *
3602  * Get the configured sequence number in the payloader of @stream.
3603  *
3604  * Returns: the sequence number of the payloader.
3605  */
3606 guint16
3607 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3608 {
3609   GstRTSPStreamPrivate *priv;
3610   guint seqnum;
3611
3612   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3613
3614   priv = stream->priv;
3615
3616   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3617
3618   return seqnum;
3619 }
3620
3621 /**
3622  * gst_rtsp_stream_transport_filter:
3623  * @stream: a #GstRTSPStream
3624  * @func: (scope call) (allow-none): a callback
3625  * @user_data: (closure): user data passed to @func
3626  *
3627  * Call @func for each transport managed by @stream. The result value of @func
3628  * determines what happens to the transport. @func will be called with @stream
3629  * locked so no further actions on @stream can be performed from @func.
3630  *
3631  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3632  * @stream.
3633  *
3634  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3635  *
3636  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3637  * will also be added with an additional ref to the result #GList of this
3638  * function..
3639  *
3640  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3641  *
3642  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3643  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3644  * element in the #GList should be unreffed before the list is freed.
3645  */
3646 GList *
3647 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3648     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3649 {
3650   GstRTSPStreamPrivate *priv;
3651   GList *result, *walk, *next;
3652   GHashTable *visited = NULL;
3653   guint cookie;
3654
3655   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3656
3657   priv = stream->priv;
3658
3659   result = NULL;
3660   if (func)
3661     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3662
3663   g_mutex_lock (&priv->lock);
3664 restart:
3665   cookie = priv->transports_cookie;
3666   for (walk = priv->transports; walk; walk = next) {
3667     GstRTSPStreamTransport *trans = walk->data;
3668     GstRTSPFilterResult res;
3669     gboolean changed;
3670
3671     next = g_list_next (walk);
3672
3673     if (func) {
3674       /* only visit each transport once */
3675       if (g_hash_table_contains (visited, trans))
3676         continue;
3677
3678       g_hash_table_add (visited, g_object_ref (trans));
3679       g_mutex_unlock (&priv->lock);
3680
3681       res = func (stream, trans, user_data);
3682
3683       g_mutex_lock (&priv->lock);
3684     } else
3685       res = GST_RTSP_FILTER_REF;
3686
3687     changed = (cookie != priv->transports_cookie);
3688
3689     switch (res) {
3690       case GST_RTSP_FILTER_REMOVE:
3691         update_transport (stream, trans, FALSE);
3692         break;
3693       case GST_RTSP_FILTER_REF:
3694         result = g_list_prepend (result, g_object_ref (trans));
3695         break;
3696       case GST_RTSP_FILTER_KEEP:
3697       default:
3698         break;
3699     }
3700     if (changed)
3701       goto restart;
3702   }
3703   g_mutex_unlock (&priv->lock);
3704
3705   if (func)
3706     g_hash_table_unref (visited);
3707
3708   return result;
3709 }
3710
3711 static GstPadProbeReturn
3712 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3713 {
3714   GstRTSPStreamPrivate *priv;
3715   GstRTSPStream *stream;
3716
3717   stream = user_data;
3718   priv = stream->priv;
3719
3720   GST_DEBUG_OBJECT (pad, "now blocking");
3721
3722   g_mutex_lock (&priv->lock);
3723   priv->blocking = TRUE;
3724   g_mutex_unlock (&priv->lock);
3725
3726   gst_element_post_message (priv->payloader,
3727       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3728           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3729
3730   return GST_PAD_PROBE_OK;
3731 }
3732
3733 /**
3734  * gst_rtsp_stream_set_blocked:
3735  * @stream: a #GstRTSPStream
3736  * @blocked: boolean indicating we should block or unblock
3737  *
3738  * Blocks or unblocks the dataflow on @stream.
3739  *
3740  * Returns: %TRUE on success
3741  */
3742 gboolean
3743 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3744 {
3745   GstRTSPStreamPrivate *priv;
3746
3747   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3748
3749   priv = stream->priv;
3750
3751   g_mutex_lock (&priv->lock);
3752   if (blocked) {
3753     priv->blocking = FALSE;
3754     if (priv->blocked_id == 0) {
3755       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3756           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3757           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3758           g_object_ref (stream), g_object_unref);
3759     }
3760   } else {
3761     if (priv->blocked_id != 0) {
3762       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3763       priv->blocked_id = 0;
3764       priv->blocking = FALSE;
3765     }
3766   }
3767   g_mutex_unlock (&priv->lock);
3768
3769   return TRUE;
3770 }
3771
3772 /**
3773  * gst_rtsp_stream_is_blocking:
3774  * @stream: a #GstRTSPStream
3775  *
3776  * Check if @stream is blocking on a #GstBuffer.
3777  *
3778  * Returns: %TRUE if @stream is blocking
3779  */
3780 gboolean
3781 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3782 {
3783   GstRTSPStreamPrivate *priv;
3784   gboolean result;
3785
3786   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3787
3788   priv = stream->priv;
3789
3790   g_mutex_lock (&priv->lock);
3791   result = priv->blocking;
3792   g_mutex_unlock (&priv->lock);
3793
3794   return result;
3795 }
3796
3797 /**
3798  * gst_rtsp_stream_query_position:
3799  * @stream: a #GstRTSPStream
3800  *
3801  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3802  * the RTP parts of the pipeline and not the RTCP parts.
3803  *
3804  * Returns: %TRUE if the position could be queried
3805  */
3806 gboolean
3807 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3808 {
3809   GstRTSPStreamPrivate *priv;
3810   GstElement *sink;
3811   gboolean ret;
3812
3813   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3814
3815   priv = stream->priv;
3816
3817   g_mutex_lock (&priv->lock);
3818   /* depending on the transport type, it should query corresponding sink */
3819   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3820       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3821     sink = priv->udpsink[0];
3822   else
3823     sink = priv->appsink[0];
3824
3825   if (sink)
3826     gst_object_ref (sink);
3827   g_mutex_unlock (&priv->lock);
3828
3829   if (!sink)
3830     return FALSE;
3831
3832   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3833   gst_object_unref (sink);
3834
3835   return ret;
3836 }
3837
3838 /**
3839  * gst_rtsp_stream_query_stop:
3840  * @stream: a #GstRTSPStream
3841  *
3842  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3843  * the RTP parts of the pipeline and not the RTCP parts.
3844  *
3845  * Returns: %TRUE if the stop could be queried
3846  */
3847 gboolean
3848 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3849 {
3850   GstRTSPStreamPrivate *priv;
3851   GstElement *sink;
3852   GstQuery *query;
3853   gboolean ret;
3854
3855   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3856
3857   priv = stream->priv;
3858
3859   g_mutex_lock (&priv->lock);
3860   /* depending on the transport type, it should query corresponding sink */
3861   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3862       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3863     sink = priv->udpsink[0];
3864   else
3865     sink = priv->appsink[0];
3866
3867   if (sink)
3868     gst_object_ref (sink);
3869   g_mutex_unlock (&priv->lock);
3870
3871   if (!sink)
3872     return FALSE;
3873
3874   query = gst_query_new_segment (GST_FORMAT_TIME);
3875   if ((ret = gst_element_query (sink, query))) {
3876     GstFormat format;
3877
3878     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3879     if (format != GST_FORMAT_TIME)
3880       *stop = -1;
3881   }
3882   gst_query_unref (query);
3883   gst_object_unref (sink);
3884
3885   return ret;
3886
3887 }