rtsp-stream: added function for creating and configuring UDP sources
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83
84   /* TRUE if this stream is running on
85    * the client side of an RTSP link (for RECORD) */
86   gboolean client_side;
87   gchar *control;
88
89   GstRTSPProfile profiles;
90   GstRTSPLowerTrans protocols;
91
92   /* pads on the rtpbin */
93   GstPad *send_rtp_sink;
94   GstPad *recv_rtp_src;
95   GstPad *recv_sink[2];
96   GstPad *send_src[2];
97
98   /* the RTPSession object */
99   GObject *session;
100
101   /* SRTP encoder/decoder */
102   GstElement *srtpenc;
103   GstElement *srtpdec;
104   GHashTable *keys;
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
107    * sockets */
108   GstElement *udpsrc_v4[2];
109
110   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
111    * sockets */
112   GstElement *udpsrc_v6[2];
113
114   GstElement *udpqueue[2];
115   GstElement *udpsink[2];
116
117   /* for TCP transport */
118   GstElement *appsrc[2];
119   GstClockTime appsrc_base_time[2];
120   GstElement *appqueue[2];
121   GstElement *appsink[2];
122
123   GstElement *tee[2];
124   GstElement *funnel[2];
125
126   /* retransmission */
127   GstElement *rtxsend;
128   guint rtx_pt;
129   GstClockTime rtx_time;
130
131   /* server ports for sending/receiving over ipv4 */
132   GstRTSPRange server_port_v4;
133   GstRTSPAddress *server_addr_v4;
134   gboolean have_ipv4;
135
136   /* server ports for sending/receiving over ipv6 */
137   GstRTSPRange server_port_v6;
138   GstRTSPAddress *server_addr_v6;
139   gboolean have_ipv6;
140
141   /* multicast addresses */
142   GstRTSPAddressPool *pool;
143   GstRTSPAddress *addr_v4;
144   GstRTSPAddress *addr_v6;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   /* UDP sources for UDP multicast transports */
161   GList *transport_sources;
162
163   gint dscp_qos;
164
165   /* stream blocking */
166   gulong blocked_id;
167   gboolean blocking;
168
169   /* pt->caps map for RECORD streams */
170   GHashTable *ptmap;
171 };
172
173 #define DEFAULT_CONTROL         NULL
174 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
175 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
176                                         GST_RTSP_LOWER_TRANS_TCP
177
178 enum
179 {
180   PROP_0,
181   PROP_CONTROL,
182   PROP_PROFILES,
183   PROP_PROTOCOLS,
184   PROP_LAST
185 };
186
187 enum
188 {
189   SIGNAL_NEW_RTP_ENCODER,
190   SIGNAL_NEW_RTCP_ENCODER,
191   SIGNAL_LAST
192 };
193
194 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
195 #define GST_CAT_DEFAULT rtsp_stream_debug
196
197 static GQuark ssrc_stream_map_key;
198
199 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
200     GValue * value, GParamSpec * pspec);
201 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
202     const GValue * value, GParamSpec * pspec);
203
204 static void gst_rtsp_stream_finalize (GObject * obj);
205
206 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
207
208 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
209
210 static void
211 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
212 {
213   GObjectClass *gobject_class;
214
215   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
216
217   gobject_class = G_OBJECT_CLASS (klass);
218
219   gobject_class->get_property = gst_rtsp_stream_get_property;
220   gobject_class->set_property = gst_rtsp_stream_set_property;
221   gobject_class->finalize = gst_rtsp_stream_finalize;
222
223   g_object_class_install_property (gobject_class, PROP_CONTROL,
224       g_param_spec_string ("control", "Control",
225           "The control string for this stream", DEFAULT_CONTROL,
226           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   g_object_class_install_property (gobject_class, PROP_PROFILES,
229       g_param_spec_flags ("profiles", "Profiles",
230           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
231           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232
233   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
234       g_param_spec_flags ("protocols", "Protocols",
235           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
236           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
239       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
241       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
242
243   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
244       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
245       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
246       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
247
248   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
249
250   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
251 }
252
253 static void
254 gst_rtsp_stream_init (GstRTSPStream * stream)
255 {
256   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
257
258   GST_DEBUG ("new stream %p", stream);
259
260   stream->priv = priv;
261
262   priv->dscp_qos = -1;
263   priv->control = g_strdup (DEFAULT_CONTROL);
264   priv->profiles = DEFAULT_PROFILES;
265   priv->protocols = DEFAULT_PROTOCOLS;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   gst_object_unref (priv->payloader);
303   if (priv->srcpad)
304     gst_object_unref (priv->srcpad);
305   if (priv->sinkpad)
306     gst_object_unref (priv->sinkpad);
307   g_free (priv->control);
308   g_mutex_clear (&priv->lock);
309
310   g_hash_table_unref (priv->keys);
311   g_hash_table_destroy (priv->ptmap);
312
313   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
314 }
315
316 static void
317 gst_rtsp_stream_get_property (GObject * object, guint propid,
318     GValue * value, GParamSpec * pspec)
319 {
320   GstRTSPStream *stream = GST_RTSP_STREAM (object);
321
322   switch (propid) {
323     case PROP_CONTROL:
324       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
325       break;
326     case PROP_PROFILES:
327       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
328       break;
329     case PROP_PROTOCOLS:
330       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
331       break;
332     default:
333       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
334   }
335 }
336
337 static void
338 gst_rtsp_stream_set_property (GObject * object, guint propid,
339     const GValue * value, GParamSpec * pspec)
340 {
341   GstRTSPStream *stream = GST_RTSP_STREAM (object);
342
343   switch (propid) {
344     case PROP_CONTROL:
345       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
346       break;
347     case PROP_PROFILES:
348       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
349       break;
350     case PROP_PROTOCOLS:
351       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
352       break;
353     default:
354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
355   }
356 }
357
358 /**
359  * gst_rtsp_stream_new:
360  * @idx: an index
361  * @pad: a #GstPad
362  * @payloader: a #GstElement
363  *
364  * Create a new media stream with index @idx that handles RTP data on
365  * @pad and has a payloader element @payloader if @pad is a source pad
366  * or a depayloader element @payloader if @pad is a sink pad.
367  *
368  * Returns: (transfer full): a new #GstRTSPStream
369  */
370 GstRTSPStream *
371 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
372 {
373   GstRTSPStreamPrivate *priv;
374   GstRTSPStream *stream;
375
376   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
377   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
378
379   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
380   priv = stream->priv;
381   priv->idx = idx;
382   priv->payloader = gst_object_ref (payloader);
383   if (GST_PAD_IS_SRC (pad))
384     priv->srcpad = gst_object_ref (pad);
385   else
386     priv->sinkpad = gst_object_ref (pad);
387
388   return stream;
389 }
390
391 /**
392  * gst_rtsp_stream_get_index:
393  * @stream: a #GstRTSPStream
394  *
395  * Get the stream index.
396  *
397  * Return: the stream index.
398  */
399 guint
400 gst_rtsp_stream_get_index (GstRTSPStream * stream)
401 {
402   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
403
404   return stream->priv->idx;
405 }
406
407 /**
408  * gst_rtsp_stream_get_pt:
409  * @stream: a #GstRTSPStream
410  *
411  * Get the stream payload type.
412  *
413  * Return: the stream payload type.
414  */
415 guint
416 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
417 {
418   GstRTSPStreamPrivate *priv;
419   guint pt;
420
421   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
422
423   priv = stream->priv;
424
425   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
426
427   return pt;
428 }
429
430 /**
431  * gst_rtsp_stream_get_srcpad:
432  * @stream: a #GstRTSPStream
433  *
434  * Get the srcpad associated with @stream.
435  *
436  * Returns: (transfer full): the srcpad. Unref after usage.
437  */
438 GstPad *
439 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
440 {
441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
442
443   if (!stream->priv->srcpad)
444     return NULL;
445
446   return gst_object_ref (stream->priv->srcpad);
447 }
448
449 /**
450  * gst_rtsp_stream_get_sinkpad:
451  * @stream: a #GstRTSPStream
452  *
453  * Get the sinkpad associated with @stream.
454  *
455  * Returns: (transfer full): the sinkpad. Unref after usage.
456  */
457 GstPad *
458 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
459 {
460   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
461
462   if (!stream->priv->sinkpad)
463     return NULL;
464
465   return gst_object_ref (stream->priv->sinkpad);
466 }
467
468 /**
469  * gst_rtsp_stream_get_control:
470  * @stream: a #GstRTSPStream
471  *
472  * Get the control string to identify this stream.
473  *
474  * Returns: (transfer full): the control string. g_free() after usage.
475  */
476 gchar *
477 gst_rtsp_stream_get_control (GstRTSPStream * stream)
478 {
479   GstRTSPStreamPrivate *priv;
480   gchar *result;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if ((result = g_strdup (priv->control)) == NULL)
488     result = g_strdup_printf ("stream=%u", priv->idx);
489   g_mutex_unlock (&priv->lock);
490
491   return result;
492 }
493
494 /**
495  * gst_rtsp_stream_set_control:
496  * @stream: a #GstRTSPStream
497  * @control: a control string
498  *
499  * Set the control string in @stream.
500  */
501 void
502 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   g_mutex_lock (&priv->lock);
511   g_free (priv->control);
512   priv->control = g_strdup (control);
513   g_mutex_unlock (&priv->lock);
514 }
515
516 /**
517  * gst_rtsp_stream_has_control:
518  * @stream: a #GstRTSPStream
519  * @control: a control string
520  *
521  * Check if @stream has the control string @control.
522  *
523  * Returns: %TRUE is @stream has @control as the control string
524  */
525 gboolean
526 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
527 {
528   GstRTSPStreamPrivate *priv;
529   gboolean res;
530
531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
532
533   priv = stream->priv;
534
535   g_mutex_lock (&priv->lock);
536   if (priv->control)
537     res = (g_strcmp0 (priv->control, control) == 0);
538   else {
539     guint streamid;
540
541     if (sscanf (control, "stream=%u", &streamid) > 0)
542       res = (streamid == priv->idx);
543     else
544       res = FALSE;
545   }
546   g_mutex_unlock (&priv->lock);
547
548   return res;
549 }
550
551 /**
552  * gst_rtsp_stream_set_mtu:
553  * @stream: a #GstRTSPStream
554  * @mtu: a new MTU
555  *
556  * Configure the mtu in the payloader of @stream to @mtu.
557  */
558 void
559 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
560 {
561   GstRTSPStreamPrivate *priv;
562
563   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
564
565   priv = stream->priv;
566
567   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
568
569   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
570 }
571
572 /**
573  * gst_rtsp_stream_get_mtu:
574  * @stream: a #GstRTSPStream
575  *
576  * Get the configured MTU in the payloader of @stream.
577  *
578  * Returns: the MTU of the payloader.
579  */
580 guint
581 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
582 {
583   GstRTSPStreamPrivate *priv;
584   guint mtu;
585
586   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
587
588   priv = stream->priv;
589
590   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
591
592   return mtu;
593 }
594
595 /* Update the dscp qos property on the udp sinks */
596 static void
597 update_dscp_qos (GstRTSPStream * stream)
598 {
599   GstRTSPStreamPrivate *priv;
600
601   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
602
603   priv = stream->priv;
604
605   if (priv->udpsink[0]) {
606     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
607         NULL);
608   }
609
610   if (priv->udpsink[1]) {
611     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
612         NULL);
613   }
614 }
615
616 /**
617  * gst_rtsp_stream_set_dscp_qos:
618  * @stream: a #GstRTSPStream
619  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
620  *
621  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
622  */
623 void
624 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
625 {
626   GstRTSPStreamPrivate *priv;
627
628   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
629
630   priv = stream->priv;
631
632   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
633
634   if (dscp_qos < -1 || dscp_qos > 63) {
635     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
636     return;
637   }
638
639   priv->dscp_qos = dscp_qos;
640
641   update_dscp_qos (stream);
642 }
643
644 /**
645  * gst_rtsp_stream_get_dscp_qos:
646  * @stream: a #GstRTSPStream
647  *
648  * Get the configured DSCP QoS in of the outgoing sockets.
649  *
650  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
651  */
652 gint
653 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
654 {
655   GstRTSPStreamPrivate *priv;
656
657   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
658
659   priv = stream->priv;
660
661   return priv->dscp_qos;
662 }
663
664 /**
665  * gst_rtsp_stream_is_transport_supported:
666  * @stream: a #GstRTSPStream
667  * @transport: (transfer none): a #GstRTSPTransport
668  *
669  * Check if @transport can be handled by stream
670  *
671  * Returns: %TRUE if @transport can be handled by @stream.
672  */
673 gboolean
674 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
675     GstRTSPTransport * transport)
676 {
677   GstRTSPStreamPrivate *priv;
678
679   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
680
681   priv = stream->priv;
682
683   g_mutex_lock (&priv->lock);
684   if (transport->trans != GST_RTSP_TRANS_RTP)
685     goto unsupported_transmode;
686
687   if (!(transport->profile & priv->profiles))
688     goto unsupported_profile;
689
690   if (!(transport->lower_transport & priv->protocols))
691     goto unsupported_ltrans;
692
693   g_mutex_unlock (&priv->lock);
694
695   return TRUE;
696
697   /* ERRORS */
698 unsupported_transmode:
699   {
700     GST_DEBUG ("unsupported transport mode %d", transport->trans);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_profile:
705   {
706     GST_DEBUG ("unsupported profile %d", transport->profile);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 unsupported_ltrans:
711   {
712     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
713     g_mutex_unlock (&priv->lock);
714     return FALSE;
715   }
716 }
717
718 /**
719  * gst_rtsp_stream_set_profiles:
720  * @stream: a #GstRTSPStream
721  * @profiles: the new profiles
722  *
723  * Configure the allowed profiles for @stream.
724  */
725 void
726 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
727 {
728   GstRTSPStreamPrivate *priv;
729
730   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
731
732   priv = stream->priv;
733
734   g_mutex_lock (&priv->lock);
735   priv->profiles = profiles;
736   g_mutex_unlock (&priv->lock);
737 }
738
739 /**
740  * gst_rtsp_stream_get_profiles:
741  * @stream: a #GstRTSPStream
742  *
743  * Get the allowed profiles of @stream.
744  *
745  * Returns: a #GstRTSPProfile
746  */
747 GstRTSPProfile
748 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
749 {
750   GstRTSPStreamPrivate *priv;
751   GstRTSPProfile res;
752
753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
754
755   priv = stream->priv;
756
757   g_mutex_lock (&priv->lock);
758   res = priv->profiles;
759   g_mutex_unlock (&priv->lock);
760
761   return res;
762 }
763
764 /**
765  * gst_rtsp_stream_set_protocols:
766  * @stream: a #GstRTSPStream
767  * @protocols: the new flags
768  *
769  * Configure the allowed lower transport for @stream.
770  */
771 void
772 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
773     GstRTSPLowerTrans protocols)
774 {
775   GstRTSPStreamPrivate *priv;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   g_mutex_lock (&priv->lock);
782   priv->protocols = protocols;
783   g_mutex_unlock (&priv->lock);
784 }
785
786 /**
787  * gst_rtsp_stream_get_protocols:
788  * @stream: a #GstRTSPStream
789  *
790  * Get the allowed protocols of @stream.
791  *
792  * Returns: a #GstRTSPLowerTrans
793  */
794 GstRTSPLowerTrans
795 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
796 {
797   GstRTSPStreamPrivate *priv;
798   GstRTSPLowerTrans res;
799
800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
801       GST_RTSP_LOWER_TRANS_UNKNOWN);
802
803   priv = stream->priv;
804
805   g_mutex_lock (&priv->lock);
806   res = priv->protocols;
807   g_mutex_unlock (&priv->lock);
808
809   return res;
810 }
811
812 /**
813  * gst_rtsp_stream_set_address_pool:
814  * @stream: a #GstRTSPStream
815  * @pool: (transfer none): a #GstRTSPAddressPool
816  *
817  * configure @pool to be used as the address pool of @stream.
818  */
819 void
820 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
821     GstRTSPAddressPool * pool)
822 {
823   GstRTSPStreamPrivate *priv;
824   GstRTSPAddressPool *old;
825
826   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
827
828   priv = stream->priv;
829
830   GST_LOG_OBJECT (stream, "set address pool %p", pool);
831
832   g_mutex_lock (&priv->lock);
833   if ((old = priv->pool) != pool)
834     priv->pool = pool ? g_object_ref (pool) : NULL;
835   else
836     old = NULL;
837   g_mutex_unlock (&priv->lock);
838
839   if (old)
840     g_object_unref (old);
841 }
842
843 /**
844  * gst_rtsp_stream_get_address_pool:
845  * @stream: a #GstRTSPStream
846  *
847  * Get the #GstRTSPAddressPool used as the address pool of @stream.
848  *
849  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
850  * usage.
851  */
852 GstRTSPAddressPool *
853 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
854 {
855   GstRTSPStreamPrivate *priv;
856   GstRTSPAddressPool *result;
857
858   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
859
860   priv = stream->priv;
861
862   g_mutex_lock (&priv->lock);
863   if ((result = priv->pool))
864     g_object_ref (result);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868 }
869
870 /**
871  * gst_rtsp_stream_get_multicast_address:
872  * @stream: a #GstRTSPStream
873  * @family: the #GSocketFamily
874  *
875  * Get the multicast address of @stream for @family.
876  *
877  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
878  * or %NULL when no address could be allocated. gst_rtsp_address_free()
879  * after usage.
880  */
881 GstRTSPAddress *
882 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
883     GSocketFamily family)
884 {
885   GstRTSPStreamPrivate *priv;
886   GstRTSPAddress *result;
887   GstRTSPAddress **addrp;
888   GstRTSPAddressFlags flags;
889
890   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
891
892   priv = stream->priv;
893
894   if (family == G_SOCKET_FAMILY_IPV6) {
895     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
896     addrp = &priv->addr_v6;
897   } else {
898     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
899     addrp = &priv->addr_v4;
900   }
901
902   g_mutex_lock (&priv->lock);
903   if (*addrp == NULL) {
904     if (priv->pool == NULL)
905       goto no_pool;
906
907     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
908
909     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
910     if (*addrp == NULL)
911       goto no_address;
912   }
913   result = gst_rtsp_address_copy (*addrp);
914   g_mutex_unlock (&priv->lock);
915
916   return result;
917
918   /* ERRORS */
919 no_pool:
920   {
921     GST_ERROR_OBJECT (stream, "no address pool specified");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 no_address:
926   {
927     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
928     g_mutex_unlock (&priv->lock);
929     return NULL;
930   }
931 }
932
933 /**
934  * gst_rtsp_stream_reserve_address:
935  * @stream: a #GstRTSPStream
936  * @address: an address
937  * @port: a port
938  * @n_ports: n_ports
939  * @ttl: a TTL
940  *
941  * Reserve @address and @port as the address and port of @stream.
942  *
943  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
944  * the address could be reserved. gst_rtsp_address_free() after usage.
945  */
946 GstRTSPAddress *
947 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
948     const gchar * address, guint port, guint n_ports, guint ttl)
949 {
950   GstRTSPStreamPrivate *priv;
951   GstRTSPAddress *result;
952   GInetAddress *addr;
953   GSocketFamily family;
954   GstRTSPAddress **addrp;
955
956   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
957   g_return_val_if_fail (address != NULL, NULL);
958   g_return_val_if_fail (port > 0, NULL);
959   g_return_val_if_fail (n_ports > 0, NULL);
960   g_return_val_if_fail (ttl > 0, NULL);
961
962   priv = stream->priv;
963
964   addr = g_inet_address_new_from_string (address);
965   if (!addr) {
966     GST_ERROR ("failed to get inet addr from %s", address);
967     family = G_SOCKET_FAMILY_IPV4;
968   } else {
969     family = g_inet_address_get_family (addr);
970     g_object_unref (addr);
971   }
972
973   if (family == G_SOCKET_FAMILY_IPV6)
974     addrp = &priv->addr_v6;
975   else
976     addrp = &priv->addr_v4;
977
978   g_mutex_lock (&priv->lock);
979   if (*addrp == NULL) {
980     GstRTSPAddressPoolResult res;
981
982     if (priv->pool == NULL)
983       goto no_pool;
984
985     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
986         port, n_ports, ttl, addrp);
987     if (res != GST_RTSP_ADDRESS_POOL_OK)
988       goto no_address;
989   } else {
990     if (strcmp ((*addrp)->address, address) ||
991         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
992         (*addrp)->ttl != ttl)
993       goto different_address;
994   }
995   result = gst_rtsp_address_copy (*addrp);
996   g_mutex_unlock (&priv->lock);
997
998   return result;
999
1000   /* ERRORS */
1001 no_pool:
1002   {
1003     GST_ERROR_OBJECT (stream, "no address pool specified");
1004     g_mutex_unlock (&priv->lock);
1005     return NULL;
1006   }
1007 no_address:
1008   {
1009     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1010         address);
1011     g_mutex_unlock (&priv->lock);
1012     return NULL;
1013   }
1014 different_address:
1015   {
1016     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1017         " reserved", address);
1018     g_mutex_unlock (&priv->lock);
1019     return NULL;
1020   }
1021 }
1022
1023 /* must be called with lock */
1024 static void
1025 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1026     GSocket * rtcp_socket, GSocketFamily family)
1027 {
1028   GstRTSPStreamPrivate *priv = stream->priv;
1029   const gchar *multisink_socket;
1030
1031   if (family == G_SOCKET_FAMILY_IPV6)
1032     multisink_socket = "socket-v6";
1033   else
1034     multisink_socket = "socket";
1035
1036   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1037       NULL);
1038   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1039       NULL);
1040 }
1041
1042 /* must be called with lock */
1043 static gboolean
1044 create_and_configure_udpsinks (GstRTSPStream * stream)
1045 {
1046   GstRTSPStreamPrivate *priv = stream->priv;
1047   GstElement *udpsink0, *udpsink1;
1048
1049   udpsink0 = NULL;
1050   udpsink1 = NULL;
1051
1052   if (priv->udpsink[0])
1053     udpsink0 = priv->udpsink[0];
1054   else
1055     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1056
1057   if (!udpsink0)
1058     goto no_udp_protocol;
1059
1060   if (priv->udpsink[1])
1061     udpsink1 = priv->udpsink[1];
1062   else
1063     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1064
1065   if (!udpsink1)
1066     goto no_udp_protocol;
1067
1068   /* configure sinks */
1069
1070   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1071   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1072
1073   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1074   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1075
1076   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1077
1078   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1079   /* Needs to be async for RECORD streams, otherwise we will never go to
1080    * PLAYING because the sinks will wait for data while the udpsrc can't
1081    * provide data with timestamps in PAUSED. */
1082   if (priv->sinkpad)
1083     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1084   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1085
1086   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1087   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1088
1089   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1090   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1091
1092   /* update the dscp qos field in the sinks */
1093   update_dscp_qos (stream);
1094
1095   priv->udpsink[0] = udpsink0;
1096   priv->udpsink[1] = udpsink1;
1097
1098   return TRUE;
1099
1100   /* ERRORS */
1101 no_udp_protocol:
1102   {
1103     return FALSE;
1104   }
1105 }
1106
1107 /* must be called with lock */
1108 static gboolean
1109 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1110     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family)
1111 {
1112   GstStateChangeReturn ret;
1113
1114   /* we keep these elements, we will further configure them when the
1115    * client told us to really use the UDP ports. */
1116   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1117   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1118
1119   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1120     goto error;
1121
1122   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1123   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1124
1125   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1126   if (ret == GST_STATE_CHANGE_FAILURE)
1127     goto error;
1128   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1129   if (ret == GST_STATE_CHANGE_FAILURE)
1130     goto error;
1131
1132   return TRUE;
1133   return TRUE;
1134
1135   /* ERRORS */
1136 error:
1137   {
1138     if (udpsrc_out[0])
1139       gst_object_unref (udpsrc_out[0]);
1140     if (udpsrc_out[1])
1141       gst_object_unref (udpsrc_out[1]);
1142     return FALSE;
1143   }
1144 }
1145
1146 static gboolean
1147 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1148     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1149     GstRTSPAddress ** server_addr_out)
1150 {
1151   GstRTSPStreamPrivate *priv = stream->priv;
1152   GSocket *rtp_socket = NULL;
1153   GSocket *rtcp_socket;
1154   gint tmp_rtp, tmp_rtcp;
1155   guint count;
1156   gint rtpport, rtcpport;
1157   GList *rejected_addresses = NULL;
1158   GstRTSPAddress *addr = NULL;
1159   GInetAddress *inetaddr = NULL;
1160   GSocketAddress *rtp_sockaddr = NULL;
1161   GSocketAddress *rtcp_sockaddr = NULL;
1162   GstRTSPAddressPool * pool;
1163
1164   pool = priv->pool;
1165   count = 0;
1166
1167   /* Start with random port */
1168   tmp_rtp = 0;
1169
1170   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1171       G_SOCKET_PROTOCOL_UDP, NULL);
1172   if (!rtcp_socket)
1173     goto no_udp_protocol;
1174
1175   if (*server_addr_out)
1176     gst_rtsp_address_free (*server_addr_out);
1177
1178   /* try to allocate 2 UDP ports, the RTP port should be an even
1179    * number and the RTCP port should be the next (uneven) port */
1180 again:
1181
1182   if (rtp_socket == NULL) {
1183     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1184         G_SOCKET_PROTOCOL_UDP, NULL);
1185     if (!rtp_socket)
1186       goto no_udp_protocol;
1187   }
1188
1189   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1190     GstRTSPAddressFlags flags;
1191
1192     if (addr)
1193       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1194
1195     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1196     if (family == G_SOCKET_FAMILY_IPV6)
1197       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1198     else
1199       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1200
1201     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1202
1203     if (addr == NULL)
1204       goto no_ports;
1205
1206     tmp_rtp = addr->port;
1207
1208     g_clear_object (&inetaddr);
1209     inetaddr = g_inet_address_new_from_string (addr->address);
1210   } else {
1211     if (tmp_rtp != 0) {
1212       tmp_rtp += 2;
1213       if (++count > 20)
1214         goto no_ports;
1215     }
1216
1217     if (inetaddr == NULL)
1218       inetaddr = g_inet_address_new_any (family);
1219   }
1220
1221   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1222   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1223     g_object_unref (rtp_sockaddr);
1224     goto again;
1225   }
1226   g_object_unref (rtp_sockaddr);
1227
1228   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1229   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1230     g_clear_object (&rtp_sockaddr);
1231     goto socket_error;
1232   }
1233
1234   tmp_rtp =
1235       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1236   g_object_unref (rtp_sockaddr);
1237
1238   /* check if port is even */
1239   if ((tmp_rtp & 1) != 0) {
1240     /* port not even, close and allocate another */
1241     tmp_rtp++;
1242     g_clear_object (&rtp_socket);
1243     goto again;
1244   }
1245
1246   /* set port */
1247   tmp_rtcp = tmp_rtp + 1;
1248
1249   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1250   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1251     g_object_unref (rtcp_sockaddr);
1252     g_clear_object (&rtp_socket);
1253     goto again;
1254   }
1255   g_object_unref (rtcp_sockaddr);
1256
1257   g_clear_object (&inetaddr);
1258
1259   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1260         rtcp_socket, family))
1261     goto no_udp_protocol;
1262
1263   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1264   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1265
1266   /* this should not happen... */
1267   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1268     goto port_error;
1269
1270   if (!create_and_configure_udpsinks (stream))
1271     goto no_udp_protocol;
1272
1273   /* set RTP and RTCP sockets */
1274   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1275
1276   server_port_out->min = rtpport;
1277   server_port_out->max = rtcpport;
1278
1279   *server_addr_out = addr;
1280   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1281
1282   g_object_unref (rtp_socket);
1283   g_object_unref (rtcp_socket);
1284
1285   return TRUE;
1286
1287   /* ERRORS */
1288 no_udp_protocol:
1289   {
1290     goto cleanup;
1291   }
1292 no_ports:
1293   {
1294     goto cleanup;
1295   }
1296 port_error:
1297   {
1298     goto cleanup;
1299   }
1300 socket_error:
1301   {
1302     goto cleanup;
1303   }
1304 cleanup:
1305   {
1306     if (inetaddr)
1307       g_object_unref (inetaddr);
1308     g_list_free_full (rejected_addresses,
1309         (GDestroyNotify) gst_rtsp_address_free);
1310     if (addr)
1311       gst_rtsp_address_free (addr);
1312     if (rtp_socket)
1313       g_object_unref (rtp_socket);
1314     if (rtcp_socket)
1315       g_object_unref (rtcp_socket);
1316     return FALSE;
1317   }
1318 }
1319
1320 /* must be called with lock */
1321 static gboolean
1322 alloc_ports (GstRTSPStream * stream)
1323 {
1324   GstRTSPStreamPrivate *priv = stream->priv;
1325
1326   priv->have_ipv4 =
1327       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1328           &priv->server_port_v4, &priv->server_addr_v4);
1329
1330   priv->have_ipv6 =
1331       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1332           &priv->server_port_v6, &priv->server_addr_v6);
1333
1334   return priv->have_ipv4 || priv->have_ipv6;
1335 }
1336
1337 /**
1338  * gst_rtsp_stream_set_client_side:
1339  * @stream: a #GstRTSPStream
1340  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1341  * an RTSP connection.
1342  *
1343  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1344  * streams to an RTSP server via RECORD. This has the practical effect
1345  * of changing which UDP port numbers are used when setting up the local
1346  * side of the stream sending to be either the 'server' or 'client' pair
1347  * of a configured UDP transport.
1348  */
1349 void
1350 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1351 {
1352   GstRTSPStreamPrivate *priv;
1353
1354   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1355   priv = stream->priv;
1356   g_mutex_lock (&priv->lock);
1357   priv->client_side = client_side;
1358   g_mutex_unlock (&priv->lock);
1359 }
1360
1361 /**
1362  * gst_rtsp_stream_set_client_side:
1363  * @stream: a #GstRTSPStream
1364  *
1365  * See gst_rtsp_stream_set_client_side()
1366  *
1367  * Returns: TRUE if this #GstRTSPStream is client-side.
1368  */
1369 gboolean
1370 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1371 {
1372   GstRTSPStreamPrivate *priv;
1373   gboolean ret;
1374
1375   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1376
1377   priv = stream->priv;
1378   g_mutex_lock (&priv->lock);
1379   ret = priv->client_side;
1380   g_mutex_unlock (&priv->lock);
1381
1382   return ret;
1383 }
1384
1385 /**
1386  * gst_rtsp_stream_get_server_port:
1387  * @stream: a #GstRTSPStream
1388  * @server_port: (out): result server port
1389  * @family: the port family to get
1390  *
1391  * Fill @server_port with the port pair used by the server. This function can
1392  * only be called when @stream has been joined.
1393  */
1394 void
1395 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1396     GstRTSPRange * server_port, GSocketFamily family)
1397 {
1398   GstRTSPStreamPrivate *priv;
1399
1400   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1401   priv = stream->priv;
1402   g_return_if_fail (priv->is_joined);
1403
1404   g_mutex_lock (&priv->lock);
1405   if (family == G_SOCKET_FAMILY_IPV4) {
1406     if (server_port)
1407       *server_port = priv->server_port_v4;
1408   } else {
1409     if (server_port)
1410       *server_port = priv->server_port_v6;
1411   }
1412   g_mutex_unlock (&priv->lock);
1413 }
1414
1415 /**
1416  * gst_rtsp_stream_get_rtpsession:
1417  * @stream: a #GstRTSPStream
1418  *
1419  * Get the RTP session of this stream.
1420  *
1421  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1422  */
1423 GObject *
1424 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1425 {
1426   GstRTSPStreamPrivate *priv;
1427   GObject *session;
1428
1429   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1430
1431   priv = stream->priv;
1432
1433   g_mutex_lock (&priv->lock);
1434   if ((session = priv->session))
1435     g_object_ref (session);
1436   g_mutex_unlock (&priv->lock);
1437
1438   return session;
1439 }
1440
1441 /**
1442  * gst_rtsp_stream_get_ssrc:
1443  * @stream: a #GstRTSPStream
1444  * @ssrc: (out): result ssrc
1445  *
1446  * Get the SSRC used by the RTP session of this stream. This function can only
1447  * be called when @stream has been joined.
1448  */
1449 void
1450 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1451 {
1452   GstRTSPStreamPrivate *priv;
1453
1454   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1455   priv = stream->priv;
1456   g_return_if_fail (priv->is_joined);
1457
1458   g_mutex_lock (&priv->lock);
1459   if (ssrc && priv->session)
1460     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1461   g_mutex_unlock (&priv->lock);
1462 }
1463
1464 /**
1465  * gst_rtsp_stream_set_retransmission_time:
1466  * @stream: a #GstRTSPStream
1467  * @time: a #GstClockTime
1468  *
1469  * Set the amount of time to store retransmission packets.
1470  */
1471 void
1472 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1473     GstClockTime time)
1474 {
1475   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1476
1477   g_mutex_lock (&stream->priv->lock);
1478   stream->priv->rtx_time = time;
1479   if (stream->priv->rtxsend)
1480     g_object_set (stream->priv->rtxsend, "max-size-time",
1481         GST_TIME_AS_MSECONDS (time), NULL);
1482   g_mutex_unlock (&stream->priv->lock);
1483 }
1484
1485 /**
1486  * gst_rtsp_stream_get_retransmission_time:
1487  * @stream: a #GstRTSPStream
1488  *
1489  * Get the amount of time to store retransmission data.
1490  *
1491  * Returns: the amount of time to store retransmission data.
1492  */
1493 GstClockTime
1494 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1495 {
1496   GstClockTime ret;
1497
1498   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1499
1500   g_mutex_lock (&stream->priv->lock);
1501   ret = stream->priv->rtx_time;
1502   g_mutex_unlock (&stream->priv->lock);
1503
1504   return ret;
1505 }
1506
1507 /**
1508  * gst_rtsp_stream_set_retransmission_pt:
1509  * @stream: a #GstRTSPStream
1510  * @rtx_pt: a #guint
1511  *
1512  * Set the payload type (pt) for retransmission of this stream.
1513  */
1514 void
1515 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1516 {
1517   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1518
1519   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1520
1521   g_mutex_lock (&stream->priv->lock);
1522   stream->priv->rtx_pt = rtx_pt;
1523   if (stream->priv->rtxsend) {
1524     guint pt = gst_rtsp_stream_get_pt (stream);
1525     gchar *pt_s = g_strdup_printf ("%d", pt);
1526     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1527         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1528     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1529     g_free (pt_s);
1530     gst_structure_free (rtx_pt_map);
1531   }
1532   g_mutex_unlock (&stream->priv->lock);
1533 }
1534
1535 /**
1536  * gst_rtsp_stream_get_retransmission_pt:
1537  * @stream: a #GstRTSPStream
1538  *
1539  * Get the payload-type used for retransmission of this stream
1540  *
1541  * Returns: The retransmission PT.
1542  */
1543 guint
1544 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1545 {
1546   guint rtx_pt;
1547
1548   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1549
1550   g_mutex_lock (&stream->priv->lock);
1551   rtx_pt = stream->priv->rtx_pt;
1552   g_mutex_unlock (&stream->priv->lock);
1553
1554   return rtx_pt;
1555 }
1556
1557 /**
1558  * gst_rtsp_stream_set_buffer_size:
1559  * @stream: a #GstRTSPStream
1560  * @size: the buffer size
1561  *
1562  * Set the size of the UDP transmission buffer (in bytes)
1563  * Needs to be set before the stream is joined to a bin.
1564  *
1565  * Since: 1.6
1566  */
1567 void
1568 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1569 {
1570   g_mutex_lock (&stream->priv->lock);
1571   stream->priv->buffer_size = size;
1572   g_mutex_unlock (&stream->priv->lock);
1573 }
1574
1575 /**
1576  * gst_rtsp_stream_get_buffer_size:
1577  * @stream: a #GstRTSPStream
1578  *
1579  * Get the size of the UDP transmission buffer (in bytes)
1580  *
1581  * Returns: the size of the UDP TX buffer
1582  *
1583  * Since: 1.6
1584  */
1585 guint
1586 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1587 {
1588   guint buffer_size;
1589
1590   g_mutex_lock (&stream->priv->lock);
1591   buffer_size = stream->priv->buffer_size;
1592   g_mutex_unlock (&stream->priv->lock);
1593
1594   return buffer_size;
1595 }
1596
1597 /* executed from streaming thread */
1598 static void
1599 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1600 {
1601   GstRTSPStreamPrivate *priv = stream->priv;
1602   GstCaps *newcaps, *oldcaps;
1603
1604   newcaps = gst_pad_get_current_caps (pad);
1605
1606   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1607       newcaps);
1608
1609   g_mutex_lock (&priv->lock);
1610   oldcaps = priv->caps;
1611   priv->caps = newcaps;
1612   g_mutex_unlock (&priv->lock);
1613
1614   if (oldcaps)
1615     gst_caps_unref (oldcaps);
1616 }
1617
1618 static void
1619 dump_structure (const GstStructure * s)
1620 {
1621   gchar *sstr;
1622
1623   sstr = gst_structure_to_string (s);
1624   GST_INFO ("structure: %s", sstr);
1625   g_free (sstr);
1626 }
1627
1628 static GstRTSPStreamTransport *
1629 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1630 {
1631   GstRTSPStreamPrivate *priv = stream->priv;
1632   GList *walk;
1633   GstRTSPStreamTransport *result = NULL;
1634   const gchar *tmp;
1635   gchar *dest;
1636   guint port;
1637
1638   if (rtcp_from == NULL)
1639     return NULL;
1640
1641   tmp = g_strrstr (rtcp_from, ":");
1642   if (tmp == NULL)
1643     return NULL;
1644
1645   port = atoi (tmp + 1);
1646   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1647
1648   g_mutex_lock (&priv->lock);
1649   GST_INFO ("finding %s:%d in %d transports", dest, port,
1650       g_list_length (priv->transports));
1651
1652   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1653     GstRTSPStreamTransport *trans = walk->data;
1654     const GstRTSPTransport *tr;
1655     gint min, max;
1656
1657     tr = gst_rtsp_stream_transport_get_transport (trans);
1658
1659     if (priv->client_side) {
1660       /* In client side mode the 'destination' is the RTSP server, so send
1661        * to those ports */
1662       min = tr->server_port.min;
1663       max = tr->server_port.max;
1664     } else {
1665       min = tr->client_port.min;
1666       max = tr->client_port.max;
1667     }
1668
1669     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1670       result = trans;
1671       break;
1672     }
1673   }
1674   if (result)
1675     g_object_ref (result);
1676   g_mutex_unlock (&priv->lock);
1677
1678   g_free (dest);
1679
1680   return result;
1681 }
1682
1683 static GstRTSPStreamTransport *
1684 check_transport (GObject * source, GstRTSPStream * stream)
1685 {
1686   GstStructure *stats;
1687   GstRTSPStreamTransport *trans;
1688
1689   /* see if we have a stream to match with the origin of the RTCP packet */
1690   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1691   if (trans == NULL) {
1692     g_object_get (source, "stats", &stats, NULL);
1693     if (stats) {
1694       const gchar *rtcp_from;
1695
1696       dump_structure (stats);
1697
1698       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1699       if ((trans = find_transport (stream, rtcp_from))) {
1700         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1701             source);
1702         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1703             g_object_unref);
1704       }
1705       gst_structure_free (stats);
1706     }
1707   }
1708   return trans;
1709 }
1710
1711
1712 static void
1713 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1714 {
1715   GstRTSPStreamTransport *trans;
1716
1717   GST_INFO ("%p: new source %p", stream, source);
1718
1719   trans = check_transport (source, stream);
1720
1721   if (trans)
1722     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1723 }
1724
1725 static void
1726 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1727 {
1728   GST_INFO ("%p: new SDES %p", stream, source);
1729 }
1730
1731 static void
1732 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1733 {
1734   GstRTSPStreamTransport *trans;
1735
1736   trans = check_transport (source, stream);
1737
1738   if (trans) {
1739     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1740     gst_rtsp_stream_transport_keep_alive (trans);
1741   }
1742 #ifdef DUMP_STATS
1743   {
1744     GstStructure *stats;
1745     g_object_get (source, "stats", &stats, NULL);
1746     if (stats) {
1747       dump_structure (stats);
1748       gst_structure_free (stats);
1749     }
1750   }
1751 #endif
1752 }
1753
1754 static void
1755 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1756 {
1757   GST_INFO ("%p: source %p bye", stream, source);
1758 }
1759
1760 static void
1761 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1762 {
1763   GstRTSPStreamTransport *trans;
1764
1765   GST_INFO ("%p: source %p bye timeout", stream, source);
1766
1767   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1768     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1769     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1770   }
1771 }
1772
1773 static void
1774 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1775 {
1776   GstRTSPStreamTransport *trans;
1777
1778   GST_INFO ("%p: source %p timeout", stream, source);
1779
1780   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1781     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1782     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1783   }
1784 }
1785
1786 static void
1787 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1788 {
1789   GST_INFO ("%p: new sender source %p", stream, source);
1790 #ifndef DUMP_STATS
1791   {
1792     GstStructure *stats;
1793     g_object_get (source, "stats", &stats, NULL);
1794     if (stats) {
1795       dump_structure (stats);
1796       gst_structure_free (stats);
1797     }
1798   }
1799 #endif
1800 }
1801
1802 static void
1803 on_sender_ssrc_active (GObject * session, GObject * source,
1804     GstRTSPStream * stream)
1805 {
1806 #ifndef DUMP_STATS
1807   {
1808     GstStructure *stats;
1809     g_object_get (source, "stats", &stats, NULL);
1810     if (stats) {
1811       dump_structure (stats);
1812       gst_structure_free (stats);
1813     }
1814   }
1815 #endif
1816 }
1817
1818 static void
1819 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1820 {
1821   if (is_rtp) {
1822     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1823     g_list_free (priv->tr_cache_rtp);
1824     priv->tr_cache_rtp = NULL;
1825   } else {
1826     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1827     g_list_free (priv->tr_cache_rtcp);
1828     priv->tr_cache_rtcp = NULL;
1829   }
1830 }
1831
1832 static GstFlowReturn
1833 handle_new_sample (GstAppSink * sink, gpointer user_data)
1834 {
1835   GstRTSPStreamPrivate *priv;
1836   GList *walk;
1837   GstSample *sample;
1838   GstBuffer *buffer;
1839   GstRTSPStream *stream;
1840   gboolean is_rtp;
1841
1842   sample = gst_app_sink_pull_sample (sink);
1843   if (!sample)
1844     return GST_FLOW_OK;
1845
1846   stream = (GstRTSPStream *) user_data;
1847   priv = stream->priv;
1848   buffer = gst_sample_get_buffer (sample);
1849
1850   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1851
1852   g_mutex_lock (&priv->lock);
1853   if (is_rtp) {
1854     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1855       clear_tr_cache (priv, is_rtp);
1856       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1857         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1858         priv->tr_cache_rtp =
1859             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1860       }
1861       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1862     }
1863   } else {
1864     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1865       clear_tr_cache (priv, is_rtp);
1866       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1867         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1868         priv->tr_cache_rtcp =
1869             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1870       }
1871       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1872     }
1873   }
1874   g_mutex_unlock (&priv->lock);
1875
1876   if (is_rtp) {
1877     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1878       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1879       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1880     }
1881   } else {
1882     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1883       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1884       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1885     }
1886   }
1887   gst_sample_unref (sample);
1888
1889   return GST_FLOW_OK;
1890 }
1891
1892 static GstAppSinkCallbacks sink_cb = {
1893   NULL,                         /* not interested in EOS */
1894   NULL,                         /* not interested in preroll samples */
1895   handle_new_sample,
1896 };
1897
1898 static GstElement *
1899 get_rtp_encoder (GstRTSPStream * stream, guint session)
1900 {
1901   GstRTSPStreamPrivate *priv = stream->priv;
1902
1903   if (priv->srtpenc == NULL) {
1904     gchar *name;
1905
1906     name = g_strdup_printf ("srtpenc_%u", session);
1907     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1908     g_free (name);
1909
1910     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1911   }
1912   return gst_object_ref (priv->srtpenc);
1913 }
1914
1915 static GstElement *
1916 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1917 {
1918   GstRTSPStreamPrivate *priv = stream->priv;
1919   GstElement *oldenc, *enc;
1920   GstPad *pad;
1921   gchar *name;
1922
1923   if (priv->idx != session)
1924     return NULL;
1925
1926   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1927
1928   oldenc = priv->srtpenc;
1929   enc = get_rtp_encoder (stream, session);
1930   name = g_strdup_printf ("rtp_sink_%d", session);
1931   pad = gst_element_get_request_pad (enc, name);
1932   g_free (name);
1933   gst_object_unref (pad);
1934
1935   if (oldenc == NULL)
1936     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1937         enc);
1938
1939   return enc;
1940 }
1941
1942 static GstElement *
1943 request_rtcp_encoder (GstElement * rtpbin, guint session,
1944     GstRTSPStream * stream)
1945 {
1946   GstRTSPStreamPrivate *priv = stream->priv;
1947   GstElement *oldenc, *enc;
1948   GstPad *pad;
1949   gchar *name;
1950
1951   if (priv->idx != session)
1952     return NULL;
1953
1954   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1955
1956   oldenc = priv->srtpenc;
1957   enc = get_rtp_encoder (stream, session);
1958   name = g_strdup_printf ("rtcp_sink_%d", session);
1959   pad = gst_element_get_request_pad (enc, name);
1960   g_free (name);
1961   gst_object_unref (pad);
1962
1963   if (oldenc == NULL)
1964     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1965         enc);
1966
1967   return enc;
1968 }
1969
1970 static GstCaps *
1971 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1972 {
1973   GstRTSPStreamPrivate *priv = stream->priv;
1974   GstCaps *caps;
1975
1976   GST_DEBUG ("request key %08x", ssrc);
1977
1978   g_mutex_lock (&priv->lock);
1979   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1980     gst_caps_ref (caps);
1981   g_mutex_unlock (&priv->lock);
1982
1983   return caps;
1984 }
1985
1986 static GstElement *
1987 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1988     GstRTSPStream * stream)
1989 {
1990   GstRTSPStreamPrivate *priv = stream->priv;
1991
1992   if (priv->idx != session)
1993     return NULL;
1994
1995   if (priv->srtpdec == NULL) {
1996     gchar *name;
1997
1998     name = g_strdup_printf ("srtpdec_%u", session);
1999     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2000     g_free (name);
2001
2002     g_signal_connect (priv->srtpdec, "request-key",
2003         (GCallback) request_key, stream);
2004   }
2005   return gst_object_ref (priv->srtpdec);
2006 }
2007
2008 /**
2009  * gst_rtsp_stream_request_aux_sender:
2010  * @stream: a #GstRTSPStream
2011  * @sessid: the session id
2012  *
2013  * Creating a rtxsend bin
2014  *
2015  * Returns: (transfer full): a #GstElement.
2016  *
2017  * Since: 1.6
2018  */
2019 GstElement *
2020 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2021 {
2022   GstElement *bin;
2023   GstPad *pad;
2024   GstStructure *pt_map;
2025   gchar *name;
2026   guint pt, rtx_pt;
2027   gchar *pt_s;
2028
2029   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2030
2031   pt = gst_rtsp_stream_get_pt (stream);
2032   pt_s = g_strdup_printf ("%u", pt);
2033   rtx_pt = stream->priv->rtx_pt;
2034
2035   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2036
2037   bin = gst_bin_new (NULL);
2038   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2039   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2040       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2041   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2042       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2043   g_free (pt_s);
2044   gst_structure_free (pt_map);
2045   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2046
2047   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2048   name = g_strdup_printf ("src_%u", sessid);
2049   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2050   g_free (name);
2051   gst_object_unref (pad);
2052
2053   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2054   name = g_strdup_printf ("sink_%u", sessid);
2055   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2056   g_free (name);
2057   gst_object_unref (pad);
2058
2059   return bin;
2060 }
2061
2062 /**
2063  * gst_rtsp_stream_set_pt_map:
2064  * @stream: a #GstRTSPStream
2065  * @pt: the pt
2066  * @caps: a #GstCaps
2067  *
2068  * Configure a pt map between @pt and @caps.
2069  */
2070 void
2071 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2072 {
2073   GstRTSPStreamPrivate *priv = stream->priv;
2074
2075   g_mutex_lock (&priv->lock);
2076   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2077   g_mutex_unlock (&priv->lock);
2078 }
2079
2080 static GstCaps *
2081 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2082     GstRTSPStream * stream)
2083 {
2084   GstRTSPStreamPrivate *priv = stream->priv;
2085   GstCaps *caps = NULL;
2086
2087   g_mutex_lock (&priv->lock);
2088
2089   if (priv->idx == session) {
2090     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2091     if (caps) {
2092       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2093       gst_caps_ref (caps);
2094     } else {
2095       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2096     }
2097   }
2098
2099   g_mutex_unlock (&priv->lock);
2100
2101   return caps;
2102 }
2103
2104 static void
2105 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2106 {
2107   GstRTSPStreamPrivate *priv = stream->priv;
2108   gchar *name;
2109   GstPadLinkReturn ret;
2110   guint sessid;
2111
2112   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2113       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2114
2115   name = gst_pad_get_name (pad);
2116   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2117     g_free (name);
2118     return;
2119   }
2120   g_free (name);
2121
2122   if (priv->idx != sessid)
2123     return;
2124
2125   if (gst_pad_is_linked (priv->sinkpad)) {
2126     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2127         GST_DEBUG_PAD_NAME (priv->sinkpad));
2128     return;
2129   }
2130
2131   /* link the RTP pad to the session manager, it should not really fail unless
2132    * this is not really an RTP pad */
2133   ret = gst_pad_link (pad, priv->sinkpad);
2134   if (ret != GST_PAD_LINK_OK)
2135     goto link_failed;
2136   priv->recv_rtp_src = gst_object_ref (pad);
2137
2138   return;
2139
2140 /* ERRORS */
2141 link_failed:
2142   {
2143     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2144         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2145   }
2146 }
2147
2148 static void
2149 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2150     GstRTSPStream * stream)
2151 {
2152   /* TODO: What to do here other than this? */
2153   GST_DEBUG ("Stream %p: Got EOS", stream);
2154   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2155 }
2156
2157 /* must be called with lock */
2158 static void
2159 create_sender_part (GstRTSPStream * stream, GstBin * bin,
2160     GstState state)
2161 {
2162   GstRTSPStreamPrivate *priv;
2163   GstPad *pad, *sinkpad = NULL;
2164   gboolean is_tcp = FALSE, is_udp = FALSE;
2165   gint i;
2166
2167   priv = stream->priv;
2168
2169   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2170   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2171       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2172
2173   for (i = 0; i < 2; i++) {
2174     GstPad *teepad, *queuepad;
2175     /* For the sender we create this bit of pipeline for both
2176      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2177      * we need to add a queue before appsink and udpsink to make
2178      * the pipeline not block. For the TCP case, we want to pump
2179      * client as fast as possible anyway. This pipeline is used
2180      * when both TCP and UDP are present.
2181      *
2182      * .--------.      .-----.    .---------.    .---------.
2183      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2184      * |       send->sink   src->sink      src->sink       |
2185      * '--------'      |     |    '---------'    '---------'
2186      *                 |     |    .---------.    .---------.
2187      *                 |     |    |  queue  |    | appsink |
2188      *                 |    src->sink      src->sink       |
2189      *                 '-----'    '---------'    '---------'
2190      *
2191      * When only UDP or only TCP is allowed, we skip the tee and queue
2192      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2193      * the session.
2194      */
2195     /* Only link the RTP send src if we're going to send RTP, link
2196      * the RTCP send src always */
2197     if (priv->srcpad || i == 1) {
2198       if (is_udp) {
2199         /* add udpsink */
2200         gst_bin_add (bin, priv->udpsink[i]);
2201         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2202       }
2203
2204       if (is_tcp) {
2205         /* make appsink */
2206         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2207         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2208         gst_bin_add (bin, priv->appsink[i]);
2209         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2210             &sink_cb, stream, NULL);
2211       }
2212
2213       if (is_udp && is_tcp) {
2214         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2215
2216         /* make tee for RTP/RTCP */
2217         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2218         gst_bin_add (bin, priv->tee[i]);
2219
2220         /* and link to rtpbin send pad */
2221         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2222         gst_pad_link (priv->send_src[i], pad);
2223         gst_object_unref (pad);
2224
2225         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2226         g_object_set (priv->udpqueue[i], "max-size-buffers",
2227             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2228             NULL);
2229         gst_bin_add (bin, priv->udpqueue[i]);
2230         /* link tee to udpqueue */
2231         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2232         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2233         gst_pad_link (teepad, pad);
2234         gst_object_unref (pad);
2235         gst_object_unref (teepad);
2236
2237         /* link udpqueue to udpsink */
2238         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2239         gst_pad_link (queuepad, sinkpad);
2240         gst_object_unref (queuepad);
2241         gst_object_unref (sinkpad);
2242
2243         /* make appqueue */
2244         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2245         g_object_set (priv->appqueue[i], "max-size-buffers",
2246             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2247             NULL);
2248         gst_bin_add (bin, priv->appqueue[i]);
2249         /* and link tee to appqueue */
2250         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2251         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2252         gst_pad_link (teepad, pad);
2253         gst_object_unref (pad);
2254         gst_object_unref (teepad);
2255
2256         /* and link appqueue to appsink */
2257         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2258         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2259         gst_pad_link (queuepad, pad);
2260         gst_object_unref (pad);
2261         gst_object_unref (queuepad);
2262       } else if (is_tcp) {
2263         /* only appsink needed, link it to the session */
2264         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2265         gst_pad_link (priv->send_src[i], pad);
2266         gst_object_unref (pad);
2267
2268         /* when its only TCP, we need to set sync and preroll to FALSE
2269          * for the sink to avoid deadlock. And this is only needed for
2270          * sink used for RTCP data, not the RTP data. */
2271         if (i == 1)
2272           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2273       } else {
2274         /* else only udpsink needed, link it to the session */
2275         gst_pad_link (priv->send_src[i], sinkpad);
2276         gst_object_unref (sinkpad);
2277       }
2278     }
2279
2280     /* check if we need to set to a special state */
2281     if (state != GST_STATE_NULL) {
2282       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2283         gst_element_set_state (priv->udpsink[i], state);
2284       if (priv->appsink[i] && (priv->srcpad || i == 1))
2285         gst_element_set_state (priv->appsink[i], state);
2286       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2287         gst_element_set_state (priv->appqueue[i], state);
2288       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2289         gst_element_set_state (priv->udpqueue[i], state);
2290       if (priv->tee[i] && (priv->srcpad || i == 1))
2291         gst_element_set_state (priv->tee[i], state);
2292     }
2293   }
2294 }
2295
2296 /* must be called with lock */
2297 static void
2298 create_receiver_part (GstRTSPStream * stream, GstBin * bin,
2299     GstState state)
2300 {
2301   GstRTSPStreamPrivate *priv;
2302   GstPad *pad, *selpad;
2303   gboolean is_tcp;
2304   gint i;
2305
2306   priv = stream->priv;
2307
2308   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2309
2310   for (i = 0; i < 2; i++) {
2311     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2312      * RTCP sink always */
2313     if (priv->sinkpad || i == 1) {
2314       /* For the receiver we create this bit of pipeline for both
2315        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2316        * and it is all funneled into the rtpbin receive pad.
2317        *
2318        * .--------.     .--------.    .--------.
2319        * | udpsrc |     | funnel |    | rtpbin |
2320        * |       src->sink      src->sink      |
2321        * '--------'     |        |    '--------'
2322        * .--------.     |        |
2323        * | appsrc |     |        |
2324        * |       src->sink       |
2325        * '--------'     '--------'
2326        */
2327       /* make funnel for the RTP/RTCP receivers */
2328       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2329       gst_bin_add (bin, priv->funnel[i]);
2330
2331       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2332       gst_pad_link (pad, priv->recv_sink[i]);
2333       gst_object_unref (pad);
2334
2335       if (priv->udpsrc_v4[i]) {
2336         if (priv->srcpad) {
2337           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2338            * values. This is only relevant for PLAY pipelines */
2339           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2340           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2341         }
2342         /* add udpsrc */
2343         gst_bin_add (bin, priv->udpsrc_v4[i]);
2344
2345         /* and link to the funnel v4 */
2346         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2347         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2348         gst_pad_link (pad, selpad);
2349         gst_object_unref (pad);
2350         gst_object_unref (selpad);
2351       }
2352
2353       if (priv->udpsrc_v6[i]) {
2354         if (priv->srcpad) {
2355           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2356           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2357         }
2358         gst_bin_add (bin, priv->udpsrc_v6[i]);
2359
2360         /* and link to the funnel v6 */
2361         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2362         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2363         gst_pad_link (pad, selpad);
2364         gst_object_unref (pad);
2365         gst_object_unref (selpad);
2366       }
2367
2368       if (is_tcp) {
2369         /* make and add appsrc */
2370         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2371         priv->appsrc_base_time[i] = -1;
2372         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2373         gst_bin_add (bin, priv->appsrc[i]);
2374         /* and link to the funnel */
2375         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2376         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2377         gst_pad_link (pad, selpad);
2378         gst_object_unref (pad);
2379         gst_object_unref (selpad);
2380       }
2381     }
2382
2383     /* check if we need to set to a special state */
2384     if (state != GST_STATE_NULL) {
2385       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2386         gst_element_set_state (priv->funnel[i], state);
2387       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2388         gst_element_set_state (priv->appsrc[i], state);
2389     }
2390   }
2391 }
2392
2393 /**
2394  * gst_rtsp_stream_join_bin:
2395  * @stream: a #GstRTSPStream
2396  * @bin: (transfer none): a #GstBin to join
2397  * @rtpbin: (transfer none): a rtpbin element in @bin
2398  * @state: the target state of the new elements
2399  *
2400  * Join the #GstBin @bin that contains the element @rtpbin.
2401  *
2402  * @stream will link to @rtpbin, which must be inside @bin. The elements
2403  * added to @bin will be set to the state given in @state.
2404  *
2405  * Returns: %TRUE on success.
2406  */
2407 gboolean
2408 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2409     GstElement * rtpbin, GstState state)
2410 {
2411   GstRTSPStreamPrivate *priv;
2412   guint idx;
2413   gchar *name;
2414   GstPadLinkReturn ret;
2415   gboolean is_udp;
2416
2417   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2418   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2419   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2420
2421   priv = stream->priv;
2422
2423   g_mutex_lock (&priv->lock);
2424   if (priv->is_joined)
2425     goto was_joined;
2426
2427   /* create a session with the same index as the stream */
2428   idx = priv->idx;
2429
2430   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2431
2432   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2433       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2434
2435   if (is_udp && !alloc_ports (stream))
2436     goto no_ports;
2437
2438   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2439       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2440     /* For SRTP */
2441     g_signal_connect (rtpbin, "request-rtp-encoder",
2442         (GCallback) request_rtp_encoder, stream);
2443     g_signal_connect (rtpbin, "request-rtcp-encoder",
2444         (GCallback) request_rtcp_encoder, stream);
2445     g_signal_connect (rtpbin, "request-rtp-decoder",
2446         (GCallback) request_rtp_rtcp_decoder, stream);
2447     g_signal_connect (rtpbin, "request-rtcp-decoder",
2448         (GCallback) request_rtp_rtcp_decoder, stream);
2449   }
2450
2451   if (priv->sinkpad) {
2452     g_signal_connect (rtpbin, "request-pt-map",
2453         (GCallback) request_pt_map, stream);
2454   }
2455
2456   /* get pads from the RTP session element for sending and receiving
2457    * RTP/RTCP*/
2458   if (priv->srcpad) {
2459     /* get a pad for sending RTP */
2460     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2461     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2462     g_free (name);
2463
2464     /* link the RTP pad to the session manager, it should not really fail unless
2465      * this is not really an RTP pad */
2466     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2467     if (ret != GST_PAD_LINK_OK)
2468       goto link_failed;
2469
2470     name = g_strdup_printf ("send_rtp_src_%u", idx);
2471     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2472     g_free (name);
2473   } else {
2474     /* Need to connect our sinkpad from here */
2475     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2476     /* EOS */
2477     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2478
2479     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2480     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2481     g_free (name);
2482   }
2483
2484   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2485   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2486   g_free (name);
2487   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2488   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2489   g_free (name);
2490
2491   /* get the session */
2492   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2493
2494   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2495       stream);
2496   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2497       stream);
2498   g_signal_connect (priv->session, "on-ssrc-active",
2499       (GCallback) on_ssrc_active, stream);
2500   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2501       stream);
2502   g_signal_connect (priv->session, "on-bye-timeout",
2503       (GCallback) on_bye_timeout, stream);
2504   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2505       stream);
2506
2507   /* signal for sender ssrc */
2508   g_signal_connect (priv->session, "on-new-sender-ssrc",
2509       (GCallback) on_new_sender_ssrc, stream);
2510   g_signal_connect (priv->session, "on-sender-ssrc-active",
2511       (GCallback) on_sender_ssrc_active, stream);
2512
2513   create_sender_part (stream, bin, state);
2514
2515   create_receiver_part (stream, bin, state);
2516
2517   if (priv->srcpad) {
2518     /* be notified of caps changes */
2519     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2520         (GCallback) caps_notify, stream);
2521   }
2522
2523   priv->is_joined = TRUE;
2524   g_mutex_unlock (&priv->lock);
2525
2526   return TRUE;
2527
2528   /* ERRORS */
2529 was_joined:
2530   {
2531     g_mutex_unlock (&priv->lock);
2532     return TRUE;
2533   }
2534 no_ports:
2535   {
2536     g_mutex_unlock (&priv->lock);
2537     GST_WARNING ("failed to allocate ports %u", idx);
2538     return FALSE;
2539   }
2540 link_failed:
2541   {
2542     GST_WARNING ("failed to link stream %u", idx);
2543     gst_object_unref (priv->send_rtp_sink);
2544     priv->send_rtp_sink = NULL;
2545     g_mutex_unlock (&priv->lock);
2546     return FALSE;
2547   }
2548 }
2549
2550 /**
2551  * gst_rtsp_stream_leave_bin:
2552  * @stream: a #GstRTSPStream
2553  * @bin: (transfer none): a #GstBin
2554  * @rtpbin: (transfer none): a rtpbin #GstElement
2555  *
2556  * Remove the elements of @stream from @bin.
2557  *
2558  * Return: %TRUE on success.
2559  */
2560 gboolean
2561 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2562     GstElement * rtpbin)
2563 {
2564   GstRTSPStreamPrivate *priv;
2565   gint i;
2566   GList *l;
2567   gboolean is_tcp, is_udp;
2568
2569   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2570   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2571   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2572
2573   priv = stream->priv;
2574
2575   g_mutex_lock (&priv->lock);
2576   if (!priv->is_joined)
2577     goto was_not_joined;
2578
2579   /* all transports must be removed by now */
2580   if (priv->transports != NULL)
2581     goto transports_not_removed;
2582
2583   clear_tr_cache (priv, TRUE);
2584   clear_tr_cache (priv, FALSE);
2585
2586   GST_INFO ("stream %p leaving bin", stream);
2587
2588   if (priv->srcpad) {
2589     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2590
2591     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2592     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2593     gst_object_unref (priv->send_rtp_sink);
2594     priv->send_rtp_sink = NULL;
2595   } else if (priv->recv_rtp_src) {
2596     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2597     gst_object_unref (priv->recv_rtp_src);
2598     priv->recv_rtp_src = NULL;
2599   }
2600
2601   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2602
2603   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2604       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2605
2606
2607   for (i = 0; i < 2; i++) {
2608     if (priv->udpsink[i])
2609       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2610     if (priv->appsink[i])
2611       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2612     if (priv->appqueue[i])
2613       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2614     if (priv->udpqueue[i])
2615       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2616     if (priv->tee[i])
2617       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2618     if (priv->funnel[i])
2619       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2620     if (priv->appsrc[i])
2621       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2622
2623     if (priv->udpsrc_v4[i]) {
2624       if (priv->sinkpad || i == 1) {
2625         /* and set udpsrc to NULL now before removing */
2626         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2627         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2628         /* removing them should also nicely release the request
2629          * pads when they finalize */
2630         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2631       } else {
2632         /* we need to set the state to NULL before unref */
2633         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2634         gst_object_unref (priv->udpsrc_v4[i]);
2635       }
2636     }
2637
2638     if (priv->udpsrc_v6[i]) {
2639       if (priv->sinkpad || i == 1) {
2640         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2641         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2642         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2643       } else {
2644         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2645         gst_object_unref (priv->udpsrc_v6[i]);
2646       }
2647     }
2648
2649     for (l = priv->transport_sources; l; l = l->next) {
2650       GstRTSPMulticastTransportSource *s = l->data;
2651
2652       if (!s->udpsrc[i])
2653         continue;
2654
2655       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2656       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2657       gst_bin_remove (bin, s->udpsrc[i]);
2658     }
2659
2660     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2661       gst_bin_remove (bin, priv->udpsink[i]);
2662     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2663       gst_bin_remove (bin, priv->appsrc[i]);
2664     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2665       gst_bin_remove (bin, priv->appsink[i]);
2666     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2667       gst_bin_remove (bin, priv->appqueue[i]);
2668     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2669       gst_bin_remove (bin, priv->udpqueue[i]);
2670     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2671       gst_bin_remove (bin, priv->tee[i]);
2672     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2673       gst_bin_remove (bin, priv->funnel[i]);
2674
2675     if (priv->sinkpad || i == 1) {
2676       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2677       gst_object_unref (priv->recv_sink[i]);
2678       priv->recv_sink[i] = NULL;
2679     }
2680
2681     priv->udpsrc_v4[i] = NULL;
2682     priv->udpsrc_v6[i] = NULL;
2683     priv->udpsink[i] = NULL;
2684     priv->appsrc[i] = NULL;
2685     priv->appsink[i] = NULL;
2686     priv->appqueue[i] = NULL;
2687     priv->udpqueue[i] = NULL;
2688     priv->tee[i] = NULL;
2689     priv->funnel[i] = NULL;
2690   }
2691
2692   for (l = priv->transport_sources; l; l = l->next) {
2693     GstRTSPMulticastTransportSource *s = l->data;
2694     g_slice_free (GstRTSPMulticastTransportSource, s);
2695   }
2696   g_list_free (priv->transport_sources);
2697   priv->transport_sources = NULL;
2698
2699   if (priv->srcpad) {
2700     gst_object_unref (priv->send_src[0]);
2701     priv->send_src[0] = NULL;
2702   }
2703
2704   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2705   gst_object_unref (priv->send_src[1]);
2706   priv->send_src[1] = NULL;
2707
2708   g_object_unref (priv->session);
2709   priv->session = NULL;
2710   if (priv->caps)
2711     gst_caps_unref (priv->caps);
2712   priv->caps = NULL;
2713
2714   if (priv->srtpenc)
2715     gst_object_unref (priv->srtpenc);
2716   if (priv->srtpdec)
2717     gst_object_unref (priv->srtpdec);
2718
2719   priv->is_joined = FALSE;
2720   g_mutex_unlock (&priv->lock);
2721
2722   return TRUE;
2723
2724 was_not_joined:
2725   {
2726     g_mutex_unlock (&priv->lock);
2727     return TRUE;
2728   }
2729 transports_not_removed:
2730   {
2731     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2732     g_mutex_unlock (&priv->lock);
2733     return FALSE;
2734   }
2735 }
2736
2737 /**
2738  * gst_rtsp_stream_get_rtpinfo:
2739  * @stream: a #GstRTSPStream
2740  * @rtptime: (allow-none): result RTP timestamp
2741  * @seq: (allow-none): result RTP seqnum
2742  * @clock_rate: (allow-none): the clock rate
2743  * @running_time: (allow-none): result running-time
2744  *
2745  * Retrieve the current rtptime, seq and running-time. This is used to
2746  * construct a RTPInfo reply header.
2747  *
2748  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2749  */
2750 gboolean
2751 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2752     guint * rtptime, guint * seq, guint * clock_rate,
2753     GstClockTime * running_time)
2754 {
2755   GstRTSPStreamPrivate *priv;
2756   GstStructure *stats;
2757   GObjectClass *payobjclass;
2758
2759   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2760
2761   priv = stream->priv;
2762
2763   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2764
2765   g_mutex_lock (&priv->lock);
2766
2767   /* First try to extract the information from the last buffer on the sinks.
2768    * This will have a more accurate sequence number and timestamp, as between
2769    * the payloader and the sink there can be some queues
2770    */
2771   if (priv->udpsink[0] || priv->appsink[0]) {
2772     GstSample *last_sample;
2773
2774     if (priv->udpsink[0])
2775       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2776     else
2777       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2778
2779     if (last_sample) {
2780       GstCaps *caps;
2781       GstBuffer *buffer;
2782       GstSegment *segment;
2783       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2784
2785       caps = gst_sample_get_caps (last_sample);
2786       buffer = gst_sample_get_buffer (last_sample);
2787       segment = gst_sample_get_segment (last_sample);
2788
2789       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2790         if (seq) {
2791           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2792         }
2793
2794         if (rtptime) {
2795           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2796         }
2797
2798         gst_rtp_buffer_unmap (&rtp_buffer);
2799
2800         if (running_time) {
2801           *running_time =
2802               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2803               GST_BUFFER_TIMESTAMP (buffer));
2804         }
2805
2806         if (clock_rate) {
2807           GstStructure *s = gst_caps_get_structure (caps, 0);
2808
2809           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2810
2811           if (*clock_rate == 0 && running_time)
2812             *running_time = GST_CLOCK_TIME_NONE;
2813         }
2814         gst_sample_unref (last_sample);
2815
2816         goto done;
2817       } else {
2818         gst_sample_unref (last_sample);
2819       }
2820     }
2821   }
2822
2823   if (g_object_class_find_property (payobjclass, "stats")) {
2824     g_object_get (priv->payloader, "stats", &stats, NULL);
2825     if (stats == NULL)
2826       goto no_stats;
2827
2828     if (seq)
2829       gst_structure_get_uint (stats, "seqnum", seq);
2830
2831     if (rtptime)
2832       gst_structure_get_uint (stats, "timestamp", rtptime);
2833
2834     if (running_time)
2835       gst_structure_get_clock_time (stats, "running-time", running_time);
2836
2837     if (clock_rate) {
2838       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2839       if (*clock_rate == 0 && running_time)
2840         *running_time = GST_CLOCK_TIME_NONE;
2841     }
2842     gst_structure_free (stats);
2843   } else {
2844     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2845         !g_object_class_find_property (payobjclass, "timestamp"))
2846       goto no_stats;
2847
2848     if (seq)
2849       g_object_get (priv->payloader, "seqnum", seq, NULL);
2850
2851     if (rtptime)
2852       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2853
2854     if (running_time)
2855       *running_time = GST_CLOCK_TIME_NONE;
2856   }
2857
2858 done:
2859   g_mutex_unlock (&priv->lock);
2860
2861   return TRUE;
2862
2863   /* ERRORS */
2864 no_stats:
2865   {
2866     GST_WARNING ("Could not get payloader stats");
2867     g_mutex_unlock (&priv->lock);
2868     return FALSE;
2869   }
2870 }
2871
2872 /**
2873  * gst_rtsp_stream_get_caps:
2874  * @stream: a #GstRTSPStream
2875  *
2876  * Retrieve the current caps of @stream.
2877  *
2878  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2879  * after usage.
2880  */
2881 GstCaps *
2882 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2883 {
2884   GstRTSPStreamPrivate *priv;
2885   GstCaps *result;
2886
2887   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2888
2889   priv = stream->priv;
2890
2891   g_mutex_lock (&priv->lock);
2892   if ((result = priv->caps))
2893     gst_caps_ref (result);
2894   g_mutex_unlock (&priv->lock);
2895
2896   return result;
2897 }
2898
2899 /**
2900  * gst_rtsp_stream_recv_rtp:
2901  * @stream: a #GstRTSPStream
2902  * @buffer: (transfer full): a #GstBuffer
2903  *
2904  * Handle an RTP buffer for the stream. This method is usually called when a
2905  * message has been received from a client using the TCP transport.
2906  *
2907  * This function takes ownership of @buffer.
2908  *
2909  * Returns: a GstFlowReturn.
2910  */
2911 GstFlowReturn
2912 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2913 {
2914   GstRTSPStreamPrivate *priv;
2915   GstFlowReturn ret;
2916   GstElement *element;
2917
2918   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2919   priv = stream->priv;
2920   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2921   g_return_val_if_fail (priv->is_joined, FALSE);
2922
2923   g_mutex_lock (&priv->lock);
2924   if (priv->appsrc[0])
2925     element = gst_object_ref (priv->appsrc[0]);
2926   else
2927     element = NULL;
2928   g_mutex_unlock (&priv->lock);
2929
2930   if (element) {
2931     if (priv->appsrc_base_time[0] == -1) {
2932       /* Take current running_time. This timestamp will be put on
2933        * the first buffer of each stream because we are a live source and so we
2934        * timestamp with the running_time. When we are dealing with TCP, we also
2935        * only timestamp the first buffer (using the DISCONT flag) because a server
2936        * typically bursts data, for which we don't want to compensate by speeding
2937        * up the media. The other timestamps will be interpollated from this one
2938        * using the RTP timestamps. */
2939       GST_OBJECT_LOCK (element);
2940       if (GST_ELEMENT_CLOCK (element)) {
2941         GstClockTime now;
2942         GstClockTime base_time;
2943
2944         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2945         base_time = GST_ELEMENT_CAST (element)->base_time;
2946
2947         priv->appsrc_base_time[0] = now - base_time;
2948         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2949         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2950             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2951             GST_TIME_ARGS (base_time));
2952       }
2953       GST_OBJECT_UNLOCK (element);
2954     }
2955
2956     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2957     gst_object_unref (element);
2958   } else {
2959     ret = GST_FLOW_OK;
2960   }
2961   return ret;
2962 }
2963
2964 /**
2965  * gst_rtsp_stream_recv_rtcp:
2966  * @stream: a #GstRTSPStream
2967  * @buffer: (transfer full): a #GstBuffer
2968  *
2969  * Handle an RTCP buffer for the stream. This method is usually called when a
2970  * message has been received from a client using the TCP transport.
2971  *
2972  * This function takes ownership of @buffer.
2973  *
2974  * Returns: a GstFlowReturn.
2975  */
2976 GstFlowReturn
2977 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2978 {
2979   GstRTSPStreamPrivate *priv;
2980   GstFlowReturn ret;
2981   GstElement *element;
2982
2983   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2984   priv = stream->priv;
2985   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2986
2987   if (!priv->is_joined) {
2988     gst_buffer_unref (buffer);
2989     return GST_FLOW_NOT_LINKED;
2990   }
2991   g_mutex_lock (&priv->lock);
2992   if (priv->appsrc[1])
2993     element = gst_object_ref (priv->appsrc[1]);
2994   else
2995     element = NULL;
2996   g_mutex_unlock (&priv->lock);
2997
2998   if (element) {
2999     if (priv->appsrc_base_time[1] == -1) {
3000       /* Take current running_time. This timestamp will be put on
3001        * the first buffer of each stream because we are a live source and so we
3002        * timestamp with the running_time. When we are dealing with TCP, we also
3003        * only timestamp the first buffer (using the DISCONT flag) because a server
3004        * typically bursts data, for which we don't want to compensate by speeding
3005        * up the media. The other timestamps will be interpollated from this one
3006        * using the RTP timestamps. */
3007       GST_OBJECT_LOCK (element);
3008       if (GST_ELEMENT_CLOCK (element)) {
3009         GstClockTime now;
3010         GstClockTime base_time;
3011
3012         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3013         base_time = GST_ELEMENT_CAST (element)->base_time;
3014
3015         priv->appsrc_base_time[1] = now - base_time;
3016         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3017         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3018             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3019             GST_TIME_ARGS (base_time));
3020       }
3021       GST_OBJECT_UNLOCK (element);
3022     }
3023
3024     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3025     gst_object_unref (element);
3026   } else {
3027     ret = GST_FLOW_OK;
3028     gst_buffer_unref (buffer);
3029   }
3030   return ret;
3031 }
3032
3033 /* must be called with lock */
3034 static gboolean
3035 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3036     gboolean add)
3037 {
3038   GstRTSPStreamPrivate *priv = stream->priv;
3039   const GstRTSPTransport *tr;
3040
3041   tr = gst_rtsp_stream_transport_get_transport (trans);
3042
3043   switch (tr->lower_transport) {
3044     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3045     {
3046       GstRTSPMulticastTransportSource *source;
3047       GstBin *bin;
3048
3049       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
3050
3051       if (add) {
3052         gchar *host;
3053         gint i;
3054         GstPad *selpad, *pad;
3055
3056         source = g_slice_new0 (GstRTSPMulticastTransportSource);
3057         source->transport = trans;
3058
3059         for (i = 0; i < 2; i++) {
3060           host =
3061               g_strdup_printf ("udp://%s:%d", tr->destination,
3062               (i == 0) ? tr->port.min : tr->port.max);
3063           source->udpsrc[i] =
3064               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
3065           g_free (host);
3066           g_object_set (source->udpsrc[i], "loop", FALSE, NULL);
3067
3068           if (priv->srcpad) {
3069             /* we set and keep these to playing so that they don't cause NO_PREROLL return
3070              * values. This is only relevant for PLAY pipelines */
3071             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
3072             gst_element_set_locked_state (source->udpsrc[i], TRUE);
3073           }
3074           /* add udpsrc */
3075           gst_bin_add (bin, source->udpsrc[i]);
3076
3077           /* and link to the funnel v4 */
3078           if (priv->sinkpad || i == 1) {
3079             source->selpad[i] = selpad =
3080                 gst_element_get_request_pad (priv->funnel[i], "sink_%u");
3081             pad = gst_element_get_static_pad (source->udpsrc[i], "src");
3082             gst_pad_link (pad, selpad);
3083             gst_object_unref (pad);
3084             gst_object_unref (selpad);
3085           }
3086         }
3087
3088         priv->transport_sources =
3089             g_list_prepend (priv->transport_sources, source);
3090       } else {
3091         GList *l;
3092
3093         for (l = priv->transport_sources; l; l = l->next) {
3094           source = l->data;
3095
3096           if (source->transport == trans) {
3097             priv->transport_sources =
3098                 g_list_delete_link (priv->transport_sources, l);
3099             break;
3100           }
3101         }
3102
3103         if (l != NULL) {
3104           gint i;
3105
3106           for (i = 0; i < 2; i++) {
3107             /* Will automatically unlink everything */
3108             gst_bin_remove (bin,
3109                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
3110
3111             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
3112             gst_object_unref (source->udpsrc[i]);
3113
3114             if (priv->sinkpad || i == 1) {
3115               gst_element_release_request_pad (priv->funnel[i],
3116                   source->selpad[i]);
3117             }
3118           }
3119
3120           g_slice_free (GstRTSPMulticastTransportSource, source);
3121         }
3122       }
3123
3124       gst_object_unref (bin);
3125
3126       /* fall through for the generic case */
3127     }
3128     case GST_RTSP_LOWER_TRANS_UDP:
3129     {
3130       gchar *dest;
3131       gint min, max;
3132       guint ttl = 0;
3133
3134       dest = tr->destination;
3135       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3136         min = tr->port.min;
3137         max = tr->port.max;
3138         ttl = tr->ttl;
3139       } else if (priv->client_side) {
3140         /* In client side mode the 'destination' is the RTSP server, so send
3141          * to those ports */
3142         min = tr->server_port.min;
3143         max = tr->server_port.max;
3144       } else {
3145         min = tr->client_port.min;
3146         max = tr->client_port.max;
3147       }
3148
3149       if (add) {
3150         if (ttl > 0) {
3151           GST_INFO ("setting ttl-mc %d", ttl);
3152           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3153           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3154         }
3155         GST_INFO ("adding %s:%d-%d", dest, min, max);
3156         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3157         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3158         priv->transports = g_list_prepend (priv->transports, trans);
3159       } else {
3160         GST_INFO ("removing %s:%d-%d", dest, min, max);
3161         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3162         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3163         priv->transports = g_list_remove (priv->transports, trans);
3164       }
3165       priv->transports_cookie++;
3166       break;
3167     }
3168     case GST_RTSP_LOWER_TRANS_TCP:
3169       if (add) {
3170         GST_INFO ("adding TCP %s", tr->destination);
3171         priv->transports = g_list_prepend (priv->transports, trans);
3172       } else {
3173         GST_INFO ("removing TCP %s", tr->destination);
3174         priv->transports = g_list_remove (priv->transports, trans);
3175       }
3176       priv->transports_cookie++;
3177       break;
3178     default:
3179       goto unknown_transport;
3180   }
3181   return TRUE;
3182
3183   /* ERRORS */
3184 unknown_transport:
3185   {
3186     GST_INFO ("Unknown transport %d", tr->lower_transport);
3187     return FALSE;
3188   }
3189 }
3190
3191
3192 /**
3193  * gst_rtsp_stream_add_transport:
3194  * @stream: a #GstRTSPStream
3195  * @trans: (transfer none): a #GstRTSPStreamTransport
3196  *
3197  * Add the transport in @trans to @stream. The media of @stream will
3198  * then also be send to the values configured in @trans.
3199  *
3200  * @stream must be joined to a bin.
3201  *
3202  * @trans must contain a valid #GstRTSPTransport.
3203  *
3204  * Returns: %TRUE if @trans was added
3205  */
3206 gboolean
3207 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3208     GstRTSPStreamTransport * trans)
3209 {
3210   GstRTSPStreamPrivate *priv;
3211   gboolean res;
3212
3213   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3214   priv = stream->priv;
3215   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3216   g_return_val_if_fail (priv->is_joined, FALSE);
3217
3218   g_mutex_lock (&priv->lock);
3219   res = update_transport (stream, trans, TRUE);
3220   g_mutex_unlock (&priv->lock);
3221
3222   return res;
3223 }
3224
3225 /**
3226  * gst_rtsp_stream_remove_transport:
3227  * @stream: a #GstRTSPStream
3228  * @trans: (transfer none): a #GstRTSPStreamTransport
3229  *
3230  * Remove the transport in @trans from @stream. The media of @stream will
3231  * not be sent to the values configured in @trans.
3232  *
3233  * @stream must be joined to a bin.
3234  *
3235  * @trans must contain a valid #GstRTSPTransport.
3236  *
3237  * Returns: %TRUE if @trans was removed
3238  */
3239 gboolean
3240 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3241     GstRTSPStreamTransport * trans)
3242 {
3243   GstRTSPStreamPrivate *priv;
3244   gboolean res;
3245
3246   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3247   priv = stream->priv;
3248   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3249   g_return_val_if_fail (priv->is_joined, FALSE);
3250
3251   g_mutex_lock (&priv->lock);
3252   res = update_transport (stream, trans, FALSE);
3253   g_mutex_unlock (&priv->lock);
3254
3255   return res;
3256 }
3257
3258 /**
3259  * gst_rtsp_stream_update_crypto:
3260  * @stream: a #GstRTSPStream
3261  * @ssrc: the SSRC
3262  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3263  *
3264  * Update the new crypto information for @ssrc in @stream. If information
3265  * for @ssrc did not exist, it will be added. If information
3266  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3267  * be removed from @stream.
3268  *
3269  * Returns: %TRUE if @crypto could be updated
3270  */
3271 gboolean
3272 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3273     guint ssrc, GstCaps * crypto)
3274 {
3275   GstRTSPStreamPrivate *priv;
3276
3277   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3278   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3279
3280   priv = stream->priv;
3281
3282   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3283
3284   g_mutex_lock (&priv->lock);
3285   if (crypto)
3286     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3287         gst_caps_ref (crypto));
3288   else
3289     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3290   g_mutex_unlock (&priv->lock);
3291
3292   return TRUE;
3293 }
3294
3295 /**
3296  * gst_rtsp_stream_get_rtp_socket:
3297  * @stream: a #GstRTSPStream
3298  * @family: the socket family
3299  *
3300  * Get the RTP socket from @stream for a @family.
3301  *
3302  * @stream must be joined to a bin.
3303  *
3304  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3305  * socket could be allocated for @family. Unref after usage
3306  */
3307 GSocket *
3308 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3309 {
3310   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3311   GSocket *socket;
3312   const gchar *name;
3313
3314   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3315   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3316       family == G_SOCKET_FAMILY_IPV6, NULL);
3317   g_return_val_if_fail (priv->udpsink[0], NULL);
3318
3319   if (family == G_SOCKET_FAMILY_IPV6)
3320     name = "socket-v6";
3321   else
3322     name = "socket";
3323
3324   g_object_get (priv->udpsink[0], name, &socket, NULL);
3325
3326   return socket;
3327 }
3328
3329 /**
3330  * gst_rtsp_stream_get_rtcp_socket:
3331  * @stream: a #GstRTSPStream
3332  * @family: the socket family
3333  *
3334  * Get the RTCP socket from @stream for a @family.
3335  *
3336  * @stream must be joined to a bin.
3337  *
3338  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3339  * socket could be allocated for @family. Unref after usage
3340  */
3341 GSocket *
3342 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3343 {
3344   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3345   GSocket *socket;
3346   const gchar *name;
3347
3348   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3349   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3350       family == G_SOCKET_FAMILY_IPV6, NULL);
3351   g_return_val_if_fail (priv->udpsink[1], NULL);
3352
3353   if (family == G_SOCKET_FAMILY_IPV6)
3354     name = "socket-v6";
3355   else
3356     name = "socket";
3357
3358   g_object_get (priv->udpsink[1], name, &socket, NULL);
3359
3360   return socket;
3361 }
3362
3363 /**
3364  * gst_rtsp_stream_set_seqnum:
3365  * @stream: a #GstRTSPStream
3366  * @seqnum: a new sequence number
3367  *
3368  * Configure the sequence number in the payloader of @stream to @seqnum.
3369  */
3370 void
3371 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3372 {
3373   GstRTSPStreamPrivate *priv;
3374
3375   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3376
3377   priv = stream->priv;
3378
3379   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3380 }
3381
3382 /**
3383  * gst_rtsp_stream_get_seqnum:
3384  * @stream: a #GstRTSPStream
3385  *
3386  * Get the configured sequence number in the payloader of @stream.
3387  *
3388  * Returns: the sequence number of the payloader.
3389  */
3390 guint16
3391 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3392 {
3393   GstRTSPStreamPrivate *priv;
3394   guint seqnum;
3395
3396   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3397
3398   priv = stream->priv;
3399
3400   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3401
3402   return seqnum;
3403 }
3404
3405 /**
3406  * gst_rtsp_stream_transport_filter:
3407  * @stream: a #GstRTSPStream
3408  * @func: (scope call) (allow-none): a callback
3409  * @user_data: (closure): user data passed to @func
3410  *
3411  * Call @func for each transport managed by @stream. The result value of @func
3412  * determines what happens to the transport. @func will be called with @stream
3413  * locked so no further actions on @stream can be performed from @func.
3414  *
3415  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3416  * @stream.
3417  *
3418  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3419  *
3420  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3421  * will also be added with an additional ref to the result #GList of this
3422  * function..
3423  *
3424  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3425  *
3426  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3427  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3428  * element in the #GList should be unreffed before the list is freed.
3429  */
3430 GList *
3431 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3432     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3433 {
3434   GstRTSPStreamPrivate *priv;
3435   GList *result, *walk, *next;
3436   GHashTable *visited = NULL;
3437   guint cookie;
3438
3439   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3440
3441   priv = stream->priv;
3442
3443   result = NULL;
3444   if (func)
3445     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3446
3447   g_mutex_lock (&priv->lock);
3448 restart:
3449   cookie = priv->transports_cookie;
3450   for (walk = priv->transports; walk; walk = next) {
3451     GstRTSPStreamTransport *trans = walk->data;
3452     GstRTSPFilterResult res;
3453     gboolean changed;
3454
3455     next = g_list_next (walk);
3456
3457     if (func) {
3458       /* only visit each transport once */
3459       if (g_hash_table_contains (visited, trans))
3460         continue;
3461
3462       g_hash_table_add (visited, g_object_ref (trans));
3463       g_mutex_unlock (&priv->lock);
3464
3465       res = func (stream, trans, user_data);
3466
3467       g_mutex_lock (&priv->lock);
3468     } else
3469       res = GST_RTSP_FILTER_REF;
3470
3471     changed = (cookie != priv->transports_cookie);
3472
3473     switch (res) {
3474       case GST_RTSP_FILTER_REMOVE:
3475         update_transport (stream, trans, FALSE);
3476         break;
3477       case GST_RTSP_FILTER_REF:
3478         result = g_list_prepend (result, g_object_ref (trans));
3479         break;
3480       case GST_RTSP_FILTER_KEEP:
3481       default:
3482         break;
3483     }
3484     if (changed)
3485       goto restart;
3486   }
3487   g_mutex_unlock (&priv->lock);
3488
3489   if (func)
3490     g_hash_table_unref (visited);
3491
3492   return result;
3493 }
3494
3495 static GstPadProbeReturn
3496 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3497 {
3498   GstRTSPStreamPrivate *priv;
3499   GstRTSPStream *stream;
3500
3501   stream = user_data;
3502   priv = stream->priv;
3503
3504   GST_DEBUG_OBJECT (pad, "now blocking");
3505
3506   g_mutex_lock (&priv->lock);
3507   priv->blocking = TRUE;
3508   g_mutex_unlock (&priv->lock);
3509
3510   gst_element_post_message (priv->payloader,
3511       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3512           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3513
3514   return GST_PAD_PROBE_OK;
3515 }
3516
3517 /**
3518  * gst_rtsp_stream_set_blocked:
3519  * @stream: a #GstRTSPStream
3520  * @blocked: boolean indicating we should block or unblock
3521  *
3522  * Blocks or unblocks the dataflow on @stream.
3523  *
3524  * Returns: %TRUE on success
3525  */
3526 gboolean
3527 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3528 {
3529   GstRTSPStreamPrivate *priv;
3530
3531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3532
3533   priv = stream->priv;
3534
3535   g_mutex_lock (&priv->lock);
3536   if (blocked) {
3537     priv->blocking = FALSE;
3538     if (priv->blocked_id == 0) {
3539       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3540           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3541           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3542           g_object_ref (stream), g_object_unref);
3543     }
3544   } else {
3545     if (priv->blocked_id != 0) {
3546       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3547       priv->blocked_id = 0;
3548       priv->blocking = FALSE;
3549     }
3550   }
3551   g_mutex_unlock (&priv->lock);
3552
3553   return TRUE;
3554 }
3555
3556 /**
3557  * gst_rtsp_stream_is_blocking:
3558  * @stream: a #GstRTSPStream
3559  *
3560  * Check if @stream is blocking on a #GstBuffer.
3561  *
3562  * Returns: %TRUE if @stream is blocking
3563  */
3564 gboolean
3565 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3566 {
3567   GstRTSPStreamPrivate *priv;
3568   gboolean result;
3569
3570   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3571
3572   priv = stream->priv;
3573
3574   g_mutex_lock (&priv->lock);
3575   result = priv->blocking;
3576   g_mutex_unlock (&priv->lock);
3577
3578   return result;
3579 }
3580
3581 /**
3582  * gst_rtsp_stream_query_position:
3583  * @stream: a #GstRTSPStream
3584  *
3585  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3586  * the RTP parts of the pipeline and not the RTCP parts.
3587  *
3588  * Returns: %TRUE if the position could be queried
3589  */
3590 gboolean
3591 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3592 {
3593   GstRTSPStreamPrivate *priv;
3594   GstElement *sink;
3595   gboolean ret;
3596
3597   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3598
3599   priv = stream->priv;
3600
3601   g_mutex_lock (&priv->lock);
3602   /* depending on the transport type, it should query corresponding sink */
3603   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3604       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3605     sink = priv->udpsink[0];
3606   else
3607     sink = priv->appsink[0];
3608
3609   if (sink)
3610     gst_object_ref (sink);
3611   g_mutex_unlock (&priv->lock);
3612
3613   if (!sink)
3614     return FALSE;
3615
3616   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3617   gst_object_unref (sink);
3618
3619   return ret;
3620 }
3621
3622 /**
3623  * gst_rtsp_stream_query_stop:
3624  * @stream: a #GstRTSPStream
3625  *
3626  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3627  * the RTP parts of the pipeline and not the RTCP parts.
3628  *
3629  * Returns: %TRUE if the stop could be queried
3630  */
3631 gboolean
3632 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3633 {
3634   GstRTSPStreamPrivate *priv;
3635   GstElement *sink;
3636   GstQuery *query;
3637   gboolean ret;
3638
3639   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3640
3641   priv = stream->priv;
3642
3643   g_mutex_lock (&priv->lock);
3644   /* depending on the transport type, it should query corresponding sink */
3645   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3646       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3647     sink = priv->udpsink[0];
3648   else
3649     sink = priv->appsink[0];
3650
3651   if (sink)
3652     gst_object_ref (sink);
3653   g_mutex_unlock (&priv->lock);
3654
3655   if (!sink)
3656     return FALSE;
3657
3658   query = gst_query_new_segment (GST_FORMAT_TIME);
3659   if ((ret = gst_element_query (sink, query))) {
3660     GstFormat format;
3661
3662     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3663     if (format != GST_FORMAT_TIME)
3664       *stop = -1;
3665   }
3666   gst_query_unref (query);
3667   gst_object_unref (sink);
3668
3669   return ret;
3670
3671 }