rtsp-stream: added function for creating and configuring UDP sinks
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83
84   /* TRUE if this stream is running on
85    * the client side of an RTSP link (for RECORD) */
86   gboolean client_side;
87   gchar *control;
88
89   GstRTSPProfile profiles;
90   GstRTSPLowerTrans protocols;
91
92   /* pads on the rtpbin */
93   GstPad *send_rtp_sink;
94   GstPad *recv_rtp_src;
95   GstPad *recv_sink[2];
96   GstPad *send_src[2];
97
98   /* the RTPSession object */
99   GObject *session;
100
101   /* SRTP encoder/decoder */
102   GstElement *srtpenc;
103   GstElement *srtpdec;
104   GHashTable *keys;
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
107    * sockets */
108   GstElement *udpsrc_v4[2];
109
110   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
111    * sockets */
112   GstElement *udpsrc_v6[2];
113
114   GstElement *udpqueue[2];
115   GstElement *udpsink[2];
116
117   /* for TCP transport */
118   GstElement *appsrc[2];
119   GstClockTime appsrc_base_time[2];
120   GstElement *appqueue[2];
121   GstElement *appsink[2];
122
123   GstElement *tee[2];
124   GstElement *funnel[2];
125
126   /* retransmission */
127   GstElement *rtxsend;
128   guint rtx_pt;
129   GstClockTime rtx_time;
130
131   /* server ports for sending/receiving over ipv4 */
132   GstRTSPRange server_port_v4;
133   GstRTSPAddress *server_addr_v4;
134   gboolean have_ipv4;
135
136   /* server ports for sending/receiving over ipv6 */
137   GstRTSPRange server_port_v6;
138   GstRTSPAddress *server_addr_v6;
139   gboolean have_ipv6;
140
141   /* multicast addresses */
142   GstRTSPAddressPool *pool;
143   GstRTSPAddress *addr_v4;
144   GstRTSPAddress *addr_v6;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   /* UDP sources for UDP multicast transports */
161   GList *transport_sources;
162
163   gint dscp_qos;
164
165   /* stream blocking */
166   gulong blocked_id;
167   gboolean blocking;
168
169   /* pt->caps map for RECORD streams */
170   GHashTable *ptmap;
171 };
172
173 #define DEFAULT_CONTROL         NULL
174 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
175 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
176                                         GST_RTSP_LOWER_TRANS_TCP
177
178 enum
179 {
180   PROP_0,
181   PROP_CONTROL,
182   PROP_PROFILES,
183   PROP_PROTOCOLS,
184   PROP_LAST
185 };
186
187 enum
188 {
189   SIGNAL_NEW_RTP_ENCODER,
190   SIGNAL_NEW_RTCP_ENCODER,
191   SIGNAL_LAST
192 };
193
194 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
195 #define GST_CAT_DEFAULT rtsp_stream_debug
196
197 static GQuark ssrc_stream_map_key;
198
199 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
200     GValue * value, GParamSpec * pspec);
201 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
202     const GValue * value, GParamSpec * pspec);
203
204 static void gst_rtsp_stream_finalize (GObject * obj);
205
206 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
207
208 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
209
210 static void
211 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
212 {
213   GObjectClass *gobject_class;
214
215   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
216
217   gobject_class = G_OBJECT_CLASS (klass);
218
219   gobject_class->get_property = gst_rtsp_stream_get_property;
220   gobject_class->set_property = gst_rtsp_stream_set_property;
221   gobject_class->finalize = gst_rtsp_stream_finalize;
222
223   g_object_class_install_property (gobject_class, PROP_CONTROL,
224       g_param_spec_string ("control", "Control",
225           "The control string for this stream", DEFAULT_CONTROL,
226           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   g_object_class_install_property (gobject_class, PROP_PROFILES,
229       g_param_spec_flags ("profiles", "Profiles",
230           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
231           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232
233   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
234       g_param_spec_flags ("protocols", "Protocols",
235           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
236           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
239       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
241       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
242
243   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
244       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
245       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
246       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
247
248   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
249
250   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
251 }
252
253 static void
254 gst_rtsp_stream_init (GstRTSPStream * stream)
255 {
256   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
257
258   GST_DEBUG ("new stream %p", stream);
259
260   stream->priv = priv;
261
262   priv->dscp_qos = -1;
263   priv->control = g_strdup (DEFAULT_CONTROL);
264   priv->profiles = DEFAULT_PROFILES;
265   priv->protocols = DEFAULT_PROTOCOLS;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   gst_object_unref (priv->payloader);
303   if (priv->srcpad)
304     gst_object_unref (priv->srcpad);
305   if (priv->sinkpad)
306     gst_object_unref (priv->sinkpad);
307   g_free (priv->control);
308   g_mutex_clear (&priv->lock);
309
310   g_hash_table_unref (priv->keys);
311   g_hash_table_destroy (priv->ptmap);
312
313   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
314 }
315
316 static void
317 gst_rtsp_stream_get_property (GObject * object, guint propid,
318     GValue * value, GParamSpec * pspec)
319 {
320   GstRTSPStream *stream = GST_RTSP_STREAM (object);
321
322   switch (propid) {
323     case PROP_CONTROL:
324       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
325       break;
326     case PROP_PROFILES:
327       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
328       break;
329     case PROP_PROTOCOLS:
330       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
331       break;
332     default:
333       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
334   }
335 }
336
337 static void
338 gst_rtsp_stream_set_property (GObject * object, guint propid,
339     const GValue * value, GParamSpec * pspec)
340 {
341   GstRTSPStream *stream = GST_RTSP_STREAM (object);
342
343   switch (propid) {
344     case PROP_CONTROL:
345       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
346       break;
347     case PROP_PROFILES:
348       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
349       break;
350     case PROP_PROTOCOLS:
351       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
352       break;
353     default:
354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
355   }
356 }
357
358 /**
359  * gst_rtsp_stream_new:
360  * @idx: an index
361  * @pad: a #GstPad
362  * @payloader: a #GstElement
363  *
364  * Create a new media stream with index @idx that handles RTP data on
365  * @pad and has a payloader element @payloader if @pad is a source pad
366  * or a depayloader element @payloader if @pad is a sink pad.
367  *
368  * Returns: (transfer full): a new #GstRTSPStream
369  */
370 GstRTSPStream *
371 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
372 {
373   GstRTSPStreamPrivate *priv;
374   GstRTSPStream *stream;
375
376   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
377   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
378
379   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
380   priv = stream->priv;
381   priv->idx = idx;
382   priv->payloader = gst_object_ref (payloader);
383   if (GST_PAD_IS_SRC (pad))
384     priv->srcpad = gst_object_ref (pad);
385   else
386     priv->sinkpad = gst_object_ref (pad);
387
388   return stream;
389 }
390
391 /**
392  * gst_rtsp_stream_get_index:
393  * @stream: a #GstRTSPStream
394  *
395  * Get the stream index.
396  *
397  * Return: the stream index.
398  */
399 guint
400 gst_rtsp_stream_get_index (GstRTSPStream * stream)
401 {
402   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
403
404   return stream->priv->idx;
405 }
406
407 /**
408  * gst_rtsp_stream_get_pt:
409  * @stream: a #GstRTSPStream
410  *
411  * Get the stream payload type.
412  *
413  * Return: the stream payload type.
414  */
415 guint
416 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
417 {
418   GstRTSPStreamPrivate *priv;
419   guint pt;
420
421   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
422
423   priv = stream->priv;
424
425   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
426
427   return pt;
428 }
429
430 /**
431  * gst_rtsp_stream_get_srcpad:
432  * @stream: a #GstRTSPStream
433  *
434  * Get the srcpad associated with @stream.
435  *
436  * Returns: (transfer full): the srcpad. Unref after usage.
437  */
438 GstPad *
439 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
440 {
441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
442
443   if (!stream->priv->srcpad)
444     return NULL;
445
446   return gst_object_ref (stream->priv->srcpad);
447 }
448
449 /**
450  * gst_rtsp_stream_get_sinkpad:
451  * @stream: a #GstRTSPStream
452  *
453  * Get the sinkpad associated with @stream.
454  *
455  * Returns: (transfer full): the sinkpad. Unref after usage.
456  */
457 GstPad *
458 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
459 {
460   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
461
462   if (!stream->priv->sinkpad)
463     return NULL;
464
465   return gst_object_ref (stream->priv->sinkpad);
466 }
467
468 /**
469  * gst_rtsp_stream_get_control:
470  * @stream: a #GstRTSPStream
471  *
472  * Get the control string to identify this stream.
473  *
474  * Returns: (transfer full): the control string. g_free() after usage.
475  */
476 gchar *
477 gst_rtsp_stream_get_control (GstRTSPStream * stream)
478 {
479   GstRTSPStreamPrivate *priv;
480   gchar *result;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if ((result = g_strdup (priv->control)) == NULL)
488     result = g_strdup_printf ("stream=%u", priv->idx);
489   g_mutex_unlock (&priv->lock);
490
491   return result;
492 }
493
494 /**
495  * gst_rtsp_stream_set_control:
496  * @stream: a #GstRTSPStream
497  * @control: a control string
498  *
499  * Set the control string in @stream.
500  */
501 void
502 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   g_mutex_lock (&priv->lock);
511   g_free (priv->control);
512   priv->control = g_strdup (control);
513   g_mutex_unlock (&priv->lock);
514 }
515
516 /**
517  * gst_rtsp_stream_has_control:
518  * @stream: a #GstRTSPStream
519  * @control: a control string
520  *
521  * Check if @stream has the control string @control.
522  *
523  * Returns: %TRUE is @stream has @control as the control string
524  */
525 gboolean
526 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
527 {
528   GstRTSPStreamPrivate *priv;
529   gboolean res;
530
531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
532
533   priv = stream->priv;
534
535   g_mutex_lock (&priv->lock);
536   if (priv->control)
537     res = (g_strcmp0 (priv->control, control) == 0);
538   else {
539     guint streamid;
540
541     if (sscanf (control, "stream=%u", &streamid) > 0)
542       res = (streamid == priv->idx);
543     else
544       res = FALSE;
545   }
546   g_mutex_unlock (&priv->lock);
547
548   return res;
549 }
550
551 /**
552  * gst_rtsp_stream_set_mtu:
553  * @stream: a #GstRTSPStream
554  * @mtu: a new MTU
555  *
556  * Configure the mtu in the payloader of @stream to @mtu.
557  */
558 void
559 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
560 {
561   GstRTSPStreamPrivate *priv;
562
563   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
564
565   priv = stream->priv;
566
567   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
568
569   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
570 }
571
572 /**
573  * gst_rtsp_stream_get_mtu:
574  * @stream: a #GstRTSPStream
575  *
576  * Get the configured MTU in the payloader of @stream.
577  *
578  * Returns: the MTU of the payloader.
579  */
580 guint
581 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
582 {
583   GstRTSPStreamPrivate *priv;
584   guint mtu;
585
586   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
587
588   priv = stream->priv;
589
590   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
591
592   return mtu;
593 }
594
595 /* Update the dscp qos property on the udp sinks */
596 static void
597 update_dscp_qos (GstRTSPStream * stream)
598 {
599   GstRTSPStreamPrivate *priv;
600
601   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
602
603   priv = stream->priv;
604
605   if (priv->udpsink[0]) {
606     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
607         NULL);
608   }
609
610   if (priv->udpsink[1]) {
611     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
612         NULL);
613   }
614 }
615
616 /**
617  * gst_rtsp_stream_set_dscp_qos:
618  * @stream: a #GstRTSPStream
619  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
620  *
621  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
622  */
623 void
624 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
625 {
626   GstRTSPStreamPrivate *priv;
627
628   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
629
630   priv = stream->priv;
631
632   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
633
634   if (dscp_qos < -1 || dscp_qos > 63) {
635     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
636     return;
637   }
638
639   priv->dscp_qos = dscp_qos;
640
641   update_dscp_qos (stream);
642 }
643
644 /**
645  * gst_rtsp_stream_get_dscp_qos:
646  * @stream: a #GstRTSPStream
647  *
648  * Get the configured DSCP QoS in of the outgoing sockets.
649  *
650  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
651  */
652 gint
653 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
654 {
655   GstRTSPStreamPrivate *priv;
656
657   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
658
659   priv = stream->priv;
660
661   return priv->dscp_qos;
662 }
663
664 /**
665  * gst_rtsp_stream_is_transport_supported:
666  * @stream: a #GstRTSPStream
667  * @transport: (transfer none): a #GstRTSPTransport
668  *
669  * Check if @transport can be handled by stream
670  *
671  * Returns: %TRUE if @transport can be handled by @stream.
672  */
673 gboolean
674 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
675     GstRTSPTransport * transport)
676 {
677   GstRTSPStreamPrivate *priv;
678
679   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
680
681   priv = stream->priv;
682
683   g_mutex_lock (&priv->lock);
684   if (transport->trans != GST_RTSP_TRANS_RTP)
685     goto unsupported_transmode;
686
687   if (!(transport->profile & priv->profiles))
688     goto unsupported_profile;
689
690   if (!(transport->lower_transport & priv->protocols))
691     goto unsupported_ltrans;
692
693   g_mutex_unlock (&priv->lock);
694
695   return TRUE;
696
697   /* ERRORS */
698 unsupported_transmode:
699   {
700     GST_DEBUG ("unsupported transport mode %d", transport->trans);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_profile:
705   {
706     GST_DEBUG ("unsupported profile %d", transport->profile);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 unsupported_ltrans:
711   {
712     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
713     g_mutex_unlock (&priv->lock);
714     return FALSE;
715   }
716 }
717
718 /**
719  * gst_rtsp_stream_set_profiles:
720  * @stream: a #GstRTSPStream
721  * @profiles: the new profiles
722  *
723  * Configure the allowed profiles for @stream.
724  */
725 void
726 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
727 {
728   GstRTSPStreamPrivate *priv;
729
730   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
731
732   priv = stream->priv;
733
734   g_mutex_lock (&priv->lock);
735   priv->profiles = profiles;
736   g_mutex_unlock (&priv->lock);
737 }
738
739 /**
740  * gst_rtsp_stream_get_profiles:
741  * @stream: a #GstRTSPStream
742  *
743  * Get the allowed profiles of @stream.
744  *
745  * Returns: a #GstRTSPProfile
746  */
747 GstRTSPProfile
748 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
749 {
750   GstRTSPStreamPrivate *priv;
751   GstRTSPProfile res;
752
753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
754
755   priv = stream->priv;
756
757   g_mutex_lock (&priv->lock);
758   res = priv->profiles;
759   g_mutex_unlock (&priv->lock);
760
761   return res;
762 }
763
764 /**
765  * gst_rtsp_stream_set_protocols:
766  * @stream: a #GstRTSPStream
767  * @protocols: the new flags
768  *
769  * Configure the allowed lower transport for @stream.
770  */
771 void
772 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
773     GstRTSPLowerTrans protocols)
774 {
775   GstRTSPStreamPrivate *priv;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   g_mutex_lock (&priv->lock);
782   priv->protocols = protocols;
783   g_mutex_unlock (&priv->lock);
784 }
785
786 /**
787  * gst_rtsp_stream_get_protocols:
788  * @stream: a #GstRTSPStream
789  *
790  * Get the allowed protocols of @stream.
791  *
792  * Returns: a #GstRTSPLowerTrans
793  */
794 GstRTSPLowerTrans
795 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
796 {
797   GstRTSPStreamPrivate *priv;
798   GstRTSPLowerTrans res;
799
800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
801       GST_RTSP_LOWER_TRANS_UNKNOWN);
802
803   priv = stream->priv;
804
805   g_mutex_lock (&priv->lock);
806   res = priv->protocols;
807   g_mutex_unlock (&priv->lock);
808
809   return res;
810 }
811
812 /**
813  * gst_rtsp_stream_set_address_pool:
814  * @stream: a #GstRTSPStream
815  * @pool: (transfer none): a #GstRTSPAddressPool
816  *
817  * configure @pool to be used as the address pool of @stream.
818  */
819 void
820 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
821     GstRTSPAddressPool * pool)
822 {
823   GstRTSPStreamPrivate *priv;
824   GstRTSPAddressPool *old;
825
826   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
827
828   priv = stream->priv;
829
830   GST_LOG_OBJECT (stream, "set address pool %p", pool);
831
832   g_mutex_lock (&priv->lock);
833   if ((old = priv->pool) != pool)
834     priv->pool = pool ? g_object_ref (pool) : NULL;
835   else
836     old = NULL;
837   g_mutex_unlock (&priv->lock);
838
839   if (old)
840     g_object_unref (old);
841 }
842
843 /**
844  * gst_rtsp_stream_get_address_pool:
845  * @stream: a #GstRTSPStream
846  *
847  * Get the #GstRTSPAddressPool used as the address pool of @stream.
848  *
849  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
850  * usage.
851  */
852 GstRTSPAddressPool *
853 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
854 {
855   GstRTSPStreamPrivate *priv;
856   GstRTSPAddressPool *result;
857
858   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
859
860   priv = stream->priv;
861
862   g_mutex_lock (&priv->lock);
863   if ((result = priv->pool))
864     g_object_ref (result);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868 }
869
870 /**
871  * gst_rtsp_stream_get_multicast_address:
872  * @stream: a #GstRTSPStream
873  * @family: the #GSocketFamily
874  *
875  * Get the multicast address of @stream for @family.
876  *
877  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
878  * or %NULL when no address could be allocated. gst_rtsp_address_free()
879  * after usage.
880  */
881 GstRTSPAddress *
882 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
883     GSocketFamily family)
884 {
885   GstRTSPStreamPrivate *priv;
886   GstRTSPAddress *result;
887   GstRTSPAddress **addrp;
888   GstRTSPAddressFlags flags;
889
890   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
891
892   priv = stream->priv;
893
894   if (family == G_SOCKET_FAMILY_IPV6) {
895     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
896     addrp = &priv->addr_v6;
897   } else {
898     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
899     addrp = &priv->addr_v4;
900   }
901
902   g_mutex_lock (&priv->lock);
903   if (*addrp == NULL) {
904     if (priv->pool == NULL)
905       goto no_pool;
906
907     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
908
909     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
910     if (*addrp == NULL)
911       goto no_address;
912   }
913   result = gst_rtsp_address_copy (*addrp);
914   g_mutex_unlock (&priv->lock);
915
916   return result;
917
918   /* ERRORS */
919 no_pool:
920   {
921     GST_ERROR_OBJECT (stream, "no address pool specified");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 no_address:
926   {
927     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
928     g_mutex_unlock (&priv->lock);
929     return NULL;
930   }
931 }
932
933 /**
934  * gst_rtsp_stream_reserve_address:
935  * @stream: a #GstRTSPStream
936  * @address: an address
937  * @port: a port
938  * @n_ports: n_ports
939  * @ttl: a TTL
940  *
941  * Reserve @address and @port as the address and port of @stream.
942  *
943  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
944  * the address could be reserved. gst_rtsp_address_free() after usage.
945  */
946 GstRTSPAddress *
947 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
948     const gchar * address, guint port, guint n_ports, guint ttl)
949 {
950   GstRTSPStreamPrivate *priv;
951   GstRTSPAddress *result;
952   GInetAddress *addr;
953   GSocketFamily family;
954   GstRTSPAddress **addrp;
955
956   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
957   g_return_val_if_fail (address != NULL, NULL);
958   g_return_val_if_fail (port > 0, NULL);
959   g_return_val_if_fail (n_ports > 0, NULL);
960   g_return_val_if_fail (ttl > 0, NULL);
961
962   priv = stream->priv;
963
964   addr = g_inet_address_new_from_string (address);
965   if (!addr) {
966     GST_ERROR ("failed to get inet addr from %s", address);
967     family = G_SOCKET_FAMILY_IPV4;
968   } else {
969     family = g_inet_address_get_family (addr);
970     g_object_unref (addr);
971   }
972
973   if (family == G_SOCKET_FAMILY_IPV6)
974     addrp = &priv->addr_v6;
975   else
976     addrp = &priv->addr_v4;
977
978   g_mutex_lock (&priv->lock);
979   if (*addrp == NULL) {
980     GstRTSPAddressPoolResult res;
981
982     if (priv->pool == NULL)
983       goto no_pool;
984
985     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
986         port, n_ports, ttl, addrp);
987     if (res != GST_RTSP_ADDRESS_POOL_OK)
988       goto no_address;
989   } else {
990     if (strcmp ((*addrp)->address, address) ||
991         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
992         (*addrp)->ttl != ttl)
993       goto different_address;
994   }
995   result = gst_rtsp_address_copy (*addrp);
996   g_mutex_unlock (&priv->lock);
997
998   return result;
999
1000   /* ERRORS */
1001 no_pool:
1002   {
1003     GST_ERROR_OBJECT (stream, "no address pool specified");
1004     g_mutex_unlock (&priv->lock);
1005     return NULL;
1006   }
1007 no_address:
1008   {
1009     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1010         address);
1011     g_mutex_unlock (&priv->lock);
1012     return NULL;
1013   }
1014 different_address:
1015   {
1016     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1017         " reserved", address);
1018     g_mutex_unlock (&priv->lock);
1019     return NULL;
1020   }
1021 }
1022
1023 /* must be called with lock */
1024 static gboolean
1025 create_and_configure_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1026     GSocket * rtcp_socket, GSocketFamily family)
1027 {
1028   GstRTSPStreamPrivate *priv = stream->priv;
1029   GstElement *udpsink0, *udpsink1;
1030   const gchar *multisink_socket;
1031
1032   if (family == G_SOCKET_FAMILY_IPV6)
1033     multisink_socket = "socket-v6";
1034   else
1035     multisink_socket = "socket";
1036
1037   udpsink0 = NULL;
1038   udpsink1 = NULL;
1039
1040   if (priv->udpsink[0])
1041     udpsink0 = priv->udpsink[0];
1042   else
1043     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1044
1045   if (!udpsink0)
1046     goto no_udp_protocol;
1047
1048   if (priv->udpsink[1])
1049     udpsink1 = priv->udpsink[1];
1050   else
1051     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1052
1053   if (!udpsink1)
1054     goto no_udp_protocol;
1055
1056   /* configure sinks */
1057
1058   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1059   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1060
1061   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1062   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1063
1064   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1065
1066   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1067   /* Needs to be async for RECORD streams, otherwise we will never go to
1068    * PLAYING because the sinks will wait for data while the udpsrc can't
1069    * provide data with timestamps in PAUSED. */
1070   if (priv->sinkpad)
1071     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1072   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1073
1074   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1075   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1076
1077   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1078   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1079
1080   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1081   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1082
1083   /* update the dscp qos field in the sinks */
1084   update_dscp_qos (stream);
1085
1086   priv->udpsink[0] = udpsink0;
1087   priv->udpsink[1] = udpsink1;
1088
1089   return TRUE;
1090
1091   /* ERRORS */
1092 no_udp_protocol:
1093   {
1094     return FALSE;
1095   }
1096 }
1097
1098 static gboolean
1099 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1100     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1101     GstRTSPAddress ** server_addr_out)
1102 {
1103   GstRTSPStreamPrivate *priv = stream->priv;
1104   GstStateChangeReturn ret;
1105   GstElement *udpsrc0, *udpsrc1;
1106   GSocket *rtp_socket = NULL;
1107   GSocket *rtcp_socket;
1108   gint tmp_rtp, tmp_rtcp;
1109   guint count;
1110   gint rtpport, rtcpport;
1111   GList *rejected_addresses = NULL;
1112   GstRTSPAddress *addr = NULL;
1113   GInetAddress *inetaddr = NULL;
1114   GSocketAddress *rtp_sockaddr = NULL;
1115   GSocketAddress *rtcp_sockaddr = NULL;
1116   GstRTSPAddressPool * pool;
1117
1118   pool = priv->pool;
1119   udpsrc0 = NULL;
1120   udpsrc1 = NULL;
1121   count = 0;
1122
1123   /* Start with random port */
1124   tmp_rtp = 0;
1125
1126   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1127       G_SOCKET_PROTOCOL_UDP, NULL);
1128   if (!rtcp_socket)
1129     goto no_udp_protocol;
1130
1131   if (*server_addr_out)
1132     gst_rtsp_address_free (*server_addr_out);
1133
1134   /* try to allocate 2 UDP ports, the RTP port should be an even
1135    * number and the RTCP port should be the next (uneven) port */
1136 again:
1137
1138   if (rtp_socket == NULL) {
1139     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1140         G_SOCKET_PROTOCOL_UDP, NULL);
1141     if (!rtp_socket)
1142       goto no_udp_protocol;
1143   }
1144
1145   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1146     GstRTSPAddressFlags flags;
1147
1148     if (addr)
1149       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1150
1151     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1152     if (family == G_SOCKET_FAMILY_IPV6)
1153       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1154     else
1155       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1156
1157     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1158
1159     if (addr == NULL)
1160       goto no_ports;
1161
1162     tmp_rtp = addr->port;
1163
1164     g_clear_object (&inetaddr);
1165     inetaddr = g_inet_address_new_from_string (addr->address);
1166   } else {
1167     if (tmp_rtp != 0) {
1168       tmp_rtp += 2;
1169       if (++count > 20)
1170         goto no_ports;
1171     }
1172
1173     if (inetaddr == NULL)
1174       inetaddr = g_inet_address_new_any (family);
1175   }
1176
1177   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1178   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1179     g_object_unref (rtp_sockaddr);
1180     goto again;
1181   }
1182   g_object_unref (rtp_sockaddr);
1183
1184   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1185   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1186     g_clear_object (&rtp_sockaddr);
1187     goto socket_error;
1188   }
1189
1190   tmp_rtp =
1191       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1192   g_object_unref (rtp_sockaddr);
1193
1194   /* check if port is even */
1195   if ((tmp_rtp & 1) != 0) {
1196     /* port not even, close and allocate another */
1197     tmp_rtp++;
1198     g_clear_object (&rtp_socket);
1199     goto again;
1200   }
1201
1202   /* set port */
1203   tmp_rtcp = tmp_rtp + 1;
1204
1205   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1206   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1207     g_object_unref (rtcp_sockaddr);
1208     g_clear_object (&rtp_socket);
1209     goto again;
1210   }
1211   g_object_unref (rtcp_sockaddr);
1212
1213   g_clear_object (&inetaddr);
1214
1215   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1216   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1217
1218   if (udpsrc0 == NULL || udpsrc1 == NULL)
1219     goto no_udp_protocol;
1220
1221   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1222   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1223
1224   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1225   if (ret == GST_STATE_CHANGE_FAILURE)
1226     goto element_error;
1227   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1228   if (ret == GST_STATE_CHANGE_FAILURE)
1229     goto element_error;
1230
1231   /* all fine, do port check */
1232   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1233   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1234
1235   /* this should not happen... */
1236   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1237     goto port_error;
1238
1239   if (!create_and_configure_udpsinks (stream, rtp_socket, rtcp_socket,
1240         family))
1241     goto no_udp_protocol;
1242
1243   /* we keep these elements, we will further configure them when the
1244    * client told us to really use the UDP ports. */
1245   udpsrc_out[0] = udpsrc0;
1246   udpsrc_out[1] = udpsrc1;
1247
1248   server_port_out->min = rtpport;
1249   server_port_out->max = rtcpport;
1250
1251   *server_addr_out = addr;
1252   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1253
1254   g_object_unref (rtp_socket);
1255   g_object_unref (rtcp_socket);
1256
1257   return TRUE;
1258
1259   /* ERRORS */
1260 no_udp_protocol:
1261   {
1262     goto cleanup;
1263   }
1264 no_ports:
1265   {
1266     goto cleanup;
1267   }
1268 port_error:
1269   {
1270     goto cleanup;
1271   }
1272 socket_error:
1273   {
1274     goto cleanup;
1275   }
1276 element_error:
1277   {
1278     goto cleanup;
1279   }
1280 cleanup:
1281   {
1282     if (udpsrc0) {
1283       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1284       gst_object_unref (udpsrc0);
1285     }
1286     if (udpsrc1) {
1287       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1288       gst_object_unref (udpsrc1);
1289     }
1290     if (inetaddr)
1291       g_object_unref (inetaddr);
1292     g_list_free_full (rejected_addresses,
1293         (GDestroyNotify) gst_rtsp_address_free);
1294     if (addr)
1295       gst_rtsp_address_free (addr);
1296     if (rtp_socket)
1297       g_object_unref (rtp_socket);
1298     if (rtcp_socket)
1299       g_object_unref (rtcp_socket);
1300     return FALSE;
1301   }
1302 }
1303
1304 /* must be called with lock */
1305 static gboolean
1306 alloc_ports (GstRTSPStream * stream)
1307 {
1308   GstRTSPStreamPrivate *priv = stream->priv;
1309
1310   priv->have_ipv4 =
1311       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1312           &priv->server_port_v4, &priv->server_addr_v4);
1313
1314   priv->have_ipv6 =
1315       alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1316           &priv->server_port_v6, &priv->server_addr_v6);
1317
1318   return priv->have_ipv4 || priv->have_ipv6;
1319 }
1320
1321 /**
1322  * gst_rtsp_stream_set_client_side:
1323  * @stream: a #GstRTSPStream
1324  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1325  * an RTSP connection.
1326  *
1327  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1328  * streams to an RTSP server via RECORD. This has the practical effect
1329  * of changing which UDP port numbers are used when setting up the local
1330  * side of the stream sending to be either the 'server' or 'client' pair
1331  * of a configured UDP transport.
1332  */
1333 void
1334 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1335 {
1336   GstRTSPStreamPrivate *priv;
1337
1338   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1339   priv = stream->priv;
1340   g_mutex_lock (&priv->lock);
1341   priv->client_side = client_side;
1342   g_mutex_unlock (&priv->lock);
1343 }
1344
1345 /**
1346  * gst_rtsp_stream_set_client_side:
1347  * @stream: a #GstRTSPStream
1348  *
1349  * See gst_rtsp_stream_set_client_side()
1350  *
1351  * Returns: TRUE if this #GstRTSPStream is client-side.
1352  */
1353 gboolean
1354 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1355 {
1356   GstRTSPStreamPrivate *priv;
1357   gboolean ret;
1358
1359   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1360
1361   priv = stream->priv;
1362   g_mutex_lock (&priv->lock);
1363   ret = priv->client_side;
1364   g_mutex_unlock (&priv->lock);
1365
1366   return ret;
1367 }
1368
1369 /**
1370  * gst_rtsp_stream_get_server_port:
1371  * @stream: a #GstRTSPStream
1372  * @server_port: (out): result server port
1373  * @family: the port family to get
1374  *
1375  * Fill @server_port with the port pair used by the server. This function can
1376  * only be called when @stream has been joined.
1377  */
1378 void
1379 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1380     GstRTSPRange * server_port, GSocketFamily family)
1381 {
1382   GstRTSPStreamPrivate *priv;
1383
1384   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1385   priv = stream->priv;
1386   g_return_if_fail (priv->is_joined);
1387
1388   g_mutex_lock (&priv->lock);
1389   if (family == G_SOCKET_FAMILY_IPV4) {
1390     if (server_port)
1391       *server_port = priv->server_port_v4;
1392   } else {
1393     if (server_port)
1394       *server_port = priv->server_port_v6;
1395   }
1396   g_mutex_unlock (&priv->lock);
1397 }
1398
1399 /**
1400  * gst_rtsp_stream_get_rtpsession:
1401  * @stream: a #GstRTSPStream
1402  *
1403  * Get the RTP session of this stream.
1404  *
1405  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1406  */
1407 GObject *
1408 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1409 {
1410   GstRTSPStreamPrivate *priv;
1411   GObject *session;
1412
1413   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1414
1415   priv = stream->priv;
1416
1417   g_mutex_lock (&priv->lock);
1418   if ((session = priv->session))
1419     g_object_ref (session);
1420   g_mutex_unlock (&priv->lock);
1421
1422   return session;
1423 }
1424
1425 /**
1426  * gst_rtsp_stream_get_ssrc:
1427  * @stream: a #GstRTSPStream
1428  * @ssrc: (out): result ssrc
1429  *
1430  * Get the SSRC used by the RTP session of this stream. This function can only
1431  * be called when @stream has been joined.
1432  */
1433 void
1434 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1435 {
1436   GstRTSPStreamPrivate *priv;
1437
1438   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1439   priv = stream->priv;
1440   g_return_if_fail (priv->is_joined);
1441
1442   g_mutex_lock (&priv->lock);
1443   if (ssrc && priv->session)
1444     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1445   g_mutex_unlock (&priv->lock);
1446 }
1447
1448 /**
1449  * gst_rtsp_stream_set_retransmission_time:
1450  * @stream: a #GstRTSPStream
1451  * @time: a #GstClockTime
1452  *
1453  * Set the amount of time to store retransmission packets.
1454  */
1455 void
1456 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1457     GstClockTime time)
1458 {
1459   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1460
1461   g_mutex_lock (&stream->priv->lock);
1462   stream->priv->rtx_time = time;
1463   if (stream->priv->rtxsend)
1464     g_object_set (stream->priv->rtxsend, "max-size-time",
1465         GST_TIME_AS_MSECONDS (time), NULL);
1466   g_mutex_unlock (&stream->priv->lock);
1467 }
1468
1469 /**
1470  * gst_rtsp_stream_get_retransmission_time:
1471  * @stream: a #GstRTSPStream
1472  *
1473  * Get the amount of time to store retransmission data.
1474  *
1475  * Returns: the amount of time to store retransmission data.
1476  */
1477 GstClockTime
1478 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1479 {
1480   GstClockTime ret;
1481
1482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1483
1484   g_mutex_lock (&stream->priv->lock);
1485   ret = stream->priv->rtx_time;
1486   g_mutex_unlock (&stream->priv->lock);
1487
1488   return ret;
1489 }
1490
1491 /**
1492  * gst_rtsp_stream_set_retransmission_pt:
1493  * @stream: a #GstRTSPStream
1494  * @rtx_pt: a #guint
1495  *
1496  * Set the payload type (pt) for retransmission of this stream.
1497  */
1498 void
1499 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1500 {
1501   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1502
1503   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1504
1505   g_mutex_lock (&stream->priv->lock);
1506   stream->priv->rtx_pt = rtx_pt;
1507   if (stream->priv->rtxsend) {
1508     guint pt = gst_rtsp_stream_get_pt (stream);
1509     gchar *pt_s = g_strdup_printf ("%d", pt);
1510     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1511         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1512     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1513     g_free (pt_s);
1514     gst_structure_free (rtx_pt_map);
1515   }
1516   g_mutex_unlock (&stream->priv->lock);
1517 }
1518
1519 /**
1520  * gst_rtsp_stream_get_retransmission_pt:
1521  * @stream: a #GstRTSPStream
1522  *
1523  * Get the payload-type used for retransmission of this stream
1524  *
1525  * Returns: The retransmission PT.
1526  */
1527 guint
1528 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1529 {
1530   guint rtx_pt;
1531
1532   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1533
1534   g_mutex_lock (&stream->priv->lock);
1535   rtx_pt = stream->priv->rtx_pt;
1536   g_mutex_unlock (&stream->priv->lock);
1537
1538   return rtx_pt;
1539 }
1540
1541 /**
1542  * gst_rtsp_stream_set_buffer_size:
1543  * @stream: a #GstRTSPStream
1544  * @size: the buffer size
1545  *
1546  * Set the size of the UDP transmission buffer (in bytes)
1547  * Needs to be set before the stream is joined to a bin.
1548  *
1549  * Since: 1.6
1550  */
1551 void
1552 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1553 {
1554   g_mutex_lock (&stream->priv->lock);
1555   stream->priv->buffer_size = size;
1556   g_mutex_unlock (&stream->priv->lock);
1557 }
1558
1559 /**
1560  * gst_rtsp_stream_get_buffer_size:
1561  * @stream: a #GstRTSPStream
1562  *
1563  * Get the size of the UDP transmission buffer (in bytes)
1564  *
1565  * Returns: the size of the UDP TX buffer
1566  *
1567  * Since: 1.6
1568  */
1569 guint
1570 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1571 {
1572   guint buffer_size;
1573
1574   g_mutex_lock (&stream->priv->lock);
1575   buffer_size = stream->priv->buffer_size;
1576   g_mutex_unlock (&stream->priv->lock);
1577
1578   return buffer_size;
1579 }
1580
1581 /* executed from streaming thread */
1582 static void
1583 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1584 {
1585   GstRTSPStreamPrivate *priv = stream->priv;
1586   GstCaps *newcaps, *oldcaps;
1587
1588   newcaps = gst_pad_get_current_caps (pad);
1589
1590   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1591       newcaps);
1592
1593   g_mutex_lock (&priv->lock);
1594   oldcaps = priv->caps;
1595   priv->caps = newcaps;
1596   g_mutex_unlock (&priv->lock);
1597
1598   if (oldcaps)
1599     gst_caps_unref (oldcaps);
1600 }
1601
1602 static void
1603 dump_structure (const GstStructure * s)
1604 {
1605   gchar *sstr;
1606
1607   sstr = gst_structure_to_string (s);
1608   GST_INFO ("structure: %s", sstr);
1609   g_free (sstr);
1610 }
1611
1612 static GstRTSPStreamTransport *
1613 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1614 {
1615   GstRTSPStreamPrivate *priv = stream->priv;
1616   GList *walk;
1617   GstRTSPStreamTransport *result = NULL;
1618   const gchar *tmp;
1619   gchar *dest;
1620   guint port;
1621
1622   if (rtcp_from == NULL)
1623     return NULL;
1624
1625   tmp = g_strrstr (rtcp_from, ":");
1626   if (tmp == NULL)
1627     return NULL;
1628
1629   port = atoi (tmp + 1);
1630   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1631
1632   g_mutex_lock (&priv->lock);
1633   GST_INFO ("finding %s:%d in %d transports", dest, port,
1634       g_list_length (priv->transports));
1635
1636   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1637     GstRTSPStreamTransport *trans = walk->data;
1638     const GstRTSPTransport *tr;
1639     gint min, max;
1640
1641     tr = gst_rtsp_stream_transport_get_transport (trans);
1642
1643     if (priv->client_side) {
1644       /* In client side mode the 'destination' is the RTSP server, so send
1645        * to those ports */
1646       min = tr->server_port.min;
1647       max = tr->server_port.max;
1648     } else {
1649       min = tr->client_port.min;
1650       max = tr->client_port.max;
1651     }
1652
1653     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1654       result = trans;
1655       break;
1656     }
1657   }
1658   if (result)
1659     g_object_ref (result);
1660   g_mutex_unlock (&priv->lock);
1661
1662   g_free (dest);
1663
1664   return result;
1665 }
1666
1667 static GstRTSPStreamTransport *
1668 check_transport (GObject * source, GstRTSPStream * stream)
1669 {
1670   GstStructure *stats;
1671   GstRTSPStreamTransport *trans;
1672
1673   /* see if we have a stream to match with the origin of the RTCP packet */
1674   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1675   if (trans == NULL) {
1676     g_object_get (source, "stats", &stats, NULL);
1677     if (stats) {
1678       const gchar *rtcp_from;
1679
1680       dump_structure (stats);
1681
1682       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1683       if ((trans = find_transport (stream, rtcp_from))) {
1684         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1685             source);
1686         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1687             g_object_unref);
1688       }
1689       gst_structure_free (stats);
1690     }
1691   }
1692   return trans;
1693 }
1694
1695
1696 static void
1697 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1698 {
1699   GstRTSPStreamTransport *trans;
1700
1701   GST_INFO ("%p: new source %p", stream, source);
1702
1703   trans = check_transport (source, stream);
1704
1705   if (trans)
1706     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1707 }
1708
1709 static void
1710 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1711 {
1712   GST_INFO ("%p: new SDES %p", stream, source);
1713 }
1714
1715 static void
1716 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1717 {
1718   GstRTSPStreamTransport *trans;
1719
1720   trans = check_transport (source, stream);
1721
1722   if (trans) {
1723     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1724     gst_rtsp_stream_transport_keep_alive (trans);
1725   }
1726 #ifdef DUMP_STATS
1727   {
1728     GstStructure *stats;
1729     g_object_get (source, "stats", &stats, NULL);
1730     if (stats) {
1731       dump_structure (stats);
1732       gst_structure_free (stats);
1733     }
1734   }
1735 #endif
1736 }
1737
1738 static void
1739 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1740 {
1741   GST_INFO ("%p: source %p bye", stream, source);
1742 }
1743
1744 static void
1745 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1746 {
1747   GstRTSPStreamTransport *trans;
1748
1749   GST_INFO ("%p: source %p bye timeout", stream, source);
1750
1751   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1752     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1753     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1754   }
1755 }
1756
1757 static void
1758 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1759 {
1760   GstRTSPStreamTransport *trans;
1761
1762   GST_INFO ("%p: source %p timeout", stream, source);
1763
1764   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1765     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1766     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1767   }
1768 }
1769
1770 static void
1771 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1772 {
1773   GST_INFO ("%p: new sender source %p", stream, source);
1774 #ifndef DUMP_STATS
1775   {
1776     GstStructure *stats;
1777     g_object_get (source, "stats", &stats, NULL);
1778     if (stats) {
1779       dump_structure (stats);
1780       gst_structure_free (stats);
1781     }
1782   }
1783 #endif
1784 }
1785
1786 static void
1787 on_sender_ssrc_active (GObject * session, GObject * source,
1788     GstRTSPStream * stream)
1789 {
1790 #ifndef DUMP_STATS
1791   {
1792     GstStructure *stats;
1793     g_object_get (source, "stats", &stats, NULL);
1794     if (stats) {
1795       dump_structure (stats);
1796       gst_structure_free (stats);
1797     }
1798   }
1799 #endif
1800 }
1801
1802 static void
1803 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1804 {
1805   if (is_rtp) {
1806     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1807     g_list_free (priv->tr_cache_rtp);
1808     priv->tr_cache_rtp = NULL;
1809   } else {
1810     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1811     g_list_free (priv->tr_cache_rtcp);
1812     priv->tr_cache_rtcp = NULL;
1813   }
1814 }
1815
1816 static GstFlowReturn
1817 handle_new_sample (GstAppSink * sink, gpointer user_data)
1818 {
1819   GstRTSPStreamPrivate *priv;
1820   GList *walk;
1821   GstSample *sample;
1822   GstBuffer *buffer;
1823   GstRTSPStream *stream;
1824   gboolean is_rtp;
1825
1826   sample = gst_app_sink_pull_sample (sink);
1827   if (!sample)
1828     return GST_FLOW_OK;
1829
1830   stream = (GstRTSPStream *) user_data;
1831   priv = stream->priv;
1832   buffer = gst_sample_get_buffer (sample);
1833
1834   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1835
1836   g_mutex_lock (&priv->lock);
1837   if (is_rtp) {
1838     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1839       clear_tr_cache (priv, is_rtp);
1840       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1841         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1842         priv->tr_cache_rtp =
1843             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1844       }
1845       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1846     }
1847   } else {
1848     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1849       clear_tr_cache (priv, is_rtp);
1850       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1851         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1852         priv->tr_cache_rtcp =
1853             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1854       }
1855       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1856     }
1857   }
1858   g_mutex_unlock (&priv->lock);
1859
1860   if (is_rtp) {
1861     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1862       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1863       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1864     }
1865   } else {
1866     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1867       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1868       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1869     }
1870   }
1871   gst_sample_unref (sample);
1872
1873   return GST_FLOW_OK;
1874 }
1875
1876 static GstAppSinkCallbacks sink_cb = {
1877   NULL,                         /* not interested in EOS */
1878   NULL,                         /* not interested in preroll samples */
1879   handle_new_sample,
1880 };
1881
1882 static GstElement *
1883 get_rtp_encoder (GstRTSPStream * stream, guint session)
1884 {
1885   GstRTSPStreamPrivate *priv = stream->priv;
1886
1887   if (priv->srtpenc == NULL) {
1888     gchar *name;
1889
1890     name = g_strdup_printf ("srtpenc_%u", session);
1891     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1892     g_free (name);
1893
1894     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1895   }
1896   return gst_object_ref (priv->srtpenc);
1897 }
1898
1899 static GstElement *
1900 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1901 {
1902   GstRTSPStreamPrivate *priv = stream->priv;
1903   GstElement *oldenc, *enc;
1904   GstPad *pad;
1905   gchar *name;
1906
1907   if (priv->idx != session)
1908     return NULL;
1909
1910   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1911
1912   oldenc = priv->srtpenc;
1913   enc = get_rtp_encoder (stream, session);
1914   name = g_strdup_printf ("rtp_sink_%d", session);
1915   pad = gst_element_get_request_pad (enc, name);
1916   g_free (name);
1917   gst_object_unref (pad);
1918
1919   if (oldenc == NULL)
1920     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1921         enc);
1922
1923   return enc;
1924 }
1925
1926 static GstElement *
1927 request_rtcp_encoder (GstElement * rtpbin, guint session,
1928     GstRTSPStream * stream)
1929 {
1930   GstRTSPStreamPrivate *priv = stream->priv;
1931   GstElement *oldenc, *enc;
1932   GstPad *pad;
1933   gchar *name;
1934
1935   if (priv->idx != session)
1936     return NULL;
1937
1938   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1939
1940   oldenc = priv->srtpenc;
1941   enc = get_rtp_encoder (stream, session);
1942   name = g_strdup_printf ("rtcp_sink_%d", session);
1943   pad = gst_element_get_request_pad (enc, name);
1944   g_free (name);
1945   gst_object_unref (pad);
1946
1947   if (oldenc == NULL)
1948     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1949         enc);
1950
1951   return enc;
1952 }
1953
1954 static GstCaps *
1955 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1956 {
1957   GstRTSPStreamPrivate *priv = stream->priv;
1958   GstCaps *caps;
1959
1960   GST_DEBUG ("request key %08x", ssrc);
1961
1962   g_mutex_lock (&priv->lock);
1963   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1964     gst_caps_ref (caps);
1965   g_mutex_unlock (&priv->lock);
1966
1967   return caps;
1968 }
1969
1970 static GstElement *
1971 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1972     GstRTSPStream * stream)
1973 {
1974   GstRTSPStreamPrivate *priv = stream->priv;
1975
1976   if (priv->idx != session)
1977     return NULL;
1978
1979   if (priv->srtpdec == NULL) {
1980     gchar *name;
1981
1982     name = g_strdup_printf ("srtpdec_%u", session);
1983     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1984     g_free (name);
1985
1986     g_signal_connect (priv->srtpdec, "request-key",
1987         (GCallback) request_key, stream);
1988   }
1989   return gst_object_ref (priv->srtpdec);
1990 }
1991
1992 /**
1993  * gst_rtsp_stream_request_aux_sender:
1994  * @stream: a #GstRTSPStream
1995  * @sessid: the session id
1996  *
1997  * Creating a rtxsend bin
1998  *
1999  * Returns: (transfer full): a #GstElement.
2000  *
2001  * Since: 1.6
2002  */
2003 GstElement *
2004 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2005 {
2006   GstElement *bin;
2007   GstPad *pad;
2008   GstStructure *pt_map;
2009   gchar *name;
2010   guint pt, rtx_pt;
2011   gchar *pt_s;
2012
2013   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2014
2015   pt = gst_rtsp_stream_get_pt (stream);
2016   pt_s = g_strdup_printf ("%u", pt);
2017   rtx_pt = stream->priv->rtx_pt;
2018
2019   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2020
2021   bin = gst_bin_new (NULL);
2022   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2023   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2024       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2025   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2026       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2027   g_free (pt_s);
2028   gst_structure_free (pt_map);
2029   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2030
2031   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2032   name = g_strdup_printf ("src_%u", sessid);
2033   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2034   g_free (name);
2035   gst_object_unref (pad);
2036
2037   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2038   name = g_strdup_printf ("sink_%u", sessid);
2039   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2040   g_free (name);
2041   gst_object_unref (pad);
2042
2043   return bin;
2044 }
2045
2046 /**
2047  * gst_rtsp_stream_set_pt_map:
2048  * @stream: a #GstRTSPStream
2049  * @pt: the pt
2050  * @caps: a #GstCaps
2051  *
2052  * Configure a pt map between @pt and @caps.
2053  */
2054 void
2055 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2056 {
2057   GstRTSPStreamPrivate *priv = stream->priv;
2058
2059   g_mutex_lock (&priv->lock);
2060   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2061   g_mutex_unlock (&priv->lock);
2062 }
2063
2064 static GstCaps *
2065 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2066     GstRTSPStream * stream)
2067 {
2068   GstRTSPStreamPrivate *priv = stream->priv;
2069   GstCaps *caps = NULL;
2070
2071   g_mutex_lock (&priv->lock);
2072
2073   if (priv->idx == session) {
2074     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2075     if (caps) {
2076       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2077       gst_caps_ref (caps);
2078     } else {
2079       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2080     }
2081   }
2082
2083   g_mutex_unlock (&priv->lock);
2084
2085   return caps;
2086 }
2087
2088 static void
2089 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2090 {
2091   GstRTSPStreamPrivate *priv = stream->priv;
2092   gchar *name;
2093   GstPadLinkReturn ret;
2094   guint sessid;
2095
2096   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2097       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2098
2099   name = gst_pad_get_name (pad);
2100   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2101     g_free (name);
2102     return;
2103   }
2104   g_free (name);
2105
2106   if (priv->idx != sessid)
2107     return;
2108
2109   if (gst_pad_is_linked (priv->sinkpad)) {
2110     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2111         GST_DEBUG_PAD_NAME (priv->sinkpad));
2112     return;
2113   }
2114
2115   /* link the RTP pad to the session manager, it should not really fail unless
2116    * this is not really an RTP pad */
2117   ret = gst_pad_link (pad, priv->sinkpad);
2118   if (ret != GST_PAD_LINK_OK)
2119     goto link_failed;
2120   priv->recv_rtp_src = gst_object_ref (pad);
2121
2122   return;
2123
2124 /* ERRORS */
2125 link_failed:
2126   {
2127     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2128         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2129   }
2130 }
2131
2132 static void
2133 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2134     GstRTSPStream * stream)
2135 {
2136   /* TODO: What to do here other than this? */
2137   GST_DEBUG ("Stream %p: Got EOS", stream);
2138   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2139 }
2140
2141 /* must be called with lock */
2142 static void
2143 create_sender_part (GstRTSPStream * stream, GstBin * bin,
2144     GstState state)
2145 {
2146   GstRTSPStreamPrivate *priv;
2147   GstPad *pad, *sinkpad = NULL;
2148   gboolean is_tcp = FALSE, is_udp = FALSE;
2149   gint i;
2150
2151   priv = stream->priv;
2152
2153   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2154   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2155       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2156
2157   for (i = 0; i < 2; i++) {
2158     GstPad *teepad, *queuepad;
2159     /* For the sender we create this bit of pipeline for both
2160      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2161      * we need to add a queue before appsink and udpsink to make
2162      * the pipeline not block. For the TCP case, we want to pump
2163      * client as fast as possible anyway. This pipeline is used
2164      * when both TCP and UDP are present.
2165      *
2166      * .--------.      .-----.    .---------.    .---------.
2167      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2168      * |       send->sink   src->sink      src->sink       |
2169      * '--------'      |     |    '---------'    '---------'
2170      *                 |     |    .---------.    .---------.
2171      *                 |     |    |  queue  |    | appsink |
2172      *                 |    src->sink      src->sink       |
2173      *                 '-----'    '---------'    '---------'
2174      *
2175      * When only UDP or only TCP is allowed, we skip the tee and queue
2176      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2177      * the session.
2178      */
2179     /* Only link the RTP send src if we're going to send RTP, link
2180      * the RTCP send src always */
2181     if (priv->srcpad || i == 1) {
2182       if (is_udp) {
2183         /* add udpsink */
2184         gst_bin_add (bin, priv->udpsink[i]);
2185         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2186       }
2187
2188       if (is_tcp) {
2189         /* make appsink */
2190         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2191         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2192         gst_bin_add (bin, priv->appsink[i]);
2193         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2194             &sink_cb, stream, NULL);
2195       }
2196
2197       if (is_udp && is_tcp) {
2198         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2199
2200         /* make tee for RTP/RTCP */
2201         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2202         gst_bin_add (bin, priv->tee[i]);
2203
2204         /* and link to rtpbin send pad */
2205         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2206         gst_pad_link (priv->send_src[i], pad);
2207         gst_object_unref (pad);
2208
2209         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2210         g_object_set (priv->udpqueue[i], "max-size-buffers",
2211             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2212             NULL);
2213         gst_bin_add (bin, priv->udpqueue[i]);
2214         /* link tee to udpqueue */
2215         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2216         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2217         gst_pad_link (teepad, pad);
2218         gst_object_unref (pad);
2219         gst_object_unref (teepad);
2220
2221         /* link udpqueue to udpsink */
2222         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2223         gst_pad_link (queuepad, sinkpad);
2224         gst_object_unref (queuepad);
2225         gst_object_unref (sinkpad);
2226
2227         /* make appqueue */
2228         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2229         g_object_set (priv->appqueue[i], "max-size-buffers",
2230             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2231             NULL);
2232         gst_bin_add (bin, priv->appqueue[i]);
2233         /* and link tee to appqueue */
2234         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2235         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2236         gst_pad_link (teepad, pad);
2237         gst_object_unref (pad);
2238         gst_object_unref (teepad);
2239
2240         /* and link appqueue to appsink */
2241         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2242         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2243         gst_pad_link (queuepad, pad);
2244         gst_object_unref (pad);
2245         gst_object_unref (queuepad);
2246       } else if (is_tcp) {
2247         /* only appsink needed, link it to the session */
2248         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2249         gst_pad_link (priv->send_src[i], pad);
2250         gst_object_unref (pad);
2251
2252         /* when its only TCP, we need to set sync and preroll to FALSE
2253          * for the sink to avoid deadlock. And this is only needed for
2254          * sink used for RTCP data, not the RTP data. */
2255         if (i == 1)
2256           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2257       } else {
2258         /* else only udpsink needed, link it to the session */
2259         gst_pad_link (priv->send_src[i], sinkpad);
2260         gst_object_unref (sinkpad);
2261       }
2262     }
2263
2264     /* check if we need to set to a special state */
2265     if (state != GST_STATE_NULL) {
2266       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2267         gst_element_set_state (priv->udpsink[i], state);
2268       if (priv->appsink[i] && (priv->srcpad || i == 1))
2269         gst_element_set_state (priv->appsink[i], state);
2270       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2271         gst_element_set_state (priv->appqueue[i], state);
2272       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2273         gst_element_set_state (priv->udpqueue[i], state);
2274       if (priv->tee[i] && (priv->srcpad || i == 1))
2275         gst_element_set_state (priv->tee[i], state);
2276     }
2277   }
2278 }
2279
2280 /* must be called with lock */
2281 static void
2282 create_receiver_part (GstRTSPStream * stream, GstBin * bin,
2283     GstState state)
2284 {
2285   GstRTSPStreamPrivate *priv;
2286   GstPad *pad, *selpad;
2287   gboolean is_tcp;
2288   gint i;
2289
2290   priv = stream->priv;
2291
2292   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2293
2294   for (i = 0; i < 2; i++) {
2295     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2296      * RTCP sink always */
2297     if (priv->sinkpad || i == 1) {
2298       /* For the receiver we create this bit of pipeline for both
2299        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2300        * and it is all funneled into the rtpbin receive pad.
2301        *
2302        * .--------.     .--------.    .--------.
2303        * | udpsrc |     | funnel |    | rtpbin |
2304        * |       src->sink      src->sink      |
2305        * '--------'     |        |    '--------'
2306        * .--------.     |        |
2307        * | appsrc |     |        |
2308        * |       src->sink       |
2309        * '--------'     '--------'
2310        */
2311       /* make funnel for the RTP/RTCP receivers */
2312       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2313       gst_bin_add (bin, priv->funnel[i]);
2314
2315       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2316       gst_pad_link (pad, priv->recv_sink[i]);
2317       gst_object_unref (pad);
2318
2319       if (priv->udpsrc_v4[i]) {
2320         if (priv->srcpad) {
2321           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2322            * values. This is only relevant for PLAY pipelines */
2323           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2324           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2325         }
2326         /* add udpsrc */
2327         gst_bin_add (bin, priv->udpsrc_v4[i]);
2328
2329         /* and link to the funnel v4 */
2330         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2331         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2332         gst_pad_link (pad, selpad);
2333         gst_object_unref (pad);
2334         gst_object_unref (selpad);
2335       }
2336
2337       if (priv->udpsrc_v6[i]) {
2338         if (priv->srcpad) {
2339           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2340           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2341         }
2342         gst_bin_add (bin, priv->udpsrc_v6[i]);
2343
2344         /* and link to the funnel v6 */
2345         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2346         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2347         gst_pad_link (pad, selpad);
2348         gst_object_unref (pad);
2349         gst_object_unref (selpad);
2350       }
2351
2352       if (is_tcp) {
2353         /* make and add appsrc */
2354         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2355         priv->appsrc_base_time[i] = -1;
2356         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2357         gst_bin_add (bin, priv->appsrc[i]);
2358         /* and link to the funnel */
2359         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2360         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2361         gst_pad_link (pad, selpad);
2362         gst_object_unref (pad);
2363         gst_object_unref (selpad);
2364       }
2365     }
2366
2367     /* check if we need to set to a special state */
2368     if (state != GST_STATE_NULL) {
2369       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2370         gst_element_set_state (priv->funnel[i], state);
2371       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2372         gst_element_set_state (priv->appsrc[i], state);
2373     }
2374   }
2375 }
2376
2377 /**
2378  * gst_rtsp_stream_join_bin:
2379  * @stream: a #GstRTSPStream
2380  * @bin: (transfer none): a #GstBin to join
2381  * @rtpbin: (transfer none): a rtpbin element in @bin
2382  * @state: the target state of the new elements
2383  *
2384  * Join the #GstBin @bin that contains the element @rtpbin.
2385  *
2386  * @stream will link to @rtpbin, which must be inside @bin. The elements
2387  * added to @bin will be set to the state given in @state.
2388  *
2389  * Returns: %TRUE on success.
2390  */
2391 gboolean
2392 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2393     GstElement * rtpbin, GstState state)
2394 {
2395   GstRTSPStreamPrivate *priv;
2396   guint idx;
2397   gchar *name;
2398   GstPadLinkReturn ret;
2399   gboolean is_udp;
2400
2401   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2402   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2403   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2404
2405   priv = stream->priv;
2406
2407   g_mutex_lock (&priv->lock);
2408   if (priv->is_joined)
2409     goto was_joined;
2410
2411   /* create a session with the same index as the stream */
2412   idx = priv->idx;
2413
2414   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2415
2416   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2417       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2418
2419   if (is_udp && !alloc_ports (stream))
2420     goto no_ports;
2421
2422   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2423       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2424     /* For SRTP */
2425     g_signal_connect (rtpbin, "request-rtp-encoder",
2426         (GCallback) request_rtp_encoder, stream);
2427     g_signal_connect (rtpbin, "request-rtcp-encoder",
2428         (GCallback) request_rtcp_encoder, stream);
2429     g_signal_connect (rtpbin, "request-rtp-decoder",
2430         (GCallback) request_rtp_rtcp_decoder, stream);
2431     g_signal_connect (rtpbin, "request-rtcp-decoder",
2432         (GCallback) request_rtp_rtcp_decoder, stream);
2433   }
2434
2435   if (priv->sinkpad) {
2436     g_signal_connect (rtpbin, "request-pt-map",
2437         (GCallback) request_pt_map, stream);
2438   }
2439
2440   /* get pads from the RTP session element for sending and receiving
2441    * RTP/RTCP*/
2442   if (priv->srcpad) {
2443     /* get a pad for sending RTP */
2444     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2445     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2446     g_free (name);
2447
2448     /* link the RTP pad to the session manager, it should not really fail unless
2449      * this is not really an RTP pad */
2450     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2451     if (ret != GST_PAD_LINK_OK)
2452       goto link_failed;
2453
2454     name = g_strdup_printf ("send_rtp_src_%u", idx);
2455     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2456     g_free (name);
2457   } else {
2458     /* Need to connect our sinkpad from here */
2459     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2460     /* EOS */
2461     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2462
2463     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2464     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2465     g_free (name);
2466   }
2467
2468   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2469   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2470   g_free (name);
2471   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2472   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2473   g_free (name);
2474
2475   /* get the session */
2476   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2477
2478   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2479       stream);
2480   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2481       stream);
2482   g_signal_connect (priv->session, "on-ssrc-active",
2483       (GCallback) on_ssrc_active, stream);
2484   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2485       stream);
2486   g_signal_connect (priv->session, "on-bye-timeout",
2487       (GCallback) on_bye_timeout, stream);
2488   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2489       stream);
2490
2491   /* signal for sender ssrc */
2492   g_signal_connect (priv->session, "on-new-sender-ssrc",
2493       (GCallback) on_new_sender_ssrc, stream);
2494   g_signal_connect (priv->session, "on-sender-ssrc-active",
2495       (GCallback) on_sender_ssrc_active, stream);
2496
2497   create_sender_part (stream, bin, state);
2498
2499   create_receiver_part (stream, bin, state);
2500
2501   if (priv->srcpad) {
2502     /* be notified of caps changes */
2503     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2504         (GCallback) caps_notify, stream);
2505   }
2506
2507   priv->is_joined = TRUE;
2508   g_mutex_unlock (&priv->lock);
2509
2510   return TRUE;
2511
2512   /* ERRORS */
2513 was_joined:
2514   {
2515     g_mutex_unlock (&priv->lock);
2516     return TRUE;
2517   }
2518 no_ports:
2519   {
2520     g_mutex_unlock (&priv->lock);
2521     GST_WARNING ("failed to allocate ports %u", idx);
2522     return FALSE;
2523   }
2524 link_failed:
2525   {
2526     GST_WARNING ("failed to link stream %u", idx);
2527     gst_object_unref (priv->send_rtp_sink);
2528     priv->send_rtp_sink = NULL;
2529     g_mutex_unlock (&priv->lock);
2530     return FALSE;
2531   }
2532 }
2533
2534 /**
2535  * gst_rtsp_stream_leave_bin:
2536  * @stream: a #GstRTSPStream
2537  * @bin: (transfer none): a #GstBin
2538  * @rtpbin: (transfer none): a rtpbin #GstElement
2539  *
2540  * Remove the elements of @stream from @bin.
2541  *
2542  * Return: %TRUE on success.
2543  */
2544 gboolean
2545 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2546     GstElement * rtpbin)
2547 {
2548   GstRTSPStreamPrivate *priv;
2549   gint i;
2550   GList *l;
2551   gboolean is_tcp, is_udp;
2552
2553   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2554   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2555   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2556
2557   priv = stream->priv;
2558
2559   g_mutex_lock (&priv->lock);
2560   if (!priv->is_joined)
2561     goto was_not_joined;
2562
2563   /* all transports must be removed by now */
2564   if (priv->transports != NULL)
2565     goto transports_not_removed;
2566
2567   clear_tr_cache (priv, TRUE);
2568   clear_tr_cache (priv, FALSE);
2569
2570   GST_INFO ("stream %p leaving bin", stream);
2571
2572   if (priv->srcpad) {
2573     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2574
2575     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2576     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2577     gst_object_unref (priv->send_rtp_sink);
2578     priv->send_rtp_sink = NULL;
2579   } else if (priv->recv_rtp_src) {
2580     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2581     gst_object_unref (priv->recv_rtp_src);
2582     priv->recv_rtp_src = NULL;
2583   }
2584
2585   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2586
2587   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2588       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2589
2590
2591   for (i = 0; i < 2; i++) {
2592     if (priv->udpsink[i])
2593       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2594     if (priv->appsink[i])
2595       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2596     if (priv->appqueue[i])
2597       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2598     if (priv->udpqueue[i])
2599       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2600     if (priv->tee[i])
2601       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2602     if (priv->funnel[i])
2603       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2604     if (priv->appsrc[i])
2605       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2606
2607     if (priv->udpsrc_v4[i]) {
2608       if (priv->sinkpad || i == 1) {
2609         /* and set udpsrc to NULL now before removing */
2610         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2611         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2612         /* removing them should also nicely release the request
2613          * pads when they finalize */
2614         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2615       } else {
2616         /* we need to set the state to NULL before unref */
2617         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2618         gst_object_unref (priv->udpsrc_v4[i]);
2619       }
2620     }
2621
2622     if (priv->udpsrc_v6[i]) {
2623       if (priv->sinkpad || i == 1) {
2624         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2625         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2626         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2627       } else {
2628         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2629         gst_object_unref (priv->udpsrc_v6[i]);
2630       }
2631     }
2632
2633     for (l = priv->transport_sources; l; l = l->next) {
2634       GstRTSPMulticastTransportSource *s = l->data;
2635
2636       if (!s->udpsrc[i])
2637         continue;
2638
2639       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2640       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2641       gst_bin_remove (bin, s->udpsrc[i]);
2642     }
2643
2644     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2645       gst_bin_remove (bin, priv->udpsink[i]);
2646     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2647       gst_bin_remove (bin, priv->appsrc[i]);
2648     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2649       gst_bin_remove (bin, priv->appsink[i]);
2650     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2651       gst_bin_remove (bin, priv->appqueue[i]);
2652     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2653       gst_bin_remove (bin, priv->udpqueue[i]);
2654     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2655       gst_bin_remove (bin, priv->tee[i]);
2656     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2657       gst_bin_remove (bin, priv->funnel[i]);
2658
2659     if (priv->sinkpad || i == 1) {
2660       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2661       gst_object_unref (priv->recv_sink[i]);
2662       priv->recv_sink[i] = NULL;
2663     }
2664
2665     priv->udpsrc_v4[i] = NULL;
2666     priv->udpsrc_v6[i] = NULL;
2667     priv->udpsink[i] = NULL;
2668     priv->appsrc[i] = NULL;
2669     priv->appsink[i] = NULL;
2670     priv->appqueue[i] = NULL;
2671     priv->udpqueue[i] = NULL;
2672     priv->tee[i] = NULL;
2673     priv->funnel[i] = NULL;
2674   }
2675
2676   for (l = priv->transport_sources; l; l = l->next) {
2677     GstRTSPMulticastTransportSource *s = l->data;
2678     g_slice_free (GstRTSPMulticastTransportSource, s);
2679   }
2680   g_list_free (priv->transport_sources);
2681   priv->transport_sources = NULL;
2682
2683   if (priv->srcpad) {
2684     gst_object_unref (priv->send_src[0]);
2685     priv->send_src[0] = NULL;
2686   }
2687
2688   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2689   gst_object_unref (priv->send_src[1]);
2690   priv->send_src[1] = NULL;
2691
2692   g_object_unref (priv->session);
2693   priv->session = NULL;
2694   if (priv->caps)
2695     gst_caps_unref (priv->caps);
2696   priv->caps = NULL;
2697
2698   if (priv->srtpenc)
2699     gst_object_unref (priv->srtpenc);
2700   if (priv->srtpdec)
2701     gst_object_unref (priv->srtpdec);
2702
2703   priv->is_joined = FALSE;
2704   g_mutex_unlock (&priv->lock);
2705
2706   return TRUE;
2707
2708 was_not_joined:
2709   {
2710     g_mutex_unlock (&priv->lock);
2711     return TRUE;
2712   }
2713 transports_not_removed:
2714   {
2715     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2716     g_mutex_unlock (&priv->lock);
2717     return FALSE;
2718   }
2719 }
2720
2721 /**
2722  * gst_rtsp_stream_get_rtpinfo:
2723  * @stream: a #GstRTSPStream
2724  * @rtptime: (allow-none): result RTP timestamp
2725  * @seq: (allow-none): result RTP seqnum
2726  * @clock_rate: (allow-none): the clock rate
2727  * @running_time: (allow-none): result running-time
2728  *
2729  * Retrieve the current rtptime, seq and running-time. This is used to
2730  * construct a RTPInfo reply header.
2731  *
2732  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2733  */
2734 gboolean
2735 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2736     guint * rtptime, guint * seq, guint * clock_rate,
2737     GstClockTime * running_time)
2738 {
2739   GstRTSPStreamPrivate *priv;
2740   GstStructure *stats;
2741   GObjectClass *payobjclass;
2742
2743   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2744
2745   priv = stream->priv;
2746
2747   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2748
2749   g_mutex_lock (&priv->lock);
2750
2751   /* First try to extract the information from the last buffer on the sinks.
2752    * This will have a more accurate sequence number and timestamp, as between
2753    * the payloader and the sink there can be some queues
2754    */
2755   if (priv->udpsink[0] || priv->appsink[0]) {
2756     GstSample *last_sample;
2757
2758     if (priv->udpsink[0])
2759       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2760     else
2761       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2762
2763     if (last_sample) {
2764       GstCaps *caps;
2765       GstBuffer *buffer;
2766       GstSegment *segment;
2767       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2768
2769       caps = gst_sample_get_caps (last_sample);
2770       buffer = gst_sample_get_buffer (last_sample);
2771       segment = gst_sample_get_segment (last_sample);
2772
2773       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2774         if (seq) {
2775           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2776         }
2777
2778         if (rtptime) {
2779           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2780         }
2781
2782         gst_rtp_buffer_unmap (&rtp_buffer);
2783
2784         if (running_time) {
2785           *running_time =
2786               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2787               GST_BUFFER_TIMESTAMP (buffer));
2788         }
2789
2790         if (clock_rate) {
2791           GstStructure *s = gst_caps_get_structure (caps, 0);
2792
2793           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2794
2795           if (*clock_rate == 0 && running_time)
2796             *running_time = GST_CLOCK_TIME_NONE;
2797         }
2798         gst_sample_unref (last_sample);
2799
2800         goto done;
2801       } else {
2802         gst_sample_unref (last_sample);
2803       }
2804     }
2805   }
2806
2807   if (g_object_class_find_property (payobjclass, "stats")) {
2808     g_object_get (priv->payloader, "stats", &stats, NULL);
2809     if (stats == NULL)
2810       goto no_stats;
2811
2812     if (seq)
2813       gst_structure_get_uint (stats, "seqnum", seq);
2814
2815     if (rtptime)
2816       gst_structure_get_uint (stats, "timestamp", rtptime);
2817
2818     if (running_time)
2819       gst_structure_get_clock_time (stats, "running-time", running_time);
2820
2821     if (clock_rate) {
2822       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2823       if (*clock_rate == 0 && running_time)
2824         *running_time = GST_CLOCK_TIME_NONE;
2825     }
2826     gst_structure_free (stats);
2827   } else {
2828     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2829         !g_object_class_find_property (payobjclass, "timestamp"))
2830       goto no_stats;
2831
2832     if (seq)
2833       g_object_get (priv->payloader, "seqnum", seq, NULL);
2834
2835     if (rtptime)
2836       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2837
2838     if (running_time)
2839       *running_time = GST_CLOCK_TIME_NONE;
2840   }
2841
2842 done:
2843   g_mutex_unlock (&priv->lock);
2844
2845   return TRUE;
2846
2847   /* ERRORS */
2848 no_stats:
2849   {
2850     GST_WARNING ("Could not get payloader stats");
2851     g_mutex_unlock (&priv->lock);
2852     return FALSE;
2853   }
2854 }
2855
2856 /**
2857  * gst_rtsp_stream_get_caps:
2858  * @stream: a #GstRTSPStream
2859  *
2860  * Retrieve the current caps of @stream.
2861  *
2862  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2863  * after usage.
2864  */
2865 GstCaps *
2866 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2867 {
2868   GstRTSPStreamPrivate *priv;
2869   GstCaps *result;
2870
2871   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2872
2873   priv = stream->priv;
2874
2875   g_mutex_lock (&priv->lock);
2876   if ((result = priv->caps))
2877     gst_caps_ref (result);
2878   g_mutex_unlock (&priv->lock);
2879
2880   return result;
2881 }
2882
2883 /**
2884  * gst_rtsp_stream_recv_rtp:
2885  * @stream: a #GstRTSPStream
2886  * @buffer: (transfer full): a #GstBuffer
2887  *
2888  * Handle an RTP buffer for the stream. This method is usually called when a
2889  * message has been received from a client using the TCP transport.
2890  *
2891  * This function takes ownership of @buffer.
2892  *
2893  * Returns: a GstFlowReturn.
2894  */
2895 GstFlowReturn
2896 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2897 {
2898   GstRTSPStreamPrivate *priv;
2899   GstFlowReturn ret;
2900   GstElement *element;
2901
2902   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2903   priv = stream->priv;
2904   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2905   g_return_val_if_fail (priv->is_joined, FALSE);
2906
2907   g_mutex_lock (&priv->lock);
2908   if (priv->appsrc[0])
2909     element = gst_object_ref (priv->appsrc[0]);
2910   else
2911     element = NULL;
2912   g_mutex_unlock (&priv->lock);
2913
2914   if (element) {
2915     if (priv->appsrc_base_time[0] == -1) {
2916       /* Take current running_time. This timestamp will be put on
2917        * the first buffer of each stream because we are a live source and so we
2918        * timestamp with the running_time. When we are dealing with TCP, we also
2919        * only timestamp the first buffer (using the DISCONT flag) because a server
2920        * typically bursts data, for which we don't want to compensate by speeding
2921        * up the media. The other timestamps will be interpollated from this one
2922        * using the RTP timestamps. */
2923       GST_OBJECT_LOCK (element);
2924       if (GST_ELEMENT_CLOCK (element)) {
2925         GstClockTime now;
2926         GstClockTime base_time;
2927
2928         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2929         base_time = GST_ELEMENT_CAST (element)->base_time;
2930
2931         priv->appsrc_base_time[0] = now - base_time;
2932         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2933         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2934             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2935             GST_TIME_ARGS (base_time));
2936       }
2937       GST_OBJECT_UNLOCK (element);
2938     }
2939
2940     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2941     gst_object_unref (element);
2942   } else {
2943     ret = GST_FLOW_OK;
2944   }
2945   return ret;
2946 }
2947
2948 /**
2949  * gst_rtsp_stream_recv_rtcp:
2950  * @stream: a #GstRTSPStream
2951  * @buffer: (transfer full): a #GstBuffer
2952  *
2953  * Handle an RTCP buffer for the stream. This method is usually called when a
2954  * message has been received from a client using the TCP transport.
2955  *
2956  * This function takes ownership of @buffer.
2957  *
2958  * Returns: a GstFlowReturn.
2959  */
2960 GstFlowReturn
2961 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2962 {
2963   GstRTSPStreamPrivate *priv;
2964   GstFlowReturn ret;
2965   GstElement *element;
2966
2967   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2968   priv = stream->priv;
2969   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2970
2971   if (!priv->is_joined) {
2972     gst_buffer_unref (buffer);
2973     return GST_FLOW_NOT_LINKED;
2974   }
2975   g_mutex_lock (&priv->lock);
2976   if (priv->appsrc[1])
2977     element = gst_object_ref (priv->appsrc[1]);
2978   else
2979     element = NULL;
2980   g_mutex_unlock (&priv->lock);
2981
2982   if (element) {
2983     if (priv->appsrc_base_time[1] == -1) {
2984       /* Take current running_time. This timestamp will be put on
2985        * the first buffer of each stream because we are a live source and so we
2986        * timestamp with the running_time. When we are dealing with TCP, we also
2987        * only timestamp the first buffer (using the DISCONT flag) because a server
2988        * typically bursts data, for which we don't want to compensate by speeding
2989        * up the media. The other timestamps will be interpollated from this one
2990        * using the RTP timestamps. */
2991       GST_OBJECT_LOCK (element);
2992       if (GST_ELEMENT_CLOCK (element)) {
2993         GstClockTime now;
2994         GstClockTime base_time;
2995
2996         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2997         base_time = GST_ELEMENT_CAST (element)->base_time;
2998
2999         priv->appsrc_base_time[1] = now - base_time;
3000         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3001         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3002             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3003             GST_TIME_ARGS (base_time));
3004       }
3005       GST_OBJECT_UNLOCK (element);
3006     }
3007
3008     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3009     gst_object_unref (element);
3010   } else {
3011     ret = GST_FLOW_OK;
3012     gst_buffer_unref (buffer);
3013   }
3014   return ret;
3015 }
3016
3017 /* must be called with lock */
3018 static gboolean
3019 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3020     gboolean add)
3021 {
3022   GstRTSPStreamPrivate *priv = stream->priv;
3023   const GstRTSPTransport *tr;
3024
3025   tr = gst_rtsp_stream_transport_get_transport (trans);
3026
3027   switch (tr->lower_transport) {
3028     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3029     {
3030       GstRTSPMulticastTransportSource *source;
3031       GstBin *bin;
3032
3033       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
3034
3035       if (add) {
3036         gchar *host;
3037         gint i;
3038         GstPad *selpad, *pad;
3039
3040         source = g_slice_new0 (GstRTSPMulticastTransportSource);
3041         source->transport = trans;
3042
3043         for (i = 0; i < 2; i++) {
3044           host =
3045               g_strdup_printf ("udp://%s:%d", tr->destination,
3046               (i == 0) ? tr->port.min : tr->port.max);
3047           source->udpsrc[i] =
3048               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
3049           g_free (host);
3050           g_object_set (source->udpsrc[i], "loop", FALSE, NULL);
3051
3052           if (priv->srcpad) {
3053             /* we set and keep these to playing so that they don't cause NO_PREROLL return
3054              * values. This is only relevant for PLAY pipelines */
3055             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
3056             gst_element_set_locked_state (source->udpsrc[i], TRUE);
3057           }
3058           /* add udpsrc */
3059           gst_bin_add (bin, source->udpsrc[i]);
3060
3061           /* and link to the funnel v4 */
3062           if (priv->sinkpad || i == 1) {
3063             source->selpad[i] = selpad =
3064                 gst_element_get_request_pad (priv->funnel[i], "sink_%u");
3065             pad = gst_element_get_static_pad (source->udpsrc[i], "src");
3066             gst_pad_link (pad, selpad);
3067             gst_object_unref (pad);
3068             gst_object_unref (selpad);
3069           }
3070         }
3071
3072         priv->transport_sources =
3073             g_list_prepend (priv->transport_sources, source);
3074       } else {
3075         GList *l;
3076
3077         for (l = priv->transport_sources; l; l = l->next) {
3078           source = l->data;
3079
3080           if (source->transport == trans) {
3081             priv->transport_sources =
3082                 g_list_delete_link (priv->transport_sources, l);
3083             break;
3084           }
3085         }
3086
3087         if (l != NULL) {
3088           gint i;
3089
3090           for (i = 0; i < 2; i++) {
3091             /* Will automatically unlink everything */
3092             gst_bin_remove (bin,
3093                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
3094
3095             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
3096             gst_object_unref (source->udpsrc[i]);
3097
3098             if (priv->sinkpad || i == 1) {
3099               gst_element_release_request_pad (priv->funnel[i],
3100                   source->selpad[i]);
3101             }
3102           }
3103
3104           g_slice_free (GstRTSPMulticastTransportSource, source);
3105         }
3106       }
3107
3108       gst_object_unref (bin);
3109
3110       /* fall through for the generic case */
3111     }
3112     case GST_RTSP_LOWER_TRANS_UDP:
3113     {
3114       gchar *dest;
3115       gint min, max;
3116       guint ttl = 0;
3117
3118       dest = tr->destination;
3119       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3120         min = tr->port.min;
3121         max = tr->port.max;
3122         ttl = tr->ttl;
3123       } else if (priv->client_side) {
3124         /* In client side mode the 'destination' is the RTSP server, so send
3125          * to those ports */
3126         min = tr->server_port.min;
3127         max = tr->server_port.max;
3128       } else {
3129         min = tr->client_port.min;
3130         max = tr->client_port.max;
3131       }
3132
3133       if (add) {
3134         if (ttl > 0) {
3135           GST_INFO ("setting ttl-mc %d", ttl);
3136           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3137           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3138         }
3139         GST_INFO ("adding %s:%d-%d", dest, min, max);
3140         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3141         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3142         priv->transports = g_list_prepend (priv->transports, trans);
3143       } else {
3144         GST_INFO ("removing %s:%d-%d", dest, min, max);
3145         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3146         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3147         priv->transports = g_list_remove (priv->transports, trans);
3148       }
3149       priv->transports_cookie++;
3150       break;
3151     }
3152     case GST_RTSP_LOWER_TRANS_TCP:
3153       if (add) {
3154         GST_INFO ("adding TCP %s", tr->destination);
3155         priv->transports = g_list_prepend (priv->transports, trans);
3156       } else {
3157         GST_INFO ("removing TCP %s", tr->destination);
3158         priv->transports = g_list_remove (priv->transports, trans);
3159       }
3160       priv->transports_cookie++;
3161       break;
3162     default:
3163       goto unknown_transport;
3164   }
3165   return TRUE;
3166
3167   /* ERRORS */
3168 unknown_transport:
3169   {
3170     GST_INFO ("Unknown transport %d", tr->lower_transport);
3171     return FALSE;
3172   }
3173 }
3174
3175
3176 /**
3177  * gst_rtsp_stream_add_transport:
3178  * @stream: a #GstRTSPStream
3179  * @trans: (transfer none): a #GstRTSPStreamTransport
3180  *
3181  * Add the transport in @trans to @stream. The media of @stream will
3182  * then also be send to the values configured in @trans.
3183  *
3184  * @stream must be joined to a bin.
3185  *
3186  * @trans must contain a valid #GstRTSPTransport.
3187  *
3188  * Returns: %TRUE if @trans was added
3189  */
3190 gboolean
3191 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3192     GstRTSPStreamTransport * trans)
3193 {
3194   GstRTSPStreamPrivate *priv;
3195   gboolean res;
3196
3197   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3198   priv = stream->priv;
3199   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3200   g_return_val_if_fail (priv->is_joined, FALSE);
3201
3202   g_mutex_lock (&priv->lock);
3203   res = update_transport (stream, trans, TRUE);
3204   g_mutex_unlock (&priv->lock);
3205
3206   return res;
3207 }
3208
3209 /**
3210  * gst_rtsp_stream_remove_transport:
3211  * @stream: a #GstRTSPStream
3212  * @trans: (transfer none): a #GstRTSPStreamTransport
3213  *
3214  * Remove the transport in @trans from @stream. The media of @stream will
3215  * not be sent to the values configured in @trans.
3216  *
3217  * @stream must be joined to a bin.
3218  *
3219  * @trans must contain a valid #GstRTSPTransport.
3220  *
3221  * Returns: %TRUE if @trans was removed
3222  */
3223 gboolean
3224 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3225     GstRTSPStreamTransport * trans)
3226 {
3227   GstRTSPStreamPrivate *priv;
3228   gboolean res;
3229
3230   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3231   priv = stream->priv;
3232   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3233   g_return_val_if_fail (priv->is_joined, FALSE);
3234
3235   g_mutex_lock (&priv->lock);
3236   res = update_transport (stream, trans, FALSE);
3237   g_mutex_unlock (&priv->lock);
3238
3239   return res;
3240 }
3241
3242 /**
3243  * gst_rtsp_stream_update_crypto:
3244  * @stream: a #GstRTSPStream
3245  * @ssrc: the SSRC
3246  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3247  *
3248  * Update the new crypto information for @ssrc in @stream. If information
3249  * for @ssrc did not exist, it will be added. If information
3250  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3251  * be removed from @stream.
3252  *
3253  * Returns: %TRUE if @crypto could be updated
3254  */
3255 gboolean
3256 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3257     guint ssrc, GstCaps * crypto)
3258 {
3259   GstRTSPStreamPrivate *priv;
3260
3261   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3262   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3263
3264   priv = stream->priv;
3265
3266   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3267
3268   g_mutex_lock (&priv->lock);
3269   if (crypto)
3270     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3271         gst_caps_ref (crypto));
3272   else
3273     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3274   g_mutex_unlock (&priv->lock);
3275
3276   return TRUE;
3277 }
3278
3279 /**
3280  * gst_rtsp_stream_get_rtp_socket:
3281  * @stream: a #GstRTSPStream
3282  * @family: the socket family
3283  *
3284  * Get the RTP socket from @stream for a @family.
3285  *
3286  * @stream must be joined to a bin.
3287  *
3288  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3289  * socket could be allocated for @family. Unref after usage
3290  */
3291 GSocket *
3292 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3293 {
3294   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3295   GSocket *socket;
3296   const gchar *name;
3297
3298   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3299   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3300       family == G_SOCKET_FAMILY_IPV6, NULL);
3301   g_return_val_if_fail (priv->udpsink[0], NULL);
3302
3303   if (family == G_SOCKET_FAMILY_IPV6)
3304     name = "socket-v6";
3305   else
3306     name = "socket";
3307
3308   g_object_get (priv->udpsink[0], name, &socket, NULL);
3309
3310   return socket;
3311 }
3312
3313 /**
3314  * gst_rtsp_stream_get_rtcp_socket:
3315  * @stream: a #GstRTSPStream
3316  * @family: the socket family
3317  *
3318  * Get the RTCP socket from @stream for a @family.
3319  *
3320  * @stream must be joined to a bin.
3321  *
3322  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3323  * socket could be allocated for @family. Unref after usage
3324  */
3325 GSocket *
3326 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3327 {
3328   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3329   GSocket *socket;
3330   const gchar *name;
3331
3332   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3333   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3334       family == G_SOCKET_FAMILY_IPV6, NULL);
3335   g_return_val_if_fail (priv->udpsink[1], NULL);
3336
3337   if (family == G_SOCKET_FAMILY_IPV6)
3338     name = "socket-v6";
3339   else
3340     name = "socket";
3341
3342   g_object_get (priv->udpsink[1], name, &socket, NULL);
3343
3344   return socket;
3345 }
3346
3347 /**
3348  * gst_rtsp_stream_set_seqnum:
3349  * @stream: a #GstRTSPStream
3350  * @seqnum: a new sequence number
3351  *
3352  * Configure the sequence number in the payloader of @stream to @seqnum.
3353  */
3354 void
3355 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3356 {
3357   GstRTSPStreamPrivate *priv;
3358
3359   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3360
3361   priv = stream->priv;
3362
3363   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3364 }
3365
3366 /**
3367  * gst_rtsp_stream_get_seqnum:
3368  * @stream: a #GstRTSPStream
3369  *
3370  * Get the configured sequence number in the payloader of @stream.
3371  *
3372  * Returns: the sequence number of the payloader.
3373  */
3374 guint16
3375 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3376 {
3377   GstRTSPStreamPrivate *priv;
3378   guint seqnum;
3379
3380   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3381
3382   priv = stream->priv;
3383
3384   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3385
3386   return seqnum;
3387 }
3388
3389 /**
3390  * gst_rtsp_stream_transport_filter:
3391  * @stream: a #GstRTSPStream
3392  * @func: (scope call) (allow-none): a callback
3393  * @user_data: (closure): user data passed to @func
3394  *
3395  * Call @func for each transport managed by @stream. The result value of @func
3396  * determines what happens to the transport. @func will be called with @stream
3397  * locked so no further actions on @stream can be performed from @func.
3398  *
3399  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3400  * @stream.
3401  *
3402  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3403  *
3404  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3405  * will also be added with an additional ref to the result #GList of this
3406  * function..
3407  *
3408  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3409  *
3410  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3411  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3412  * element in the #GList should be unreffed before the list is freed.
3413  */
3414 GList *
3415 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3416     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3417 {
3418   GstRTSPStreamPrivate *priv;
3419   GList *result, *walk, *next;
3420   GHashTable *visited = NULL;
3421   guint cookie;
3422
3423   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3424
3425   priv = stream->priv;
3426
3427   result = NULL;
3428   if (func)
3429     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3430
3431   g_mutex_lock (&priv->lock);
3432 restart:
3433   cookie = priv->transports_cookie;
3434   for (walk = priv->transports; walk; walk = next) {
3435     GstRTSPStreamTransport *trans = walk->data;
3436     GstRTSPFilterResult res;
3437     gboolean changed;
3438
3439     next = g_list_next (walk);
3440
3441     if (func) {
3442       /* only visit each transport once */
3443       if (g_hash_table_contains (visited, trans))
3444         continue;
3445
3446       g_hash_table_add (visited, g_object_ref (trans));
3447       g_mutex_unlock (&priv->lock);
3448
3449       res = func (stream, trans, user_data);
3450
3451       g_mutex_lock (&priv->lock);
3452     } else
3453       res = GST_RTSP_FILTER_REF;
3454
3455     changed = (cookie != priv->transports_cookie);
3456
3457     switch (res) {
3458       case GST_RTSP_FILTER_REMOVE:
3459         update_transport (stream, trans, FALSE);
3460         break;
3461       case GST_RTSP_FILTER_REF:
3462         result = g_list_prepend (result, g_object_ref (trans));
3463         break;
3464       case GST_RTSP_FILTER_KEEP:
3465       default:
3466         break;
3467     }
3468     if (changed)
3469       goto restart;
3470   }
3471   g_mutex_unlock (&priv->lock);
3472
3473   if (func)
3474     g_hash_table_unref (visited);
3475
3476   return result;
3477 }
3478
3479 static GstPadProbeReturn
3480 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3481 {
3482   GstRTSPStreamPrivate *priv;
3483   GstRTSPStream *stream;
3484
3485   stream = user_data;
3486   priv = stream->priv;
3487
3488   GST_DEBUG_OBJECT (pad, "now blocking");
3489
3490   g_mutex_lock (&priv->lock);
3491   priv->blocking = TRUE;
3492   g_mutex_unlock (&priv->lock);
3493
3494   gst_element_post_message (priv->payloader,
3495       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3496           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3497
3498   return GST_PAD_PROBE_OK;
3499 }
3500
3501 /**
3502  * gst_rtsp_stream_set_blocked:
3503  * @stream: a #GstRTSPStream
3504  * @blocked: boolean indicating we should block or unblock
3505  *
3506  * Blocks or unblocks the dataflow on @stream.
3507  *
3508  * Returns: %TRUE on success
3509  */
3510 gboolean
3511 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3512 {
3513   GstRTSPStreamPrivate *priv;
3514
3515   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3516
3517   priv = stream->priv;
3518
3519   g_mutex_lock (&priv->lock);
3520   if (blocked) {
3521     priv->blocking = FALSE;
3522     if (priv->blocked_id == 0) {
3523       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3524           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3525           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3526           g_object_ref (stream), g_object_unref);
3527     }
3528   } else {
3529     if (priv->blocked_id != 0) {
3530       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3531       priv->blocked_id = 0;
3532       priv->blocking = FALSE;
3533     }
3534   }
3535   g_mutex_unlock (&priv->lock);
3536
3537   return TRUE;
3538 }
3539
3540 /**
3541  * gst_rtsp_stream_is_blocking:
3542  * @stream: a #GstRTSPStream
3543  *
3544  * Check if @stream is blocking on a #GstBuffer.
3545  *
3546  * Returns: %TRUE if @stream is blocking
3547  */
3548 gboolean
3549 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3550 {
3551   GstRTSPStreamPrivate *priv;
3552   gboolean result;
3553
3554   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3555
3556   priv = stream->priv;
3557
3558   g_mutex_lock (&priv->lock);
3559   result = priv->blocking;
3560   g_mutex_unlock (&priv->lock);
3561
3562   return result;
3563 }
3564
3565 /**
3566  * gst_rtsp_stream_query_position:
3567  * @stream: a #GstRTSPStream
3568  *
3569  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3570  * the RTP parts of the pipeline and not the RTCP parts.
3571  *
3572  * Returns: %TRUE if the position could be queried
3573  */
3574 gboolean
3575 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3576 {
3577   GstRTSPStreamPrivate *priv;
3578   GstElement *sink;
3579   gboolean ret;
3580
3581   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3582
3583   priv = stream->priv;
3584
3585   g_mutex_lock (&priv->lock);
3586   /* depending on the transport type, it should query corresponding sink */
3587   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3588       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3589     sink = priv->udpsink[0];
3590   else
3591     sink = priv->appsink[0];
3592
3593   if (sink)
3594     gst_object_ref (sink);
3595   g_mutex_unlock (&priv->lock);
3596
3597   if (!sink)
3598     return FALSE;
3599
3600   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3601   gst_object_unref (sink);
3602
3603   return ret;
3604 }
3605
3606 /**
3607  * gst_rtsp_stream_query_stop:
3608  * @stream: a #GstRTSPStream
3609  *
3610  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3611  * the RTP parts of the pipeline and not the RTCP parts.
3612  *
3613  * Returns: %TRUE if the stop could be queried
3614  */
3615 gboolean
3616 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3617 {
3618   GstRTSPStreamPrivate *priv;
3619   GstElement *sink;
3620   GstQuery *query;
3621   gboolean ret;
3622
3623   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3624
3625   priv = stream->priv;
3626
3627   g_mutex_lock (&priv->lock);
3628   /* depending on the transport type, it should query corresponding sink */
3629   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3630       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3631     sink = priv->udpsink[0];
3632   else
3633     sink = priv->appsink[0];
3634
3635   if (sink)
3636     gst_object_ref (sink);
3637   g_mutex_unlock (&priv->lock);
3638
3639   if (!sink)
3640     return FALSE;
3641
3642   query = gst_query_new_segment (GST_FORMAT_TIME);
3643   if ((ret = gst_element_query (sink, query))) {
3644     GstFormat format;
3645
3646     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3647     if (format != GST_FORMAT_TIME)
3648       *stop = -1;
3649   }
3650   gst_query_unref (query);
3651   gst_object_unref (sink);
3652
3653   return ret;
3654
3655 }