rtsp-stream: Add functions for using rtsp-stream from the client
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 typedef struct
66 {
67   GstRTSPStreamTransport *transport;
68
69   /* RTP and RTCP source */
70   GstElement *udpsrc[2];
71   GstPad *selpad[2];
72 } GstRTSPMulticastTransportSource;
73
74 struct _GstRTSPStreamPrivate
75 {
76   GMutex lock;
77   guint idx;
78   /* Only one pad is ever set */
79   GstPad *srcpad, *sinkpad;
80   GstElement *payloader;
81   guint buffer_size;
82   gboolean is_joined;
83
84   /* TRUE if this stream is running on
85    * the client side of an RTSP link (for RECORD) */
86   gboolean client_side;
87   gchar *control;
88
89   GstRTSPProfile profiles;
90   GstRTSPLowerTrans protocols;
91
92   /* pads on the rtpbin */
93   GstPad *send_rtp_sink;
94   GstPad *recv_rtp_src;
95   GstPad *recv_sink[2];
96   GstPad *send_src[2];
97
98   /* the RTPSession object */
99   GObject *session;
100
101   /* SRTP encoder/decoder */
102   GstElement *srtpenc;
103   GstElement *srtpdec;
104   GHashTable *keys;
105
106   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
107    * sockets */
108   GstElement *udpsrc_v4[2];
109
110   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
111    * sockets */
112   GstElement *udpsrc_v6[2];
113
114   GstElement *udpqueue[2];
115   GstElement *udpsink[2];
116
117   /* for TCP transport */
118   GstElement *appsrc[2];
119   GstClockTime appsrc_base_time[2];
120   GstElement *appqueue[2];
121   GstElement *appsink[2];
122
123   GstElement *tee[2];
124   GstElement *funnel[2];
125
126   /* retransmission */
127   GstElement *rtxsend;
128   guint rtx_pt;
129   GstClockTime rtx_time;
130
131   /* server ports for sending/receiving over ipv4 */
132   GstRTSPRange server_port_v4;
133   GstRTSPAddress *server_addr_v4;
134   gboolean have_ipv4;
135
136   /* server ports for sending/receiving over ipv6 */
137   GstRTSPRange server_port_v6;
138   GstRTSPAddress *server_addr_v6;
139   gboolean have_ipv6;
140
141   /* multicast addresses */
142   GstRTSPAddressPool *pool;
143   GstRTSPAddress *addr_v4;
144   GstRTSPAddress *addr_v6;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   /* UDP sources for UDP multicast transports */
161   GList *transport_sources;
162
163   gint dscp_qos;
164
165   /* stream blocking */
166   gulong blocked_id;
167   gboolean blocking;
168
169   /* pt->caps map for RECORD streams */
170   GHashTable *ptmap;
171 };
172
173 #define DEFAULT_CONTROL         NULL
174 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
175 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
176                                         GST_RTSP_LOWER_TRANS_TCP
177
178 enum
179 {
180   PROP_0,
181   PROP_CONTROL,
182   PROP_PROFILES,
183   PROP_PROTOCOLS,
184   PROP_LAST
185 };
186
187 enum
188 {
189   SIGNAL_NEW_RTP_ENCODER,
190   SIGNAL_NEW_RTCP_ENCODER,
191   SIGNAL_LAST
192 };
193
194 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
195 #define GST_CAT_DEFAULT rtsp_stream_debug
196
197 static GQuark ssrc_stream_map_key;
198
199 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
200     GValue * value, GParamSpec * pspec);
201 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
202     const GValue * value, GParamSpec * pspec);
203
204 static void gst_rtsp_stream_finalize (GObject * obj);
205
206 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
207
208 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
209
210 static void
211 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
212 {
213   GObjectClass *gobject_class;
214
215   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
216
217   gobject_class = G_OBJECT_CLASS (klass);
218
219   gobject_class->get_property = gst_rtsp_stream_get_property;
220   gobject_class->set_property = gst_rtsp_stream_set_property;
221   gobject_class->finalize = gst_rtsp_stream_finalize;
222
223   g_object_class_install_property (gobject_class, PROP_CONTROL,
224       g_param_spec_string ("control", "Control",
225           "The control string for this stream", DEFAULT_CONTROL,
226           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
227
228   g_object_class_install_property (gobject_class, PROP_PROFILES,
229       g_param_spec_flags ("profiles", "Profiles",
230           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
231           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232
233   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
234       g_param_spec_flags ("protocols", "Protocols",
235           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
236           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
239       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
240       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
241       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
242
243   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
244       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
245       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
246       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
247
248   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
249
250   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
251 }
252
253 static void
254 gst_rtsp_stream_init (GstRTSPStream * stream)
255 {
256   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
257
258   GST_DEBUG ("new stream %p", stream);
259
260   stream->priv = priv;
261
262   priv->dscp_qos = -1;
263   priv->control = g_strdup (DEFAULT_CONTROL);
264   priv->profiles = DEFAULT_PROFILES;
265   priv->protocols = DEFAULT_PROTOCOLS;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   gst_object_unref (priv->payloader);
303   if (priv->srcpad)
304     gst_object_unref (priv->srcpad);
305   if (priv->sinkpad)
306     gst_object_unref (priv->sinkpad);
307   g_free (priv->control);
308   g_mutex_clear (&priv->lock);
309
310   g_hash_table_unref (priv->keys);
311   g_hash_table_destroy (priv->ptmap);
312
313   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
314 }
315
316 static void
317 gst_rtsp_stream_get_property (GObject * object, guint propid,
318     GValue * value, GParamSpec * pspec)
319 {
320   GstRTSPStream *stream = GST_RTSP_STREAM (object);
321
322   switch (propid) {
323     case PROP_CONTROL:
324       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
325       break;
326     case PROP_PROFILES:
327       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
328       break;
329     case PROP_PROTOCOLS:
330       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
331       break;
332     default:
333       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
334   }
335 }
336
337 static void
338 gst_rtsp_stream_set_property (GObject * object, guint propid,
339     const GValue * value, GParamSpec * pspec)
340 {
341   GstRTSPStream *stream = GST_RTSP_STREAM (object);
342
343   switch (propid) {
344     case PROP_CONTROL:
345       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
346       break;
347     case PROP_PROFILES:
348       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
349       break;
350     case PROP_PROTOCOLS:
351       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
352       break;
353     default:
354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
355   }
356 }
357
358 /**
359  * gst_rtsp_stream_new:
360  * @idx: an index
361  * @pad: a #GstPad
362  * @payloader: a #GstElement
363  *
364  * Create a new media stream with index @idx that handles RTP data on
365  * @pad and has a payloader element @payloader if @pad is a source pad
366  * or a depayloader element @payloader if @pad is a sink pad.
367  *
368  * Returns: (transfer full): a new #GstRTSPStream
369  */
370 GstRTSPStream *
371 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
372 {
373   GstRTSPStreamPrivate *priv;
374   GstRTSPStream *stream;
375
376   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
377   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
378
379   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
380   priv = stream->priv;
381   priv->idx = idx;
382   priv->payloader = gst_object_ref (payloader);
383   if (GST_PAD_IS_SRC (pad))
384     priv->srcpad = gst_object_ref (pad);
385   else
386     priv->sinkpad = gst_object_ref (pad);
387
388   return stream;
389 }
390
391 /**
392  * gst_rtsp_stream_get_index:
393  * @stream: a #GstRTSPStream
394  *
395  * Get the stream index.
396  *
397  * Return: the stream index.
398  */
399 guint
400 gst_rtsp_stream_get_index (GstRTSPStream * stream)
401 {
402   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
403
404   return stream->priv->idx;
405 }
406
407 /**
408  * gst_rtsp_stream_get_pt:
409  * @stream: a #GstRTSPStream
410  *
411  * Get the stream payload type.
412  *
413  * Return: the stream payload type.
414  */
415 guint
416 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
417 {
418   GstRTSPStreamPrivate *priv;
419   guint pt;
420
421   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
422
423   priv = stream->priv;
424
425   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
426
427   return pt;
428 }
429
430 /**
431  * gst_rtsp_stream_get_srcpad:
432  * @stream: a #GstRTSPStream
433  *
434  * Get the srcpad associated with @stream.
435  *
436  * Returns: (transfer full): the srcpad. Unref after usage.
437  */
438 GstPad *
439 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
440 {
441   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
442
443   if (!stream->priv->srcpad)
444     return NULL;
445
446   return gst_object_ref (stream->priv->srcpad);
447 }
448
449 /**
450  * gst_rtsp_stream_get_sinkpad:
451  * @stream: a #GstRTSPStream
452  *
453  * Get the sinkpad associated with @stream.
454  *
455  * Returns: (transfer full): the sinkpad. Unref after usage.
456  */
457 GstPad *
458 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
459 {
460   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
461
462   if (!stream->priv->sinkpad)
463     return NULL;
464
465   return gst_object_ref (stream->priv->sinkpad);
466 }
467
468 /**
469  * gst_rtsp_stream_get_control:
470  * @stream: a #GstRTSPStream
471  *
472  * Get the control string to identify this stream.
473  *
474  * Returns: (transfer full): the control string. g_free() after usage.
475  */
476 gchar *
477 gst_rtsp_stream_get_control (GstRTSPStream * stream)
478 {
479   GstRTSPStreamPrivate *priv;
480   gchar *result;
481
482   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
483
484   priv = stream->priv;
485
486   g_mutex_lock (&priv->lock);
487   if ((result = g_strdup (priv->control)) == NULL)
488     result = g_strdup_printf ("stream=%u", priv->idx);
489   g_mutex_unlock (&priv->lock);
490
491   return result;
492 }
493
494 /**
495  * gst_rtsp_stream_set_control:
496  * @stream: a #GstRTSPStream
497  * @control: a control string
498  *
499  * Set the control string in @stream.
500  */
501 void
502 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
503 {
504   GstRTSPStreamPrivate *priv;
505
506   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
507
508   priv = stream->priv;
509
510   g_mutex_lock (&priv->lock);
511   g_free (priv->control);
512   priv->control = g_strdup (control);
513   g_mutex_unlock (&priv->lock);
514 }
515
516 /**
517  * gst_rtsp_stream_has_control:
518  * @stream: a #GstRTSPStream
519  * @control: a control string
520  *
521  * Check if @stream has the control string @control.
522  *
523  * Returns: %TRUE is @stream has @control as the control string
524  */
525 gboolean
526 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
527 {
528   GstRTSPStreamPrivate *priv;
529   gboolean res;
530
531   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
532
533   priv = stream->priv;
534
535   g_mutex_lock (&priv->lock);
536   if (priv->control)
537     res = (g_strcmp0 (priv->control, control) == 0);
538   else {
539     guint streamid;
540
541     if (sscanf (control, "stream=%u", &streamid) > 0)
542       res = (streamid == priv->idx);
543     else
544       res = FALSE;
545   }
546   g_mutex_unlock (&priv->lock);
547
548   return res;
549 }
550
551 /**
552  * gst_rtsp_stream_set_mtu:
553  * @stream: a #GstRTSPStream
554  * @mtu: a new MTU
555  *
556  * Configure the mtu in the payloader of @stream to @mtu.
557  */
558 void
559 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
560 {
561   GstRTSPStreamPrivate *priv;
562
563   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
564
565   priv = stream->priv;
566
567   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
568
569   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
570 }
571
572 /**
573  * gst_rtsp_stream_get_mtu:
574  * @stream: a #GstRTSPStream
575  *
576  * Get the configured MTU in the payloader of @stream.
577  *
578  * Returns: the MTU of the payloader.
579  */
580 guint
581 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
582 {
583   GstRTSPStreamPrivate *priv;
584   guint mtu;
585
586   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
587
588   priv = stream->priv;
589
590   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
591
592   return mtu;
593 }
594
595 /* Update the dscp qos property on the udp sinks */
596 static void
597 update_dscp_qos (GstRTSPStream * stream)
598 {
599   GstRTSPStreamPrivate *priv;
600
601   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
602
603   priv = stream->priv;
604
605   if (priv->udpsink[0]) {
606     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
607         NULL);
608   }
609
610   if (priv->udpsink[1]) {
611     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
612         NULL);
613   }
614 }
615
616 /**
617  * gst_rtsp_stream_set_dscp_qos:
618  * @stream: a #GstRTSPStream
619  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
620  *
621  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
622  */
623 void
624 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
625 {
626   GstRTSPStreamPrivate *priv;
627
628   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
629
630   priv = stream->priv;
631
632   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
633
634   if (dscp_qos < -1 || dscp_qos > 63) {
635     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
636     return;
637   }
638
639   priv->dscp_qos = dscp_qos;
640
641   update_dscp_qos (stream);
642 }
643
644 /**
645  * gst_rtsp_stream_get_dscp_qos:
646  * @stream: a #GstRTSPStream
647  *
648  * Get the configured DSCP QoS in of the outgoing sockets.
649  *
650  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
651  */
652 gint
653 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
654 {
655   GstRTSPStreamPrivate *priv;
656
657   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
658
659   priv = stream->priv;
660
661   return priv->dscp_qos;
662 }
663
664 /**
665  * gst_rtsp_stream_is_transport_supported:
666  * @stream: a #GstRTSPStream
667  * @transport: (transfer none): a #GstRTSPTransport
668  *
669  * Check if @transport can be handled by stream
670  *
671  * Returns: %TRUE if @transport can be handled by @stream.
672  */
673 gboolean
674 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
675     GstRTSPTransport * transport)
676 {
677   GstRTSPStreamPrivate *priv;
678
679   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
680
681   priv = stream->priv;
682
683   g_mutex_lock (&priv->lock);
684   if (transport->trans != GST_RTSP_TRANS_RTP)
685     goto unsupported_transmode;
686
687   if (!(transport->profile & priv->profiles))
688     goto unsupported_profile;
689
690   if (!(transport->lower_transport & priv->protocols))
691     goto unsupported_ltrans;
692
693   g_mutex_unlock (&priv->lock);
694
695   return TRUE;
696
697   /* ERRORS */
698 unsupported_transmode:
699   {
700     GST_DEBUG ("unsupported transport mode %d", transport->trans);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_profile:
705   {
706     GST_DEBUG ("unsupported profile %d", transport->profile);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 unsupported_ltrans:
711   {
712     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
713     g_mutex_unlock (&priv->lock);
714     return FALSE;
715   }
716 }
717
718 /**
719  * gst_rtsp_stream_set_profiles:
720  * @stream: a #GstRTSPStream
721  * @profiles: the new profiles
722  *
723  * Configure the allowed profiles for @stream.
724  */
725 void
726 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
727 {
728   GstRTSPStreamPrivate *priv;
729
730   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
731
732   priv = stream->priv;
733
734   g_mutex_lock (&priv->lock);
735   priv->profiles = profiles;
736   g_mutex_unlock (&priv->lock);
737 }
738
739 /**
740  * gst_rtsp_stream_get_profiles:
741  * @stream: a #GstRTSPStream
742  *
743  * Get the allowed profiles of @stream.
744  *
745  * Returns: a #GstRTSPProfile
746  */
747 GstRTSPProfile
748 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
749 {
750   GstRTSPStreamPrivate *priv;
751   GstRTSPProfile res;
752
753   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
754
755   priv = stream->priv;
756
757   g_mutex_lock (&priv->lock);
758   res = priv->profiles;
759   g_mutex_unlock (&priv->lock);
760
761   return res;
762 }
763
764 /**
765  * gst_rtsp_stream_set_protocols:
766  * @stream: a #GstRTSPStream
767  * @protocols: the new flags
768  *
769  * Configure the allowed lower transport for @stream.
770  */
771 void
772 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
773     GstRTSPLowerTrans protocols)
774 {
775   GstRTSPStreamPrivate *priv;
776
777   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
778
779   priv = stream->priv;
780
781   g_mutex_lock (&priv->lock);
782   priv->protocols = protocols;
783   g_mutex_unlock (&priv->lock);
784 }
785
786 /**
787  * gst_rtsp_stream_get_protocols:
788  * @stream: a #GstRTSPStream
789  *
790  * Get the allowed protocols of @stream.
791  *
792  * Returns: a #GstRTSPLowerTrans
793  */
794 GstRTSPLowerTrans
795 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
796 {
797   GstRTSPStreamPrivate *priv;
798   GstRTSPLowerTrans res;
799
800   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
801       GST_RTSP_LOWER_TRANS_UNKNOWN);
802
803   priv = stream->priv;
804
805   g_mutex_lock (&priv->lock);
806   res = priv->protocols;
807   g_mutex_unlock (&priv->lock);
808
809   return res;
810 }
811
812 /**
813  * gst_rtsp_stream_set_address_pool:
814  * @stream: a #GstRTSPStream
815  * @pool: (transfer none): a #GstRTSPAddressPool
816  *
817  * configure @pool to be used as the address pool of @stream.
818  */
819 void
820 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
821     GstRTSPAddressPool * pool)
822 {
823   GstRTSPStreamPrivate *priv;
824   GstRTSPAddressPool *old;
825
826   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
827
828   priv = stream->priv;
829
830   GST_LOG_OBJECT (stream, "set address pool %p", pool);
831
832   g_mutex_lock (&priv->lock);
833   if ((old = priv->pool) != pool)
834     priv->pool = pool ? g_object_ref (pool) : NULL;
835   else
836     old = NULL;
837   g_mutex_unlock (&priv->lock);
838
839   if (old)
840     g_object_unref (old);
841 }
842
843 /**
844  * gst_rtsp_stream_get_address_pool:
845  * @stream: a #GstRTSPStream
846  *
847  * Get the #GstRTSPAddressPool used as the address pool of @stream.
848  *
849  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
850  * usage.
851  */
852 GstRTSPAddressPool *
853 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
854 {
855   GstRTSPStreamPrivate *priv;
856   GstRTSPAddressPool *result;
857
858   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
859
860   priv = stream->priv;
861
862   g_mutex_lock (&priv->lock);
863   if ((result = priv->pool))
864     g_object_ref (result);
865   g_mutex_unlock (&priv->lock);
866
867   return result;
868 }
869
870 /**
871  * gst_rtsp_stream_get_multicast_address:
872  * @stream: a #GstRTSPStream
873  * @family: the #GSocketFamily
874  *
875  * Get the multicast address of @stream for @family.
876  *
877  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
878  * or %NULL when no address could be allocated. gst_rtsp_address_free()
879  * after usage.
880  */
881 GstRTSPAddress *
882 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
883     GSocketFamily family)
884 {
885   GstRTSPStreamPrivate *priv;
886   GstRTSPAddress *result;
887   GstRTSPAddress **addrp;
888   GstRTSPAddressFlags flags;
889
890   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
891
892   priv = stream->priv;
893
894   if (family == G_SOCKET_FAMILY_IPV6) {
895     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
896     addrp = &priv->addr_v6;
897   } else {
898     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
899     addrp = &priv->addr_v4;
900   }
901
902   g_mutex_lock (&priv->lock);
903   if (*addrp == NULL) {
904     if (priv->pool == NULL)
905       goto no_pool;
906
907     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
908
909     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
910     if (*addrp == NULL)
911       goto no_address;
912   }
913   result = gst_rtsp_address_copy (*addrp);
914   g_mutex_unlock (&priv->lock);
915
916   return result;
917
918   /* ERRORS */
919 no_pool:
920   {
921     GST_ERROR_OBJECT (stream, "no address pool specified");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 no_address:
926   {
927     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
928     g_mutex_unlock (&priv->lock);
929     return NULL;
930   }
931 }
932
933 /**
934  * gst_rtsp_stream_reserve_address:
935  * @stream: a #GstRTSPStream
936  * @address: an address
937  * @port: a port
938  * @n_ports: n_ports
939  * @ttl: a TTL
940  *
941  * Reserve @address and @port as the address and port of @stream.
942  *
943  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
944  * the address could be reserved. gst_rtsp_address_free() after usage.
945  */
946 GstRTSPAddress *
947 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
948     const gchar * address, guint port, guint n_ports, guint ttl)
949 {
950   GstRTSPStreamPrivate *priv;
951   GstRTSPAddress *result;
952   GInetAddress *addr;
953   GSocketFamily family;
954   GstRTSPAddress **addrp;
955
956   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
957   g_return_val_if_fail (address != NULL, NULL);
958   g_return_val_if_fail (port > 0, NULL);
959   g_return_val_if_fail (n_ports > 0, NULL);
960   g_return_val_if_fail (ttl > 0, NULL);
961
962   priv = stream->priv;
963
964   addr = g_inet_address_new_from_string (address);
965   if (!addr) {
966     GST_ERROR ("failed to get inet addr from %s", address);
967     family = G_SOCKET_FAMILY_IPV4;
968   } else {
969     family = g_inet_address_get_family (addr);
970     g_object_unref (addr);
971   }
972
973   if (family == G_SOCKET_FAMILY_IPV6)
974     addrp = &priv->addr_v6;
975   else
976     addrp = &priv->addr_v4;
977
978   g_mutex_lock (&priv->lock);
979   if (*addrp == NULL) {
980     GstRTSPAddressPoolResult res;
981
982     if (priv->pool == NULL)
983       goto no_pool;
984
985     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
986         port, n_ports, ttl, addrp);
987     if (res != GST_RTSP_ADDRESS_POOL_OK)
988       goto no_address;
989   } else {
990     if (strcmp ((*addrp)->address, address) ||
991         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
992         (*addrp)->ttl != ttl)
993       goto different_address;
994   }
995   result = gst_rtsp_address_copy (*addrp);
996   g_mutex_unlock (&priv->lock);
997
998   return result;
999
1000   /* ERRORS */
1001 no_pool:
1002   {
1003     GST_ERROR_OBJECT (stream, "no address pool specified");
1004     g_mutex_unlock (&priv->lock);
1005     return NULL;
1006   }
1007 no_address:
1008   {
1009     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1010         address);
1011     g_mutex_unlock (&priv->lock);
1012     return NULL;
1013   }
1014 different_address:
1015   {
1016     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1017         " reserved", address);
1018     g_mutex_unlock (&priv->lock);
1019     return NULL;
1020   }
1021 }
1022
1023 static gboolean
1024 alloc_ports_one_family (GstRTSPStream * stream, GstRTSPAddressPool * pool,
1025     gint buffer_size, GSocketFamily family, GstElement * udpsrc_out[2],
1026     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
1027     GstRTSPAddress ** server_addr_out)
1028 {
1029   GstRTSPStreamPrivate *priv = stream->priv;
1030   GstStateChangeReturn ret;
1031   GstElement *udpsrc0, *udpsrc1;
1032   GstElement *udpsink0, *udpsink1;
1033   GSocket *rtp_socket = NULL;
1034   GSocket *rtcp_socket;
1035   gint tmp_rtp, tmp_rtcp;
1036   guint count;
1037   gint rtpport, rtcpport;
1038   GList *rejected_addresses = NULL;
1039   GstRTSPAddress *addr = NULL;
1040   GInetAddress *inetaddr = NULL;
1041   GSocketAddress *rtp_sockaddr = NULL;
1042   GSocketAddress *rtcp_sockaddr = NULL;
1043   const gchar *multisink_socket;
1044
1045   if (family == G_SOCKET_FAMILY_IPV6)
1046     multisink_socket = "socket-v6";
1047   else
1048     multisink_socket = "socket";
1049
1050   udpsrc0 = NULL;
1051   udpsrc1 = NULL;
1052   udpsink0 = NULL;
1053   udpsink1 = NULL;
1054   count = 0;
1055
1056   /* Start with random port */
1057   tmp_rtp = 0;
1058
1059   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1060       G_SOCKET_PROTOCOL_UDP, NULL);
1061   if (!rtcp_socket)
1062     goto no_udp_protocol;
1063
1064   if (*server_addr_out)
1065     gst_rtsp_address_free (*server_addr_out);
1066
1067   /* try to allocate 2 UDP ports, the RTP port should be an even
1068    * number and the RTCP port should be the next (uneven) port */
1069 again:
1070
1071   if (rtp_socket == NULL) {
1072     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1073         G_SOCKET_PROTOCOL_UDP, NULL);
1074     if (!rtp_socket)
1075       goto no_udp_protocol;
1076   }
1077
1078   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1079     GstRTSPAddressFlags flags;
1080
1081     if (addr)
1082       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1083
1084     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1085     if (family == G_SOCKET_FAMILY_IPV6)
1086       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1087     else
1088       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1089
1090     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1091
1092     if (addr == NULL)
1093       goto no_ports;
1094
1095     tmp_rtp = addr->port;
1096
1097     g_clear_object (&inetaddr);
1098     inetaddr = g_inet_address_new_from_string (addr->address);
1099   } else {
1100     if (tmp_rtp != 0) {
1101       tmp_rtp += 2;
1102       if (++count > 20)
1103         goto no_ports;
1104     }
1105
1106     if (inetaddr == NULL)
1107       inetaddr = g_inet_address_new_any (family);
1108   }
1109
1110   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1111   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1112     g_object_unref (rtp_sockaddr);
1113     goto again;
1114   }
1115   g_object_unref (rtp_sockaddr);
1116
1117   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1118   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1119     g_clear_object (&rtp_sockaddr);
1120     goto socket_error;
1121   }
1122
1123   tmp_rtp =
1124       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1125   g_object_unref (rtp_sockaddr);
1126
1127   /* check if port is even */
1128   if ((tmp_rtp & 1) != 0) {
1129     /* port not even, close and allocate another */
1130     tmp_rtp++;
1131     g_clear_object (&rtp_socket);
1132     goto again;
1133   }
1134
1135   /* set port */
1136   tmp_rtcp = tmp_rtp + 1;
1137
1138   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1139   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1140     g_object_unref (rtcp_sockaddr);
1141     g_clear_object (&rtp_socket);
1142     goto again;
1143   }
1144   g_object_unref (rtcp_sockaddr);
1145
1146   g_clear_object (&inetaddr);
1147
1148   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1149   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1150
1151   if (udpsrc0 == NULL || udpsrc1 == NULL)
1152     goto no_udp_protocol;
1153
1154   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1155   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1156
1157   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1158   if (ret == GST_STATE_CHANGE_FAILURE)
1159     goto element_error;
1160   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1161   if (ret == GST_STATE_CHANGE_FAILURE)
1162     goto element_error;
1163
1164   /* all fine, do port check */
1165   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1166   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1167
1168   /* this should not happen... */
1169   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1170     goto port_error;
1171
1172   if (udpsink_out[0])
1173     udpsink0 = udpsink_out[0];
1174   else
1175     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1176
1177   if (!udpsink0)
1178     goto no_udp_protocol;
1179
1180   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1181   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1182
1183   if (udpsink_out[1])
1184     udpsink1 = udpsink_out[1];
1185   else
1186     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1187
1188   if (!udpsink1)
1189     goto no_udp_protocol;
1190
1191   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1192   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1193   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1194
1195   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1196   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1197   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1198   /* Needs to be async for RECORD streams, otherwise we will never go to
1199    * PLAYING because the sinks will wait for data while the udpsrc can't
1200    * provide data with timestamps in PAUSED. */
1201   if (priv->sinkpad)
1202     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1203   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1204   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1205   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1206   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1207   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1208
1209   /* we keep these elements, we will further configure them when the
1210    * client told us to really use the UDP ports. */
1211   udpsrc_out[0] = udpsrc0;
1212   udpsrc_out[1] = udpsrc1;
1213   udpsink_out[0] = udpsink0;
1214   udpsink_out[1] = udpsink1;
1215
1216   server_port_out->min = rtpport;
1217   server_port_out->max = rtcpport;
1218
1219   *server_addr_out = addr;
1220   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1221
1222   g_object_unref (rtp_socket);
1223   g_object_unref (rtcp_socket);
1224
1225   return TRUE;
1226
1227   /* ERRORS */
1228 no_udp_protocol:
1229   {
1230     goto cleanup;
1231   }
1232 no_ports:
1233   {
1234     goto cleanup;
1235   }
1236 port_error:
1237   {
1238     goto cleanup;
1239   }
1240 socket_error:
1241   {
1242     goto cleanup;
1243   }
1244 element_error:
1245   {
1246     goto cleanup;
1247   }
1248 cleanup:
1249   {
1250     if (udpsrc0) {
1251       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1252       gst_object_unref (udpsrc0);
1253     }
1254     if (udpsrc1) {
1255       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1256       gst_object_unref (udpsrc1);
1257     }
1258     if (udpsink0) {
1259       gst_element_set_state (udpsink0, GST_STATE_NULL);
1260       gst_object_unref (udpsink0);
1261     }
1262     if (inetaddr)
1263       g_object_unref (inetaddr);
1264     g_list_free_full (rejected_addresses,
1265         (GDestroyNotify) gst_rtsp_address_free);
1266     if (addr)
1267       gst_rtsp_address_free (addr);
1268     if (rtp_socket)
1269       g_object_unref (rtp_socket);
1270     if (rtcp_socket)
1271       g_object_unref (rtcp_socket);
1272     return FALSE;
1273   }
1274 }
1275
1276 /* must be called with lock */
1277 static gboolean
1278 alloc_ports (GstRTSPStream * stream)
1279 {
1280   GstRTSPStreamPrivate *priv = stream->priv;
1281
1282   priv->have_ipv4 =
1283       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1284       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1285       &priv->server_port_v4, &priv->server_addr_v4);
1286
1287   priv->have_ipv6 =
1288       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1289       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1290       &priv->server_port_v6, &priv->server_addr_v6);
1291
1292   return priv->have_ipv4 || priv->have_ipv6;
1293 }
1294
1295 /**
1296  * gst_rtsp_stream_set_client_side:
1297  * @stream: a #GstRTSPStream
1298  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1299  * an RTSP connection.
1300  *
1301  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1302  * streams to an RTSP server via RECORD. This has the practical effect
1303  * of changing which UDP port numbers are used when setting up the local
1304  * side of the stream sending to be either the 'server' or 'client' pair
1305  * of a configured UDP transport.
1306  */
1307 void
1308 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1309 {
1310   GstRTSPStreamPrivate *priv;
1311
1312   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1313   priv = stream->priv;
1314   g_mutex_lock (&priv->lock);
1315   priv->client_side = client_side;
1316   g_mutex_unlock (&priv->lock);
1317 }
1318
1319 /**
1320  * gst_rtsp_stream_set_client_side:
1321  * @stream: a #GstRTSPStream
1322  *
1323  * See gst_rtsp_stream_set_client_side()
1324  *
1325  * Returns: TRUE if this #GstRTSPStream is client-side.
1326  */
1327 gboolean
1328 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1329 {
1330   GstRTSPStreamPrivate *priv;
1331   gboolean ret;
1332
1333   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1334
1335   priv = stream->priv;
1336   g_mutex_lock (&priv->lock);
1337   ret = priv->client_side;
1338   g_mutex_unlock (&priv->lock);
1339
1340   return ret;
1341 }
1342
1343 /**
1344  * gst_rtsp_stream_get_server_port:
1345  * @stream: a #GstRTSPStream
1346  * @server_port: (out): result server port
1347  * @family: the port family to get
1348  *
1349  * Fill @server_port with the port pair used by the server. This function can
1350  * only be called when @stream has been joined.
1351  */
1352 void
1353 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1354     GstRTSPRange * server_port, GSocketFamily family)
1355 {
1356   GstRTSPStreamPrivate *priv;
1357
1358   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1359   priv = stream->priv;
1360   g_return_if_fail (priv->is_joined);
1361
1362   g_mutex_lock (&priv->lock);
1363   if (family == G_SOCKET_FAMILY_IPV4) {
1364     if (server_port)
1365       *server_port = priv->server_port_v4;
1366   } else {
1367     if (server_port)
1368       *server_port = priv->server_port_v6;
1369   }
1370   g_mutex_unlock (&priv->lock);
1371 }
1372
1373 /**
1374  * gst_rtsp_stream_get_rtpsession:
1375  * @stream: a #GstRTSPStream
1376  *
1377  * Get the RTP session of this stream.
1378  *
1379  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1380  */
1381 GObject *
1382 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1383 {
1384   GstRTSPStreamPrivate *priv;
1385   GObject *session;
1386
1387   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1388
1389   priv = stream->priv;
1390
1391   g_mutex_lock (&priv->lock);
1392   if ((session = priv->session))
1393     g_object_ref (session);
1394   g_mutex_unlock (&priv->lock);
1395
1396   return session;
1397 }
1398
1399 /**
1400  * gst_rtsp_stream_get_ssrc:
1401  * @stream: a #GstRTSPStream
1402  * @ssrc: (out): result ssrc
1403  *
1404  * Get the SSRC used by the RTP session of this stream. This function can only
1405  * be called when @stream has been joined.
1406  */
1407 void
1408 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1409 {
1410   GstRTSPStreamPrivate *priv;
1411
1412   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1413   priv = stream->priv;
1414   g_return_if_fail (priv->is_joined);
1415
1416   g_mutex_lock (&priv->lock);
1417   if (ssrc && priv->session)
1418     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1419   g_mutex_unlock (&priv->lock);
1420 }
1421
1422 /**
1423  * gst_rtsp_stream_set_retransmission_time:
1424  * @stream: a #GstRTSPStream
1425  * @time: a #GstClockTime
1426  *
1427  * Set the amount of time to store retransmission packets.
1428  */
1429 void
1430 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1431     GstClockTime time)
1432 {
1433   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1434
1435   g_mutex_lock (&stream->priv->lock);
1436   stream->priv->rtx_time = time;
1437   if (stream->priv->rtxsend)
1438     g_object_set (stream->priv->rtxsend, "max-size-time",
1439         GST_TIME_AS_MSECONDS (time), NULL);
1440   g_mutex_unlock (&stream->priv->lock);
1441 }
1442
1443 /**
1444  * gst_rtsp_stream_get_retransmission_time:
1445  * @stream: a #GstRTSPStream
1446  *
1447  * Get the amount of time to store retransmission data.
1448  *
1449  * Returns: the amount of time to store retransmission data.
1450  */
1451 GstClockTime
1452 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1453 {
1454   GstClockTime ret;
1455
1456   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1457
1458   g_mutex_lock (&stream->priv->lock);
1459   ret = stream->priv->rtx_time;
1460   g_mutex_unlock (&stream->priv->lock);
1461
1462   return ret;
1463 }
1464
1465 /**
1466  * gst_rtsp_stream_set_retransmission_pt:
1467  * @stream: a #GstRTSPStream
1468  * @rtx_pt: a #guint
1469  *
1470  * Set the payload type (pt) for retransmission of this stream.
1471  */
1472 void
1473 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1474 {
1475   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1476
1477   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1478
1479   g_mutex_lock (&stream->priv->lock);
1480   stream->priv->rtx_pt = rtx_pt;
1481   if (stream->priv->rtxsend) {
1482     guint pt = gst_rtsp_stream_get_pt (stream);
1483     gchar *pt_s = g_strdup_printf ("%d", pt);
1484     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1485         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1486     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1487     g_free (pt_s);
1488     gst_structure_free (rtx_pt_map);
1489   }
1490   g_mutex_unlock (&stream->priv->lock);
1491 }
1492
1493 /**
1494  * gst_rtsp_stream_get_retransmission_pt:
1495  * @stream: a #GstRTSPStream
1496  *
1497  * Get the payload-type used for retransmission of this stream
1498  *
1499  * Returns: The retransmission PT.
1500  */
1501 guint
1502 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1503 {
1504   guint rtx_pt;
1505
1506   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1507
1508   g_mutex_lock (&stream->priv->lock);
1509   rtx_pt = stream->priv->rtx_pt;
1510   g_mutex_unlock (&stream->priv->lock);
1511
1512   return rtx_pt;
1513 }
1514
1515 /**
1516  * gst_rtsp_stream_set_buffer_size:
1517  * @stream: a #GstRTSPStream
1518  * @size: the buffer size
1519  *
1520  * Set the size of the UDP transmission buffer (in bytes)
1521  * Needs to be set before the stream is joined to a bin.
1522  *
1523  * Since: 1.6
1524  */
1525 void
1526 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1527 {
1528   g_mutex_lock (&stream->priv->lock);
1529   stream->priv->buffer_size = size;
1530   g_mutex_unlock (&stream->priv->lock);
1531 }
1532
1533 /**
1534  * gst_rtsp_stream_get_buffer_size:
1535  * @stream: a #GstRTSPStream
1536  *
1537  * Get the size of the UDP transmission buffer (in bytes)
1538  *
1539  * Returns: the size of the UDP TX buffer
1540  *
1541  * Since: 1.6
1542  */
1543 guint
1544 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1545 {
1546   guint buffer_size;
1547
1548   g_mutex_lock (&stream->priv->lock);
1549   buffer_size = stream->priv->buffer_size;
1550   g_mutex_unlock (&stream->priv->lock);
1551
1552   return buffer_size;
1553 }
1554
1555 /* executed from streaming thread */
1556 static void
1557 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1558 {
1559   GstRTSPStreamPrivate *priv = stream->priv;
1560   GstCaps *newcaps, *oldcaps;
1561
1562   newcaps = gst_pad_get_current_caps (pad);
1563
1564   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1565       newcaps);
1566
1567   g_mutex_lock (&priv->lock);
1568   oldcaps = priv->caps;
1569   priv->caps = newcaps;
1570   g_mutex_unlock (&priv->lock);
1571
1572   if (oldcaps)
1573     gst_caps_unref (oldcaps);
1574 }
1575
1576 static void
1577 dump_structure (const GstStructure * s)
1578 {
1579   gchar *sstr;
1580
1581   sstr = gst_structure_to_string (s);
1582   GST_INFO ("structure: %s", sstr);
1583   g_free (sstr);
1584 }
1585
1586 static GstRTSPStreamTransport *
1587 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1588 {
1589   GstRTSPStreamPrivate *priv = stream->priv;
1590   GList *walk;
1591   GstRTSPStreamTransport *result = NULL;
1592   const gchar *tmp;
1593   gchar *dest;
1594   guint port;
1595
1596   if (rtcp_from == NULL)
1597     return NULL;
1598
1599   tmp = g_strrstr (rtcp_from, ":");
1600   if (tmp == NULL)
1601     return NULL;
1602
1603   port = atoi (tmp + 1);
1604   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1605
1606   g_mutex_lock (&priv->lock);
1607   GST_INFO ("finding %s:%d in %d transports", dest, port,
1608       g_list_length (priv->transports));
1609
1610   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1611     GstRTSPStreamTransport *trans = walk->data;
1612     const GstRTSPTransport *tr;
1613     gint min, max;
1614
1615     tr = gst_rtsp_stream_transport_get_transport (trans);
1616
1617     if (priv->client_side) {
1618       /* In client side mode the 'destination' is the RTSP server, so send
1619        * to those ports */
1620       min = tr->server_port.min;
1621       max = tr->server_port.max;
1622     } else {
1623       min = tr->client_port.min;
1624       max = tr->client_port.max;
1625     }
1626
1627     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1628       result = trans;
1629       break;
1630     }
1631   }
1632   if (result)
1633     g_object_ref (result);
1634   g_mutex_unlock (&priv->lock);
1635
1636   g_free (dest);
1637
1638   return result;
1639 }
1640
1641 static GstRTSPStreamTransport *
1642 check_transport (GObject * source, GstRTSPStream * stream)
1643 {
1644   GstStructure *stats;
1645   GstRTSPStreamTransport *trans;
1646
1647   /* see if we have a stream to match with the origin of the RTCP packet */
1648   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1649   if (trans == NULL) {
1650     g_object_get (source, "stats", &stats, NULL);
1651     if (stats) {
1652       const gchar *rtcp_from;
1653
1654       dump_structure (stats);
1655
1656       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1657       if ((trans = find_transport (stream, rtcp_from))) {
1658         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1659             source);
1660         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1661             g_object_unref);
1662       }
1663       gst_structure_free (stats);
1664     }
1665   }
1666   return trans;
1667 }
1668
1669
1670 static void
1671 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1672 {
1673   GstRTSPStreamTransport *trans;
1674
1675   GST_INFO ("%p: new source %p", stream, source);
1676
1677   trans = check_transport (source, stream);
1678
1679   if (trans)
1680     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1681 }
1682
1683 static void
1684 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1685 {
1686   GST_INFO ("%p: new SDES %p", stream, source);
1687 }
1688
1689 static void
1690 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1691 {
1692   GstRTSPStreamTransport *trans;
1693
1694   trans = check_transport (source, stream);
1695
1696   if (trans) {
1697     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1698     gst_rtsp_stream_transport_keep_alive (trans);
1699   }
1700 #ifdef DUMP_STATS
1701   {
1702     GstStructure *stats;
1703     g_object_get (source, "stats", &stats, NULL);
1704     if (stats) {
1705       dump_structure (stats);
1706       gst_structure_free (stats);
1707     }
1708   }
1709 #endif
1710 }
1711
1712 static void
1713 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1714 {
1715   GST_INFO ("%p: source %p bye", stream, source);
1716 }
1717
1718 static void
1719 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1720 {
1721   GstRTSPStreamTransport *trans;
1722
1723   GST_INFO ("%p: source %p bye timeout", stream, source);
1724
1725   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1726     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1727     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1728   }
1729 }
1730
1731 static void
1732 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1733 {
1734   GstRTSPStreamTransport *trans;
1735
1736   GST_INFO ("%p: source %p timeout", stream, source);
1737
1738   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1739     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1740     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1741   }
1742 }
1743
1744 static void
1745 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1746 {
1747   GST_INFO ("%p: new sender source %p", stream, source);
1748 #ifndef DUMP_STATS
1749   {
1750     GstStructure *stats;
1751     g_object_get (source, "stats", &stats, NULL);
1752     if (stats) {
1753       dump_structure (stats);
1754       gst_structure_free (stats);
1755     }
1756   }
1757 #endif
1758 }
1759
1760 static void
1761 on_sender_ssrc_active (GObject * session, GObject * source,
1762     GstRTSPStream * stream)
1763 {
1764 #ifndef DUMP_STATS
1765   {
1766     GstStructure *stats;
1767     g_object_get (source, "stats", &stats, NULL);
1768     if (stats) {
1769       dump_structure (stats);
1770       gst_structure_free (stats);
1771     }
1772   }
1773 #endif
1774 }
1775
1776 static void
1777 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1778 {
1779   if (is_rtp) {
1780     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1781     g_list_free (priv->tr_cache_rtp);
1782     priv->tr_cache_rtp = NULL;
1783   } else {
1784     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1785     g_list_free (priv->tr_cache_rtcp);
1786     priv->tr_cache_rtcp = NULL;
1787   }
1788 }
1789
1790 static GstFlowReturn
1791 handle_new_sample (GstAppSink * sink, gpointer user_data)
1792 {
1793   GstRTSPStreamPrivate *priv;
1794   GList *walk;
1795   GstSample *sample;
1796   GstBuffer *buffer;
1797   GstRTSPStream *stream;
1798   gboolean is_rtp;
1799
1800   sample = gst_app_sink_pull_sample (sink);
1801   if (!sample)
1802     return GST_FLOW_OK;
1803
1804   stream = (GstRTSPStream *) user_data;
1805   priv = stream->priv;
1806   buffer = gst_sample_get_buffer (sample);
1807
1808   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1809
1810   g_mutex_lock (&priv->lock);
1811   if (is_rtp) {
1812     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1813       clear_tr_cache (priv, is_rtp);
1814       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1815         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1816         priv->tr_cache_rtp =
1817             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1818       }
1819       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1820     }
1821   } else {
1822     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1823       clear_tr_cache (priv, is_rtp);
1824       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1825         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1826         priv->tr_cache_rtcp =
1827             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1828       }
1829       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1830     }
1831   }
1832   g_mutex_unlock (&priv->lock);
1833
1834   if (is_rtp) {
1835     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1836       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1837       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1838     }
1839   } else {
1840     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1841       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1842       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1843     }
1844   }
1845   gst_sample_unref (sample);
1846
1847   return GST_FLOW_OK;
1848 }
1849
1850 static GstAppSinkCallbacks sink_cb = {
1851   NULL,                         /* not interested in EOS */
1852   NULL,                         /* not interested in preroll samples */
1853   handle_new_sample,
1854 };
1855
1856 static GstElement *
1857 get_rtp_encoder (GstRTSPStream * stream, guint session)
1858 {
1859   GstRTSPStreamPrivate *priv = stream->priv;
1860
1861   if (priv->srtpenc == NULL) {
1862     gchar *name;
1863
1864     name = g_strdup_printf ("srtpenc_%u", session);
1865     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1866     g_free (name);
1867
1868     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1869   }
1870   return gst_object_ref (priv->srtpenc);
1871 }
1872
1873 static GstElement *
1874 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1875 {
1876   GstRTSPStreamPrivate *priv = stream->priv;
1877   GstElement *oldenc, *enc;
1878   GstPad *pad;
1879   gchar *name;
1880
1881   if (priv->idx != session)
1882     return NULL;
1883
1884   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1885
1886   oldenc = priv->srtpenc;
1887   enc = get_rtp_encoder (stream, session);
1888   name = g_strdup_printf ("rtp_sink_%d", session);
1889   pad = gst_element_get_request_pad (enc, name);
1890   g_free (name);
1891   gst_object_unref (pad);
1892
1893   if (oldenc == NULL)
1894     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1895         enc);
1896
1897   return enc;
1898 }
1899
1900 static GstElement *
1901 request_rtcp_encoder (GstElement * rtpbin, guint session,
1902     GstRTSPStream * stream)
1903 {
1904   GstRTSPStreamPrivate *priv = stream->priv;
1905   GstElement *oldenc, *enc;
1906   GstPad *pad;
1907   gchar *name;
1908
1909   if (priv->idx != session)
1910     return NULL;
1911
1912   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1913
1914   oldenc = priv->srtpenc;
1915   enc = get_rtp_encoder (stream, session);
1916   name = g_strdup_printf ("rtcp_sink_%d", session);
1917   pad = gst_element_get_request_pad (enc, name);
1918   g_free (name);
1919   gst_object_unref (pad);
1920
1921   if (oldenc == NULL)
1922     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1923         enc);
1924
1925   return enc;
1926 }
1927
1928 static GstCaps *
1929 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1930 {
1931   GstRTSPStreamPrivate *priv = stream->priv;
1932   GstCaps *caps;
1933
1934   GST_DEBUG ("request key %08x", ssrc);
1935
1936   g_mutex_lock (&priv->lock);
1937   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1938     gst_caps_ref (caps);
1939   g_mutex_unlock (&priv->lock);
1940
1941   return caps;
1942 }
1943
1944 static GstElement *
1945 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1946     GstRTSPStream * stream)
1947 {
1948   GstRTSPStreamPrivate *priv = stream->priv;
1949
1950   if (priv->idx != session)
1951     return NULL;
1952
1953   if (priv->srtpdec == NULL) {
1954     gchar *name;
1955
1956     name = g_strdup_printf ("srtpdec_%u", session);
1957     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1958     g_free (name);
1959
1960     g_signal_connect (priv->srtpdec, "request-key",
1961         (GCallback) request_key, stream);
1962   }
1963   return gst_object_ref (priv->srtpdec);
1964 }
1965
1966 /**
1967  * gst_rtsp_stream_request_aux_sender:
1968  * @stream: a #GstRTSPStream
1969  * @sessid: the session id
1970  *
1971  * Creating a rtxsend bin
1972  *
1973  * Returns: (transfer full): a #GstElement.
1974  *
1975  * Since: 1.6
1976  */
1977 GstElement *
1978 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
1979 {
1980   GstElement *bin;
1981   GstPad *pad;
1982   GstStructure *pt_map;
1983   gchar *name;
1984   guint pt, rtx_pt;
1985   gchar *pt_s;
1986
1987   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1988
1989   pt = gst_rtsp_stream_get_pt (stream);
1990   pt_s = g_strdup_printf ("%u", pt);
1991   rtx_pt = stream->priv->rtx_pt;
1992
1993   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1994
1995   bin = gst_bin_new (NULL);
1996   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1997   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1998       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1999   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2000       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2001   g_free (pt_s);
2002   gst_structure_free (pt_map);
2003   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2004
2005   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2006   name = g_strdup_printf ("src_%u", sessid);
2007   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2008   g_free (name);
2009   gst_object_unref (pad);
2010
2011   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2012   name = g_strdup_printf ("sink_%u", sessid);
2013   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2014   g_free (name);
2015   gst_object_unref (pad);
2016
2017   return bin;
2018 }
2019
2020 /**
2021  * gst_rtsp_stream_set_pt_map:
2022  * @stream: a #GstRTSPStream
2023  * @pt: the pt
2024  * @caps: a #GstCaps
2025  *
2026  * Configure a pt map between @pt and @caps.
2027  */
2028 void
2029 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2030 {
2031   GstRTSPStreamPrivate *priv = stream->priv;
2032
2033   g_mutex_lock (&priv->lock);
2034   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2035   g_mutex_unlock (&priv->lock);
2036 }
2037
2038 static GstCaps *
2039 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2040     GstRTSPStream * stream)
2041 {
2042   GstRTSPStreamPrivate *priv = stream->priv;
2043   GstCaps *caps = NULL;
2044
2045   g_mutex_lock (&priv->lock);
2046
2047   if (priv->idx == session) {
2048     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2049     if (caps) {
2050       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2051       gst_caps_ref (caps);
2052     } else {
2053       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2054     }
2055   }
2056
2057   g_mutex_unlock (&priv->lock);
2058
2059   return caps;
2060 }
2061
2062 static void
2063 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2064 {
2065   GstRTSPStreamPrivate *priv = stream->priv;
2066   gchar *name;
2067   GstPadLinkReturn ret;
2068   guint sessid;
2069
2070   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2071       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2072
2073   name = gst_pad_get_name (pad);
2074   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2075     g_free (name);
2076     return;
2077   }
2078   g_free (name);
2079
2080   if (priv->idx != sessid)
2081     return;
2082
2083   if (gst_pad_is_linked (priv->sinkpad)) {
2084     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2085         GST_DEBUG_PAD_NAME (priv->sinkpad));
2086     return;
2087   }
2088
2089   /* link the RTP pad to the session manager, it should not really fail unless
2090    * this is not really an RTP pad */
2091   ret = gst_pad_link (pad, priv->sinkpad);
2092   if (ret != GST_PAD_LINK_OK)
2093     goto link_failed;
2094   priv->recv_rtp_src = gst_object_ref (pad);
2095
2096   return;
2097
2098 /* ERRORS */
2099 link_failed:
2100   {
2101     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2102         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2103   }
2104 }
2105
2106 static void
2107 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2108     GstRTSPStream * stream)
2109 {
2110   /* TODO: What to do here other than this? */
2111   GST_DEBUG ("Stream %p: Got EOS", stream);
2112   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2113 }
2114
2115 /**
2116  * gst_rtsp_stream_join_bin:
2117  * @stream: a #GstRTSPStream
2118  * @bin: (transfer none): a #GstBin to join
2119  * @rtpbin: (transfer none): a rtpbin element in @bin
2120  * @state: the target state of the new elements
2121  *
2122  * Join the #GstBin @bin that contains the element @rtpbin.
2123  *
2124  * @stream will link to @rtpbin, which must be inside @bin. The elements
2125  * added to @bin will be set to the state given in @state.
2126  *
2127  * Returns: %TRUE on success.
2128  */
2129 gboolean
2130 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2131     GstElement * rtpbin, GstState state)
2132 {
2133   GstRTSPStreamPrivate *priv;
2134   gint i;
2135   guint idx;
2136   gchar *name;
2137   GstPad *pad, *sinkpad = NULL, *selpad;
2138   GstPadLinkReturn ret;
2139   gboolean is_tcp = FALSE, is_udp = FALSE;
2140
2141   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2142   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2143   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2144
2145   priv = stream->priv;
2146
2147   g_mutex_lock (&priv->lock);
2148   if (priv->is_joined)
2149     goto was_joined;
2150
2151   /* create a session with the same index as the stream */
2152   idx = priv->idx;
2153
2154   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2155
2156   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2157
2158   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2159       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2160
2161   if (is_udp && !alloc_ports (stream))
2162     goto no_ports;
2163
2164   /* update the dscp qos field in the sinks */
2165   update_dscp_qos (stream);
2166
2167   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2168       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2169     /* For SRTP */
2170     g_signal_connect (rtpbin, "request-rtp-encoder",
2171         (GCallback) request_rtp_encoder, stream);
2172     g_signal_connect (rtpbin, "request-rtcp-encoder",
2173         (GCallback) request_rtcp_encoder, stream);
2174     g_signal_connect (rtpbin, "request-rtp-decoder",
2175         (GCallback) request_rtp_rtcp_decoder, stream);
2176     g_signal_connect (rtpbin, "request-rtcp-decoder",
2177         (GCallback) request_rtp_rtcp_decoder, stream);
2178   }
2179
2180   if (priv->sinkpad) {
2181     g_signal_connect (rtpbin, "request-pt-map",
2182         (GCallback) request_pt_map, stream);
2183   }
2184
2185   /* get pads from the RTP session element for sending and receiving
2186    * RTP/RTCP*/
2187   if (priv->srcpad) {
2188     /* get a pad for sending RTP */
2189     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2190     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2191     g_free (name);
2192
2193     /* link the RTP pad to the session manager, it should not really fail unless
2194      * this is not really an RTP pad */
2195     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2196     if (ret != GST_PAD_LINK_OK)
2197       goto link_failed;
2198
2199     name = g_strdup_printf ("send_rtp_src_%u", idx);
2200     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2201     g_free (name);
2202   } else {
2203     /* Need to connect our sinkpad from here */
2204     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2205     /* EOS */
2206     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2207
2208     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2209     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2210     g_free (name);
2211   }
2212
2213   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2214   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2215   g_free (name);
2216   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2217   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2218   g_free (name);
2219
2220   /* get the session */
2221   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2222
2223   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2224       stream);
2225   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2226       stream);
2227   g_signal_connect (priv->session, "on-ssrc-active",
2228       (GCallback) on_ssrc_active, stream);
2229   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2230       stream);
2231   g_signal_connect (priv->session, "on-bye-timeout",
2232       (GCallback) on_bye_timeout, stream);
2233   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2234       stream);
2235
2236   /* signal for sender ssrc */
2237   g_signal_connect (priv->session, "on-new-sender-ssrc",
2238       (GCallback) on_new_sender_ssrc, stream);
2239   g_signal_connect (priv->session, "on-sender-ssrc-active",
2240       (GCallback) on_sender_ssrc_active, stream);
2241
2242   for (i = 0; i < 2; i++) {
2243     GstPad *teepad, *queuepad;
2244     /* For the sender we create this bit of pipeline for both
2245      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2246      * we need to add a queue before appsink and udpsink to make
2247      * the pipeline not block. For the TCP case, we want to pump
2248      * client as fast as possible anyway. This pipeline is used
2249      * when both TCP and UDP are present.
2250      *
2251      * .--------.      .-----.    .---------.    .---------.
2252      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2253      * |       send->sink   src->sink      src->sink       |
2254      * '--------'      |     |    '---------'    '---------'
2255      *                 |     |    .---------.    .---------.
2256      *                 |     |    |  queue  |    | appsink |
2257      *                 |    src->sink      src->sink       |
2258      *                 '-----'    '---------'    '---------'
2259      *
2260      * When only UDP or only TCP is allowed, we skip the tee and queue
2261      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2262      * the session.
2263      */
2264
2265     /* Only link the RTP send src if we're going to send RTP, link
2266      * the RTCP send src always */
2267     if (priv->srcpad || i == 1) {
2268       if (is_udp) {
2269         /* add udpsink */
2270         gst_bin_add (bin, priv->udpsink[i]);
2271         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2272       }
2273
2274       if (is_tcp) {
2275         /* make appsink */
2276         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2277         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2278         gst_bin_add (bin, priv->appsink[i]);
2279         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2280             &sink_cb, stream, NULL);
2281       }
2282
2283       if (is_udp && is_tcp) {
2284         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2285
2286         /* make tee for RTP/RTCP */
2287         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2288         gst_bin_add (bin, priv->tee[i]);
2289
2290         /* and link to rtpbin send pad */
2291         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2292         gst_pad_link (priv->send_src[i], pad);
2293         gst_object_unref (pad);
2294
2295         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2296         g_object_set (priv->udpqueue[i], "max-size-buffers",
2297             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2298             NULL);
2299         gst_bin_add (bin, priv->udpqueue[i]);
2300         /* link tee to udpqueue */
2301         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2302         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2303         gst_pad_link (teepad, pad);
2304         gst_object_unref (pad);
2305         gst_object_unref (teepad);
2306
2307         /* link udpqueue to udpsink */
2308         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2309         gst_pad_link (queuepad, sinkpad);
2310         gst_object_unref (queuepad);
2311         gst_object_unref (sinkpad);
2312
2313         /* make appqueue */
2314         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2315         g_object_set (priv->appqueue[i], "max-size-buffers",
2316             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2317             NULL);
2318         gst_bin_add (bin, priv->appqueue[i]);
2319         /* and link tee to appqueue */
2320         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2321         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2322         gst_pad_link (teepad, pad);
2323         gst_object_unref (pad);
2324         gst_object_unref (teepad);
2325
2326         /* and link appqueue to appsink */
2327         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2328         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2329         gst_pad_link (queuepad, pad);
2330         gst_object_unref (pad);
2331         gst_object_unref (queuepad);
2332       } else if (is_tcp) {
2333         /* only appsink needed, link it to the session */
2334         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2335         gst_pad_link (priv->send_src[i], pad);
2336         gst_object_unref (pad);
2337
2338         /* when its only TCP, we need to set sync and preroll to FALSE
2339          * for the sink to avoid deadlock. And this is only needed for
2340          * sink used for RTCP data, not the RTP data. */
2341         if (i == 1)
2342           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2343       } else {
2344         /* else only udpsink needed, link it to the session */
2345         gst_pad_link (priv->send_src[i], sinkpad);
2346         gst_object_unref (sinkpad);
2347       }
2348     }
2349
2350     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2351      * RTCP sink always */
2352     if (priv->sinkpad || i == 1) {
2353       /* For the receiver we create this bit of pipeline for both
2354        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2355        * and it is all funneled into the rtpbin receive pad.
2356        *
2357        * .--------.     .--------.    .--------.
2358        * | udpsrc |     | funnel |    | rtpbin |
2359        * |       src->sink      src->sink      |
2360        * '--------'     |        |    '--------'
2361        * .--------.     |        |
2362        * | appsrc |     |        |
2363        * |       src->sink       |
2364        * '--------'     '--------'
2365        */
2366       /* make funnel for the RTP/RTCP receivers */
2367       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2368       gst_bin_add (bin, priv->funnel[i]);
2369
2370       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2371       gst_pad_link (pad, priv->recv_sink[i]);
2372       gst_object_unref (pad);
2373
2374       if (priv->udpsrc_v4[i]) {
2375         if (priv->srcpad) {
2376           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2377            * values. This is only relevant for PLAY pipelines */
2378           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2379           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2380         }
2381         /* add udpsrc */
2382         gst_bin_add (bin, priv->udpsrc_v4[i]);
2383
2384         /* and link to the funnel v4 */
2385         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2386         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2387         gst_pad_link (pad, selpad);
2388         gst_object_unref (pad);
2389         gst_object_unref (selpad);
2390       }
2391
2392       if (priv->udpsrc_v6[i]) {
2393         if (priv->srcpad) {
2394           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2395           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2396         }
2397         gst_bin_add (bin, priv->udpsrc_v6[i]);
2398
2399         /* and link to the funnel v6 */
2400         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2401         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2402         gst_pad_link (pad, selpad);
2403         gst_object_unref (pad);
2404         gst_object_unref (selpad);
2405       }
2406
2407       if (is_tcp) {
2408         /* make and add appsrc */
2409         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2410         priv->appsrc_base_time[i] = -1;
2411         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2412         gst_bin_add (bin, priv->appsrc[i]);
2413         /* and link to the funnel */
2414         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2415         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2416         gst_pad_link (pad, selpad);
2417         gst_object_unref (pad);
2418         gst_object_unref (selpad);
2419       }
2420     }
2421
2422     /* check if we need to set to a special state */
2423     if (state != GST_STATE_NULL) {
2424       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2425         gst_element_set_state (priv->udpsink[i], state);
2426       if (priv->appsink[i] && (priv->srcpad || i == 1))
2427         gst_element_set_state (priv->appsink[i], state);
2428       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2429         gst_element_set_state (priv->appqueue[i], state);
2430       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2431         gst_element_set_state (priv->udpqueue[i], state);
2432       if (priv->tee[i] && (priv->srcpad || i == 1))
2433         gst_element_set_state (priv->tee[i], state);
2434       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2435         gst_element_set_state (priv->funnel[i], state);
2436       if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2437         gst_element_set_state (priv->appsrc[i], state);
2438     }
2439   }
2440
2441   if (priv->srcpad) {
2442     /* be notified of caps changes */
2443     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2444         (GCallback) caps_notify, stream);
2445   }
2446
2447   priv->is_joined = TRUE;
2448   g_mutex_unlock (&priv->lock);
2449
2450   return TRUE;
2451
2452   /* ERRORS */
2453 was_joined:
2454   {
2455     g_mutex_unlock (&priv->lock);
2456     return TRUE;
2457   }
2458 no_ports:
2459   {
2460     g_mutex_unlock (&priv->lock);
2461     GST_WARNING ("failed to allocate ports %u", idx);
2462     return FALSE;
2463   }
2464 link_failed:
2465   {
2466     GST_WARNING ("failed to link stream %u", idx);
2467     gst_object_unref (priv->send_rtp_sink);
2468     priv->send_rtp_sink = NULL;
2469     g_mutex_unlock (&priv->lock);
2470     return FALSE;
2471   }
2472 }
2473
2474 /**
2475  * gst_rtsp_stream_leave_bin:
2476  * @stream: a #GstRTSPStream
2477  * @bin: (transfer none): a #GstBin
2478  * @rtpbin: (transfer none): a rtpbin #GstElement
2479  *
2480  * Remove the elements of @stream from @bin.
2481  *
2482  * Return: %TRUE on success.
2483  */
2484 gboolean
2485 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2486     GstElement * rtpbin)
2487 {
2488   GstRTSPStreamPrivate *priv;
2489   gint i;
2490   GList *l;
2491   gboolean is_tcp, is_udp;
2492
2493   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2494   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2495   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2496
2497   priv = stream->priv;
2498
2499   g_mutex_lock (&priv->lock);
2500   if (!priv->is_joined)
2501     goto was_not_joined;
2502
2503   /* all transports must be removed by now */
2504   if (priv->transports != NULL)
2505     goto transports_not_removed;
2506
2507   clear_tr_cache (priv, TRUE);
2508   clear_tr_cache (priv, FALSE);
2509
2510   GST_INFO ("stream %p leaving bin", stream);
2511
2512   if (priv->srcpad) {
2513     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2514
2515     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2516     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2517     gst_object_unref (priv->send_rtp_sink);
2518     priv->send_rtp_sink = NULL;
2519   } else if (priv->recv_rtp_src) {
2520     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2521     gst_object_unref (priv->recv_rtp_src);
2522     priv->recv_rtp_src = NULL;
2523   }
2524
2525   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2526
2527   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2528       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2529
2530
2531   for (i = 0; i < 2; i++) {
2532     if (priv->udpsink[i])
2533       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2534     if (priv->appsink[i])
2535       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2536     if (priv->appqueue[i])
2537       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2538     if (priv->udpqueue[i])
2539       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2540     if (priv->tee[i])
2541       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2542     if (priv->funnel[i])
2543       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2544     if (priv->appsrc[i])
2545       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2546
2547     if (priv->udpsrc_v4[i]) {
2548       if (priv->sinkpad || i == 1) {
2549         /* and set udpsrc to NULL now before removing */
2550         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2551         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2552         /* removing them should also nicely release the request
2553          * pads when they finalize */
2554         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2555       } else {
2556         /* we need to set the state to NULL before unref */
2557         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2558         gst_object_unref (priv->udpsrc_v4[i]);
2559       }
2560     }
2561
2562     if (priv->udpsrc_v6[i]) {
2563       if (priv->sinkpad || i == 1) {
2564         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2565         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2566         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2567       } else {
2568         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2569         gst_object_unref (priv->udpsrc_v6[i]);
2570       }
2571     }
2572
2573     for (l = priv->transport_sources; l; l = l->next) {
2574       GstRTSPMulticastTransportSource *s = l->data;
2575
2576       if (!s->udpsrc[i])
2577         continue;
2578
2579       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2580       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2581       gst_bin_remove (bin, s->udpsrc[i]);
2582     }
2583
2584     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
2585       gst_bin_remove (bin, priv->udpsink[i]);
2586     if (priv->appsrc[i] && (priv->sinkpad || i == 1))
2587       gst_bin_remove (bin, priv->appsrc[i]);
2588     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
2589       gst_bin_remove (bin, priv->appsink[i]);
2590     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2591       gst_bin_remove (bin, priv->appqueue[i]);
2592     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2593       gst_bin_remove (bin, priv->udpqueue[i]);
2594     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
2595       gst_bin_remove (bin, priv->tee[i]);
2596     if (priv->funnel[i] && (priv->sinkpad || i == 1))
2597       gst_bin_remove (bin, priv->funnel[i]);
2598
2599     if (priv->sinkpad || i == 1) {
2600       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2601       gst_object_unref (priv->recv_sink[i]);
2602       priv->recv_sink[i] = NULL;
2603     }
2604
2605     priv->udpsrc_v4[i] = NULL;
2606     priv->udpsrc_v6[i] = NULL;
2607     priv->udpsink[i] = NULL;
2608     priv->appsrc[i] = NULL;
2609     priv->appsink[i] = NULL;
2610     priv->appqueue[i] = NULL;
2611     priv->udpqueue[i] = NULL;
2612     priv->tee[i] = NULL;
2613     priv->funnel[i] = NULL;
2614   }
2615
2616   for (l = priv->transport_sources; l; l = l->next) {
2617     GstRTSPMulticastTransportSource *s = l->data;
2618     g_slice_free (GstRTSPMulticastTransportSource, s);
2619   }
2620   g_list_free (priv->transport_sources);
2621   priv->transport_sources = NULL;
2622
2623   if (priv->srcpad) {
2624     gst_object_unref (priv->send_src[0]);
2625     priv->send_src[0] = NULL;
2626   }
2627
2628   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2629   gst_object_unref (priv->send_src[1]);
2630   priv->send_src[1] = NULL;
2631
2632   g_object_unref (priv->session);
2633   priv->session = NULL;
2634   if (priv->caps)
2635     gst_caps_unref (priv->caps);
2636   priv->caps = NULL;
2637
2638   if (priv->srtpenc)
2639     gst_object_unref (priv->srtpenc);
2640   if (priv->srtpdec)
2641     gst_object_unref (priv->srtpdec);
2642
2643   priv->is_joined = FALSE;
2644   g_mutex_unlock (&priv->lock);
2645
2646   return TRUE;
2647
2648 was_not_joined:
2649   {
2650     g_mutex_unlock (&priv->lock);
2651     return TRUE;
2652   }
2653 transports_not_removed:
2654   {
2655     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2656     g_mutex_unlock (&priv->lock);
2657     return FALSE;
2658   }
2659 }
2660
2661 /**
2662  * gst_rtsp_stream_get_rtpinfo:
2663  * @stream: a #GstRTSPStream
2664  * @rtptime: (allow-none): result RTP timestamp
2665  * @seq: (allow-none): result RTP seqnum
2666  * @clock_rate: (allow-none): the clock rate
2667  * @running_time: (allow-none): result running-time
2668  *
2669  * Retrieve the current rtptime, seq and running-time. This is used to
2670  * construct a RTPInfo reply header.
2671  *
2672  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2673  */
2674 gboolean
2675 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2676     guint * rtptime, guint * seq, guint * clock_rate,
2677     GstClockTime * running_time)
2678 {
2679   GstRTSPStreamPrivate *priv;
2680   GstStructure *stats;
2681   GObjectClass *payobjclass;
2682
2683   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2684
2685   priv = stream->priv;
2686
2687   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2688
2689   g_mutex_lock (&priv->lock);
2690
2691   /* First try to extract the information from the last buffer on the sinks.
2692    * This will have a more accurate sequence number and timestamp, as between
2693    * the payloader and the sink there can be some queues
2694    */
2695   if (priv->udpsink[0] || priv->appsink[0]) {
2696     GstSample *last_sample;
2697
2698     if (priv->udpsink[0])
2699       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
2700     else
2701       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
2702
2703     if (last_sample) {
2704       GstCaps *caps;
2705       GstBuffer *buffer;
2706       GstSegment *segment;
2707       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
2708
2709       caps = gst_sample_get_caps (last_sample);
2710       buffer = gst_sample_get_buffer (last_sample);
2711       segment = gst_sample_get_segment (last_sample);
2712
2713       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
2714         if (seq) {
2715           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
2716         }
2717
2718         if (rtptime) {
2719           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
2720         }
2721
2722         gst_rtp_buffer_unmap (&rtp_buffer);
2723
2724         if (running_time) {
2725           *running_time =
2726               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
2727               GST_BUFFER_TIMESTAMP (buffer));
2728         }
2729
2730         if (clock_rate) {
2731           GstStructure *s = gst_caps_get_structure (caps, 0);
2732
2733           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
2734
2735           if (*clock_rate == 0 && running_time)
2736             *running_time = GST_CLOCK_TIME_NONE;
2737         }
2738         gst_sample_unref (last_sample);
2739
2740         goto done;
2741       } else {
2742         gst_sample_unref (last_sample);
2743       }
2744     }
2745   }
2746
2747   if (g_object_class_find_property (payobjclass, "stats")) {
2748     g_object_get (priv->payloader, "stats", &stats, NULL);
2749     if (stats == NULL)
2750       goto no_stats;
2751
2752     if (seq)
2753       gst_structure_get_uint (stats, "seqnum", seq);
2754
2755     if (rtptime)
2756       gst_structure_get_uint (stats, "timestamp", rtptime);
2757
2758     if (running_time)
2759       gst_structure_get_clock_time (stats, "running-time", running_time);
2760
2761     if (clock_rate) {
2762       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2763       if (*clock_rate == 0 && running_time)
2764         *running_time = GST_CLOCK_TIME_NONE;
2765     }
2766     gst_structure_free (stats);
2767   } else {
2768     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2769         !g_object_class_find_property (payobjclass, "timestamp"))
2770       goto no_stats;
2771
2772     if (seq)
2773       g_object_get (priv->payloader, "seqnum", seq, NULL);
2774
2775     if (rtptime)
2776       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2777
2778     if (running_time)
2779       *running_time = GST_CLOCK_TIME_NONE;
2780   }
2781
2782 done:
2783   g_mutex_unlock (&priv->lock);
2784
2785   return TRUE;
2786
2787   /* ERRORS */
2788 no_stats:
2789   {
2790     GST_WARNING ("Could not get payloader stats");
2791     g_mutex_unlock (&priv->lock);
2792     return FALSE;
2793   }
2794 }
2795
2796 /**
2797  * gst_rtsp_stream_get_caps:
2798  * @stream: a #GstRTSPStream
2799  *
2800  * Retrieve the current caps of @stream.
2801  *
2802  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2803  * after usage.
2804  */
2805 GstCaps *
2806 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2807 {
2808   GstRTSPStreamPrivate *priv;
2809   GstCaps *result;
2810
2811   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2812
2813   priv = stream->priv;
2814
2815   g_mutex_lock (&priv->lock);
2816   if ((result = priv->caps))
2817     gst_caps_ref (result);
2818   g_mutex_unlock (&priv->lock);
2819
2820   return result;
2821 }
2822
2823 /**
2824  * gst_rtsp_stream_recv_rtp:
2825  * @stream: a #GstRTSPStream
2826  * @buffer: (transfer full): a #GstBuffer
2827  *
2828  * Handle an RTP buffer for the stream. This method is usually called when a
2829  * message has been received from a client using the TCP transport.
2830  *
2831  * This function takes ownership of @buffer.
2832  *
2833  * Returns: a GstFlowReturn.
2834  */
2835 GstFlowReturn
2836 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2837 {
2838   GstRTSPStreamPrivate *priv;
2839   GstFlowReturn ret;
2840   GstElement *element;
2841
2842   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2843   priv = stream->priv;
2844   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2845   g_return_val_if_fail (priv->is_joined, FALSE);
2846
2847   g_mutex_lock (&priv->lock);
2848   if (priv->appsrc[0])
2849     element = gst_object_ref (priv->appsrc[0]);
2850   else
2851     element = NULL;
2852   g_mutex_unlock (&priv->lock);
2853
2854   if (element) {
2855     if (priv->appsrc_base_time[0] == -1) {
2856       /* Take current running_time. This timestamp will be put on
2857        * the first buffer of each stream because we are a live source and so we
2858        * timestamp with the running_time. When we are dealing with TCP, we also
2859        * only timestamp the first buffer (using the DISCONT flag) because a server
2860        * typically bursts data, for which we don't want to compensate by speeding
2861        * up the media. The other timestamps will be interpollated from this one
2862        * using the RTP timestamps. */
2863       GST_OBJECT_LOCK (element);
2864       if (GST_ELEMENT_CLOCK (element)) {
2865         GstClockTime now;
2866         GstClockTime base_time;
2867
2868         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2869         base_time = GST_ELEMENT_CAST (element)->base_time;
2870
2871         priv->appsrc_base_time[0] = now - base_time;
2872         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2873         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2874             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2875             GST_TIME_ARGS (base_time));
2876       }
2877       GST_OBJECT_UNLOCK (element);
2878     }
2879
2880     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2881     gst_object_unref (element);
2882   } else {
2883     ret = GST_FLOW_OK;
2884   }
2885   return ret;
2886 }
2887
2888 /**
2889  * gst_rtsp_stream_recv_rtcp:
2890  * @stream: a #GstRTSPStream
2891  * @buffer: (transfer full): a #GstBuffer
2892  *
2893  * Handle an RTCP buffer for the stream. This method is usually called when a
2894  * message has been received from a client using the TCP transport.
2895  *
2896  * This function takes ownership of @buffer.
2897  *
2898  * Returns: a GstFlowReturn.
2899  */
2900 GstFlowReturn
2901 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2902 {
2903   GstRTSPStreamPrivate *priv;
2904   GstFlowReturn ret;
2905   GstElement *element;
2906
2907   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2908   priv = stream->priv;
2909   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2910
2911   if (!priv->is_joined) {
2912     gst_buffer_unref (buffer);
2913     return GST_FLOW_NOT_LINKED;
2914   }
2915   g_mutex_lock (&priv->lock);
2916   if (priv->appsrc[1])
2917     element = gst_object_ref (priv->appsrc[1]);
2918   else
2919     element = NULL;
2920   g_mutex_unlock (&priv->lock);
2921
2922   if (element) {
2923     if (priv->appsrc_base_time[1] == -1) {
2924       /* Take current running_time. This timestamp will be put on
2925        * the first buffer of each stream because we are a live source and so we
2926        * timestamp with the running_time. When we are dealing with TCP, we also
2927        * only timestamp the first buffer (using the DISCONT flag) because a server
2928        * typically bursts data, for which we don't want to compensate by speeding
2929        * up the media. The other timestamps will be interpollated from this one
2930        * using the RTP timestamps. */
2931       GST_OBJECT_LOCK (element);
2932       if (GST_ELEMENT_CLOCK (element)) {
2933         GstClockTime now;
2934         GstClockTime base_time;
2935
2936         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2937         base_time = GST_ELEMENT_CAST (element)->base_time;
2938
2939         priv->appsrc_base_time[1] = now - base_time;
2940         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
2941         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2942             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2943             GST_TIME_ARGS (base_time));
2944       }
2945       GST_OBJECT_UNLOCK (element);
2946     }
2947
2948     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2949     gst_object_unref (element);
2950   } else {
2951     ret = GST_FLOW_OK;
2952     gst_buffer_unref (buffer);
2953   }
2954   return ret;
2955 }
2956
2957 /* must be called with lock */
2958 static gboolean
2959 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2960     gboolean add)
2961 {
2962   GstRTSPStreamPrivate *priv = stream->priv;
2963   const GstRTSPTransport *tr;
2964
2965   tr = gst_rtsp_stream_transport_get_transport (trans);
2966
2967   switch (tr->lower_transport) {
2968     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2969     {
2970       GstRTSPMulticastTransportSource *source;
2971       GstBin *bin;
2972
2973       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
2974
2975       if (add) {
2976         gchar *host;
2977         gint i;
2978         GstPad *selpad, *pad;
2979
2980         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2981         source->transport = trans;
2982
2983         for (i = 0; i < 2; i++) {
2984           host =
2985               g_strdup_printf ("udp://%s:%d", tr->destination,
2986               (i == 0) ? tr->port.min : tr->port.max);
2987           source->udpsrc[i] =
2988               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2989           g_free (host);
2990           g_object_set (source->udpsrc[i], "loop", FALSE, NULL);
2991
2992           if (priv->srcpad) {
2993             /* we set and keep these to playing so that they don't cause NO_PREROLL return
2994              * values. This is only relevant for PLAY pipelines */
2995             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2996             gst_element_set_locked_state (source->udpsrc[i], TRUE);
2997           }
2998           /* add udpsrc */
2999           gst_bin_add (bin, source->udpsrc[i]);
3000
3001           /* and link to the funnel v4 */
3002           if (priv->sinkpad || i == 1) {
3003             source->selpad[i] = selpad =
3004                 gst_element_get_request_pad (priv->funnel[i], "sink_%u");
3005             pad = gst_element_get_static_pad (source->udpsrc[i], "src");
3006             gst_pad_link (pad, selpad);
3007             gst_object_unref (pad);
3008             gst_object_unref (selpad);
3009           }
3010         }
3011
3012         priv->transport_sources =
3013             g_list_prepend (priv->transport_sources, source);
3014       } else {
3015         GList *l;
3016
3017         for (l = priv->transport_sources; l; l = l->next) {
3018           source = l->data;
3019
3020           if (source->transport == trans) {
3021             priv->transport_sources =
3022                 g_list_delete_link (priv->transport_sources, l);
3023             break;
3024           }
3025         }
3026
3027         if (l != NULL) {
3028           gint i;
3029
3030           for (i = 0; i < 2; i++) {
3031             /* Will automatically unlink everything */
3032             gst_bin_remove (bin,
3033                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
3034
3035             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
3036             gst_object_unref (source->udpsrc[i]);
3037
3038             if (priv->sinkpad || i == 1) {
3039               gst_element_release_request_pad (priv->funnel[i],
3040                   source->selpad[i]);
3041             }
3042           }
3043
3044           g_slice_free (GstRTSPMulticastTransportSource, source);
3045         }
3046       }
3047
3048       gst_object_unref (bin);
3049
3050       /* fall through for the generic case */
3051     }
3052     case GST_RTSP_LOWER_TRANS_UDP:
3053     {
3054       gchar *dest;
3055       gint min, max;
3056       guint ttl = 0;
3057
3058       dest = tr->destination;
3059       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3060         min = tr->port.min;
3061         max = tr->port.max;
3062         ttl = tr->ttl;
3063       } else if (priv->client_side) {
3064         /* In client side mode the 'destination' is the RTSP server, so send
3065          * to those ports */
3066         min = tr->server_port.min;
3067         max = tr->server_port.max;
3068       } else {
3069         min = tr->client_port.min;
3070         max = tr->client_port.max;
3071       }
3072
3073       if (add) {
3074         if (ttl > 0) {
3075           GST_INFO ("setting ttl-mc %d", ttl);
3076           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3077           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3078         }
3079         GST_INFO ("adding %s:%d-%d", dest, min, max);
3080         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3081         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3082         priv->transports = g_list_prepend (priv->transports, trans);
3083       } else {
3084         GST_INFO ("removing %s:%d-%d", dest, min, max);
3085         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3086         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3087         priv->transports = g_list_remove (priv->transports, trans);
3088       }
3089       priv->transports_cookie++;
3090       break;
3091     }
3092     case GST_RTSP_LOWER_TRANS_TCP:
3093       if (add) {
3094         GST_INFO ("adding TCP %s", tr->destination);
3095         priv->transports = g_list_prepend (priv->transports, trans);
3096       } else {
3097         GST_INFO ("removing TCP %s", tr->destination);
3098         priv->transports = g_list_remove (priv->transports, trans);
3099       }
3100       priv->transports_cookie++;
3101       break;
3102     default:
3103       goto unknown_transport;
3104   }
3105   return TRUE;
3106
3107   /* ERRORS */
3108 unknown_transport:
3109   {
3110     GST_INFO ("Unknown transport %d", tr->lower_transport);
3111     return FALSE;
3112   }
3113 }
3114
3115
3116 /**
3117  * gst_rtsp_stream_add_transport:
3118  * @stream: a #GstRTSPStream
3119  * @trans: (transfer none): a #GstRTSPStreamTransport
3120  *
3121  * Add the transport in @trans to @stream. The media of @stream will
3122  * then also be send to the values configured in @trans.
3123  *
3124  * @stream must be joined to a bin.
3125  *
3126  * @trans must contain a valid #GstRTSPTransport.
3127  *
3128  * Returns: %TRUE if @trans was added
3129  */
3130 gboolean
3131 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3132     GstRTSPStreamTransport * trans)
3133 {
3134   GstRTSPStreamPrivate *priv;
3135   gboolean res;
3136
3137   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3138   priv = stream->priv;
3139   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3140   g_return_val_if_fail (priv->is_joined, FALSE);
3141
3142   g_mutex_lock (&priv->lock);
3143   res = update_transport (stream, trans, TRUE);
3144   g_mutex_unlock (&priv->lock);
3145
3146   return res;
3147 }
3148
3149 /**
3150  * gst_rtsp_stream_remove_transport:
3151  * @stream: a #GstRTSPStream
3152  * @trans: (transfer none): a #GstRTSPStreamTransport
3153  *
3154  * Remove the transport in @trans from @stream. The media of @stream will
3155  * not be sent to the values configured in @trans.
3156  *
3157  * @stream must be joined to a bin.
3158  *
3159  * @trans must contain a valid #GstRTSPTransport.
3160  *
3161  * Returns: %TRUE if @trans was removed
3162  */
3163 gboolean
3164 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3165     GstRTSPStreamTransport * trans)
3166 {
3167   GstRTSPStreamPrivate *priv;
3168   gboolean res;
3169
3170   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3171   priv = stream->priv;
3172   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3173   g_return_val_if_fail (priv->is_joined, FALSE);
3174
3175   g_mutex_lock (&priv->lock);
3176   res = update_transport (stream, trans, FALSE);
3177   g_mutex_unlock (&priv->lock);
3178
3179   return res;
3180 }
3181
3182 /**
3183  * gst_rtsp_stream_update_crypto:
3184  * @stream: a #GstRTSPStream
3185  * @ssrc: the SSRC
3186  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3187  *
3188  * Update the new crypto information for @ssrc in @stream. If information
3189  * for @ssrc did not exist, it will be added. If information
3190  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3191  * be removed from @stream.
3192  *
3193  * Returns: %TRUE if @crypto could be updated
3194  */
3195 gboolean
3196 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3197     guint ssrc, GstCaps * crypto)
3198 {
3199   GstRTSPStreamPrivate *priv;
3200
3201   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3202   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3203
3204   priv = stream->priv;
3205
3206   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3207
3208   g_mutex_lock (&priv->lock);
3209   if (crypto)
3210     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3211         gst_caps_ref (crypto));
3212   else
3213     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3214   g_mutex_unlock (&priv->lock);
3215
3216   return TRUE;
3217 }
3218
3219 /**
3220  * gst_rtsp_stream_get_rtp_socket:
3221  * @stream: a #GstRTSPStream
3222  * @family: the socket family
3223  *
3224  * Get the RTP socket from @stream for a @family.
3225  *
3226  * @stream must be joined to a bin.
3227  *
3228  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3229  * socket could be allocated for @family. Unref after usage
3230  */
3231 GSocket *
3232 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3233 {
3234   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3235   GSocket *socket;
3236   const gchar *name;
3237
3238   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3239   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3240       family == G_SOCKET_FAMILY_IPV6, NULL);
3241   g_return_val_if_fail (priv->udpsink[0], NULL);
3242
3243   if (family == G_SOCKET_FAMILY_IPV6)
3244     name = "socket-v6";
3245   else
3246     name = "socket";
3247
3248   g_object_get (priv->udpsink[0], name, &socket, NULL);
3249
3250   return socket;
3251 }
3252
3253 /**
3254  * gst_rtsp_stream_get_rtcp_socket:
3255  * @stream: a #GstRTSPStream
3256  * @family: the socket family
3257  *
3258  * Get the RTCP socket from @stream for a @family.
3259  *
3260  * @stream must be joined to a bin.
3261  *
3262  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3263  * socket could be allocated for @family. Unref after usage
3264  */
3265 GSocket *
3266 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3267 {
3268   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3269   GSocket *socket;
3270   const gchar *name;
3271
3272   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3273   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3274       family == G_SOCKET_FAMILY_IPV6, NULL);
3275   g_return_val_if_fail (priv->udpsink[1], NULL);
3276
3277   if (family == G_SOCKET_FAMILY_IPV6)
3278     name = "socket-v6";
3279   else
3280     name = "socket";
3281
3282   g_object_get (priv->udpsink[1], name, &socket, NULL);
3283
3284   return socket;
3285 }
3286
3287 /**
3288  * gst_rtsp_stream_set_seqnum:
3289  * @stream: a #GstRTSPStream
3290  * @seqnum: a new sequence number
3291  *
3292  * Configure the sequence number in the payloader of @stream to @seqnum.
3293  */
3294 void
3295 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3296 {
3297   GstRTSPStreamPrivate *priv;
3298
3299   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3300
3301   priv = stream->priv;
3302
3303   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3304 }
3305
3306 /**
3307  * gst_rtsp_stream_get_seqnum:
3308  * @stream: a #GstRTSPStream
3309  *
3310  * Get the configured sequence number in the payloader of @stream.
3311  *
3312  * Returns: the sequence number of the payloader.
3313  */
3314 guint16
3315 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3316 {
3317   GstRTSPStreamPrivate *priv;
3318   guint seqnum;
3319
3320   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3321
3322   priv = stream->priv;
3323
3324   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3325
3326   return seqnum;
3327 }
3328
3329 /**
3330  * gst_rtsp_stream_transport_filter:
3331  * @stream: a #GstRTSPStream
3332  * @func: (scope call) (allow-none): a callback
3333  * @user_data: (closure): user data passed to @func
3334  *
3335  * Call @func for each transport managed by @stream. The result value of @func
3336  * determines what happens to the transport. @func will be called with @stream
3337  * locked so no further actions on @stream can be performed from @func.
3338  *
3339  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3340  * @stream.
3341  *
3342  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3343  *
3344  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3345  * will also be added with an additional ref to the result #GList of this
3346  * function..
3347  *
3348  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3349  *
3350  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3351  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3352  * element in the #GList should be unreffed before the list is freed.
3353  */
3354 GList *
3355 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3356     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3357 {
3358   GstRTSPStreamPrivate *priv;
3359   GList *result, *walk, *next;
3360   GHashTable *visited = NULL;
3361   guint cookie;
3362
3363   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3364
3365   priv = stream->priv;
3366
3367   result = NULL;
3368   if (func)
3369     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3370
3371   g_mutex_lock (&priv->lock);
3372 restart:
3373   cookie = priv->transports_cookie;
3374   for (walk = priv->transports; walk; walk = next) {
3375     GstRTSPStreamTransport *trans = walk->data;
3376     GstRTSPFilterResult res;
3377     gboolean changed;
3378
3379     next = g_list_next (walk);
3380
3381     if (func) {
3382       /* only visit each transport once */
3383       if (g_hash_table_contains (visited, trans))
3384         continue;
3385
3386       g_hash_table_add (visited, g_object_ref (trans));
3387       g_mutex_unlock (&priv->lock);
3388
3389       res = func (stream, trans, user_data);
3390
3391       g_mutex_lock (&priv->lock);
3392     } else
3393       res = GST_RTSP_FILTER_REF;
3394
3395     changed = (cookie != priv->transports_cookie);
3396
3397     switch (res) {
3398       case GST_RTSP_FILTER_REMOVE:
3399         update_transport (stream, trans, FALSE);
3400         break;
3401       case GST_RTSP_FILTER_REF:
3402         result = g_list_prepend (result, g_object_ref (trans));
3403         break;
3404       case GST_RTSP_FILTER_KEEP:
3405       default:
3406         break;
3407     }
3408     if (changed)
3409       goto restart;
3410   }
3411   g_mutex_unlock (&priv->lock);
3412
3413   if (func)
3414     g_hash_table_unref (visited);
3415
3416   return result;
3417 }
3418
3419 static GstPadProbeReturn
3420 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3421 {
3422   GstRTSPStreamPrivate *priv;
3423   GstRTSPStream *stream;
3424
3425   stream = user_data;
3426   priv = stream->priv;
3427
3428   GST_DEBUG_OBJECT (pad, "now blocking");
3429
3430   g_mutex_lock (&priv->lock);
3431   priv->blocking = TRUE;
3432   g_mutex_unlock (&priv->lock);
3433
3434   gst_element_post_message (priv->payloader,
3435       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3436           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3437
3438   return GST_PAD_PROBE_OK;
3439 }
3440
3441 /**
3442  * gst_rtsp_stream_set_blocked:
3443  * @stream: a #GstRTSPStream
3444  * @blocked: boolean indicating we should block or unblock
3445  *
3446  * Blocks or unblocks the dataflow on @stream.
3447  *
3448  * Returns: %TRUE on success
3449  */
3450 gboolean
3451 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3452 {
3453   GstRTSPStreamPrivate *priv;
3454
3455   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3456
3457   priv = stream->priv;
3458
3459   g_mutex_lock (&priv->lock);
3460   if (blocked) {
3461     priv->blocking = FALSE;
3462     if (priv->blocked_id == 0) {
3463       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3464           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3465           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3466           g_object_ref (stream), g_object_unref);
3467     }
3468   } else {
3469     if (priv->blocked_id != 0) {
3470       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3471       priv->blocked_id = 0;
3472       priv->blocking = FALSE;
3473     }
3474   }
3475   g_mutex_unlock (&priv->lock);
3476
3477   return TRUE;
3478 }
3479
3480 /**
3481  * gst_rtsp_stream_is_blocking:
3482  * @stream: a #GstRTSPStream
3483  *
3484  * Check if @stream is blocking on a #GstBuffer.
3485  *
3486  * Returns: %TRUE if @stream is blocking
3487  */
3488 gboolean
3489 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3490 {
3491   GstRTSPStreamPrivate *priv;
3492   gboolean result;
3493
3494   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3495
3496   priv = stream->priv;
3497
3498   g_mutex_lock (&priv->lock);
3499   result = priv->blocking;
3500   g_mutex_unlock (&priv->lock);
3501
3502   return result;
3503 }
3504
3505 /**
3506  * gst_rtsp_stream_query_position:
3507  * @stream: a #GstRTSPStream
3508  *
3509  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3510  * the RTP parts of the pipeline and not the RTCP parts.
3511  *
3512  * Returns: %TRUE if the position could be queried
3513  */
3514 gboolean
3515 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3516 {
3517   GstRTSPStreamPrivate *priv;
3518   GstElement *sink;
3519   gboolean ret;
3520
3521   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3522
3523   priv = stream->priv;
3524
3525   g_mutex_lock (&priv->lock);
3526   /* depending on the transport type, it should query corresponding sink */
3527   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3528       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3529     sink = priv->udpsink[0];
3530   else
3531     sink = priv->appsink[0];
3532
3533   if (sink)
3534     gst_object_ref (sink);
3535   g_mutex_unlock (&priv->lock);
3536
3537   if (!sink)
3538     return FALSE;
3539
3540   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3541   gst_object_unref (sink);
3542
3543   return ret;
3544 }
3545
3546 /**
3547  * gst_rtsp_stream_query_stop:
3548  * @stream: a #GstRTSPStream
3549  *
3550  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3551  * the RTP parts of the pipeline and not the RTCP parts.
3552  *
3553  * Returns: %TRUE if the stop could be queried
3554  */
3555 gboolean
3556 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3557 {
3558   GstRTSPStreamPrivate *priv;
3559   GstElement *sink;
3560   GstQuery *query;
3561   gboolean ret;
3562
3563   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3564
3565   priv = stream->priv;
3566
3567   g_mutex_lock (&priv->lock);
3568   /* depending on the transport type, it should query corresponding sink */
3569   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3570       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3571     sink = priv->udpsink[0];
3572   else
3573     sink = priv->appsink[0];
3574
3575   if (sink)
3576     gst_object_ref (sink);
3577   g_mutex_unlock (&priv->lock);
3578
3579   if (!sink)
3580     return FALSE;
3581
3582   query = gst_query_new_segment (GST_FORMAT_TIME);
3583   if ((ret = gst_element_query (sink, query))) {
3584     GstFormat format;
3585
3586     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3587     if (format != GST_FORMAT_TIME)
3588       *stop = -1;
3589   }
3590   gst_query_unref (query);
3591   gst_object_unref (sink);
3592
3593   return ret;
3594
3595 }