rtsp-stream: postpone the creation of the UDP sources
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83
84   /* TRUE if this stream is running on
85    * the client side of an RTSP link (for RECORD) */
86   gboolean client_side;
87   gchar *control;
88
89   GstRTSPProfile profiles;
90   GstRTSPLowerTrans protocols;
91
92   /* pads on the rtpbin */
93   GstPad *send_rtp_sink;
94   GstPad *recv_rtp_src;
95   GstPad *recv_sink[2];
96   GstPad *send_src[2];
97
98   /* the RTPSession object */
99   GObject *session;
100
101   /* SRTP encoder/decoder */
102   GstElement *srtpenc;
103   GstElement *srtpdec;
104   GHashTable *keys;
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
107    * sockets */
108   GstElement *udpsrc_v4[2];
109
110   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
111    * sockets */
112   GstElement *udpsrc_v6[2];
113
114   GstElement *udpqueue[2];
115   GstElement *udpsink[2];
116
117   /* for TCP transport */
118   GstElement *appsrc[2];
119   GstClockTime appsrc_base_time[2];
120   GstElement *appqueue[2];
121   GstElement *appsink[2];
122
123   GstElement *tee[2];
124   GstElement *funnel[2];
125
126   /* retransmission */
127   GstElement *rtxsend;
128   guint rtx_pt;
129   GstClockTime rtx_time;
130
131   /* server ports for sending/receiving over ipv4 */
132   GstRTSPRange server_port_v4;
133   GstRTSPAddress *server_addr_v4;
134   gboolean have_ipv4;
135
136   /* server ports for sending/receiving over ipv6 */
137   GstRTSPRange server_port_v6;
138   GstRTSPAddress *server_addr_v6;
139   gboolean have_ipv6;
140
141   /* multicast addresses */
142   GstRTSPAddressPool *pool;
143   GstRTSPAddress *addr_v4;
144   GstRTSPAddress *addr_v6;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   /* UDP sources for UDP multicast transports */
161   GList *transport_sources;
162
163   gint dscp_qos;
164
165   /* stream blocking */
166   gulong blocked_id;
167   gboolean blocking;
168
169   /* pt->caps map for RECORD streams */
170   GHashTable *ptmap;
171 };
172
173 #define DEFAULT_CONTROL         NULL
174 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
175 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
176                                         GST_RTSP_LOWER_TRANS_TCP
177
178 enum
179 {
180   PROP_0,
181   PROP_CONTROL,
182   PROP_PROFILES,
183   PROP_PROTOCOLS,
184   PROP_LAST
185 };
186
187 enum
188 {
189   SIGNAL_NEW_RTP_ENCODER,
190   SIGNAL_NEW_RTCP_ENCODER,
191   SIGNAL_LAST
192 };
193
194 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
195 #define GST_CAT_DEFAULT rtsp_stream_debug
196
197 static GQuark ssrc_stream_map_key;
198
199 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
200     GValue * value, GParamSpec * pspec);
201 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
202     const GValue * value, GParamSpec * pspec);
203
204 static void gst_rtsp_stream_finalize (GObject * obj);
205
206 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
207
208 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
209
210 static void
211 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
212 {
213   GObjectClass *gobject_class;
214
215   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
216
217   gobject_class = G_OBJECT_CLASS (klass);
218
219   gobject_class->get_property = gst_rtsp_stream_get_property;
220   gobject_class->set_property = gst_rtsp_stream_set_property;
221   gobject_class->finalize = gst_rtsp_stream_finalize;
222
223   g_object_class_install_property (gobject_class, PROP_CONTROL,
224       g_param_spec_string ("control", "Control",
225           "The control string for this stream", DEFAULT_CONTROL,
226           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   g_object_class_install_property (gobject_class, PROP_PROFILES,
229       g_param_spec_flags ("profiles", "Profiles",
230           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
231           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232
233   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
234       g_param_spec_flags ("protocols", "Protocols",
235           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
236           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
239       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
241       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
242
243   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
244       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
245       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
246       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
247
248   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
249
250   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
251 }
252
253 static void
254 gst_rtsp_stream_init (GstRTSPStream * stream)
255 {
256   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
257
258   GST_DEBUG ("new stream %p", stream);
259
260   stream->priv = priv;
261
262   priv->dscp_qos = -1;
263   priv->control = g_strdup (DEFAULT_CONTROL);
264   priv->profiles = DEFAULT_PROFILES;
265   priv->protocols = DEFAULT_PROTOCOLS;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   gst_object_unref (priv->payloader);
303   if (priv->srcpad)
304     gst_object_unref (priv->srcpad);
305   if (priv->sinkpad)
306     gst_object_unref (priv->sinkpad);
307   g_free (priv->control);
308   g_mutex_clear (&priv->lock);
309
310   g_hash_table_unref (priv->keys);
311   g_hash_table_destroy (priv->ptmap);
312
313   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
314 }
315
316 static void
317 gst_rtsp_stream_get_property (GObject * object, guint propid,
318     GValue * value, GParamSpec * pspec)
319 {
320   GstRTSPStream *stream = GST_RTSP_STREAM (object);
321
322   switch (propid) {
323     case PROP_CONTROL:
324       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
325       break;
326     case PROP_PROFILES:
327       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
328       break;
329     case PROP_PROTOCOLS:
330       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
331       break;
332     default:
333       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
334   }
335 }
336
337 static void
338 gst_rtsp_stream_set_property (GObject * object, guint propid,
339     const GValue * value, GParamSpec * pspec)
340 {
341   GstRTSPStream *stream = GST_RTSP_STREAM (object);
342
343   switch (propid) {
344     case PROP_CONTROL:
345       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
346       break;
347     case PROP_PROFILES:
348       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
349       break;
350     case PROP_PROTOCOLS:
351       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
352       break;
353     default:
354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
355   }
356 }
357
358 /**
359  * gst_rtsp_stream_new:
360  * @idx: an index
361  * @pad: a #GstPad
362  * @payloader: a #GstElement
363  *
364  * Create a new media stream with index @idx that handles RTP data on
365  * @pad and has a payloader element @payloader if @pad is a source pad
366  * or a depayloader element @payloader if @pad is a sink pad.
367  *
368  * Returns: (transfer full): a new #GstRTSPStream
369  */
370 GstRTSPStream *
371 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
372 {
373   GstRTSPStreamPrivate *priv;
374   GstRTSPStream *stream;
375
376   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
377   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
378
379   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
380   priv = stream->priv;
381   priv->idx = idx;
382   priv->payloader = gst_object_ref (payloader);
383   if (GST_PAD_IS_SRC (pad))
384     priv->srcpad = gst_object_ref (pad);
385   else
386     priv->sinkpad = gst_object_ref (pad);
387
388   return stream;
389 }
390
391 /**
392  * gst_rtsp_stream_get_index:
393  * @stream: a #GstRTSPStream
394  *
395  * Get the stream index.
396  *
397  * Return: the stream index.
398  */
399 guint
400 gst_rtsp_stream_get_index (GstRTSPStream * stream)
401 {
402   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
403
404   return stream->priv->idx;
405 }
406
407 /**
408  * gst_rtsp_stream_get_pt:
409  * @stream: a #GstRTSPStream
410  *
411  * Get the stream payload type.
412  *
413  * Return: the stream payload type.
414  */
415 guint
416 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
417 {
418   GstRTSPStreamPrivate *priv;
419   guint pt;
420
421   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
422
423   priv = stream->priv;
424
425   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
426
427   return pt;
428 }
429
430 /**
431  * gst_rtsp_stream_get_srcpad:
432  * @stream: a #GstRTSPStream
433  *
434  * Get the srcpad associated with @stream.
435  *
436  * Returns: (transfer full): the srcpad. Unref after usage.
437  */
438 GstPad *
439 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
440 {
441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
442
443   if (!stream->priv->srcpad)
444     return NULL;
445
446   return gst_object_ref (stream->priv->srcpad);
447 }
448
449 /**
450  * gst_rtsp_stream_get_sinkpad:
451  * @stream: a #GstRTSPStream
452  *
453  * Get the sinkpad associated with @stream.
454  *
455  * Returns: (transfer full): the sinkpad. Unref after usage.
456  */
457 GstPad *
458 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
459 {
460   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
461
462   if (!stream->priv->sinkpad)
463     return NULL;
464
465   return gst_object_ref (stream->priv->sinkpad);
466 }
467
468 /**
469  * gst_rtsp_stream_get_control:
470  * @stream: a #GstRTSPStream
471  *
472  * Get the control string to identify this stream.
473  *
474  * Returns: (transfer full): the control string. g_free() after usage.
475  */
476 gchar *
477 gst_rtsp_stream_get_control (GstRTSPStream * stream)
478 {
479   GstRTSPStreamPrivate *priv;
480   gchar *result;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if ((result = g_strdup (priv->control)) == NULL)
488     result = g_strdup_printf ("stream=%u", priv->idx);
489   g_mutex_unlock (&priv->lock);
490
491   return result;
492 }
493
494 /**
495  * gst_rtsp_stream_set_control:
496  * @stream: a #GstRTSPStream
497  * @control: a control string
498  *
499  * Set the control string in @stream.
500  */
501 void
502 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   g_mutex_lock (&priv->lock);
511   g_free (priv->control);
512   priv->control = g_strdup (control);
513   g_mutex_unlock (&priv->lock);
514 }
515
516 /**
517  * gst_rtsp_stream_has_control:
518  * @stream: a #GstRTSPStream
519  * @control: a control string
520  *
521  * Check if @stream has the control string @control.
522  *
523  * Returns: %TRUE is @stream has @control as the control string
524  */
525 gboolean
526 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
527 {
528   GstRTSPStreamPrivate *priv;
529   gboolean res;
530
531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
532
533   priv = stream->priv;
534
535   g_mutex_lock (&priv->lock);
536   if (priv->control)
537     res = (g_strcmp0 (priv->control, control) == 0);
538   else {
539     guint streamid;
540
541     if (sscanf (control, "stream=%u", &streamid) > 0)
542       res = (streamid == priv->idx);
543     else
544       res = FALSE;
545   }
546   g_mutex_unlock (&priv->lock);
547
548   return res;
549 }
550
551 /**
552  * gst_rtsp_stream_set_mtu:
553  * @stream: a #GstRTSPStream
554  * @mtu: a new MTU
555  *
556  * Configure the mtu in the payloader of @stream to @mtu.
557  */
558 void
559 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
560 {
561   GstRTSPStreamPrivate *priv;
562
563   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
564
565   priv = stream->priv;
566
567   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
568
569   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
570 }
571
572 /**
573  * gst_rtsp_stream_get_mtu:
574  * @stream: a #GstRTSPStream
575  *
576  * Get the configured MTU in the payloader of @stream.
577  *
578  * Returns: the MTU of the payloader.
579  */
580 guint
581 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
582 {
583   GstRTSPStreamPrivate *priv;
584   guint mtu;
585
586   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
587
588   priv = stream->priv;
589
590   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
591
592   return mtu;
593 }
594
595 /* Update the dscp qos property on the udp sinks */
596 static void
597 update_dscp_qos (GstRTSPStream * stream)
598 {
599   GstRTSPStreamPrivate *priv;
600
601   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
602
603   priv = stream->priv;
604
605   if (priv->udpsink[0]) {
606     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
607         NULL);
608   }
609
610   if (priv->udpsink[1]) {
611     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
612         NULL);
613   }
614 }
615
616 /**
617  * gst_rtsp_stream_set_dscp_qos:
618  * @stream: a #GstRTSPStream
619  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
620  *
621  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
622  */
623 void
624 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
625 {
626   GstRTSPStreamPrivate *priv;
627
628   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
629
630   priv = stream->priv;
631
632   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
633
634   if (dscp_qos < -1 || dscp_qos > 63) {
635     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
636     return;
637   }
638
639   priv->dscp_qos = dscp_qos;
640
641   update_dscp_qos (stream);
642 }
643
644 /**
645  * gst_rtsp_stream_get_dscp_qos:
646  * @stream: a #GstRTSPStream
647  *
648  * Get the configured DSCP QoS in of the outgoing sockets.
649  *
650  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
651  */
652 gint
653 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
654 {
655   GstRTSPStreamPrivate *priv;
656
657   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
658
659   priv = stream->priv;
660
661   return priv->dscp_qos;
662 }
663
664 /**
665  * gst_rtsp_stream_is_transport_supported:
666  * @stream: a #GstRTSPStream
667  * @transport: (transfer none): a #GstRTSPTransport
668  *
669  * Check if @transport can be handled by stream
670  *
671  * Returns: %TRUE if @transport can be handled by @stream.
672  */
673 gboolean
674 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
675     GstRTSPTransport * transport)
676 {
677   GstRTSPStreamPrivate *priv;
678
679   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
680
681   priv = stream->priv;
682
683   g_mutex_lock (&priv->lock);
684   if (transport->trans != GST_RTSP_TRANS_RTP)
685     goto unsupported_transmode;
686
687   if (!(transport->profile & priv->profiles))
688     goto unsupported_profile;
689
690   if (!(transport->lower_transport & priv->protocols))
691     goto unsupported_ltrans;
692
693   g_mutex_unlock (&priv->lock);
694
695   return TRUE;
696
697   /* ERRORS */
698 unsupported_transmode:
699   {
700     GST_DEBUG ("unsupported transport mode %d", transport->trans);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_profile:
705   {
706     GST_DEBUG ("unsupported profile %d", transport->profile);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 unsupported_ltrans:
711   {
712     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
713     g_mutex_unlock (&priv->lock);
714     return FALSE;
715   }
716 }
717
718 /**
719  * gst_rtsp_stream_set_profiles:
720  * @stream: a #GstRTSPStream
721  * @profiles: the new profiles
722  *
723  * Configure the allowed profiles for @stream.
724  */
725 void
726 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
727 {
728   GstRTSPStreamPrivate *priv;
729
730   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
731
732   priv = stream->priv;
733
734   g_mutex_lock (&priv->lock);
735   priv->profiles = profiles;
736   g_mutex_unlock (&priv->lock);
737 }
738
739 /**
740  * gst_rtsp_stream_get_profiles:
741  * @stream: a #GstRTSPStream
742  *
743  * Get the allowed profiles of @stream.
744  *
745  * Returns: a #GstRTSPProfile
746  */
747 GstRTSPProfile
748 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
749 {
750   GstRTSPStreamPrivate *priv;
751   GstRTSPProfile res;
752
753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
754
755   priv = stream->priv;
756
757   g_mutex_lock (&priv->lock);
758   res = priv->profiles;
759   g_mutex_unlock (&priv->lock);
760
761   return res;
762 }
763
764 /**
765  * gst_rtsp_stream_set_protocols:
766  * @stream: a #GstRTSPStream
767  * @protocols: the new flags
768  *
769  * Configure the allowed lower transport for @stream.
770  */
771 void
772 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
773     GstRTSPLowerTrans protocols)
774 {
775   GstRTSPStreamPrivate *priv;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   g_mutex_lock (&priv->lock);
782   priv->protocols = protocols;
783   g_mutex_unlock (&priv->lock);
784 }
785
786 /**
787  * gst_rtsp_stream_get_protocols:
788  * @stream: a #GstRTSPStream
789  *
790  * Get the allowed protocols of @stream.
791  *
792  * Returns: a #GstRTSPLowerTrans
793  */
794 GstRTSPLowerTrans
795 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
796 {
797   GstRTSPStreamPrivate *priv;
798   GstRTSPLowerTrans res;
799
800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
801       GST_RTSP_LOWER_TRANS_UNKNOWN);
802
803   priv = stream->priv;
804
805   g_mutex_lock (&priv->lock);
806   res = priv->protocols;
807   g_mutex_unlock (&priv->lock);
808
809   return res;
810 }
811
812 /**
813  * gst_rtsp_stream_set_address_pool:
814  * @stream: a #GstRTSPStream
815  * @pool: (transfer none): a #GstRTSPAddressPool
816  *
817  * configure @pool to be used as the address pool of @stream.
818  */
819 void
820 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
821     GstRTSPAddressPool * pool)
822 {
823   GstRTSPStreamPrivate *priv;
824   GstRTSPAddressPool *old;
825
826   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
827
828   priv = stream->priv;
829
830   GST_LOG_OBJECT (stream, "set address pool %p", pool);
831
832   g_mutex_lock (&priv->lock);
833   if ((old = priv->pool) != pool)
834     priv->pool = pool ? g_object_ref (pool) : NULL;
835   else
836     old = NULL;
837   g_mutex_unlock (&priv->lock);
838
839   if (old)
840     g_object_unref (old);
841 }
842
843 /**
844  * gst_rtsp_stream_get_address_pool:
845  * @stream: a #GstRTSPStream
846  *
847  * Get the #GstRTSPAddressPool used as the address pool of @stream.
848  *
849  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
850  * usage.
851  */
852 GstRTSPAddressPool *
853 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
854 {
855   GstRTSPStreamPrivate *priv;
856   GstRTSPAddressPool *result;
857
858   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
859
860   priv = stream->priv;
861
862   g_mutex_lock (&priv->lock);
863   if ((result = priv->pool))
864     g_object_ref (result);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868 }
869
870 /**
871  * gst_rtsp_stream_get_multicast_address:
872  * @stream: a #GstRTSPStream
873  * @family: the #GSocketFamily
874  *
875  * Get the multicast address of @stream for @family.
876  *
877  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
878  * or %NULL when no address could be allocated. gst_rtsp_address_free()
879  * after usage.
880  */
881 GstRTSPAddress *
882 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
883     GSocketFamily family)
884 {
885   GstRTSPStreamPrivate *priv;
886   GstRTSPAddress *result;
887   GstRTSPAddress **addrp;
888   GstRTSPAddressFlags flags;
889
890   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
891
892   priv = stream->priv;
893
894   if (family == G_SOCKET_FAMILY_IPV6) {
895     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
896     addrp = &priv->addr_v6;
897   } else {
898     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
899     addrp = &priv->addr_v4;
900   }
901
902   g_mutex_lock (&priv->lock);
903   if (*addrp == NULL) {
904     if (priv->pool == NULL)
905       goto no_pool;
906
907     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
908
909     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
910     if (*addrp == NULL)
911       goto no_address;
912   }
913   result = gst_rtsp_address_copy (*addrp);
914   g_mutex_unlock (&priv->lock);
915
916   return result;
917
918   /* ERRORS */
919 no_pool:
920   {
921     GST_ERROR_OBJECT (stream, "no address pool specified");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 no_address:
926   {
927     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
928     g_mutex_unlock (&priv->lock);
929     return NULL;
930   }
931 }
932
933 /**
934  * gst_rtsp_stream_reserve_address:
935  * @stream: a #GstRTSPStream
936  * @address: an address
937  * @port: a port
938  * @n_ports: n_ports
939  * @ttl: a TTL
940  *
941  * Reserve @address and @port as the address and port of @stream.
942  *
943  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
944  * the address could be reserved. gst_rtsp_address_free() after usage.
945  */
946 GstRTSPAddress *
947 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
948     const gchar * address, guint port, guint n_ports, guint ttl)
949 {
950   GstRTSPStreamPrivate *priv;
951   GstRTSPAddress *result;
952   GInetAddress *addr;
953   GSocketFamily family;
954   GstRTSPAddress **addrp;
955
956   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
957   g_return_val_if_fail (address != NULL, NULL);
958   g_return_val_if_fail (port > 0, NULL);
959   g_return_val_if_fail (n_ports > 0, NULL);
960   g_return_val_if_fail (ttl > 0, NULL);
961
962   priv = stream->priv;
963
964   addr = g_inet_address_new_from_string (address);
965   if (!addr) {
966     GST_ERROR ("failed to get inet addr from %s", address);
967     family = G_SOCKET_FAMILY_IPV4;
968   } else {
969     family = g_inet_address_get_family (addr);
970     g_object_unref (addr);
971   }
972
973   if (family == G_SOCKET_FAMILY_IPV6)
974     addrp = &priv->addr_v6;
975   else
976     addrp = &priv->addr_v4;
977
978   g_mutex_lock (&priv->lock);
979   if (*addrp == NULL) {
980     GstRTSPAddressPoolResult res;
981
982     if (priv->pool == NULL)
983       goto no_pool;
984
985     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
986         port, n_ports, ttl, addrp);
987     if (res != GST_RTSP_ADDRESS_POOL_OK)
988       goto no_address;
989   } else {
990     if (strcmp ((*addrp)->address, address) ||
991         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
992         (*addrp)->ttl != ttl)
993       goto different_address;
994   }
995   result = gst_rtsp_address_copy (*addrp);
996   g_mutex_unlock (&priv->lock);
997
998   return result;
999
1000   /* ERRORS */
1001 no_pool:
1002   {
1003     GST_ERROR_OBJECT (stream, "no address pool specified");
1004     g_mutex_unlock (&priv->lock);
1005     return NULL;
1006   }
1007 no_address:
1008   {
1009     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1010         address);
1011     g_mutex_unlock (&priv->lock);
1012     return NULL;
1013   }
1014 different_address:
1015   {
1016     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1017         " reserved", address);
1018     g_mutex_unlock (&priv->lock);
1019     return NULL;
1020   }
1021 }
1022
1023 /* must be called with lock */
1024 static void
1025 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1026     GSocket * rtcp_socket, GSocketFamily family)
1027 {
1028   GstRTSPStreamPrivate *priv = stream->priv;
1029   const gchar *multisink_socket;
1030
1031   if (family == G_SOCKET_FAMILY_IPV6)
1032     multisink_socket = "socket-v6";
1033   else
1034     multisink_socket = "socket";
1035
1036   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1037       NULL);
1038   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1039       NULL);
1040 }
1041
1042 /* must be called with lock */
1043 static gboolean
1044 create_and_configure_udpsinks (GstRTSPStream * stream)
1045 {
1046   GstRTSPStreamPrivate *priv = stream->priv;
1047   GstElement *udpsink0, *udpsink1;
1048
1049   udpsink0 = NULL;
1050   udpsink1 = NULL;
1051
1052   if (priv->udpsink[0])
1053     udpsink0 = priv->udpsink[0];
1054   else
1055     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1056
1057   if (!udpsink0)
1058     goto no_udp_protocol;
1059
1060   if (priv->udpsink[1])
1061     udpsink1 = priv->udpsink[1];
1062   else
1063     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1064
1065   if (!udpsink1)
1066     goto no_udp_protocol;
1067
1068   /* configure sinks */
1069
1070   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1071   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1072
1073   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1074   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1075
1076   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1077
1078   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1079   /* Needs to be async for RECORD streams, otherwise we will never go to
1080    * PLAYING because the sinks will wait for data while the udpsrc can't
1081    * provide data with timestamps in PAUSED. */
1082   if (priv->sinkpad)
1083     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1085
1086   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1087   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1088
1089   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1090   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1091
1092   /* update the dscp qos field in the sinks */
1093   update_dscp_qos (stream);
1094
1095   priv->udpsink[0] = udpsink0;
1096   priv->udpsink[1] = udpsink1;
1097
1098   return TRUE;
1099
1100   /* ERRORS */
1101 no_udp_protocol:
1102   {
1103     return FALSE;
1104   }
1105 }
1106
1107 /* must be called with lock */
1108 static void
1109 play_udpsources_one_family (GstRTSPStream * stream, GSocketFamily family)
1110 {
1111   GstRTSPStreamPrivate *priv;
1112   GstPad *pad, *selpad;
1113   guint i;
1114   GstBin *bin;
1115
1116   priv = stream->priv;
1117   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1118
1119   for (i = 0; i < 2; i++) {
1120     if (priv->sinkpad || i == 1) {
1121       if (family == G_SOCKET_FAMILY_IPV4 && priv->udpsrc_v4[i]) {
1122         if (priv->srcpad) {
1123           /* we set and keep these to playing so that they don't cause NO_PREROLL return
1124            * values. This is only relevant for PLAY pipelines */
1125           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1126           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1127         }
1128         /* add udpsrc */
1129         gst_bin_add (bin, priv->udpsrc_v4[i]);
1130
1131         /* and link to the funnel v4 */
1132         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1133         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1134         gst_pad_link (pad, selpad);
1135         gst_object_unref (pad);
1136         gst_object_unref (selpad);
1137       }
1138
1139       if (family == G_SOCKET_FAMILY_IPV6 && priv->udpsrc_v6[i]) {
1140         if (priv->srcpad) {
1141           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1142           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1143         }
1144         gst_bin_add (bin, priv->udpsrc_v6[i]);
1145
1146         /* and link to the funnel v6 */
1147         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1148         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1149         gst_pad_link (pad, selpad);
1150         gst_object_unref (pad);
1151         gst_object_unref (selpad);
1152       }
1153     }
1154   }
1155
1156   gst_object_unref (bin);
1157 }
1158
1159 /* must be called with lock */
1160 static gboolean
1161 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1162     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family)
1163 {
1164   GstStateChangeReturn ret;
1165
1166   /* we keep these elements, we will further configure them when the
1167    * client told us to really use the UDP ports. */
1168   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1169   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1170
1171   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1172     goto error;
1173
1174   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1175   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1176
1177   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1178   if (ret == GST_STATE_CHANGE_FAILURE)
1179     goto error;
1180   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1181   if (ret == GST_STATE_CHANGE_FAILURE)
1182     goto error;
1183
1184   return TRUE;
1185   return TRUE;
1186
1187   /* ERRORS */
1188 error:
1189   {
1190     if (udpsrc_out[0])
1191       gst_object_unref (udpsrc_out[0]);
1192     if (udpsrc_out[1])
1193       gst_object_unref (udpsrc_out[1]);
1194     return FALSE;
1195   }
1196 }
1197
1198 static gboolean
1199 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1200     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1201     GstRTSPAddress ** server_addr_out)
1202 {
1203   GstRTSPStreamPrivate *priv = stream->priv;
1204   GSocket *rtp_socket = NULL;
1205   GSocket *rtcp_socket;
1206   gint tmp_rtp, tmp_rtcp;
1207   guint count;
1208   gint rtpport, rtcpport;
1209   GList *rejected_addresses = NULL;
1210   GstRTSPAddress *addr = NULL;
1211   GInetAddress *inetaddr = NULL;
1212   GSocketAddress *rtp_sockaddr = NULL;
1213   GSocketAddress *rtcp_sockaddr = NULL;
1214   GstRTSPAddressPool * pool;
1215
1216   pool = priv->pool;
1217   count = 0;
1218
1219   /* Start with random port */
1220   tmp_rtp = 0;
1221
1222   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1223       G_SOCKET_PROTOCOL_UDP, NULL);
1224   if (!rtcp_socket)
1225     goto no_udp_protocol;
1226
1227   if (*server_addr_out)
1228     gst_rtsp_address_free (*server_addr_out);
1229
1230   /* try to allocate 2 UDP ports, the RTP port should be an even
1231    * number and the RTCP port should be the next (uneven) port */
1232 again:
1233
1234   if (rtp_socket == NULL) {
1235     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1236         G_SOCKET_PROTOCOL_UDP, NULL);
1237     if (!rtp_socket)
1238       goto no_udp_protocol;
1239   }
1240
1241   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1242     GstRTSPAddressFlags flags;
1243
1244     if (addr)
1245       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1246
1247     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1248     if (family == G_SOCKET_FAMILY_IPV6)
1249       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1250     else
1251       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1252
1253     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1254
1255     if (addr == NULL)
1256       goto no_ports;
1257
1258     tmp_rtp = addr->port;
1259
1260     g_clear_object (&inetaddr);
1261     inetaddr = g_inet_address_new_from_string (addr->address);
1262   } else {
1263     if (tmp_rtp != 0) {
1264       tmp_rtp += 2;
1265       if (++count > 20)
1266         goto no_ports;
1267     }
1268
1269     if (inetaddr == NULL)
1270       inetaddr = g_inet_address_new_any (family);
1271   }
1272
1273   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1274   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1275     g_object_unref (rtp_sockaddr);
1276     goto again;
1277   }
1278   g_object_unref (rtp_sockaddr);
1279
1280   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1281   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1282     g_clear_object (&rtp_sockaddr);
1283     goto socket_error;
1284   }
1285
1286   tmp_rtp =
1287       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1288   g_object_unref (rtp_sockaddr);
1289
1290   /* check if port is even */
1291   if ((tmp_rtp & 1) != 0) {
1292     /* port not even, close and allocate another */
1293     tmp_rtp++;
1294     g_clear_object (&rtp_socket);
1295     goto again;
1296   }
1297
1298   /* set port */
1299   tmp_rtcp = tmp_rtp + 1;
1300
1301   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1302   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1303     g_object_unref (rtcp_sockaddr);
1304     g_clear_object (&rtp_socket);
1305     goto again;
1306   }
1307   g_object_unref (rtcp_sockaddr);
1308
1309   g_clear_object (&inetaddr);
1310
1311   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1312         rtcp_socket, family))
1313     goto no_udp_protocol;
1314   play_udpsources_one_family (stream, family);
1315
1316   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1317   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1318
1319   /* this should not happen... */
1320   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1321     goto port_error;
1322
1323
1324   /* set RTP and RTCP sockets */
1325   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1326
1327   server_port_out->min = rtpport;
1328   server_port_out->max = rtcpport;
1329
1330   *server_addr_out = addr;
1331   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1332
1333   g_object_unref (rtp_socket);
1334   g_object_unref (rtcp_socket);
1335
1336   return TRUE;
1337
1338   /* ERRORS */
1339 no_udp_protocol:
1340   {
1341     goto cleanup;
1342   }
1343 no_ports:
1344   {
1345     goto cleanup;
1346   }
1347 port_error:
1348   {
1349     goto cleanup;
1350   }
1351 socket_error:
1352   {
1353     goto cleanup;
1354   }
1355 cleanup:
1356   {
1357     if (inetaddr)
1358       g_object_unref (inetaddr);
1359     g_list_free_full (rejected_addresses,
1360         (GDestroyNotify) gst_rtsp_address_free);
1361     if (addr)
1362       gst_rtsp_address_free (addr);
1363     if (rtp_socket)
1364       g_object_unref (rtp_socket);
1365     if (rtcp_socket)
1366       g_object_unref (rtcp_socket);
1367     return FALSE;
1368   }
1369 }
1370
1371 /* must be called with lock */
1372 static gboolean
1373 alloc_ports (GstRTSPStream * stream)
1374 {
1375   GstRTSPStreamPrivate *priv = stream->priv;
1376
1377   priv->have_ipv4 =
1378       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1379           &priv->server_port_v4, &priv->server_addr_v4);
1380
1381   priv->have_ipv6 =
1382       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1383           &priv->server_port_v6, &priv->server_addr_v6);
1384
1385   return priv->have_ipv4 || priv->have_ipv6;
1386 }
1387
1388 /**
1389  * gst_rtsp_stream_set_client_side:
1390  * @stream: a #GstRTSPStream
1391  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1392  * an RTSP connection.
1393  *
1394  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1395  * streams to an RTSP server via RECORD. This has the practical effect
1396  * of changing which UDP port numbers are used when setting up the local
1397  * side of the stream sending to be either the 'server' or 'client' pair
1398  * of a configured UDP transport.
1399  */
1400 void
1401 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1402 {
1403   GstRTSPStreamPrivate *priv;
1404
1405   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1406   priv = stream->priv;
1407   g_mutex_lock (&priv->lock);
1408   priv->client_side = client_side;
1409   g_mutex_unlock (&priv->lock);
1410 }
1411
1412 /**
1413  * gst_rtsp_stream_set_client_side:
1414  * @stream: a #GstRTSPStream
1415  *
1416  * See gst_rtsp_stream_set_client_side()
1417  *
1418  * Returns: TRUE if this #GstRTSPStream is client-side.
1419  */
1420 gboolean
1421 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1422 {
1423   GstRTSPStreamPrivate *priv;
1424   gboolean ret;
1425
1426   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1427
1428   priv = stream->priv;
1429   g_mutex_lock (&priv->lock);
1430   ret = priv->client_side;
1431   g_mutex_unlock (&priv->lock);
1432
1433   return ret;
1434 }
1435
1436 /**
1437  * gst_rtsp_stream_get_server_port:
1438  * @stream: a #GstRTSPStream
1439  * @server_port: (out): result server port
1440  * @family: the port family to get
1441  *
1442  * Fill @server_port with the port pair used by the server. This function can
1443  * only be called when @stream has been joined.
1444  */
1445 void
1446 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1447     GstRTSPRange * server_port, GSocketFamily family)
1448 {
1449   GstRTSPStreamPrivate *priv;
1450
1451   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1452   priv = stream->priv;
1453   g_return_if_fail (priv->is_joined);
1454
1455   g_mutex_lock (&priv->lock);
1456   if (family == G_SOCKET_FAMILY_IPV4) {
1457     if (server_port)
1458       *server_port = priv->server_port_v4;
1459   } else {
1460     if (server_port)
1461       *server_port = priv->server_port_v6;
1462   }
1463   g_mutex_unlock (&priv->lock);
1464 }
1465
1466 /**
1467  * gst_rtsp_stream_get_rtpsession:
1468  * @stream: a #GstRTSPStream
1469  *
1470  * Get the RTP session of this stream.
1471  *
1472  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1473  */
1474 GObject *
1475 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1476 {
1477   GstRTSPStreamPrivate *priv;
1478   GObject *session;
1479
1480   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1481
1482   priv = stream->priv;
1483
1484   g_mutex_lock (&priv->lock);
1485   if ((session = priv->session))
1486     g_object_ref (session);
1487   g_mutex_unlock (&priv->lock);
1488
1489   return session;
1490 }
1491
1492 /**
1493  * gst_rtsp_stream_get_ssrc:
1494  * @stream: a #GstRTSPStream
1495  * @ssrc: (out): result ssrc
1496  *
1497  * Get the SSRC used by the RTP session of this stream. This function can only
1498  * be called when @stream has been joined.
1499  */
1500 void
1501 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1502 {
1503   GstRTSPStreamPrivate *priv;
1504
1505   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1506   priv = stream->priv;
1507   g_return_if_fail (priv->is_joined);
1508
1509   g_mutex_lock (&priv->lock);
1510   if (ssrc && priv->session)
1511     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1512   g_mutex_unlock (&priv->lock);
1513 }
1514
1515 /**
1516  * gst_rtsp_stream_set_retransmission_time:
1517  * @stream: a #GstRTSPStream
1518  * @time: a #GstClockTime
1519  *
1520  * Set the amount of time to store retransmission packets.
1521  */
1522 void
1523 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1524     GstClockTime time)
1525 {
1526   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1527
1528   g_mutex_lock (&stream->priv->lock);
1529   stream->priv->rtx_time = time;
1530   if (stream->priv->rtxsend)
1531     g_object_set (stream->priv->rtxsend, "max-size-time",
1532         GST_TIME_AS_MSECONDS (time), NULL);
1533   g_mutex_unlock (&stream->priv->lock);
1534 }
1535
1536 /**
1537  * gst_rtsp_stream_get_retransmission_time:
1538  * @stream: a #GstRTSPStream
1539  *
1540  * Get the amount of time to store retransmission data.
1541  *
1542  * Returns: the amount of time to store retransmission data.
1543  */
1544 GstClockTime
1545 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1546 {
1547   GstClockTime ret;
1548
1549   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1550
1551   g_mutex_lock (&stream->priv->lock);
1552   ret = stream->priv->rtx_time;
1553   g_mutex_unlock (&stream->priv->lock);
1554
1555   return ret;
1556 }
1557
1558 /**
1559  * gst_rtsp_stream_set_retransmission_pt:
1560  * @stream: a #GstRTSPStream
1561  * @rtx_pt: a #guint
1562  *
1563  * Set the payload type (pt) for retransmission of this stream.
1564  */
1565 void
1566 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1567 {
1568   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1569
1570   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1571
1572   g_mutex_lock (&stream->priv->lock);
1573   stream->priv->rtx_pt = rtx_pt;
1574   if (stream->priv->rtxsend) {
1575     guint pt = gst_rtsp_stream_get_pt (stream);
1576     gchar *pt_s = g_strdup_printf ("%d", pt);
1577     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1578         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1579     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1580     g_free (pt_s);
1581     gst_structure_free (rtx_pt_map);
1582   }
1583   g_mutex_unlock (&stream->priv->lock);
1584 }
1585
1586 /**
1587  * gst_rtsp_stream_get_retransmission_pt:
1588  * @stream: a #GstRTSPStream
1589  *
1590  * Get the payload-type used for retransmission of this stream
1591  *
1592  * Returns: The retransmission PT.
1593  */
1594 guint
1595 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1596 {
1597   guint rtx_pt;
1598
1599   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1600
1601   g_mutex_lock (&stream->priv->lock);
1602   rtx_pt = stream->priv->rtx_pt;
1603   g_mutex_unlock (&stream->priv->lock);
1604
1605   return rtx_pt;
1606 }
1607
1608 /**
1609  * gst_rtsp_stream_set_buffer_size:
1610  * @stream: a #GstRTSPStream
1611  * @size: the buffer size
1612  *
1613  * Set the size of the UDP transmission buffer (in bytes)
1614  * Needs to be set before the stream is joined to a bin.
1615  *
1616  * Since: 1.6
1617  */
1618 void
1619 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1620 {
1621   g_mutex_lock (&stream->priv->lock);
1622   stream->priv->buffer_size = size;
1623   g_mutex_unlock (&stream->priv->lock);
1624 }
1625
1626 /**
1627  * gst_rtsp_stream_get_buffer_size:
1628  * @stream: a #GstRTSPStream
1629  *
1630  * Get the size of the UDP transmission buffer (in bytes)
1631  *
1632  * Returns: the size of the UDP TX buffer
1633  *
1634  * Since: 1.6
1635  */
1636 guint
1637 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1638 {
1639   guint buffer_size;
1640
1641   g_mutex_lock (&stream->priv->lock);
1642   buffer_size = stream->priv->buffer_size;
1643   g_mutex_unlock (&stream->priv->lock);
1644
1645   return buffer_size;
1646 }
1647
1648 /* executed from streaming thread */
1649 static void
1650 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1651 {
1652   GstRTSPStreamPrivate *priv = stream->priv;
1653   GstCaps *newcaps, *oldcaps;
1654
1655   newcaps = gst_pad_get_current_caps (pad);
1656
1657   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1658       newcaps);
1659
1660   g_mutex_lock (&priv->lock);
1661   oldcaps = priv->caps;
1662   priv->caps = newcaps;
1663   g_mutex_unlock (&priv->lock);
1664
1665   if (oldcaps)
1666     gst_caps_unref (oldcaps);
1667 }
1668
1669 static void
1670 dump_structure (const GstStructure * s)
1671 {
1672   gchar *sstr;
1673
1674   sstr = gst_structure_to_string (s);
1675   GST_INFO ("structure: %s", sstr);
1676   g_free (sstr);
1677 }
1678
1679 static GstRTSPStreamTransport *
1680 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1681 {
1682   GstRTSPStreamPrivate *priv = stream->priv;
1683   GList *walk;
1684   GstRTSPStreamTransport *result = NULL;
1685   const gchar *tmp;
1686   gchar *dest;
1687   guint port;
1688
1689   if (rtcp_from == NULL)
1690     return NULL;
1691
1692   tmp = g_strrstr (rtcp_from, ":");
1693   if (tmp == NULL)
1694     return NULL;
1695
1696   port = atoi (tmp + 1);
1697   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1698
1699   g_mutex_lock (&priv->lock);
1700   GST_INFO ("finding %s:%d in %d transports", dest, port,
1701       g_list_length (priv->transports));
1702
1703   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1704     GstRTSPStreamTransport *trans = walk->data;
1705     const GstRTSPTransport *tr;
1706     gint min, max;
1707
1708     tr = gst_rtsp_stream_transport_get_transport (trans);
1709
1710     if (priv->client_side) {
1711       /* In client side mode the 'destination' is the RTSP server, so send
1712        * to those ports */
1713       min = tr->server_port.min;
1714       max = tr->server_port.max;
1715     } else {
1716       min = tr->client_port.min;
1717       max = tr->client_port.max;
1718     }
1719
1720     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1721       result = trans;
1722       break;
1723     }
1724   }
1725   if (result)
1726     g_object_ref (result);
1727   g_mutex_unlock (&priv->lock);
1728
1729   g_free (dest);
1730
1731   return result;
1732 }
1733
1734 static GstRTSPStreamTransport *
1735 check_transport (GObject * source, GstRTSPStream * stream)
1736 {
1737   GstStructure *stats;
1738   GstRTSPStreamTransport *trans;
1739
1740   /* see if we have a stream to match with the origin of the RTCP packet */
1741   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1742   if (trans == NULL) {
1743     g_object_get (source, "stats", &stats, NULL);
1744     if (stats) {
1745       const gchar *rtcp_from;
1746
1747       dump_structure (stats);
1748
1749       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1750       if ((trans = find_transport (stream, rtcp_from))) {
1751         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1752             source);
1753         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1754             g_object_unref);
1755       }
1756       gst_structure_free (stats);
1757     }
1758   }
1759   return trans;
1760 }
1761
1762
1763 static void
1764 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1765 {
1766   GstRTSPStreamTransport *trans;
1767
1768   GST_INFO ("%p: new source %p", stream, source);
1769
1770   trans = check_transport (source, stream);
1771
1772   if (trans)
1773     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1774 }
1775
1776 static void
1777 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1778 {
1779   GST_INFO ("%p: new SDES %p", stream, source);
1780 }
1781
1782 static void
1783 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1784 {
1785   GstRTSPStreamTransport *trans;
1786
1787   trans = check_transport (source, stream);
1788
1789   if (trans) {
1790     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1791     gst_rtsp_stream_transport_keep_alive (trans);
1792   }
1793 #ifdef DUMP_STATS
1794   {
1795     GstStructure *stats;
1796     g_object_get (source, "stats", &stats, NULL);
1797     if (stats) {
1798       dump_structure (stats);
1799       gst_structure_free (stats);
1800     }
1801   }
1802 #endif
1803 }
1804
1805 static void
1806 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1807 {
1808   GST_INFO ("%p: source %p bye", stream, source);
1809 }
1810
1811 static void
1812 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1813 {
1814   GstRTSPStreamTransport *trans;
1815
1816   GST_INFO ("%p: source %p bye timeout", stream, source);
1817
1818   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1819     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1820     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1821   }
1822 }
1823
1824 static void
1825 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1826 {
1827   GstRTSPStreamTransport *trans;
1828
1829   GST_INFO ("%p: source %p timeout", stream, source);
1830
1831   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1832     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1833     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1834   }
1835 }
1836
1837 static void
1838 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1839 {
1840   GST_INFO ("%p: new sender source %p", stream, source);
1841 #ifndef DUMP_STATS
1842   {
1843     GstStructure *stats;
1844     g_object_get (source, "stats", &stats, NULL);
1845     if (stats) {
1846       dump_structure (stats);
1847       gst_structure_free (stats);
1848     }
1849   }
1850 #endif
1851 }
1852
1853 static void
1854 on_sender_ssrc_active (GObject * session, GObject * source,
1855     GstRTSPStream * stream)
1856 {
1857 #ifndef DUMP_STATS
1858   {
1859     GstStructure *stats;
1860     g_object_get (source, "stats", &stats, NULL);
1861     if (stats) {
1862       dump_structure (stats);
1863       gst_structure_free (stats);
1864     }
1865   }
1866 #endif
1867 }
1868
1869 static void
1870 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1871 {
1872   if (is_rtp) {
1873     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1874     g_list_free (priv->tr_cache_rtp);
1875     priv->tr_cache_rtp = NULL;
1876   } else {
1877     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1878     g_list_free (priv->tr_cache_rtcp);
1879     priv->tr_cache_rtcp = NULL;
1880   }
1881 }
1882
1883 static GstFlowReturn
1884 handle_new_sample (GstAppSink * sink, gpointer user_data)
1885 {
1886   GstRTSPStreamPrivate *priv;
1887   GList *walk;
1888   GstSample *sample;
1889   GstBuffer *buffer;
1890   GstRTSPStream *stream;
1891   gboolean is_rtp;
1892
1893   sample = gst_app_sink_pull_sample (sink);
1894   if (!sample)
1895     return GST_FLOW_OK;
1896
1897   stream = (GstRTSPStream *) user_data;
1898   priv = stream->priv;
1899   buffer = gst_sample_get_buffer (sample);
1900
1901   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1902
1903   g_mutex_lock (&priv->lock);
1904   if (is_rtp) {
1905     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1906       clear_tr_cache (priv, is_rtp);
1907       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1908         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1909         priv->tr_cache_rtp =
1910             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1911       }
1912       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1913     }
1914   } else {
1915     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1916       clear_tr_cache (priv, is_rtp);
1917       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1918         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1919         priv->tr_cache_rtcp =
1920             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1921       }
1922       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1923     }
1924   }
1925   g_mutex_unlock (&priv->lock);
1926
1927   if (is_rtp) {
1928     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1929       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1930       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1931     }
1932   } else {
1933     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1934       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1935       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1936     }
1937   }
1938   gst_sample_unref (sample);
1939
1940   return GST_FLOW_OK;
1941 }
1942
1943 static GstAppSinkCallbacks sink_cb = {
1944   NULL,                         /* not interested in EOS */
1945   NULL,                         /* not interested in preroll samples */
1946   handle_new_sample,
1947 };
1948
1949 static GstElement *
1950 get_rtp_encoder (GstRTSPStream * stream, guint session)
1951 {
1952   GstRTSPStreamPrivate *priv = stream->priv;
1953
1954   if (priv->srtpenc == NULL) {
1955     gchar *name;
1956
1957     name = g_strdup_printf ("srtpenc_%u", session);
1958     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1959     g_free (name);
1960
1961     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1962   }
1963   return gst_object_ref (priv->srtpenc);
1964 }
1965
1966 static GstElement *
1967 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1968 {
1969   GstRTSPStreamPrivate *priv = stream->priv;
1970   GstElement *oldenc, *enc;
1971   GstPad *pad;
1972   gchar *name;
1973
1974   if (priv->idx != session)
1975     return NULL;
1976
1977   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1978
1979   oldenc = priv->srtpenc;
1980   enc = get_rtp_encoder (stream, session);
1981   name = g_strdup_printf ("rtp_sink_%d", session);
1982   pad = gst_element_get_request_pad (enc, name);
1983   g_free (name);
1984   gst_object_unref (pad);
1985
1986   if (oldenc == NULL)
1987     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1988         enc);
1989
1990   return enc;
1991 }
1992
1993 static GstElement *
1994 request_rtcp_encoder (GstElement * rtpbin, guint session,
1995     GstRTSPStream * stream)
1996 {
1997   GstRTSPStreamPrivate *priv = stream->priv;
1998   GstElement *oldenc, *enc;
1999   GstPad *pad;
2000   gchar *name;
2001
2002   if (priv->idx != session)
2003     return NULL;
2004
2005   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2006
2007   oldenc = priv->srtpenc;
2008   enc = get_rtp_encoder (stream, session);
2009   name = g_strdup_printf ("rtcp_sink_%d", session);
2010   pad = gst_element_get_request_pad (enc, name);
2011   g_free (name);
2012   gst_object_unref (pad);
2013
2014   if (oldenc == NULL)
2015     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2016         enc);
2017
2018   return enc;
2019 }
2020
2021 static GstCaps *
2022 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2023 {
2024   GstRTSPStreamPrivate *priv = stream->priv;
2025   GstCaps *caps;
2026
2027   GST_DEBUG ("request key %08x", ssrc);
2028
2029   g_mutex_lock (&priv->lock);
2030   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2031     gst_caps_ref (caps);
2032   g_mutex_unlock (&priv->lock);
2033
2034   return caps;
2035 }
2036
2037 static GstElement *
2038 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2039     GstRTSPStream * stream)
2040 {
2041   GstRTSPStreamPrivate *priv = stream->priv;
2042
2043   if (priv->idx != session)
2044     return NULL;
2045
2046   if (priv->srtpdec == NULL) {
2047     gchar *name;
2048
2049     name = g_strdup_printf ("srtpdec_%u", session);
2050     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2051     g_free (name);
2052
2053     g_signal_connect (priv->srtpdec, "request-key",
2054         (GCallback) request_key, stream);
2055   }
2056   return gst_object_ref (priv->srtpdec);
2057 }
2058
2059 /**
2060  * gst_rtsp_stream_request_aux_sender:
2061  * @stream: a #GstRTSPStream
2062  * @sessid: the session id
2063  *
2064  * Creating a rtxsend bin
2065  *
2066  * Returns: (transfer full): a #GstElement.
2067  *
2068  * Since: 1.6
2069  */
2070 GstElement *
2071 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2072 {
2073   GstElement *bin;
2074   GstPad *pad;
2075   GstStructure *pt_map;
2076   gchar *name;
2077   guint pt, rtx_pt;
2078   gchar *pt_s;
2079
2080   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2081
2082   pt = gst_rtsp_stream_get_pt (stream);
2083   pt_s = g_strdup_printf ("%u", pt);
2084   rtx_pt = stream->priv->rtx_pt;
2085
2086   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2087
2088   bin = gst_bin_new (NULL);
2089   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2090   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2091       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2092   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2093       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2094   g_free (pt_s);
2095   gst_structure_free (pt_map);
2096   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2097
2098   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2099   name = g_strdup_printf ("src_%u", sessid);
2100   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2101   g_free (name);
2102   gst_object_unref (pad);
2103
2104   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2105   name = g_strdup_printf ("sink_%u", sessid);
2106   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2107   g_free (name);
2108   gst_object_unref (pad);
2109
2110   return bin;
2111 }
2112
2113 /**
2114  * gst_rtsp_stream_set_pt_map:
2115  * @stream: a #GstRTSPStream
2116  * @pt: the pt
2117  * @caps: a #GstCaps
2118  *
2119  * Configure a pt map between @pt and @caps.
2120  */
2121 void
2122 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2123 {
2124   GstRTSPStreamPrivate *priv = stream->priv;
2125
2126   g_mutex_lock (&priv->lock);
2127   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2128   g_mutex_unlock (&priv->lock);
2129 }
2130
2131 static GstCaps *
2132 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2133     GstRTSPStream * stream)
2134 {
2135   GstRTSPStreamPrivate *priv = stream->priv;
2136   GstCaps *caps = NULL;
2137
2138   g_mutex_lock (&priv->lock);
2139
2140   if (priv->idx == session) {
2141     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2142     if (caps) {
2143       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2144       gst_caps_ref (caps);
2145     } else {
2146       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2147     }
2148   }
2149
2150   g_mutex_unlock (&priv->lock);
2151
2152   return caps;
2153 }
2154
2155 static void
2156 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2157 {
2158   GstRTSPStreamPrivate *priv = stream->priv;
2159   gchar *name;
2160   GstPadLinkReturn ret;
2161   guint sessid;
2162
2163   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2164       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2165
2166   name = gst_pad_get_name (pad);
2167   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2168     g_free (name);
2169     return;
2170   }
2171   g_free (name);
2172
2173   if (priv->idx != sessid)
2174     return;
2175
2176   if (gst_pad_is_linked (priv->sinkpad)) {
2177     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2178         GST_DEBUG_PAD_NAME (priv->sinkpad));
2179     return;
2180   }
2181
2182   /* link the RTP pad to the session manager, it should not really fail unless
2183    * this is not really an RTP pad */
2184   ret = gst_pad_link (pad, priv->sinkpad);
2185   if (ret != GST_PAD_LINK_OK)
2186     goto link_failed;
2187   priv->recv_rtp_src = gst_object_ref (pad);
2188
2189   return;
2190
2191 /* ERRORS */
2192 link_failed:
2193   {
2194     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2195         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2196   }
2197 }
2198
2199 static void
2200 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2201     GstRTSPStream * stream)
2202 {
2203   /* TODO: What to do here other than this? */
2204   GST_DEBUG ("Stream %p: Got EOS", stream);
2205   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2206 }
2207
2208 /* must be called with lock */
2209 static gboolean
2210 create_sender_part (GstRTSPStream * stream, GstBin * bin,
2211     GstState state)
2212 {
2213   GstRTSPStreamPrivate *priv;
2214   GstPad *pad, *sinkpad = NULL;
2215   gboolean is_tcp = FALSE, is_udp = FALSE;
2216   gint i;
2217
2218   priv = stream->priv;
2219
2220   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2221   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2222       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2223
2224   if (is_udp && !create_and_configure_udpsinks (stream))
2225     goto no_udp_protocol;
2226
2227   for (i = 0; i < 2; i++) {
2228     GstPad *teepad, *queuepad;
2229     /* For the sender we create this bit of pipeline for both
2230      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2231      * we need to add a queue before appsink and udpsink to make
2232      * the pipeline not block. For the TCP case, we want to pump
2233      * client as fast as possible anyway. This pipeline is used
2234      * when both TCP and UDP are present.
2235      *
2236      * .--------.      .-----.    .---------.    .---------.
2237      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2238      * |       send->sink   src->sink      src->sink       |
2239      * '--------'      |     |    '---------'    '---------'
2240      *                 |     |    .---------.    .---------.
2241      *                 |     |    |  queue  |    | appsink |
2242      *                 |    src->sink      src->sink       |
2243      *                 '-----'    '---------'    '---------'
2244      *
2245      * When only UDP or only TCP is allowed, we skip the tee and queue
2246      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2247      * the session.
2248      */
2249     /* Only link the RTP send src if we're going to send RTP, link
2250      * the RTCP send src always */
2251     if (priv->srcpad || i == 1) {
2252       if (is_udp) {
2253         /* add udpsink */
2254         gst_bin_add (bin, priv->udpsink[i]);
2255         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2256       }
2257
2258       if (is_tcp) {
2259         /* make appsink */
2260         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2261         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2262         gst_bin_add (bin, priv->appsink[i]);
2263         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2264             &sink_cb, stream, NULL);
2265       }
2266
2267       if (is_udp && is_tcp) {
2268         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2269
2270         /* make tee for RTP/RTCP */
2271         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2272         gst_bin_add (bin, priv->tee[i]);
2273
2274         /* and link to rtpbin send pad */
2275         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2276         gst_pad_link (priv->send_src[i], pad);
2277         gst_object_unref (pad);
2278
2279         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2280         g_object_set (priv->udpqueue[i], "max-size-buffers",
2281             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2282             NULL);
2283         gst_bin_add (bin, priv->udpqueue[i]);
2284         /* link tee to udpqueue */
2285         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2286         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2287         gst_pad_link (teepad, pad);
2288         gst_object_unref (pad);
2289         gst_object_unref (teepad);
2290
2291         /* link udpqueue to udpsink */
2292         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2293         gst_pad_link (queuepad, sinkpad);
2294         gst_object_unref (queuepad);
2295         gst_object_unref (sinkpad);
2296
2297         /* make appqueue */
2298         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2299         g_object_set (priv->appqueue[i], "max-size-buffers",
2300             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2301             NULL);
2302         gst_bin_add (bin, priv->appqueue[i]);
2303         /* and link tee to appqueue */
2304         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2305         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2306         gst_pad_link (teepad, pad);
2307         gst_object_unref (pad);
2308         gst_object_unref (teepad);
2309
2310         /* and link appqueue to appsink */
2311         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2312         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2313         gst_pad_link (queuepad, pad);
2314         gst_object_unref (pad);
2315         gst_object_unref (queuepad);
2316       } else if (is_tcp) {
2317         /* only appsink needed, link it to the session */
2318         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2319         gst_pad_link (priv->send_src[i], pad);
2320         gst_object_unref (pad);
2321
2322         /* when its only TCP, we need to set sync and preroll to FALSE
2323          * for the sink to avoid deadlock. And this is only needed for
2324          * sink used for RTCP data, not the RTP data. */
2325         if (i == 1)
2326           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2327       } else {
2328         /* else only udpsink needed, link it to the session */
2329         gst_pad_link (priv->send_src[i], sinkpad);
2330         gst_object_unref (sinkpad);
2331       }
2332     }
2333
2334     /* check if we need to set to a special state */
2335     if (state != GST_STATE_NULL) {
2336       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2337         gst_element_set_state (priv->udpsink[i], state);
2338       if (priv->appsink[i] && (priv->srcpad || i == 1))
2339         gst_element_set_state (priv->appsink[i], state);
2340       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2341         gst_element_set_state (priv->appqueue[i], state);
2342       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2343         gst_element_set_state (priv->udpqueue[i], state);
2344       if (priv->tee[i] && (priv->srcpad || i == 1))
2345         gst_element_set_state (priv->tee[i], state);
2346     }
2347   }
2348
2349   return TRUE;
2350
2351   /* ERRORS */
2352 no_udp_protocol:
2353   {
2354     return FALSE;
2355   }
2356 }
2357
2358 /* must be called with lock */
2359 static void
2360 create_receiver_part (GstRTSPStream * stream, GstBin * bin,
2361     GstState state)
2362 {
2363   GstRTSPStreamPrivate *priv;
2364   GstPad *pad, *selpad;
2365   gboolean is_tcp;
2366   gint i;
2367
2368   priv = stream->priv;
2369
2370   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2371
2372   for (i = 0; i < 2; i++) {
2373     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2374      * RTCP sink always */
2375     if (priv->sinkpad || i == 1) {
2376       /* For the receiver we create this bit of pipeline for both
2377        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2378        * and it is all funneled into the rtpbin receive pad.
2379        *
2380        * .--------.     .--------.    .--------.
2381        * | udpsrc |     | funnel |    | rtpbin |
2382        * |       src->sink      src->sink      |
2383        * '--------'     |        |    '--------'
2384        * .--------.     |        |
2385        * | appsrc |     |        |
2386        * |       src->sink       |
2387        * '--------'     '--------'
2388        */
2389       /* make funnel for the RTP/RTCP receivers */
2390       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2391       gst_bin_add (bin, priv->funnel[i]);
2392
2393       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2394       gst_pad_link (pad, priv->recv_sink[i]);
2395       gst_object_unref (pad);
2396
2397       if (priv->udpsrc_v4[i]) {
2398         if (priv->srcpad) {
2399           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2400            * values. This is only relevant for PLAY pipelines */
2401           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2402           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2403         }
2404         /* add udpsrc */
2405         gst_bin_add (bin, priv->udpsrc_v4[i]);
2406
2407         /* and link to the funnel v4 */
2408         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2409         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2410         gst_pad_link (pad, selpad);
2411         gst_object_unref (pad);
2412         gst_object_unref (selpad);
2413       }
2414
2415       if (priv->udpsrc_v6[i]) {
2416         if (priv->srcpad) {
2417           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2418           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2419         }
2420         gst_bin_add (bin, priv->udpsrc_v6[i]);
2421
2422         /* and link to the funnel v6 */
2423         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2424         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2425         gst_pad_link (pad, selpad);
2426         gst_object_unref (pad);
2427         gst_object_unref (selpad);
2428       }
2429
2430       if (is_tcp) {
2431         /* make and add appsrc */
2432         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2433         priv->appsrc_base_time[i] = -1;
2434         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2435         gst_bin_add (bin, priv->appsrc[i]);
2436         /* and link to the funnel */
2437         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2438         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2439         gst_pad_link (pad, selpad);
2440         gst_object_unref (pad);
2441         gst_object_unref (selpad);
2442       }
2443     }
2444
2445     /* check if we need to set to a special state */
2446     if (state != GST_STATE_NULL) {
2447       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2448         gst_element_set_state (priv->funnel[i], state);
2449       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2450         gst_element_set_state (priv->appsrc[i], state);
2451     }
2452   }
2453 }
2454
2455 /**
2456  * gst_rtsp_stream_join_bin:
2457  * @stream: a #GstRTSPStream
2458  * @bin: (transfer none): a #GstBin to join
2459  * @rtpbin: (transfer none): a rtpbin element in @bin
2460  * @state: the target state of the new elements
2461  *
2462  * Join the #GstBin @bin that contains the element @rtpbin.
2463  *
2464  * @stream will link to @rtpbin, which must be inside @bin. The elements
2465  * added to @bin will be set to the state given in @state.
2466  *
2467  * Returns: %TRUE on success.
2468  */
2469 gboolean
2470 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2471     GstElement * rtpbin, GstState state)
2472 {
2473   GstRTSPStreamPrivate *priv;
2474   guint idx;
2475   gchar *name;
2476   GstPadLinkReturn ret;
2477   gboolean is_udp;
2478
2479   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2480   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2481   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2482
2483   priv = stream->priv;
2484
2485   g_mutex_lock (&priv->lock);
2486   if (priv->is_joined)
2487     goto was_joined;
2488
2489   /* create a session with the same index as the stream */
2490   idx = priv->idx;
2491
2492   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2493
2494   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2495       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2496     /* For SRTP */
2497     g_signal_connect (rtpbin, "request-rtp-encoder",
2498         (GCallback) request_rtp_encoder, stream);
2499     g_signal_connect (rtpbin, "request-rtcp-encoder",
2500         (GCallback) request_rtcp_encoder, stream);
2501     g_signal_connect (rtpbin, "request-rtp-decoder",
2502         (GCallback) request_rtp_rtcp_decoder, stream);
2503     g_signal_connect (rtpbin, "request-rtcp-decoder",
2504         (GCallback) request_rtp_rtcp_decoder, stream);
2505   }
2506
2507   if (priv->sinkpad) {
2508     g_signal_connect (rtpbin, "request-pt-map",
2509         (GCallback) request_pt_map, stream);
2510   }
2511
2512   /* get pads from the RTP session element for sending and receiving
2513    * RTP/RTCP*/
2514   if (priv->srcpad) {
2515     /* get a pad for sending RTP */
2516     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2517     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2518     g_free (name);
2519
2520     /* link the RTP pad to the session manager, it should not really fail unless
2521      * this is not really an RTP pad */
2522     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2523     if (ret != GST_PAD_LINK_OK)
2524       goto link_failed;
2525
2526     name = g_strdup_printf ("send_rtp_src_%u", idx);
2527     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2528     g_free (name);
2529   } else {
2530     /* Need to connect our sinkpad from here */
2531     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2532     /* EOS */
2533     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2534
2535     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2536     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2537     g_free (name);
2538   }
2539
2540   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2541   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2542   g_free (name);
2543   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2544   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2545   g_free (name);
2546
2547   /* get the session */
2548   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2549
2550   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2551       stream);
2552   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2553       stream);
2554   g_signal_connect (priv->session, "on-ssrc-active",
2555       (GCallback) on_ssrc_active, stream);
2556   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2557       stream);
2558   g_signal_connect (priv->session, "on-bye-timeout",
2559       (GCallback) on_bye_timeout, stream);
2560   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2561       stream);
2562
2563   /* signal for sender ssrc */
2564   g_signal_connect (priv->session, "on-new-sender-ssrc",
2565       (GCallback) on_new_sender_ssrc, stream);
2566   g_signal_connect (priv->session, "on-sender-ssrc-active",
2567       (GCallback) on_sender_ssrc_active, stream);
2568
2569   if (!create_sender_part (stream, bin, state))
2570     goto no_udp_protocol;
2571
2572   create_receiver_part (stream, bin, state);
2573
2574   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2575       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2576
2577   if (is_udp && !alloc_ports (stream))
2578     goto no_udp_protocol;
2579
2580   if (priv->srcpad) {
2581     /* be notified of caps changes */
2582     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2583         (GCallback) caps_notify, stream);
2584   }
2585
2586   priv->is_joined = TRUE;
2587   g_mutex_unlock (&priv->lock);
2588
2589   return TRUE;
2590
2591   /* ERRORS */
2592 was_joined:
2593   {
2594     g_mutex_unlock (&priv->lock);
2595     return TRUE;
2596   }
2597 link_failed:
2598   {
2599     GST_WARNING ("failed to link stream %u", idx);
2600     gst_object_unref (priv->send_rtp_sink);
2601     priv->send_rtp_sink = NULL;
2602     g_mutex_unlock (&priv->lock);
2603     return FALSE;
2604   }
2605 no_udp_protocol:
2606   {
2607     GST_WARNING ("failed to allocate ports %u", idx);
2608     gst_object_unref (priv->send_rtp_sink);
2609     priv->send_rtp_sink = NULL;
2610     gst_object_unref (priv->send_src[0]);
2611     priv->send_src[0] = NULL;
2612     gst_object_unref (priv->send_src[1]);
2613     priv->send_src[1] = NULL;
2614     gst_object_unref (priv->recv_sink[0]);
2615     priv->recv_sink[0] = NULL;
2616     gst_object_unref (priv->recv_sink[1]);
2617     priv->recv_sink[1] = NULL;
2618     if (priv->udpsink[0])
2619       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2620     if (priv->udpsink[1])
2621       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2622     if (priv->udpsrc_v4[0]) {
2623       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2624       gst_object_unref (priv->udpsrc_v4[0]);
2625       priv->udpsrc_v4[0] = NULL;
2626     }
2627     if (priv->udpsrc_v4[1]) {
2628       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2629       gst_object_unref (priv->udpsrc_v4[1]);
2630       priv->udpsrc_v4[1] = NULL;
2631     }
2632     if (priv->udpsrc_v6[0]) {
2633       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2634       gst_object_unref (priv->udpsrc_v6[0]);
2635       priv->udpsrc_v6[0] = NULL;
2636     }
2637     if (priv->udpsrc_v6[1]) {
2638       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2639       gst_object_unref (priv->udpsrc_v6[1]);
2640       priv->udpsrc_v6[1] = NULL;
2641     }
2642     g_mutex_unlock (&priv->lock);
2643     return FALSE;
2644   }
2645 }
2646
2647 /**
2648  * gst_rtsp_stream_leave_bin:
2649  * @stream: a #GstRTSPStream
2650  * @bin: (transfer none): a #GstBin
2651  * @rtpbin: (transfer none): a rtpbin #GstElement
2652  *
2653  * Remove the elements of @stream from @bin.
2654  *
2655  * Return: %TRUE on success.
2656  */
2657 gboolean
2658 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2659     GstElement * rtpbin)
2660 {
2661   GstRTSPStreamPrivate *priv;
2662   gint i;
2663   GList *l;
2664   gboolean is_tcp, is_udp;
2665
2666   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2667   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2668   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2669
2670   priv = stream->priv;
2671
2672   g_mutex_lock (&priv->lock);
2673   if (!priv->is_joined)
2674     goto was_not_joined;
2675
2676   /* all transports must be removed by now */
2677   if (priv->transports != NULL)
2678     goto transports_not_removed;
2679
2680   clear_tr_cache (priv, TRUE);
2681   clear_tr_cache (priv, FALSE);
2682
2683   GST_INFO ("stream %p leaving bin", stream);
2684
2685   if (priv->srcpad) {
2686     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2687
2688     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2689     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2690     gst_object_unref (priv->send_rtp_sink);
2691     priv->send_rtp_sink = NULL;
2692   } else if (priv->recv_rtp_src) {
2693     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2694     gst_object_unref (priv->recv_rtp_src);
2695     priv->recv_rtp_src = NULL;
2696   }
2697
2698   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2699
2700   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2701       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2702
2703
2704   for (i = 0; i < 2; i++) {
2705     if (priv->udpsink[i])
2706       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2707     if (priv->appsink[i])
2708       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2709     if (priv->appqueue[i])
2710       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2711     if (priv->udpqueue[i])
2712       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2713     if (priv->tee[i])
2714       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2715     if (priv->funnel[i])
2716       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2717     if (priv->appsrc[i])
2718       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2719
2720     if (priv->udpsrc_v4[i]) {
2721       if (priv->sinkpad || i == 1) {
2722         /* and set udpsrc to NULL now before removing */
2723         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2724         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2725         /* removing them should also nicely release the request
2726          * pads when they finalize */
2727         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2728       } else {
2729         /* we need to set the state to NULL before unref */
2730         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2731         gst_object_unref (priv->udpsrc_v4[i]);
2732       }
2733     }
2734
2735     if (priv->udpsrc_v6[i]) {
2736       if (priv->sinkpad || i == 1) {
2737         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2738         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2739         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2740       } else {
2741         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2742         gst_object_unref (priv->udpsrc_v6[i]);
2743       }
2744     }
2745
2746     for (l = priv->transport_sources; l; l = l->next) {
2747       GstRTSPMulticastTransportSource *s = l->data;
2748
2749       if (!s->udpsrc[i])
2750         continue;
2751
2752       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2753       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2754       gst_bin_remove (bin, s->udpsrc[i]);
2755     }
2756
2757     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2758       gst_bin_remove (bin, priv->udpsink[i]);
2759     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2760       gst_bin_remove (bin, priv->appsrc[i]);
2761     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2762       gst_bin_remove (bin, priv->appsink[i]);
2763     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2764       gst_bin_remove (bin, priv->appqueue[i]);
2765     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2766       gst_bin_remove (bin, priv->udpqueue[i]);
2767     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2768       gst_bin_remove (bin, priv->tee[i]);
2769     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2770       gst_bin_remove (bin, priv->funnel[i]);
2771
2772     if (priv->sinkpad || i == 1) {
2773       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2774       gst_object_unref (priv->recv_sink[i]);
2775       priv->recv_sink[i] = NULL;
2776     }
2777
2778     priv->udpsrc_v4[i] = NULL;
2779     priv->udpsrc_v6[i] = NULL;
2780     priv->udpsink[i] = NULL;
2781     priv->appsrc[i] = NULL;
2782     priv->appsink[i] = NULL;
2783     priv->appqueue[i] = NULL;
2784     priv->udpqueue[i] = NULL;
2785     priv->tee[i] = NULL;
2786     priv->funnel[i] = NULL;
2787   }
2788
2789   for (l = priv->transport_sources; l; l = l->next) {
2790     GstRTSPMulticastTransportSource *s = l->data;
2791     g_slice_free (GstRTSPMulticastTransportSource, s);
2792   }
2793   g_list_free (priv->transport_sources);
2794   priv->transport_sources = NULL;
2795
2796   if (priv->srcpad) {
2797     gst_object_unref (priv->send_src[0]);
2798     priv->send_src[0] = NULL;
2799   }
2800
2801   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2802   gst_object_unref (priv->send_src[1]);
2803   priv->send_src[1] = NULL;
2804
2805   g_object_unref (priv->session);
2806   priv->session = NULL;
2807   if (priv->caps)
2808     gst_caps_unref (priv->caps);
2809   priv->caps = NULL;
2810
2811   if (priv->srtpenc)
2812     gst_object_unref (priv->srtpenc);
2813   if (priv->srtpdec)
2814     gst_object_unref (priv->srtpdec);
2815
2816   priv->is_joined = FALSE;
2817   g_mutex_unlock (&priv->lock);
2818
2819   return TRUE;
2820
2821 was_not_joined:
2822   {
2823     g_mutex_unlock (&priv->lock);
2824     return TRUE;
2825   }
2826 transports_not_removed:
2827   {
2828     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2829     g_mutex_unlock (&priv->lock);
2830     return FALSE;
2831   }
2832 }
2833
2834 /**
2835  * gst_rtsp_stream_get_rtpinfo:
2836  * @stream: a #GstRTSPStream
2837  * @rtptime: (allow-none): result RTP timestamp
2838  * @seq: (allow-none): result RTP seqnum
2839  * @clock_rate: (allow-none): the clock rate
2840  * @running_time: (allow-none): result running-time
2841  *
2842  * Retrieve the current rtptime, seq and running-time. This is used to
2843  * construct a RTPInfo reply header.
2844  *
2845  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2846  */
2847 gboolean
2848 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2849     guint * rtptime, guint * seq, guint * clock_rate,
2850     GstClockTime * running_time)
2851 {
2852   GstRTSPStreamPrivate *priv;
2853   GstStructure *stats;
2854   GObjectClass *payobjclass;
2855
2856   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2857
2858   priv = stream->priv;
2859
2860   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2861
2862   g_mutex_lock (&priv->lock);
2863
2864   /* First try to extract the information from the last buffer on the sinks.
2865    * This will have a more accurate sequence number and timestamp, as between
2866    * the payloader and the sink there can be some queues
2867    */
2868   if (priv->udpsink[0] || priv->appsink[0]) {
2869     GstSample *last_sample;
2870
2871     if (priv->udpsink[0])
2872       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2873     else
2874       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2875
2876     if (last_sample) {
2877       GstCaps *caps;
2878       GstBuffer *buffer;
2879       GstSegment *segment;
2880       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2881
2882       caps = gst_sample_get_caps (last_sample);
2883       buffer = gst_sample_get_buffer (last_sample);
2884       segment = gst_sample_get_segment (last_sample);
2885
2886       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2887         if (seq) {
2888           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2889         }
2890
2891         if (rtptime) {
2892           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2893         }
2894
2895         gst_rtp_buffer_unmap (&rtp_buffer);
2896
2897         if (running_time) {
2898           *running_time =
2899               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2900               GST_BUFFER_TIMESTAMP (buffer));
2901         }
2902
2903         if (clock_rate) {
2904           GstStructure *s = gst_caps_get_structure (caps, 0);
2905
2906           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2907
2908           if (*clock_rate == 0 && running_time)
2909             *running_time = GST_CLOCK_TIME_NONE;
2910         }
2911         gst_sample_unref (last_sample);
2912
2913         goto done;
2914       } else {
2915         gst_sample_unref (last_sample);
2916       }
2917     }
2918   }
2919
2920   if (g_object_class_find_property (payobjclass, "stats")) {
2921     g_object_get (priv->payloader, "stats", &stats, NULL);
2922     if (stats == NULL)
2923       goto no_stats;
2924
2925     if (seq)
2926       gst_structure_get_uint (stats, "seqnum", seq);
2927
2928     if (rtptime)
2929       gst_structure_get_uint (stats, "timestamp", rtptime);
2930
2931     if (running_time)
2932       gst_structure_get_clock_time (stats, "running-time", running_time);
2933
2934     if (clock_rate) {
2935       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2936       if (*clock_rate == 0 && running_time)
2937         *running_time = GST_CLOCK_TIME_NONE;
2938     }
2939     gst_structure_free (stats);
2940   } else {
2941     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2942         !g_object_class_find_property (payobjclass, "timestamp"))
2943       goto no_stats;
2944
2945     if (seq)
2946       g_object_get (priv->payloader, "seqnum", seq, NULL);
2947
2948     if (rtptime)
2949       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2950
2951     if (running_time)
2952       *running_time = GST_CLOCK_TIME_NONE;
2953   }
2954
2955 done:
2956   g_mutex_unlock (&priv->lock);
2957
2958   return TRUE;
2959
2960   /* ERRORS */
2961 no_stats:
2962   {
2963     GST_WARNING ("Could not get payloader stats");
2964     g_mutex_unlock (&priv->lock);
2965     return FALSE;
2966   }
2967 }
2968
2969 /**
2970  * gst_rtsp_stream_get_caps:
2971  * @stream: a #GstRTSPStream
2972  *
2973  * Retrieve the current caps of @stream.
2974  *
2975  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2976  * after usage.
2977  */
2978 GstCaps *
2979 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2980 {
2981   GstRTSPStreamPrivate *priv;
2982   GstCaps *result;
2983
2984   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2985
2986   priv = stream->priv;
2987
2988   g_mutex_lock (&priv->lock);
2989   if ((result = priv->caps))
2990     gst_caps_ref (result);
2991   g_mutex_unlock (&priv->lock);
2992
2993   return result;
2994 }
2995
2996 /**
2997  * gst_rtsp_stream_recv_rtp:
2998  * @stream: a #GstRTSPStream
2999  * @buffer: (transfer full): a #GstBuffer
3000  *
3001  * Handle an RTP buffer for the stream. This method is usually called when a
3002  * message has been received from a client using the TCP transport.
3003  *
3004  * This function takes ownership of @buffer.
3005  *
3006  * Returns: a GstFlowReturn.
3007  */
3008 GstFlowReturn
3009 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3010 {
3011   GstRTSPStreamPrivate *priv;
3012   GstFlowReturn ret;
3013   GstElement *element;
3014
3015   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3016   priv = stream->priv;
3017   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3018   g_return_val_if_fail (priv->is_joined, FALSE);
3019
3020   g_mutex_lock (&priv->lock);
3021   if (priv->appsrc[0])
3022     element = gst_object_ref (priv->appsrc[0]);
3023   else
3024     element = NULL;
3025   g_mutex_unlock (&priv->lock);
3026
3027   if (element) {
3028     if (priv->appsrc_base_time[0] == -1) {
3029       /* Take current running_time. This timestamp will be put on
3030        * the first buffer of each stream because we are a live source and so we
3031        * timestamp with the running_time. When we are dealing with TCP, we also
3032        * only timestamp the first buffer (using the DISCONT flag) because a server
3033        * typically bursts data, for which we don't want to compensate by speeding
3034        * up the media. The other timestamps will be interpollated from this one
3035        * using the RTP timestamps. */
3036       GST_OBJECT_LOCK (element);
3037       if (GST_ELEMENT_CLOCK (element)) {
3038         GstClockTime now;
3039         GstClockTime base_time;
3040
3041         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3042         base_time = GST_ELEMENT_CAST (element)->base_time;
3043
3044         priv->appsrc_base_time[0] = now - base_time;
3045         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3046         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3047             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3048             GST_TIME_ARGS (base_time));
3049       }
3050       GST_OBJECT_UNLOCK (element);
3051     }
3052
3053     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3054     gst_object_unref (element);
3055   } else {
3056     ret = GST_FLOW_OK;
3057   }
3058   return ret;
3059 }
3060
3061 /**
3062  * gst_rtsp_stream_recv_rtcp:
3063  * @stream: a #GstRTSPStream
3064  * @buffer: (transfer full): a #GstBuffer
3065  *
3066  * Handle an RTCP buffer for the stream. This method is usually called when a
3067  * message has been received from a client using the TCP transport.
3068  *
3069  * This function takes ownership of @buffer.
3070  *
3071  * Returns: a GstFlowReturn.
3072  */
3073 GstFlowReturn
3074 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3075 {
3076   GstRTSPStreamPrivate *priv;
3077   GstFlowReturn ret;
3078   GstElement *element;
3079
3080   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3081   priv = stream->priv;
3082   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3083
3084   if (!priv->is_joined) {
3085     gst_buffer_unref (buffer);
3086     return GST_FLOW_NOT_LINKED;
3087   }
3088   g_mutex_lock (&priv->lock);
3089   if (priv->appsrc[1])
3090     element = gst_object_ref (priv->appsrc[1]);
3091   else
3092     element = NULL;
3093   g_mutex_unlock (&priv->lock);
3094
3095   if (element) {
3096     if (priv->appsrc_base_time[1] == -1) {
3097       /* Take current running_time. This timestamp will be put on
3098        * the first buffer of each stream because we are a live source and so we
3099        * timestamp with the running_time. When we are dealing with TCP, we also
3100        * only timestamp the first buffer (using the DISCONT flag) because a server
3101        * typically bursts data, for which we don't want to compensate by speeding
3102        * up the media. The other timestamps will be interpollated from this one
3103        * using the RTP timestamps. */
3104       GST_OBJECT_LOCK (element);
3105       if (GST_ELEMENT_CLOCK (element)) {
3106         GstClockTime now;
3107         GstClockTime base_time;
3108
3109         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3110         base_time = GST_ELEMENT_CAST (element)->base_time;
3111
3112         priv->appsrc_base_time[1] = now - base_time;
3113         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3114         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3115             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3116             GST_TIME_ARGS (base_time));
3117       }
3118       GST_OBJECT_UNLOCK (element);
3119     }
3120
3121     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3122     gst_object_unref (element);
3123   } else {
3124     ret = GST_FLOW_OK;
3125     gst_buffer_unref (buffer);
3126   }
3127   return ret;
3128 }
3129
3130 /* must be called with lock */
3131 static gboolean
3132 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3133     gboolean add)
3134 {
3135   GstRTSPStreamPrivate *priv = stream->priv;
3136   const GstRTSPTransport *tr;
3137
3138   tr = gst_rtsp_stream_transport_get_transport (trans);
3139
3140   switch (tr->lower_transport) {
3141     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3142     {
3143       GstRTSPMulticastTransportSource *source;
3144       GstBin *bin;
3145
3146       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
3147
3148       if (add) {
3149         gchar *host;
3150         gint i;
3151         GstPad *selpad, *pad;
3152
3153         source = g_slice_new0 (GstRTSPMulticastTransportSource);
3154         source->transport = trans;
3155
3156         for (i = 0; i < 2; i++) {
3157           host =
3158               g_strdup_printf ("udp://%s:%d", tr->destination,
3159               (i == 0) ? tr->port.min : tr->port.max);
3160           source->udpsrc[i] =
3161               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
3162           g_free (host);
3163           g_object_set (source->udpsrc[i], "loop", FALSE, NULL);
3164
3165           if (priv->srcpad) {
3166             /* we set and keep these to playing so that they don't cause NO_PREROLL return
3167              * values. This is only relevant for PLAY pipelines */
3168             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
3169             gst_element_set_locked_state (source->udpsrc[i], TRUE);
3170           }
3171           /* add udpsrc */
3172           gst_bin_add (bin, source->udpsrc[i]);
3173
3174           /* and link to the funnel v4 */
3175           if (priv->sinkpad || i == 1) {
3176             source->selpad[i] = selpad =
3177                 gst_element_get_request_pad (priv->funnel[i], "sink_%u");
3178             pad = gst_element_get_static_pad (source->udpsrc[i], "src");
3179             gst_pad_link (pad, selpad);
3180             gst_object_unref (pad);
3181             gst_object_unref (selpad);
3182           }
3183         }
3184
3185         priv->transport_sources =
3186             g_list_prepend (priv->transport_sources, source);
3187       } else {
3188         GList *l;
3189
3190         for (l = priv->transport_sources; l; l = l->next) {
3191           source = l->data;
3192
3193           if (source->transport == trans) {
3194             priv->transport_sources =
3195                 g_list_delete_link (priv->transport_sources, l);
3196             break;
3197           }
3198         }
3199
3200         if (l != NULL) {
3201           gint i;
3202
3203           for (i = 0; i < 2; i++) {
3204             /* Will automatically unlink everything */
3205             gst_bin_remove (bin,
3206                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
3207
3208             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
3209             gst_object_unref (source->udpsrc[i]);
3210
3211             if (priv->sinkpad || i == 1) {
3212               gst_element_release_request_pad (priv->funnel[i],
3213                   source->selpad[i]);
3214             }
3215           }
3216
3217           g_slice_free (GstRTSPMulticastTransportSource, source);
3218         }
3219       }
3220
3221       gst_object_unref (bin);
3222
3223       /* fall through for the generic case */
3224     }
3225     case GST_RTSP_LOWER_TRANS_UDP:
3226     {
3227       gchar *dest;
3228       gint min, max;
3229       guint ttl = 0;
3230
3231       dest = tr->destination;
3232       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3233         min = tr->port.min;
3234         max = tr->port.max;
3235         ttl = tr->ttl;
3236       } else if (priv->client_side) {
3237         /* In client side mode the 'destination' is the RTSP server, so send
3238          * to those ports */
3239         min = tr->server_port.min;
3240         max = tr->server_port.max;
3241       } else {
3242         min = tr->client_port.min;
3243         max = tr->client_port.max;
3244       }
3245
3246       if (add) {
3247         if (ttl > 0) {
3248           GST_INFO ("setting ttl-mc %d", ttl);
3249           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3250           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3251         }
3252         GST_INFO ("adding %s:%d-%d", dest, min, max);
3253         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3254         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3255         priv->transports = g_list_prepend (priv->transports, trans);
3256       } else {
3257         GST_INFO ("removing %s:%d-%d", dest, min, max);
3258         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3259         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3260         priv->transports = g_list_remove (priv->transports, trans);
3261       }
3262       priv->transports_cookie++;
3263       break;
3264     }
3265     case GST_RTSP_LOWER_TRANS_TCP:
3266       if (add) {
3267         GST_INFO ("adding TCP %s", tr->destination);
3268         priv->transports = g_list_prepend (priv->transports, trans);
3269       } else {
3270         GST_INFO ("removing TCP %s", tr->destination);
3271         priv->transports = g_list_remove (priv->transports, trans);
3272       }
3273       priv->transports_cookie++;
3274       break;
3275     default:
3276       goto unknown_transport;
3277   }
3278   return TRUE;
3279
3280   /* ERRORS */
3281 unknown_transport:
3282   {
3283     GST_INFO ("Unknown transport %d", tr->lower_transport);
3284     return FALSE;
3285   }
3286 }
3287
3288
3289 /**
3290  * gst_rtsp_stream_add_transport:
3291  * @stream: a #GstRTSPStream
3292  * @trans: (transfer none): a #GstRTSPStreamTransport
3293  *
3294  * Add the transport in @trans to @stream. The media of @stream will
3295  * then also be send to the values configured in @trans.
3296  *
3297  * @stream must be joined to a bin.
3298  *
3299  * @trans must contain a valid #GstRTSPTransport.
3300  *
3301  * Returns: %TRUE if @trans was added
3302  */
3303 gboolean
3304 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3305     GstRTSPStreamTransport * trans)
3306 {
3307   GstRTSPStreamPrivate *priv;
3308   gboolean res;
3309
3310   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3311   priv = stream->priv;
3312   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3313   g_return_val_if_fail (priv->is_joined, FALSE);
3314
3315   g_mutex_lock (&priv->lock);
3316   res = update_transport (stream, trans, TRUE);
3317   g_mutex_unlock (&priv->lock);
3318
3319   return res;
3320 }
3321
3322 /**
3323  * gst_rtsp_stream_remove_transport:
3324  * @stream: a #GstRTSPStream
3325  * @trans: (transfer none): a #GstRTSPStreamTransport
3326  *
3327  * Remove the transport in @trans from @stream. The media of @stream will
3328  * not be sent to the values configured in @trans.
3329  *
3330  * @stream must be joined to a bin.
3331  *
3332  * @trans must contain a valid #GstRTSPTransport.
3333  *
3334  * Returns: %TRUE if @trans was removed
3335  */
3336 gboolean
3337 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3338     GstRTSPStreamTransport * trans)
3339 {
3340   GstRTSPStreamPrivate *priv;
3341   gboolean res;
3342
3343   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3344   priv = stream->priv;
3345   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3346   g_return_val_if_fail (priv->is_joined, FALSE);
3347
3348   g_mutex_lock (&priv->lock);
3349   res = update_transport (stream, trans, FALSE);
3350   g_mutex_unlock (&priv->lock);
3351
3352   return res;
3353 }
3354
3355 /**
3356  * gst_rtsp_stream_update_crypto:
3357  * @stream: a #GstRTSPStream
3358  * @ssrc: the SSRC
3359  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3360  *
3361  * Update the new crypto information for @ssrc in @stream. If information
3362  * for @ssrc did not exist, it will be added. If information
3363  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3364  * be removed from @stream.
3365  *
3366  * Returns: %TRUE if @crypto could be updated
3367  */
3368 gboolean
3369 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3370     guint ssrc, GstCaps * crypto)
3371 {
3372   GstRTSPStreamPrivate *priv;
3373
3374   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3375   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3376
3377   priv = stream->priv;
3378
3379   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3380
3381   g_mutex_lock (&priv->lock);
3382   if (crypto)
3383     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3384         gst_caps_ref (crypto));
3385   else
3386     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3387   g_mutex_unlock (&priv->lock);
3388
3389   return TRUE;
3390 }
3391
3392 /**
3393  * gst_rtsp_stream_get_rtp_socket:
3394  * @stream: a #GstRTSPStream
3395  * @family: the socket family
3396  *
3397  * Get the RTP socket from @stream for a @family.
3398  *
3399  * @stream must be joined to a bin.
3400  *
3401  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3402  * socket could be allocated for @family. Unref after usage
3403  */
3404 GSocket *
3405 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3406 {
3407   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3408   GSocket *socket;
3409   const gchar *name;
3410
3411   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3412   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3413       family == G_SOCKET_FAMILY_IPV6, NULL);
3414   g_return_val_if_fail (priv->udpsink[0], NULL);
3415
3416   if (family == G_SOCKET_FAMILY_IPV6)
3417     name = "socket-v6";
3418   else
3419     name = "socket";
3420
3421   g_object_get (priv->udpsink[0], name, &socket, NULL);
3422
3423   return socket;
3424 }
3425
3426 /**
3427  * gst_rtsp_stream_get_rtcp_socket:
3428  * @stream: a #GstRTSPStream
3429  * @family: the socket family
3430  *
3431  * Get the RTCP socket from @stream for a @family.
3432  *
3433  * @stream must be joined to a bin.
3434  *
3435  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3436  * socket could be allocated for @family. Unref after usage
3437  */
3438 GSocket *
3439 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3440 {
3441   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3442   GSocket *socket;
3443   const gchar *name;
3444
3445   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3446   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3447       family == G_SOCKET_FAMILY_IPV6, NULL);
3448   g_return_val_if_fail (priv->udpsink[1], NULL);
3449
3450   if (family == G_SOCKET_FAMILY_IPV6)
3451     name = "socket-v6";
3452   else
3453     name = "socket";
3454
3455   g_object_get (priv->udpsink[1], name, &socket, NULL);
3456
3457   return socket;
3458 }
3459
3460 /**
3461  * gst_rtsp_stream_set_seqnum:
3462  * @stream: a #GstRTSPStream
3463  * @seqnum: a new sequence number
3464  *
3465  * Configure the sequence number in the payloader of @stream to @seqnum.
3466  */
3467 void
3468 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3469 {
3470   GstRTSPStreamPrivate *priv;
3471
3472   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3473
3474   priv = stream->priv;
3475
3476   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3477 }
3478
3479 /**
3480  * gst_rtsp_stream_get_seqnum:
3481  * @stream: a #GstRTSPStream
3482  *
3483  * Get the configured sequence number in the payloader of @stream.
3484  *
3485  * Returns: the sequence number of the payloader.
3486  */
3487 guint16
3488 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3489 {
3490   GstRTSPStreamPrivate *priv;
3491   guint seqnum;
3492
3493   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3494
3495   priv = stream->priv;
3496
3497   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3498
3499   return seqnum;
3500 }
3501
3502 /**
3503  * gst_rtsp_stream_transport_filter:
3504  * @stream: a #GstRTSPStream
3505  * @func: (scope call) (allow-none): a callback
3506  * @user_data: (closure): user data passed to @func
3507  *
3508  * Call @func for each transport managed by @stream. The result value of @func
3509  * determines what happens to the transport. @func will be called with @stream
3510  * locked so no further actions on @stream can be performed from @func.
3511  *
3512  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3513  * @stream.
3514  *
3515  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3516  *
3517  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3518  * will also be added with an additional ref to the result #GList of this
3519  * function..
3520  *
3521  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3522  *
3523  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3524  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3525  * element in the #GList should be unreffed before the list is freed.
3526  */
3527 GList *
3528 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3529     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3530 {
3531   GstRTSPStreamPrivate *priv;
3532   GList *result, *walk, *next;
3533   GHashTable *visited = NULL;
3534   guint cookie;
3535
3536   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3537
3538   priv = stream->priv;
3539
3540   result = NULL;
3541   if (func)
3542     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3543
3544   g_mutex_lock (&priv->lock);
3545 restart:
3546   cookie = priv->transports_cookie;
3547   for (walk = priv->transports; walk; walk = next) {
3548     GstRTSPStreamTransport *trans = walk->data;
3549     GstRTSPFilterResult res;
3550     gboolean changed;
3551
3552     next = g_list_next (walk);
3553
3554     if (func) {
3555       /* only visit each transport once */
3556       if (g_hash_table_contains (visited, trans))
3557         continue;
3558
3559       g_hash_table_add (visited, g_object_ref (trans));
3560       g_mutex_unlock (&priv->lock);
3561
3562       res = func (stream, trans, user_data);
3563
3564       g_mutex_lock (&priv->lock);
3565     } else
3566       res = GST_RTSP_FILTER_REF;
3567
3568     changed = (cookie != priv->transports_cookie);
3569
3570     switch (res) {
3571       case GST_RTSP_FILTER_REMOVE:
3572         update_transport (stream, trans, FALSE);
3573         break;
3574       case GST_RTSP_FILTER_REF:
3575         result = g_list_prepend (result, g_object_ref (trans));
3576         break;
3577       case GST_RTSP_FILTER_KEEP:
3578       default:
3579         break;
3580     }
3581     if (changed)
3582       goto restart;
3583   }
3584   g_mutex_unlock (&priv->lock);
3585
3586   if (func)
3587     g_hash_table_unref (visited);
3588
3589   return result;
3590 }
3591
3592 static GstPadProbeReturn
3593 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3594 {
3595   GstRTSPStreamPrivate *priv;
3596   GstRTSPStream *stream;
3597
3598   stream = user_data;
3599   priv = stream->priv;
3600
3601   GST_DEBUG_OBJECT (pad, "now blocking");
3602
3603   g_mutex_lock (&priv->lock);
3604   priv->blocking = TRUE;
3605   g_mutex_unlock (&priv->lock);
3606
3607   gst_element_post_message (priv->payloader,
3608       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3609           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3610
3611   return GST_PAD_PROBE_OK;
3612 }
3613
3614 /**
3615  * gst_rtsp_stream_set_blocked:
3616  * @stream: a #GstRTSPStream
3617  * @blocked: boolean indicating we should block or unblock
3618  *
3619  * Blocks or unblocks the dataflow on @stream.
3620  *
3621  * Returns: %TRUE on success
3622  */
3623 gboolean
3624 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3625 {
3626   GstRTSPStreamPrivate *priv;
3627
3628   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3629
3630   priv = stream->priv;
3631
3632   g_mutex_lock (&priv->lock);
3633   if (blocked) {
3634     priv->blocking = FALSE;
3635     if (priv->blocked_id == 0) {
3636       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3637           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3638           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3639           g_object_ref (stream), g_object_unref);
3640     }
3641   } else {
3642     if (priv->blocked_id != 0) {
3643       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3644       priv->blocked_id = 0;
3645       priv->blocking = FALSE;
3646     }
3647   }
3648   g_mutex_unlock (&priv->lock);
3649
3650   return TRUE;
3651 }
3652
3653 /**
3654  * gst_rtsp_stream_is_blocking:
3655  * @stream: a #GstRTSPStream
3656  *
3657  * Check if @stream is blocking on a #GstBuffer.
3658  *
3659  * Returns: %TRUE if @stream is blocking
3660  */
3661 gboolean
3662 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3663 {
3664   GstRTSPStreamPrivate *priv;
3665   gboolean result;
3666
3667   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3668
3669   priv = stream->priv;
3670
3671   g_mutex_lock (&priv->lock);
3672   result = priv->blocking;
3673   g_mutex_unlock (&priv->lock);
3674
3675   return result;
3676 }
3677
3678 /**
3679  * gst_rtsp_stream_query_position:
3680  * @stream: a #GstRTSPStream
3681  *
3682  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3683  * the RTP parts of the pipeline and not the RTCP parts.
3684  *
3685  * Returns: %TRUE if the position could be queried
3686  */
3687 gboolean
3688 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3689 {
3690   GstRTSPStreamPrivate *priv;
3691   GstElement *sink;
3692   gboolean ret;
3693
3694   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3695
3696   priv = stream->priv;
3697
3698   g_mutex_lock (&priv->lock);
3699   /* depending on the transport type, it should query corresponding sink */
3700   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3701       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3702     sink = priv->udpsink[0];
3703   else
3704     sink = priv->appsink[0];
3705
3706   if (sink)
3707     gst_object_ref (sink);
3708   g_mutex_unlock (&priv->lock);
3709
3710   if (!sink)
3711     return FALSE;
3712
3713   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3714   gst_object_unref (sink);
3715
3716   return ret;
3717 }
3718
3719 /**
3720  * gst_rtsp_stream_query_stop:
3721  * @stream: a #GstRTSPStream
3722  *
3723  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3724  * the RTP parts of the pipeline and not the RTCP parts.
3725  *
3726  * Returns: %TRUE if the stop could be queried
3727  */
3728 gboolean
3729 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3730 {
3731   GstRTSPStreamPrivate *priv;
3732   GstElement *sink;
3733   GstQuery *query;
3734   gboolean ret;
3735
3736   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3737
3738   priv = stream->priv;
3739
3740   g_mutex_lock (&priv->lock);
3741   /* depending on the transport type, it should query corresponding sink */
3742   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3743       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3744     sink = priv->udpsink[0];
3745   else
3746     sink = priv->appsink[0];
3747
3748   if (sink)
3749     gst_object_ref (sink);
3750   g_mutex_unlock (&priv->lock);
3751
3752   if (!sink)
3753     return FALSE;
3754
3755   query = gst_query_new_segment (GST_FORMAT_TIME);
3756   if ((ret = gst_element_query (sink, query))) {
3757     GstFormat format;
3758
3759     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3760     if (format != GST_FORMAT_TIME)
3761       *stop = -1;
3762   }
3763   gst_query_unref (query);
3764   gst_object_unref (sink);
3765
3766   return ret;
3767
3768 }