stream: small fix in error code path
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include <gst/rtp/gstrtpbuffer.h>
59
60 #include "rtsp-stream.h"
61
62 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
63      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
64
65 struct _GstRTSPStreamPrivate
66 {
67   GMutex lock;
68   guint idx;
69   /* Only one pad is ever set */
70   GstPad *srcpad, *sinkpad;
71   GstElement *payloader;
72   guint buffer_size;
73   gboolean is_joined;
74   GstBin *joined_bin;
75
76   /* TRUE if this stream is running on
77    * the client side of an RTSP link (for RECORD) */
78   gboolean client_side;
79   gchar *control;
80
81   GstRTSPProfile profiles;
82   GstRTSPLowerTrans protocols;
83
84   /* pads on the rtpbin */
85   GstPad *send_rtp_sink;
86   GstPad *recv_rtp_src;
87   GstPad *recv_sink[2];
88   GstPad *send_src[2];
89
90   /* the RTPSession object */
91   GObject *session;
92
93   /* SRTP encoder/decoder */
94   GstElement *srtpenc;
95   GstElement *srtpdec;
96   GHashTable *keys;
97
98   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
99    * sockets */
100   GstElement *udpsrc_v4[2];
101   /* UDP sources for UDP multicast transports */
102   GstElement *udpsrc_mcast_v4[2];
103
104   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
105    * sockets */
106   GstElement *udpsrc_v6[2];
107   /* UDP sources for UDP multicast transports */
108   GstElement *udpsrc_mcast_v6[2];
109
110   GstElement *udpqueue[2];
111   GstElement *udpsink[2];
112
113   /* for TCP transport */
114   GstElement *appsrc[2];
115   GstClockTime appsrc_base_time[2];
116   GstElement *appqueue[2];
117   GstElement *appsink[2];
118
119   GstElement *tee[2];
120   GstElement *funnel[2];
121
122   /* retransmission */
123   GstElement *rtxsend;
124   guint rtx_pt;
125   GstClockTime rtx_time;
126
127   /* server ports for sending/receiving over ipv4 */
128   GstRTSPRange server_port_v4;
129   GstRTSPAddress *server_addr_v4;
130   gboolean have_ipv4;
131
132   /* server ports for sending/receiving over ipv6 */
133   GstRTSPRange server_port_v6;
134   GstRTSPAddress *server_addr_v6;
135   gboolean have_ipv6;
136
137   /* multicast addresses */
138   GstRTSPAddressPool *pool;
139   GstRTSPAddress *addr_v4;
140   GstRTSPAddress *addr_v6;
141   gboolean have_ipv4_mcast;
142   gboolean have_ipv6_mcast;
143
144   gchar *multicast_iface;
145
146   /* the caps of the stream */
147   gulong caps_sig;
148   GstCaps *caps;
149
150   /* transports we stream to */
151   guint n_active;
152   GList *transports;
153   guint transports_cookie;
154   GList *tr_cache_rtp;
155   GList *tr_cache_rtcp;
156   guint tr_cache_cookie_rtp;
157   guint tr_cache_cookie_rtcp;
158
159
160   gint dscp_qos;
161
162   /* stream blocking */
163   gulong blocked_id;
164   gboolean blocking;
165
166   /* pt->caps map for RECORD streams */
167   GHashTable *ptmap;
168
169   GstRTSPPublishClockMode publish_clock_mode;
170 };
171
172 #define DEFAULT_CONTROL         NULL
173 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
174 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
175                                         GST_RTSP_LOWER_TRANS_TCP
176
177 enum
178 {
179   PROP_0,
180   PROP_CONTROL,
181   PROP_PROFILES,
182   PROP_PROTOCOLS,
183   PROP_LAST
184 };
185
186 enum
187 {
188   SIGNAL_NEW_RTP_ENCODER,
189   SIGNAL_NEW_RTCP_ENCODER,
190   SIGNAL_LAST
191 };
192
193 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
194 #define GST_CAT_DEFAULT rtsp_stream_debug
195
196 static GQuark ssrc_stream_map_key;
197
198 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
199     GValue * value, GParamSpec * pspec);
200 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
201     const GValue * value, GParamSpec * pspec);
202
203 static void gst_rtsp_stream_finalize (GObject * obj);
204
205 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
206
207 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
208
209 static void
210 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
211 {
212   GObjectClass *gobject_class;
213
214   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
215
216   gobject_class = G_OBJECT_CLASS (klass);
217
218   gobject_class->get_property = gst_rtsp_stream_get_property;
219   gobject_class->set_property = gst_rtsp_stream_set_property;
220   gobject_class->finalize = gst_rtsp_stream_finalize;
221
222   g_object_class_install_property (gobject_class, PROP_CONTROL,
223       g_param_spec_string ("control", "Control",
224           "The control string for this stream", DEFAULT_CONTROL,
225           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   g_object_class_install_property (gobject_class, PROP_PROFILES,
228       g_param_spec_flags ("profiles", "Profiles",
229           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
230           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
233       g_param_spec_flags ("protocols", "Protocols",
234           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
235           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
236
237   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
238       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
240       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
241
242   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
243       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
244       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
245       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
246
247   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
248
249   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
250 }
251
252 static void
253 gst_rtsp_stream_init (GstRTSPStream * stream)
254 {
255   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
256
257   GST_DEBUG ("new stream %p", stream);
258
259   stream->priv = priv;
260
261   priv->dscp_qos = -1;
262   priv->control = g_strdup (DEFAULT_CONTROL);
263   priv->profiles = DEFAULT_PROFILES;
264   priv->protocols = DEFAULT_PROTOCOLS;
265   priv->publish_clock_mode = GST_RTSP_PUBLISH_CLOCK_MODE_CLOCK;
266
267   g_mutex_init (&priv->lock);
268
269   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
270       NULL, (GDestroyNotify) gst_caps_unref);
271   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
272       (GDestroyNotify) gst_caps_unref);
273 }
274
275 static void
276 gst_rtsp_stream_finalize (GObject * obj)
277 {
278   GstRTSPStream *stream;
279   GstRTSPStreamPrivate *priv;
280
281   stream = GST_RTSP_STREAM (obj);
282   priv = stream->priv;
283
284   GST_DEBUG ("finalize stream %p", stream);
285
286   /* we really need to be unjoined now */
287   g_return_if_fail (!priv->is_joined);
288
289   if (priv->addr_v4)
290     gst_rtsp_address_free (priv->addr_v4);
291   if (priv->addr_v6)
292     gst_rtsp_address_free (priv->addr_v6);
293   if (priv->server_addr_v4)
294     gst_rtsp_address_free (priv->server_addr_v4);
295   if (priv->server_addr_v6)
296     gst_rtsp_address_free (priv->server_addr_v6);
297   if (priv->pool)
298     g_object_unref (priv->pool);
299   if (priv->rtxsend)
300     g_object_unref (priv->rtxsend);
301
302   g_free (priv->multicast_iface);
303
304   gst_object_unref (priv->payloader);
305   if (priv->srcpad)
306     gst_object_unref (priv->srcpad);
307   if (priv->sinkpad)
308     gst_object_unref (priv->sinkpad);
309   g_free (priv->control);
310   g_mutex_clear (&priv->lock);
311
312   g_hash_table_unref (priv->keys);
313   g_hash_table_destroy (priv->ptmap);
314
315   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
316 }
317
318 static void
319 gst_rtsp_stream_get_property (GObject * object, guint propid,
320     GValue * value, GParamSpec * pspec)
321 {
322   GstRTSPStream *stream = GST_RTSP_STREAM (object);
323
324   switch (propid) {
325     case PROP_CONTROL:
326       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
327       break;
328     case PROP_PROFILES:
329       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
330       break;
331     case PROP_PROTOCOLS:
332       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
333       break;
334     default:
335       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
336   }
337 }
338
339 static void
340 gst_rtsp_stream_set_property (GObject * object, guint propid,
341     const GValue * value, GParamSpec * pspec)
342 {
343   GstRTSPStream *stream = GST_RTSP_STREAM (object);
344
345   switch (propid) {
346     case PROP_CONTROL:
347       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
348       break;
349     case PROP_PROFILES:
350       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
351       break;
352     case PROP_PROTOCOLS:
353       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
354       break;
355     default:
356       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
357   }
358 }
359
360 /**
361  * gst_rtsp_stream_new:
362  * @idx: an index
363  * @pad: a #GstPad
364  * @payloader: a #GstElement
365  *
366  * Create a new media stream with index @idx that handles RTP data on
367  * @pad and has a payloader element @payloader if @pad is a source pad
368  * or a depayloader element @payloader if @pad is a sink pad.
369  *
370  * Returns: (transfer full): a new #GstRTSPStream
371  */
372 GstRTSPStream *
373 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
374 {
375   GstRTSPStreamPrivate *priv;
376   GstRTSPStream *stream;
377
378   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
379   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
380
381   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
382   priv = stream->priv;
383   priv->idx = idx;
384   priv->payloader = gst_object_ref (payloader);
385   if (GST_PAD_IS_SRC (pad))
386     priv->srcpad = gst_object_ref (pad);
387   else
388     priv->sinkpad = gst_object_ref (pad);
389
390   return stream;
391 }
392
393 /**
394  * gst_rtsp_stream_get_index:
395  * @stream: a #GstRTSPStream
396  *
397  * Get the stream index.
398  *
399  * Return: the stream index.
400  */
401 guint
402 gst_rtsp_stream_get_index (GstRTSPStream * stream)
403 {
404   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
405
406   return stream->priv->idx;
407 }
408
409 /**
410  * gst_rtsp_stream_get_pt:
411  * @stream: a #GstRTSPStream
412  *
413  * Get the stream payload type.
414  *
415  * Return: the stream payload type.
416  */
417 guint
418 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
419 {
420   GstRTSPStreamPrivate *priv;
421   guint pt;
422
423   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
424
425   priv = stream->priv;
426
427   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
428
429   return pt;
430 }
431
432 /**
433  * gst_rtsp_stream_get_srcpad:
434  * @stream: a #GstRTSPStream
435  *
436  * Get the srcpad associated with @stream.
437  *
438  * Returns: (transfer full): the srcpad. Unref after usage.
439  */
440 GstPad *
441 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
442 {
443   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
444
445   if (!stream->priv->srcpad)
446     return NULL;
447
448   return gst_object_ref (stream->priv->srcpad);
449 }
450
451 /**
452  * gst_rtsp_stream_get_sinkpad:
453  * @stream: a #GstRTSPStream
454  *
455  * Get the sinkpad associated with @stream.
456  *
457  * Returns: (transfer full): the sinkpad. Unref after usage.
458  */
459 GstPad *
460 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
461 {
462   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
463
464   if (!stream->priv->sinkpad)
465     return NULL;
466
467   return gst_object_ref (stream->priv->sinkpad);
468 }
469
470 /**
471  * gst_rtsp_stream_get_control:
472  * @stream: a #GstRTSPStream
473  *
474  * Get the control string to identify this stream.
475  *
476  * Returns: (transfer full): the control string. g_free() after usage.
477  */
478 gchar *
479 gst_rtsp_stream_get_control (GstRTSPStream * stream)
480 {
481   GstRTSPStreamPrivate *priv;
482   gchar *result;
483
484   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
485
486   priv = stream->priv;
487
488   g_mutex_lock (&priv->lock);
489   if ((result = g_strdup (priv->control)) == NULL)
490     result = g_strdup_printf ("stream=%u", priv->idx);
491   g_mutex_unlock (&priv->lock);
492
493   return result;
494 }
495
496 /**
497  * gst_rtsp_stream_set_control:
498  * @stream: a #GstRTSPStream
499  * @control: a control string
500  *
501  * Set the control string in @stream.
502  */
503 void
504 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
505 {
506   GstRTSPStreamPrivate *priv;
507
508   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
509
510   priv = stream->priv;
511
512   g_mutex_lock (&priv->lock);
513   g_free (priv->control);
514   priv->control = g_strdup (control);
515   g_mutex_unlock (&priv->lock);
516 }
517
518 /**
519  * gst_rtsp_stream_has_control:
520  * @stream: a #GstRTSPStream
521  * @control: a control string
522  *
523  * Check if @stream has the control string @control.
524  *
525  * Returns: %TRUE is @stream has @control as the control string
526  */
527 gboolean
528 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
529 {
530   GstRTSPStreamPrivate *priv;
531   gboolean res;
532
533   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
534
535   priv = stream->priv;
536
537   g_mutex_lock (&priv->lock);
538   if (priv->control)
539     res = (g_strcmp0 (priv->control, control) == 0);
540   else {
541     guint streamid;
542
543     if (sscanf (control, "stream=%u", &streamid) > 0)
544       res = (streamid == priv->idx);
545     else
546       res = FALSE;
547   }
548   g_mutex_unlock (&priv->lock);
549
550   return res;
551 }
552
553 /**
554  * gst_rtsp_stream_set_mtu:
555  * @stream: a #GstRTSPStream
556  * @mtu: a new MTU
557  *
558  * Configure the mtu in the payloader of @stream to @mtu.
559  */
560 void
561 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
562 {
563   GstRTSPStreamPrivate *priv;
564
565   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
566
567   priv = stream->priv;
568
569   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
570
571   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
572 }
573
574 /**
575  * gst_rtsp_stream_get_mtu:
576  * @stream: a #GstRTSPStream
577  *
578  * Get the configured MTU in the payloader of @stream.
579  *
580  * Returns: the MTU of the payloader.
581  */
582 guint
583 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
584 {
585   GstRTSPStreamPrivate *priv;
586   guint mtu;
587
588   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
589
590   priv = stream->priv;
591
592   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
593
594   return mtu;
595 }
596
597 /* Update the dscp qos property on the udp sinks */
598 static void
599 update_dscp_qos (GstRTSPStream * stream)
600 {
601   GstRTSPStreamPrivate *priv;
602
603   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
604
605   priv = stream->priv;
606
607   if (priv->udpsink[0]) {
608     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
609         NULL);
610   }
611
612   if (priv->udpsink[1]) {
613     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
614         NULL);
615   }
616 }
617
618 /**
619  * gst_rtsp_stream_set_dscp_qos:
620  * @stream: a #GstRTSPStream
621  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
622  *
623  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
624  */
625 void
626 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
627 {
628   GstRTSPStreamPrivate *priv;
629
630   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
631
632   priv = stream->priv;
633
634   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
635
636   if (dscp_qos < -1 || dscp_qos > 63) {
637     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
638     return;
639   }
640
641   priv->dscp_qos = dscp_qos;
642
643   update_dscp_qos (stream);
644 }
645
646 /**
647  * gst_rtsp_stream_get_dscp_qos:
648  * @stream: a #GstRTSPStream
649  *
650  * Get the configured DSCP QoS in of the outgoing sockets.
651  *
652  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
653  */
654 gint
655 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
656 {
657   GstRTSPStreamPrivate *priv;
658
659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
660
661   priv = stream->priv;
662
663   return priv->dscp_qos;
664 }
665
666 /**
667  * gst_rtsp_stream_is_transport_supported:
668  * @stream: a #GstRTSPStream
669  * @transport: (transfer none): a #GstRTSPTransport
670  *
671  * Check if @transport can be handled by stream
672  *
673  * Returns: %TRUE if @transport can be handled by @stream.
674  */
675 gboolean
676 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
677     GstRTSPTransport * transport)
678 {
679   GstRTSPStreamPrivate *priv;
680
681   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
682
683   priv = stream->priv;
684
685   g_mutex_lock (&priv->lock);
686   if (transport->trans != GST_RTSP_TRANS_RTP)
687     goto unsupported_transmode;
688
689   if (!(transport->profile & priv->profiles))
690     goto unsupported_profile;
691
692   if (!(transport->lower_transport & priv->protocols))
693     goto unsupported_ltrans;
694
695   g_mutex_unlock (&priv->lock);
696
697   return TRUE;
698
699   /* ERRORS */
700 unsupported_transmode:
701   {
702     GST_DEBUG ("unsupported transport mode %d", transport->trans);
703     g_mutex_unlock (&priv->lock);
704     return FALSE;
705   }
706 unsupported_profile:
707   {
708     GST_DEBUG ("unsupported profile %d", transport->profile);
709     g_mutex_unlock (&priv->lock);
710     return FALSE;
711   }
712 unsupported_ltrans:
713   {
714     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
715     g_mutex_unlock (&priv->lock);
716     return FALSE;
717   }
718 }
719
720 /**
721  * gst_rtsp_stream_set_profiles:
722  * @stream: a #GstRTSPStream
723  * @profiles: the new profiles
724  *
725  * Configure the allowed profiles for @stream.
726  */
727 void
728 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
729 {
730   GstRTSPStreamPrivate *priv;
731
732   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
733
734   priv = stream->priv;
735
736   g_mutex_lock (&priv->lock);
737   priv->profiles = profiles;
738   g_mutex_unlock (&priv->lock);
739 }
740
741 /**
742  * gst_rtsp_stream_get_profiles:
743  * @stream: a #GstRTSPStream
744  *
745  * Get the allowed profiles of @stream.
746  *
747  * Returns: a #GstRTSPProfile
748  */
749 GstRTSPProfile
750 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
751 {
752   GstRTSPStreamPrivate *priv;
753   GstRTSPProfile res;
754
755   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
756
757   priv = stream->priv;
758
759   g_mutex_lock (&priv->lock);
760   res = priv->profiles;
761   g_mutex_unlock (&priv->lock);
762
763   return res;
764 }
765
766 /**
767  * gst_rtsp_stream_set_protocols:
768  * @stream: a #GstRTSPStream
769  * @protocols: the new flags
770  *
771  * Configure the allowed lower transport for @stream.
772  */
773 void
774 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
775     GstRTSPLowerTrans protocols)
776 {
777   GstRTSPStreamPrivate *priv;
778
779   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
780
781   priv = stream->priv;
782
783   g_mutex_lock (&priv->lock);
784   priv->protocols = protocols;
785   g_mutex_unlock (&priv->lock);
786 }
787
788 /**
789  * gst_rtsp_stream_get_protocols:
790  * @stream: a #GstRTSPStream
791  *
792  * Get the allowed protocols of @stream.
793  *
794  * Returns: a #GstRTSPLowerTrans
795  */
796 GstRTSPLowerTrans
797 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
798 {
799   GstRTSPStreamPrivate *priv;
800   GstRTSPLowerTrans res;
801
802   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
803       GST_RTSP_LOWER_TRANS_UNKNOWN);
804
805   priv = stream->priv;
806
807   g_mutex_lock (&priv->lock);
808   res = priv->protocols;
809   g_mutex_unlock (&priv->lock);
810
811   return res;
812 }
813
814 /**
815  * gst_rtsp_stream_set_address_pool:
816  * @stream: a #GstRTSPStream
817  * @pool: (transfer none): a #GstRTSPAddressPool
818  *
819  * configure @pool to be used as the address pool of @stream.
820  */
821 void
822 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
823     GstRTSPAddressPool * pool)
824 {
825   GstRTSPStreamPrivate *priv;
826   GstRTSPAddressPool *old;
827
828   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
829
830   priv = stream->priv;
831
832   GST_LOG_OBJECT (stream, "set address pool %p", pool);
833
834   g_mutex_lock (&priv->lock);
835   if ((old = priv->pool) != pool)
836     priv->pool = pool ? g_object_ref (pool) : NULL;
837   else
838     old = NULL;
839   g_mutex_unlock (&priv->lock);
840
841   if (old)
842     g_object_unref (old);
843 }
844
845 /**
846  * gst_rtsp_stream_get_address_pool:
847  * @stream: a #GstRTSPStream
848  *
849  * Get the #GstRTSPAddressPool used as the address pool of @stream.
850  *
851  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
852  * usage.
853  */
854 GstRTSPAddressPool *
855 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
856 {
857   GstRTSPStreamPrivate *priv;
858   GstRTSPAddressPool *result;
859
860   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
861
862   priv = stream->priv;
863
864   g_mutex_lock (&priv->lock);
865   if ((result = priv->pool))
866     g_object_ref (result);
867   g_mutex_unlock (&priv->lock);
868
869   return result;
870 }
871
872 /**
873  * gst_rtsp_stream_set_multicast_iface:
874  * @stream: a #GstRTSPStream
875  * @multicast_iface: (transfer none): a multicast interface
876  *
877  * configure @multicast_iface to be used for @stream.
878  */
879 void
880 gst_rtsp_stream_set_multicast_iface (GstRTSPStream * stream,
881     const gchar * multicast_iface)
882 {
883   GstRTSPStreamPrivate *priv;
884   gchar *old;
885
886   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
887
888   priv = stream->priv;
889
890   GST_LOG_OBJECT (stream, "set multicast iface %s",
891       GST_STR_NULL (multicast_iface));
892
893   g_mutex_lock (&priv->lock);
894   if ((old = priv->multicast_iface) != multicast_iface)
895     priv->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
896   else
897     old = NULL;
898   g_mutex_unlock (&priv->lock);
899
900   if (old)
901     g_free (old);
902 }
903
904 /**
905  * gst_rtsp_stream_get_multicast_iface:
906  * @stream: a #GstRTSPStream
907  *
908  * Get the multicast interface used for @stream.
909  *
910  * Returns: (transfer full): the multicast interface for @stream. g_free() after
911  * usage.
912  */
913 gchar *
914 gst_rtsp_stream_get_multicast_iface (GstRTSPStream * stream)
915 {
916   GstRTSPStreamPrivate *priv;
917   gchar *result;
918
919   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
920
921   priv = stream->priv;
922
923   g_mutex_lock (&priv->lock);
924   if ((result = priv->multicast_iface))
925     result = g_strdup (result);
926   g_mutex_unlock (&priv->lock);
927
928   return result;
929 }
930
931 /**
932  * gst_rtsp_stream_get_multicast_address:
933  * @stream: a #GstRTSPStream
934  * @family: the #GSocketFamily
935  *
936  * Get the multicast address of @stream for @family.
937  *
938  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
939  * or %NULL when no address could be allocated. gst_rtsp_address_free()
940  * after usage.
941  */
942 GstRTSPAddress *
943 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
944     GSocketFamily family)
945 {
946   GstRTSPStreamPrivate *priv;
947   GstRTSPAddress *result;
948   GstRTSPAddress **addrp;
949   GstRTSPAddressFlags flags;
950
951   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
952
953   priv = stream->priv;
954
955   if (family == G_SOCKET_FAMILY_IPV6) {
956     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
957     addrp = &priv->addr_v6;
958   } else {
959     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
960     addrp = &priv->addr_v4;
961   }
962
963   g_mutex_lock (&priv->lock);
964   if (*addrp == NULL) {
965     if (priv->pool == NULL)
966       goto no_pool;
967
968     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
969
970     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
971     if (*addrp == NULL)
972       goto no_address;
973   }
974   result = gst_rtsp_address_copy (*addrp);
975   g_mutex_unlock (&priv->lock);
976
977   return result;
978
979   /* ERRORS */
980 no_pool:
981   {
982     GST_ERROR_OBJECT (stream, "no address pool specified");
983     g_mutex_unlock (&priv->lock);
984     return NULL;
985   }
986 no_address:
987   {
988     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
989     g_mutex_unlock (&priv->lock);
990     return NULL;
991   }
992 }
993
994 /**
995  * gst_rtsp_stream_reserve_address:
996  * @stream: a #GstRTSPStream
997  * @address: an address
998  * @port: a port
999  * @n_ports: n_ports
1000  * @ttl: a TTL
1001  *
1002  * Reserve @address and @port as the address and port of @stream.
1003  *
1004  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
1005  * the address could be reserved. gst_rtsp_address_free() after usage.
1006  */
1007 GstRTSPAddress *
1008 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
1009     const gchar * address, guint port, guint n_ports, guint ttl)
1010 {
1011   GstRTSPStreamPrivate *priv;
1012   GstRTSPAddress *result;
1013   GInetAddress *addr;
1014   GSocketFamily family;
1015   GstRTSPAddress **addrp;
1016
1017   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1018   g_return_val_if_fail (address != NULL, NULL);
1019   g_return_val_if_fail (port > 0, NULL);
1020   g_return_val_if_fail (n_ports > 0, NULL);
1021   g_return_val_if_fail (ttl > 0, NULL);
1022
1023   priv = stream->priv;
1024
1025   addr = g_inet_address_new_from_string (address);
1026   if (!addr) {
1027     GST_ERROR ("failed to get inet addr from %s", address);
1028     family = G_SOCKET_FAMILY_IPV4;
1029   } else {
1030     family = g_inet_address_get_family (addr);
1031     g_object_unref (addr);
1032   }
1033
1034   if (family == G_SOCKET_FAMILY_IPV6)
1035     addrp = &priv->addr_v6;
1036   else
1037     addrp = &priv->addr_v4;
1038
1039   g_mutex_lock (&priv->lock);
1040   if (*addrp == NULL) {
1041     GstRTSPAddressPoolResult res;
1042
1043     if (priv->pool == NULL)
1044       goto no_pool;
1045
1046     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
1047         port, n_ports, ttl, addrp);
1048     if (res != GST_RTSP_ADDRESS_POOL_OK)
1049       goto no_address;
1050   } else {
1051     if (strcmp ((*addrp)->address, address) ||
1052         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
1053         (*addrp)->ttl != ttl)
1054       goto different_address;
1055   }
1056   result = gst_rtsp_address_copy (*addrp);
1057   g_mutex_unlock (&priv->lock);
1058
1059   return result;
1060
1061   /* ERRORS */
1062 no_pool:
1063   {
1064     GST_ERROR_OBJECT (stream, "no address pool specified");
1065     g_mutex_unlock (&priv->lock);
1066     return NULL;
1067   }
1068 no_address:
1069   {
1070     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1071         address);
1072     g_mutex_unlock (&priv->lock);
1073     return NULL;
1074   }
1075 different_address:
1076   {
1077     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1078         " reserved", address);
1079     g_mutex_unlock (&priv->lock);
1080     return NULL;
1081   }
1082 }
1083
1084 /* must be called with lock */
1085 static void
1086 set_sockets_for_udpsinks (GstRTSPStream * stream, GSocket * rtp_socket,
1087     GSocket * rtcp_socket, GSocketFamily family)
1088 {
1089   GstRTSPStreamPrivate *priv = stream->priv;
1090   const gchar *multisink_socket;
1091
1092   if (family == G_SOCKET_FAMILY_IPV6)
1093     multisink_socket = "socket-v6";
1094   else
1095     multisink_socket = "socket";
1096
1097   g_object_set (G_OBJECT (priv->udpsink[0]), multisink_socket, rtp_socket,
1098       NULL);
1099   g_object_set (G_OBJECT (priv->udpsink[1]), multisink_socket, rtcp_socket,
1100       NULL);
1101 }
1102
1103 /* must be called with lock */
1104 static gboolean
1105 create_and_configure_udpsinks (GstRTSPStream * stream)
1106 {
1107   GstRTSPStreamPrivate *priv = stream->priv;
1108   GstElement *udpsink0, *udpsink1;
1109
1110   udpsink0 = NULL;
1111   udpsink1 = NULL;
1112
1113   if (priv->udpsink[0])
1114     udpsink0 = priv->udpsink[0];
1115   else
1116     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1117
1118   if (!udpsink0)
1119     goto no_udp_protocol;
1120
1121   if (priv->udpsink[1])
1122     udpsink1 = priv->udpsink[1];
1123   else
1124     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1125
1126   if (!udpsink1)
1127     goto no_udp_protocol;
1128
1129   /* configure sinks */
1130
1131   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1132   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1133
1134   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1135   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1136
1137   g_object_set (G_OBJECT (udpsink0), "buffer-size", priv->buffer_size, NULL);
1138
1139   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1140   /* Needs to be async for RECORD streams, otherwise we will never go to
1141    * PLAYING because the sinks will wait for data while the udpsrc can't
1142    * provide data with timestamps in PAUSED. */
1143   if (priv->sinkpad)
1144     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1145   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1146
1147   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1148   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1149
1150   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1151   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1152
1153   /* update the dscp qos field in the sinks */
1154   update_dscp_qos (stream);
1155
1156   priv->udpsink[0] = udpsink0;
1157   priv->udpsink[1] = udpsink1;
1158
1159   return TRUE;
1160
1161   /* ERRORS */
1162 no_udp_protocol:
1163   {
1164     return FALSE;
1165   }
1166 }
1167
1168 /* must be called with lock */
1169 static void
1170 play_udpsources_one_family (GstRTSPStream * stream, GstElement * udpsrc_out[2],
1171     GSocketFamily family)
1172 {
1173   GstRTSPStreamPrivate *priv;
1174   GstPad *pad, *selpad;
1175   guint i;
1176   GstBin *bin;
1177
1178   priv = stream->priv;
1179   bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[1])));
1180
1181   for (i = 0; i < 2; i++) {
1182     if (priv->sinkpad || i == 1) {
1183       if (priv->srcpad) {
1184         /* we set and keep these to playing so that they don't cause NO_PREROLL return
1185          * values. This is only relevant for PLAY pipelines */
1186         gst_element_set_state (udpsrc_out[i], GST_STATE_PLAYING);
1187         gst_element_set_locked_state (udpsrc_out[i], TRUE);
1188       }
1189       /* add udpsrc */
1190       gst_bin_add (bin, udpsrc_out[i]);
1191
1192       /* and link to the funnel */
1193       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1194       pad = gst_element_get_static_pad (udpsrc_out[i], "src");
1195       gst_pad_link (pad, selpad);
1196       gst_object_unref (pad);
1197       gst_object_unref (selpad);
1198
1199       /* otherwise sync state with parent in case it's running already
1200        * at this point */
1201       if (!priv->srcpad) {
1202         gst_element_sync_state_with_parent (udpsrc_out[i]);
1203       }
1204     }
1205   }
1206
1207   gst_object_unref (bin);
1208 }
1209
1210 /* must be called with lock */
1211 static gboolean
1212 create_and_configure_udpsources_one_family (GstElement * udpsrc_out[2],
1213     GSocket * rtp_socket, GSocket * rtcp_socket, GSocketFamily family,
1214     const gchar * address, gint rtpport, gint rtcpport,
1215     const gchar * multicast_iface, GstRTSPLowerTrans transport)
1216 {
1217   GstStateChangeReturn ret;
1218
1219   udpsrc_out[0] = gst_element_factory_make ("udpsrc", NULL);
1220   udpsrc_out[1] = gst_element_factory_make ("udpsrc", NULL);
1221
1222   if (udpsrc_out[0] == NULL || udpsrc_out[1] == NULL)
1223     goto error;
1224
1225   if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1226     g_object_set (G_OBJECT (udpsrc_out[0]), "address", address, NULL);
1227     g_object_set (G_OBJECT (udpsrc_out[1]), "address", address, NULL);
1228     g_object_set (G_OBJECT (udpsrc_out[0]), "port", rtpport, NULL);
1229     g_object_set (G_OBJECT (udpsrc_out[1]), "port", rtcpport, NULL);
1230     g_object_set (G_OBJECT (udpsrc_out[0]), "multicast-iface", multicast_iface,
1231         NULL);
1232     g_object_set (G_OBJECT (udpsrc_out[1]), "multicast-iface", multicast_iface,
1233         NULL);
1234     g_object_set (G_OBJECT (udpsrc_out[0]), "loop", FALSE, NULL);
1235     g_object_set (G_OBJECT (udpsrc_out[1]), "loop", FALSE, NULL);
1236   }
1237
1238   g_object_set (G_OBJECT (udpsrc_out[0]), "socket", rtp_socket, NULL);
1239   g_object_set (G_OBJECT (udpsrc_out[1]), "socket", rtcp_socket, NULL);
1240
1241   ret = gst_element_set_state (udpsrc_out[0], GST_STATE_READY);
1242   if (ret == GST_STATE_CHANGE_FAILURE)
1243     goto error;
1244   ret = gst_element_set_state (udpsrc_out[1], GST_STATE_READY);
1245   if (ret == GST_STATE_CHANGE_FAILURE)
1246     goto error;
1247
1248   return TRUE;
1249   return TRUE;
1250
1251   /* ERRORS */
1252 error:
1253   {
1254     if (udpsrc_out[0]) {
1255       gst_element_set_state (udpsrc_out[0], GST_STATE_NULL);
1256       g_clear_object (&udpsrc_out[0]);
1257     }
1258     if (udpsrc_out[1]) {
1259       gst_element_set_state (udpsrc_out[1], GST_STATE_NULL);
1260       g_clear_object (&udpsrc_out[1]);
1261     }
1262     return FALSE;
1263   }
1264 }
1265
1266 static gboolean
1267 alloc_ports_one_family (GstRTSPStream * stream, GSocketFamily family,
1268     GstElement * udpsrc_out[2], GstRTSPRange * server_port_out,
1269     GstRTSPTransport * ct, GstRTSPAddress ** server_addr_out,
1270     gboolean use_client_settings)
1271 {
1272   GstRTSPStreamPrivate *priv = stream->priv;
1273   GSocket *rtp_socket = NULL;
1274   GSocket *rtcp_socket;
1275   gint tmp_rtp, tmp_rtcp;
1276   guint count;
1277   gint rtpport, rtcpport;
1278   GList *rejected_addresses = NULL;
1279   GstRTSPAddress *addr = NULL;
1280   GInetAddress *inetaddr = NULL;
1281   gchar *addr_str;
1282   GSocketAddress *rtp_sockaddr = NULL;
1283   GSocketAddress *rtcp_sockaddr = NULL;
1284   GstRTSPAddressPool *pool;
1285   GstRTSPLowerTrans transport;
1286   const gchar *multicast_iface = priv->multicast_iface;
1287
1288   pool = priv->pool;
1289   count = 0;
1290   transport = ct->lower_transport;
1291
1292   /* Start with random port */
1293   tmp_rtp = 0;
1294
1295   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1296       G_SOCKET_PROTOCOL_UDP, NULL);
1297   if (!rtcp_socket)
1298     goto no_udp_protocol;
1299   g_socket_set_multicast_loopback (rtcp_socket, FALSE);
1300
1301   if (*server_addr_out)
1302     gst_rtsp_address_free (*server_addr_out);
1303
1304   /* try to allocate 2 UDP ports, the RTP port should be an even
1305    * number and the RTCP port should be the next (uneven) port */
1306 again:
1307
1308   if (rtp_socket == NULL) {
1309     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1310         G_SOCKET_PROTOCOL_UDP, NULL);
1311     if (!rtp_socket)
1312       goto no_udp_protocol;
1313     g_socket_set_multicast_loopback (rtp_socket, FALSE);
1314   }
1315
1316   if (pool && ((transport == GST_RTSP_LOWER_TRANS_UDP &&
1317               gst_rtsp_address_pool_has_unicast_addresses (pool))
1318           || transport == GST_RTSP_LOWER_TRANS_UDP_MCAST)) {
1319     GstRTSPAddressFlags flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT;
1320
1321     if (transport == GST_RTSP_LOWER_TRANS_UDP)
1322       flags |= GST_RTSP_ADDRESS_FLAG_UNICAST;
1323     else
1324       flags |= GST_RTSP_ADDRESS_FLAG_MULTICAST;
1325
1326     if (addr)
1327       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1328
1329     if (family == G_SOCKET_FAMILY_IPV6)
1330       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1331     else
1332       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1333
1334     if (ct->destination && transport == GST_RTSP_LOWER_TRANS_UDP_MCAST
1335         && use_client_settings)
1336       gst_rtsp_address_pool_reserve_address (pool, ct->destination,
1337           ct->port.min, 2, ct->ttl, &addr);
1338     else
1339       addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1340
1341     if (addr == NULL)
1342       goto no_ports;
1343
1344     tmp_rtp = addr->port;
1345
1346     g_clear_object (&inetaddr);
1347     inetaddr = g_inet_address_new_from_string (addr->address);
1348
1349     /* If we're supposed to bind to a multicast address, instead bind
1350      * to ANY and let udpsrc later join the relevant multicast group
1351      */
1352     if (g_inet_address_get_is_multicast (inetaddr)) {
1353       g_object_unref (inetaddr);
1354       inetaddr = g_inet_address_new_any (family);
1355     }
1356   } else {
1357     if (tmp_rtp != 0) {
1358       tmp_rtp += 2;
1359       if (++count > 20)
1360         goto no_ports;
1361     }
1362
1363     if (inetaddr == NULL)
1364       inetaddr = g_inet_address_new_any (family);
1365   }
1366
1367   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1368   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1369     g_object_unref (rtp_sockaddr);
1370     goto again;
1371   }
1372   g_object_unref (rtp_sockaddr);
1373
1374   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1375   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1376     g_clear_object (&rtp_sockaddr);
1377     goto socket_error;
1378   }
1379
1380   tmp_rtp =
1381       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1382   g_object_unref (rtp_sockaddr);
1383
1384   /* check if port is even */
1385   if ((tmp_rtp & 1) != 0) {
1386     /* port not even, close and allocate another */
1387     tmp_rtp++;
1388     g_clear_object (&rtp_socket);
1389     goto again;
1390   }
1391
1392   /* set port */
1393   tmp_rtcp = tmp_rtp + 1;
1394
1395   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1396   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1397     g_object_unref (rtcp_sockaddr);
1398     g_clear_object (&rtp_socket);
1399     goto again;
1400   }
1401   g_object_unref (rtcp_sockaddr);
1402
1403   if (addr == NULL)
1404     addr_str = g_inet_address_to_string (inetaddr);
1405   else
1406     addr_str = addr->address;
1407   g_clear_object (&inetaddr);
1408
1409   if (!create_and_configure_udpsources_one_family (udpsrc_out, rtp_socket,
1410           rtcp_socket, family, addr_str, tmp_rtp, tmp_rtcp, multicast_iface,
1411           transport)) {
1412     if (addr == NULL)
1413       g_free (addr_str);
1414     goto no_udp_protocol;
1415   }
1416
1417   if (addr == NULL)
1418     g_free (addr_str);
1419
1420   play_udpsources_one_family (stream, udpsrc_out, family);
1421
1422   g_object_get (G_OBJECT (udpsrc_out[0]), "port", &rtpport, NULL);
1423   g_object_get (G_OBJECT (udpsrc_out[1]), "port", &rtcpport, NULL);
1424
1425   /* this should not happen... */
1426   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1427     goto port_error;
1428
1429   /* set RTP and RTCP sockets */
1430   set_sockets_for_udpsinks (stream, rtp_socket, rtcp_socket, family);
1431
1432   server_port_out->min = rtpport;
1433   server_port_out->max = rtcpport;
1434
1435   *server_addr_out = addr;
1436   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1437
1438   g_object_unref (rtp_socket);
1439   g_object_unref (rtcp_socket);
1440
1441   return TRUE;
1442
1443   /* ERRORS */
1444 no_udp_protocol:
1445   {
1446     goto cleanup;
1447   }
1448 no_ports:
1449   {
1450     goto cleanup;
1451   }
1452 port_error:
1453   {
1454     goto cleanup;
1455   }
1456 socket_error:
1457   {
1458     goto cleanup;
1459   }
1460 cleanup:
1461   {
1462     if (inetaddr)
1463       g_object_unref (inetaddr);
1464     g_list_free_full (rejected_addresses,
1465         (GDestroyNotify) gst_rtsp_address_free);
1466     if (addr)
1467       gst_rtsp_address_free (addr);
1468     if (rtp_socket)
1469       g_object_unref (rtp_socket);
1470     if (rtcp_socket)
1471       g_object_unref (rtcp_socket);
1472     return FALSE;
1473   }
1474 }
1475
1476 /**
1477  * gst_rtsp_stream_allocate_udp_sockets:
1478  * @stream: a #GstRTSPStream
1479  * @family: protocol family
1480  * @transport_method: transport method
1481  *
1482  * Allocates RTP and RTCP ports.
1483  *
1484  * Returns: %TRUE if the RTP and RTCP sockets have been succeccully allocated.
1485  */
1486 gboolean
1487 gst_rtsp_stream_allocate_udp_sockets (GstRTSPStream * stream,
1488     GSocketFamily family, GstRTSPTransport * ct, gboolean use_client_settings)
1489 {
1490   GstRTSPStreamPrivate *priv;
1491   gboolean result = FALSE;
1492   GstRTSPLowerTrans transport = ct->lower_transport;
1493
1494   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1495   priv = stream->priv;
1496   g_return_val_if_fail (priv->is_joined, FALSE);
1497
1498   g_mutex_lock (&priv->lock);
1499
1500   if (family == G_SOCKET_FAMILY_IPV4) {
1501     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1502       if (priv->have_ipv4_mcast)
1503         goto done;
1504       priv->have_ipv4_mcast =
1505           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4,
1506           priv->udpsrc_mcast_v4, &priv->server_port_v4, ct, &priv->addr_v4,
1507           use_client_settings);
1508     } else {
1509       priv->have_ipv4 =
1510           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4,
1511           &priv->server_port_v4, ct, &priv->server_addr_v4,
1512           use_client_settings);
1513     }
1514   } else {
1515     if (transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1516       if (priv->have_ipv6_mcast)
1517         goto done;
1518       priv->have_ipv6_mcast =
1519           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6,
1520           priv->udpsrc_mcast_v6, &priv->server_port_v6, ct, &priv->addr_v6,
1521           use_client_settings);
1522     } else {
1523       if (priv->have_ipv6)
1524         goto done;
1525       priv->have_ipv6 =
1526           alloc_ports_one_family (stream, G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6,
1527           &priv->server_port_v6, ct, &priv->server_addr_v6,
1528           use_client_settings);
1529     }
1530   }
1531
1532 done:
1533   result = priv->have_ipv4 || priv->have_ipv4_mcast || priv->have_ipv6 ||
1534       priv->have_ipv6_mcast;
1535
1536   g_mutex_unlock (&priv->lock);
1537
1538   return result;
1539 }
1540
1541 /**
1542  * gst_rtsp_stream_set_client_side:
1543  * @stream: a #GstRTSPStream
1544  * @client_side: TRUE if this #GstRTSPStream is running on the 'client' side of
1545  * an RTSP connection.
1546  *
1547  * Sets the #GstRTSPStream as a 'client side' stream - used for sending
1548  * streams to an RTSP server via RECORD. This has the practical effect
1549  * of changing which UDP port numbers are used when setting up the local
1550  * side of the stream sending to be either the 'server' or 'client' pair
1551  * of a configured UDP transport.
1552  */
1553 void
1554 gst_rtsp_stream_set_client_side (GstRTSPStream * stream, gboolean client_side)
1555 {
1556   GstRTSPStreamPrivate *priv;
1557
1558   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1559   priv = stream->priv;
1560   g_mutex_lock (&priv->lock);
1561   priv->client_side = client_side;
1562   g_mutex_unlock (&priv->lock);
1563 }
1564
1565 /**
1566  * gst_rtsp_stream_is_client_side:
1567  * @stream: a #GstRTSPStream
1568  *
1569  * See gst_rtsp_stream_set_client_side()
1570  *
1571  * Returns: TRUE if this #GstRTSPStream is client-side.
1572  */
1573 gboolean
1574 gst_rtsp_stream_is_client_side (GstRTSPStream * stream)
1575 {
1576   GstRTSPStreamPrivate *priv;
1577   gboolean ret;
1578
1579   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1580
1581   priv = stream->priv;
1582   g_mutex_lock (&priv->lock);
1583   ret = priv->client_side;
1584   g_mutex_unlock (&priv->lock);
1585
1586   return ret;
1587 }
1588
1589 /**
1590  * gst_rtsp_stream_get_server_port:
1591  * @stream: a #GstRTSPStream
1592  * @server_port: (out): result server port
1593  * @family: the port family to get
1594  *
1595  * Fill @server_port with the port pair used by the server. This function can
1596  * only be called when @stream has been joined.
1597  */
1598 void
1599 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1600     GstRTSPRange * server_port, GSocketFamily family)
1601 {
1602   GstRTSPStreamPrivate *priv;
1603
1604   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1605   priv = stream->priv;
1606   g_return_if_fail (priv->is_joined);
1607
1608   g_mutex_lock (&priv->lock);
1609   if (family == G_SOCKET_FAMILY_IPV4) {
1610     if (server_port)
1611       *server_port = priv->server_port_v4;
1612   } else {
1613     if (server_port)
1614       *server_port = priv->server_port_v6;
1615   }
1616   g_mutex_unlock (&priv->lock);
1617 }
1618
1619 /**
1620  * gst_rtsp_stream_get_rtpsession:
1621  * @stream: a #GstRTSPStream
1622  *
1623  * Get the RTP session of this stream.
1624  *
1625  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1626  */
1627 GObject *
1628 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1629 {
1630   GstRTSPStreamPrivate *priv;
1631   GObject *session;
1632
1633   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1634
1635   priv = stream->priv;
1636
1637   g_mutex_lock (&priv->lock);
1638   if ((session = priv->session))
1639     g_object_ref (session);
1640   g_mutex_unlock (&priv->lock);
1641
1642   return session;
1643 }
1644
1645 /**
1646  * gst_rtsp_stream_get_encoder:
1647  * @stream: a #GstRTSPStream
1648  *
1649  * Get the SRTP encoder for this stream.
1650  *
1651  * Returns: (transfer full): The SRTP encoder for this stream. Unref after usage.
1652  */
1653 GstElement *
1654 gst_rtsp_stream_get_srtp_encoder (GstRTSPStream * stream)
1655 {
1656   GstRTSPStreamPrivate *priv;
1657   GstElement *encoder;
1658
1659   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1660
1661   priv = stream->priv;
1662
1663   g_mutex_lock (&priv->lock);
1664   if ((encoder = priv->srtpenc))
1665     g_object_ref (encoder);
1666   g_mutex_unlock (&priv->lock);
1667
1668   return encoder;
1669 }
1670
1671 /**
1672  * gst_rtsp_stream_get_ssrc:
1673  * @stream: a #GstRTSPStream
1674  * @ssrc: (out): result ssrc
1675  *
1676  * Get the SSRC used by the RTP session of this stream. This function can only
1677  * be called when @stream has been joined.
1678  */
1679 void
1680 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1681 {
1682   GstRTSPStreamPrivate *priv;
1683
1684   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1685   priv = stream->priv;
1686   g_return_if_fail (priv->is_joined);
1687
1688   g_mutex_lock (&priv->lock);
1689   if (ssrc && priv->session)
1690     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1691   g_mutex_unlock (&priv->lock);
1692 }
1693
1694 /**
1695  * gst_rtsp_stream_set_retransmission_time:
1696  * @stream: a #GstRTSPStream
1697  * @time: a #GstClockTime
1698  *
1699  * Set the amount of time to store retransmission packets.
1700  */
1701 void
1702 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1703     GstClockTime time)
1704 {
1705   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1706
1707   g_mutex_lock (&stream->priv->lock);
1708   stream->priv->rtx_time = time;
1709   if (stream->priv->rtxsend)
1710     g_object_set (stream->priv->rtxsend, "max-size-time",
1711         GST_TIME_AS_MSECONDS (time), NULL);
1712   g_mutex_unlock (&stream->priv->lock);
1713 }
1714
1715 /**
1716  * gst_rtsp_stream_get_retransmission_time:
1717  * @stream: a #GstRTSPStream
1718  *
1719  * Get the amount of time to store retransmission data.
1720  *
1721  * Returns: the amount of time to store retransmission data.
1722  */
1723 GstClockTime
1724 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1725 {
1726   GstClockTime ret;
1727
1728   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1729
1730   g_mutex_lock (&stream->priv->lock);
1731   ret = stream->priv->rtx_time;
1732   g_mutex_unlock (&stream->priv->lock);
1733
1734   return ret;
1735 }
1736
1737 /**
1738  * gst_rtsp_stream_set_retransmission_pt:
1739  * @stream: a #GstRTSPStream
1740  * @rtx_pt: a #guint
1741  *
1742  * Set the payload type (pt) for retransmission of this stream.
1743  */
1744 void
1745 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1746 {
1747   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1748
1749   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1750
1751   g_mutex_lock (&stream->priv->lock);
1752   stream->priv->rtx_pt = rtx_pt;
1753   if (stream->priv->rtxsend) {
1754     guint pt = gst_rtsp_stream_get_pt (stream);
1755     gchar *pt_s = g_strdup_printf ("%d", pt);
1756     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1757         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1758     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1759     g_free (pt_s);
1760     gst_structure_free (rtx_pt_map);
1761   }
1762   g_mutex_unlock (&stream->priv->lock);
1763 }
1764
1765 /**
1766  * gst_rtsp_stream_get_retransmission_pt:
1767  * @stream: a #GstRTSPStream
1768  *
1769  * Get the payload-type used for retransmission of this stream
1770  *
1771  * Returns: The retransmission PT.
1772  */
1773 guint
1774 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1775 {
1776   guint rtx_pt;
1777
1778   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1779
1780   g_mutex_lock (&stream->priv->lock);
1781   rtx_pt = stream->priv->rtx_pt;
1782   g_mutex_unlock (&stream->priv->lock);
1783
1784   return rtx_pt;
1785 }
1786
1787 /**
1788  * gst_rtsp_stream_set_buffer_size:
1789  * @stream: a #GstRTSPStream
1790  * @size: the buffer size
1791  *
1792  * Set the size of the UDP transmission buffer (in bytes)
1793  * Needs to be set before the stream is joined to a bin.
1794  *
1795  * Since: 1.6
1796  */
1797 void
1798 gst_rtsp_stream_set_buffer_size (GstRTSPStream * stream, guint size)
1799 {
1800   g_mutex_lock (&stream->priv->lock);
1801   stream->priv->buffer_size = size;
1802   g_mutex_unlock (&stream->priv->lock);
1803 }
1804
1805 /**
1806  * gst_rtsp_stream_get_buffer_size:
1807  * @stream: a #GstRTSPStream
1808  *
1809  * Get the size of the UDP transmission buffer (in bytes)
1810  *
1811  * Returns: the size of the UDP TX buffer
1812  *
1813  * Since: 1.6
1814  */
1815 guint
1816 gst_rtsp_stream_get_buffer_size (GstRTSPStream * stream)
1817 {
1818   guint buffer_size;
1819
1820   g_mutex_lock (&stream->priv->lock);
1821   buffer_size = stream->priv->buffer_size;
1822   g_mutex_unlock (&stream->priv->lock);
1823
1824   return buffer_size;
1825 }
1826
1827 /* executed from streaming thread */
1828 static void
1829 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1830 {
1831   GstRTSPStreamPrivate *priv = stream->priv;
1832   GstCaps *newcaps, *oldcaps;
1833
1834   newcaps = gst_pad_get_current_caps (pad);
1835
1836   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1837       newcaps);
1838
1839   g_mutex_lock (&priv->lock);
1840   oldcaps = priv->caps;
1841   priv->caps = newcaps;
1842   g_mutex_unlock (&priv->lock);
1843
1844   if (oldcaps)
1845     gst_caps_unref (oldcaps);
1846 }
1847
1848 static void
1849 dump_structure (const GstStructure * s)
1850 {
1851   gchar *sstr;
1852
1853   sstr = gst_structure_to_string (s);
1854   GST_INFO ("structure: %s", sstr);
1855   g_free (sstr);
1856 }
1857
1858 static GstRTSPStreamTransport *
1859 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1860 {
1861   GstRTSPStreamPrivate *priv = stream->priv;
1862   GList *walk;
1863   GstRTSPStreamTransport *result = NULL;
1864   const gchar *tmp;
1865   gchar *dest;
1866   guint port;
1867
1868   if (rtcp_from == NULL)
1869     return NULL;
1870
1871   tmp = g_strrstr (rtcp_from, ":");
1872   if (tmp == NULL)
1873     return NULL;
1874
1875   port = atoi (tmp + 1);
1876   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1877
1878   g_mutex_lock (&priv->lock);
1879   GST_INFO ("finding %s:%d in %d transports", dest, port,
1880       g_list_length (priv->transports));
1881
1882   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1883     GstRTSPStreamTransport *trans = walk->data;
1884     const GstRTSPTransport *tr;
1885     gint min, max;
1886
1887     tr = gst_rtsp_stream_transport_get_transport (trans);
1888
1889     if (priv->client_side) {
1890       /* In client side mode the 'destination' is the RTSP server, so send
1891        * to those ports */
1892       min = tr->server_port.min;
1893       max = tr->server_port.max;
1894     } else {
1895       min = tr->client_port.min;
1896       max = tr->client_port.max;
1897     }
1898
1899     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1900       result = trans;
1901       break;
1902     }
1903   }
1904   if (result)
1905     g_object_ref (result);
1906   g_mutex_unlock (&priv->lock);
1907
1908   g_free (dest);
1909
1910   return result;
1911 }
1912
1913 static GstRTSPStreamTransport *
1914 check_transport (GObject * source, GstRTSPStream * stream)
1915 {
1916   GstStructure *stats;
1917   GstRTSPStreamTransport *trans;
1918
1919   /* see if we have a stream to match with the origin of the RTCP packet */
1920   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1921   if (trans == NULL) {
1922     g_object_get (source, "stats", &stats, NULL);
1923     if (stats) {
1924       const gchar *rtcp_from;
1925
1926       dump_structure (stats);
1927
1928       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1929       if ((trans = find_transport (stream, rtcp_from))) {
1930         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1931             source);
1932         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1933             g_object_unref);
1934       }
1935       gst_structure_free (stats);
1936     }
1937   }
1938   return trans;
1939 }
1940
1941
1942 static void
1943 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1944 {
1945   GstRTSPStreamTransport *trans;
1946
1947   GST_INFO ("%p: new source %p", stream, source);
1948
1949   trans = check_transport (source, stream);
1950
1951   if (trans)
1952     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1953 }
1954
1955 static void
1956 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1957 {
1958   GST_INFO ("%p: new SDES %p", stream, source);
1959 }
1960
1961 static void
1962 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1963 {
1964   GstRTSPStreamTransport *trans;
1965
1966   trans = check_transport (source, stream);
1967
1968   if (trans) {
1969     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1970     gst_rtsp_stream_transport_keep_alive (trans);
1971   }
1972 #ifdef DUMP_STATS
1973   {
1974     GstStructure *stats;
1975     g_object_get (source, "stats", &stats, NULL);
1976     if (stats) {
1977       dump_structure (stats);
1978       gst_structure_free (stats);
1979     }
1980   }
1981 #endif
1982 }
1983
1984 static void
1985 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1986 {
1987   GST_INFO ("%p: source %p bye", stream, source);
1988 }
1989
1990 static void
1991 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1992 {
1993   GstRTSPStreamTransport *trans;
1994
1995   GST_INFO ("%p: source %p bye timeout", stream, source);
1996
1997   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1998     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1999     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2000   }
2001 }
2002
2003 static void
2004 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
2005 {
2006   GstRTSPStreamTransport *trans;
2007
2008   GST_INFO ("%p: source %p timeout", stream, source);
2009
2010   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
2011     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
2012     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
2013   }
2014 }
2015
2016 static void
2017 on_new_sender_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
2018 {
2019   GST_INFO ("%p: new sender source %p", stream, source);
2020 #ifndef DUMP_STATS
2021   {
2022     GstStructure *stats;
2023     g_object_get (source, "stats", &stats, NULL);
2024     if (stats) {
2025       dump_structure (stats);
2026       gst_structure_free (stats);
2027     }
2028   }
2029 #endif
2030 }
2031
2032 static void
2033 on_sender_ssrc_active (GObject * session, GObject * source,
2034     GstRTSPStream * stream)
2035 {
2036 #ifndef DUMP_STATS
2037   {
2038     GstStructure *stats;
2039     g_object_get (source, "stats", &stats, NULL);
2040     if (stats) {
2041       dump_structure (stats);
2042       gst_structure_free (stats);
2043     }
2044   }
2045 #endif
2046 }
2047
2048 static void
2049 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
2050 {
2051   if (is_rtp) {
2052     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
2053     g_list_free (priv->tr_cache_rtp);
2054     priv->tr_cache_rtp = NULL;
2055   } else {
2056     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
2057     g_list_free (priv->tr_cache_rtcp);
2058     priv->tr_cache_rtcp = NULL;
2059   }
2060 }
2061
2062 static GstFlowReturn
2063 handle_new_sample (GstAppSink * sink, gpointer user_data)
2064 {
2065   GstRTSPStreamPrivate *priv;
2066   GList *walk;
2067   GstSample *sample;
2068   GstBuffer *buffer;
2069   GstRTSPStream *stream;
2070   gboolean is_rtp;
2071
2072   sample = gst_app_sink_pull_sample (sink);
2073   if (!sample)
2074     return GST_FLOW_OK;
2075
2076   stream = (GstRTSPStream *) user_data;
2077   priv = stream->priv;
2078   buffer = gst_sample_get_buffer (sample);
2079
2080   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
2081
2082   g_mutex_lock (&priv->lock);
2083   if (is_rtp) {
2084     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
2085       clear_tr_cache (priv, is_rtp);
2086       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2087         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2088         priv->tr_cache_rtp =
2089             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
2090       }
2091       priv->tr_cache_cookie_rtp = priv->transports_cookie;
2092     }
2093   } else {
2094     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
2095       clear_tr_cache (priv, is_rtp);
2096       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
2097         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2098         priv->tr_cache_rtcp =
2099             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
2100       }
2101       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
2102     }
2103   }
2104   g_mutex_unlock (&priv->lock);
2105
2106   if (is_rtp) {
2107     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
2108       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2109       gst_rtsp_stream_transport_send_rtp (tr, buffer);
2110     }
2111   } else {
2112     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
2113       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
2114       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
2115     }
2116   }
2117   gst_sample_unref (sample);
2118
2119   return GST_FLOW_OK;
2120 }
2121
2122 static GstAppSinkCallbacks sink_cb = {
2123   NULL,                         /* not interested in EOS */
2124   NULL,                         /* not interested in preroll samples */
2125   handle_new_sample,
2126 };
2127
2128 static GstElement *
2129 get_rtp_encoder (GstRTSPStream * stream, guint session)
2130 {
2131   GstRTSPStreamPrivate *priv = stream->priv;
2132
2133   if (priv->srtpenc == NULL) {
2134     gchar *name;
2135
2136     name = g_strdup_printf ("srtpenc_%u", session);
2137     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
2138     g_free (name);
2139
2140     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
2141   }
2142   return gst_object_ref (priv->srtpenc);
2143 }
2144
2145 static GstElement *
2146 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
2147 {
2148   GstRTSPStreamPrivate *priv = stream->priv;
2149   GstElement *oldenc, *enc;
2150   GstPad *pad;
2151   gchar *name;
2152
2153   if (priv->idx != session)
2154     return NULL;
2155
2156   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
2157
2158   oldenc = priv->srtpenc;
2159   enc = get_rtp_encoder (stream, session);
2160   name = g_strdup_printf ("rtp_sink_%d", session);
2161   pad = gst_element_get_request_pad (enc, name);
2162   g_free (name);
2163   gst_object_unref (pad);
2164
2165   if (oldenc == NULL)
2166     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
2167         enc);
2168
2169   return enc;
2170 }
2171
2172 static GstElement *
2173 request_rtcp_encoder (GstElement * rtpbin, guint session,
2174     GstRTSPStream * stream)
2175 {
2176   GstRTSPStreamPrivate *priv = stream->priv;
2177   GstElement *oldenc, *enc;
2178   GstPad *pad;
2179   gchar *name;
2180
2181   if (priv->idx != session)
2182     return NULL;
2183
2184   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
2185
2186   oldenc = priv->srtpenc;
2187   enc = get_rtp_encoder (stream, session);
2188   name = g_strdup_printf ("rtcp_sink_%d", session);
2189   pad = gst_element_get_request_pad (enc, name);
2190   g_free (name);
2191   gst_object_unref (pad);
2192
2193   if (oldenc == NULL)
2194     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
2195         enc);
2196
2197   return enc;
2198 }
2199
2200 static GstCaps *
2201 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
2202 {
2203   GstRTSPStreamPrivate *priv = stream->priv;
2204   GstCaps *caps;
2205
2206   GST_DEBUG ("request key %08x", ssrc);
2207
2208   g_mutex_lock (&priv->lock);
2209   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
2210     gst_caps_ref (caps);
2211   g_mutex_unlock (&priv->lock);
2212
2213   return caps;
2214 }
2215
2216 static GstElement *
2217 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
2218     GstRTSPStream * stream)
2219 {
2220   GstRTSPStreamPrivate *priv = stream->priv;
2221
2222   if (priv->idx != session)
2223     return NULL;
2224
2225   if (priv->srtpdec == NULL) {
2226     gchar *name;
2227
2228     name = g_strdup_printf ("srtpdec_%u", session);
2229     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
2230     g_free (name);
2231
2232     g_signal_connect (priv->srtpdec, "request-key",
2233         (GCallback) request_key, stream);
2234   }
2235   return gst_object_ref (priv->srtpdec);
2236 }
2237
2238 /**
2239  * gst_rtsp_stream_request_aux_sender:
2240  * @stream: a #GstRTSPStream
2241  * @sessid: the session id
2242  *
2243  * Creating a rtxsend bin
2244  *
2245  * Returns: (transfer full): a #GstElement.
2246  *
2247  * Since: 1.6
2248  */
2249 GstElement *
2250 gst_rtsp_stream_request_aux_sender (GstRTSPStream * stream, guint sessid)
2251 {
2252   GstElement *bin;
2253   GstPad *pad;
2254   GstStructure *pt_map;
2255   gchar *name;
2256   guint pt, rtx_pt;
2257   gchar *pt_s;
2258
2259   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2260
2261   pt = gst_rtsp_stream_get_pt (stream);
2262   pt_s = g_strdup_printf ("%u", pt);
2263   rtx_pt = stream->priv->rtx_pt;
2264
2265   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
2266
2267   bin = gst_bin_new (NULL);
2268   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
2269   pt_map = gst_structure_new ("application/x-rtp-pt-map",
2270       pt_s, G_TYPE_UINT, rtx_pt, NULL);
2271   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
2272       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
2273   g_free (pt_s);
2274   gst_structure_free (pt_map);
2275   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
2276
2277   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
2278   name = g_strdup_printf ("src_%u", sessid);
2279   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2280   g_free (name);
2281   gst_object_unref (pad);
2282
2283   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
2284   name = g_strdup_printf ("sink_%u", sessid);
2285   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
2286   g_free (name);
2287   gst_object_unref (pad);
2288
2289   return bin;
2290 }
2291
2292 /**
2293  * gst_rtsp_stream_set_pt_map:
2294  * @stream: a #GstRTSPStream
2295  * @pt: the pt
2296  * @caps: a #GstCaps
2297  *
2298  * Configure a pt map between @pt and @caps.
2299  */
2300 void
2301 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
2302 {
2303   GstRTSPStreamPrivate *priv = stream->priv;
2304
2305   g_mutex_lock (&priv->lock);
2306   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
2307   g_mutex_unlock (&priv->lock);
2308 }
2309
2310 /**
2311  * gst_rtsp_stream_set_publish_clock_mode:
2312  * @stream: a #GstRTSPStream
2313  * @mode: the clock publish mode
2314  *
2315  * Sets if and how the stream clock should be published according to RFC7273.
2316  *
2317  * Since: 1.8
2318  */
2319 void
2320 gst_rtsp_stream_set_publish_clock_mode (GstRTSPStream * stream,
2321     GstRTSPPublishClockMode mode)
2322 {
2323   GstRTSPStreamPrivate *priv;
2324
2325   priv = stream->priv;
2326   g_mutex_lock (&priv->lock);
2327   priv->publish_clock_mode = mode;
2328   g_mutex_unlock (&priv->lock);
2329 }
2330
2331 /**
2332  * gst_rtsp_stream_get_publish_clock_mode:
2333  * @factory: a #GstRTSPStream
2334  *
2335  * Gets if and how the stream clock should be published according to RFC7273.
2336  *
2337  * Returns: The GstRTSPPublishClockMode
2338  *
2339  * Since: 1.8
2340  */
2341 GstRTSPPublishClockMode
2342 gst_rtsp_stream_get_publish_clock_mode (GstRTSPStream * stream)
2343 {
2344   GstRTSPStreamPrivate *priv;
2345   GstRTSPPublishClockMode ret;
2346
2347   priv = stream->priv;
2348   g_mutex_lock (&priv->lock);
2349   ret = priv->publish_clock_mode;
2350   g_mutex_unlock (&priv->lock);
2351
2352   return ret;
2353 }
2354
2355 static GstCaps *
2356 request_pt_map (GstElement * rtpbin, guint session, guint pt,
2357     GstRTSPStream * stream)
2358 {
2359   GstRTSPStreamPrivate *priv = stream->priv;
2360   GstCaps *caps = NULL;
2361
2362   g_mutex_lock (&priv->lock);
2363
2364   if (priv->idx == session) {
2365     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
2366     if (caps) {
2367       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
2368       gst_caps_ref (caps);
2369     } else {
2370       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
2371     }
2372   }
2373
2374   g_mutex_unlock (&priv->lock);
2375
2376   return caps;
2377 }
2378
2379 static void
2380 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
2381 {
2382   GstRTSPStreamPrivate *priv = stream->priv;
2383   gchar *name;
2384   GstPadLinkReturn ret;
2385   guint sessid;
2386
2387   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
2388       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2389
2390   name = gst_pad_get_name (pad);
2391   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
2392     g_free (name);
2393     return;
2394   }
2395   g_free (name);
2396
2397   if (priv->idx != sessid)
2398     return;
2399
2400   if (gst_pad_is_linked (priv->sinkpad)) {
2401     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
2402         GST_DEBUG_PAD_NAME (priv->sinkpad));
2403     return;
2404   }
2405
2406   /* link the RTP pad to the session manager, it should not really fail unless
2407    * this is not really an RTP pad */
2408   ret = gst_pad_link (pad, priv->sinkpad);
2409   if (ret != GST_PAD_LINK_OK)
2410     goto link_failed;
2411   priv->recv_rtp_src = gst_object_ref (pad);
2412
2413   return;
2414
2415 /* ERRORS */
2416 link_failed:
2417   {
2418     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
2419         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
2420   }
2421 }
2422
2423 static void
2424 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
2425     GstRTSPStream * stream)
2426 {
2427   /* TODO: What to do here other than this? */
2428   GST_DEBUG ("Stream %p: Got EOS", stream);
2429   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
2430 }
2431
2432 /* must be called with lock */
2433 static gboolean
2434 create_sender_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2435 {
2436   GstRTSPStreamPrivate *priv;
2437   GstPad *pad, *sinkpad = NULL;
2438   gboolean is_tcp = FALSE, is_udp = FALSE;
2439   gint i;
2440
2441   priv = stream->priv;
2442
2443   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2444   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2445       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2446
2447   if (is_udp && !create_and_configure_udpsinks (stream))
2448     goto no_udp_protocol;
2449
2450   for (i = 0; i < 2; i++) {
2451     GstPad *teepad, *queuepad;
2452     /* For the sender we create this bit of pipeline for both
2453      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2454      * we need to add a queue before appsink and udpsink to make
2455      * the pipeline not block. For the TCP case, we want to pump
2456      * client as fast as possible anyway. This pipeline is used
2457      * when both TCP and UDP are present.
2458      *
2459      * .--------.      .-----.    .---------.    .---------.
2460      * | rtpbin |      | tee |    |  queue  |    | udpsink |
2461      * |       send->sink   src->sink      src->sink       |
2462      * '--------'      |     |    '---------'    '---------'
2463      *                 |     |    .---------.    .---------.
2464      *                 |     |    |  queue  |    | appsink |
2465      *                 |    src->sink      src->sink       |
2466      *                 '-----'    '---------'    '---------'
2467      *
2468      * When only UDP or only TCP is allowed, we skip the tee and queue
2469      * and link the udpsink (for UDP) or appsink (for TCP) directly to
2470      * the session.
2471      */
2472     /* Only link the RTP send src if we're going to send RTP, link
2473      * the RTCP send src always */
2474     if (priv->srcpad || i == 1) {
2475       if (is_udp) {
2476         /* add udpsink */
2477         gst_bin_add (bin, priv->udpsink[i]);
2478         sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2479       }
2480
2481       if (is_tcp) {
2482         /* make appsink */
2483         priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2484         g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2485         gst_bin_add (bin, priv->appsink[i]);
2486         gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2487             &sink_cb, stream, NULL);
2488       }
2489
2490       if (is_udp && is_tcp) {
2491         g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2492
2493         /* make tee for RTP/RTCP */
2494         priv->tee[i] = gst_element_factory_make ("tee", NULL);
2495         gst_bin_add (bin, priv->tee[i]);
2496
2497         /* and link to rtpbin send pad */
2498         pad = gst_element_get_static_pad (priv->tee[i], "sink");
2499         gst_pad_link (priv->send_src[i], pad);
2500         gst_object_unref (pad);
2501
2502         priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2503         g_object_set (priv->udpqueue[i], "max-size-buffers",
2504             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2505             NULL);
2506         gst_bin_add (bin, priv->udpqueue[i]);
2507         /* link tee to udpqueue */
2508         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2509         pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2510         gst_pad_link (teepad, pad);
2511         gst_object_unref (pad);
2512         gst_object_unref (teepad);
2513
2514         /* link udpqueue to udpsink */
2515         queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2516         gst_pad_link (queuepad, sinkpad);
2517         gst_object_unref (queuepad);
2518         gst_object_unref (sinkpad);
2519
2520         /* make appqueue */
2521         priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2522         g_object_set (priv->appqueue[i], "max-size-buffers",
2523             1, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT (0),
2524             NULL);
2525         gst_bin_add (bin, priv->appqueue[i]);
2526         /* and link tee to appqueue */
2527         teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2528         pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2529         gst_pad_link (teepad, pad);
2530         gst_object_unref (pad);
2531         gst_object_unref (teepad);
2532
2533         /* and link appqueue to appsink */
2534         queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2535         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2536         gst_pad_link (queuepad, pad);
2537         gst_object_unref (pad);
2538         gst_object_unref (queuepad);
2539       } else if (is_tcp) {
2540         /* only appsink needed, link it to the session */
2541         pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2542         gst_pad_link (priv->send_src[i], pad);
2543         gst_object_unref (pad);
2544
2545         /* when its only TCP, we need to set sync and preroll to FALSE
2546          * for the sink to avoid deadlock. And this is only needed for
2547          * sink used for RTCP data, not the RTP data. */
2548         if (i == 1)
2549           g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2550       } else {
2551         /* else only udpsink needed, link it to the session */
2552         gst_pad_link (priv->send_src[i], sinkpad);
2553         gst_object_unref (sinkpad);
2554       }
2555     }
2556
2557     /* check if we need to set to a special state */
2558     if (state != GST_STATE_NULL) {
2559       if (priv->udpsink[i] && (priv->srcpad || i == 1))
2560         gst_element_set_state (priv->udpsink[i], state);
2561       if (priv->appsink[i] && (priv->srcpad || i == 1))
2562         gst_element_set_state (priv->appsink[i], state);
2563       if (priv->appqueue[i] && (priv->srcpad || i == 1))
2564         gst_element_set_state (priv->appqueue[i], state);
2565       if (priv->udpqueue[i] && (priv->srcpad || i == 1))
2566         gst_element_set_state (priv->udpqueue[i], state);
2567       if (priv->tee[i] && (priv->srcpad || i == 1))
2568         gst_element_set_state (priv->tee[i], state);
2569     }
2570   }
2571
2572   return TRUE;
2573
2574   /* ERRORS */
2575 no_udp_protocol:
2576   {
2577     return FALSE;
2578   }
2579 }
2580
2581 /* must be called with lock */
2582 static void
2583 create_receiver_part (GstRTSPStream * stream, GstBin * bin, GstState state)
2584 {
2585   GstRTSPStreamPrivate *priv;
2586   GstPad *pad, *selpad;
2587   gboolean is_tcp;
2588   gint i;
2589
2590   priv = stream->priv;
2591
2592   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2593
2594   for (i = 0; i < 2; i++) {
2595     /* Only connect recv RTP sink if we expect to receive RTP. Connect recv
2596      * RTCP sink always */
2597     if (priv->sinkpad || i == 1) {
2598       /* For the receiver we create this bit of pipeline for both
2599        * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2600        * and it is all funneled into the rtpbin receive pad.
2601        *
2602        * .--------.     .--------.    .--------.
2603        * | udpsrc |     | funnel |    | rtpbin |
2604        * |       src->sink      src->sink      |
2605        * '--------'     |        |    '--------'
2606        * .--------.     |        |
2607        * | appsrc |     |        |
2608        * |       src->sink       |
2609        * '--------'     '--------'
2610        */
2611       /* make funnel for the RTP/RTCP receivers */
2612       priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2613       gst_bin_add (bin, priv->funnel[i]);
2614
2615       pad = gst_element_get_static_pad (priv->funnel[i], "src");
2616       gst_pad_link (pad, priv->recv_sink[i]);
2617       gst_object_unref (pad);
2618
2619       if (priv->udpsrc_v4[i]) {
2620         if (priv->srcpad) {
2621           /* we set and keep these to playing so that they don't cause NO_PREROLL return
2622            * values. This is only relevant for PLAY pipelines */
2623           gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2624           gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2625         }
2626         /* add udpsrc */
2627         gst_bin_add (bin, priv->udpsrc_v4[i]);
2628
2629         /* and link to the funnel v4 */
2630         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2631         pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2632         gst_pad_link (pad, selpad);
2633         gst_object_unref (pad);
2634         gst_object_unref (selpad);
2635       }
2636
2637       if (priv->udpsrc_v6[i]) {
2638         if (priv->srcpad) {
2639           gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2640           gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2641         }
2642         gst_bin_add (bin, priv->udpsrc_v6[i]);
2643
2644         /* and link to the funnel v6 */
2645         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2646         pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2647         gst_pad_link (pad, selpad);
2648         gst_object_unref (pad);
2649         gst_object_unref (selpad);
2650       }
2651
2652       if (is_tcp) {
2653         /* make and add appsrc */
2654         priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2655         priv->appsrc_base_time[i] = -1;
2656         if (priv->srcpad) {
2657           gst_element_set_state (priv->appsrc[i], GST_STATE_PLAYING);
2658           gst_element_set_locked_state (priv->appsrc[i], TRUE);
2659         }
2660         g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, "is-live",
2661             TRUE, NULL);
2662         gst_bin_add (bin, priv->appsrc[i]);
2663         /* and link to the funnel */
2664         selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2665         pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2666         gst_pad_link (pad, selpad);
2667         gst_object_unref (pad);
2668         gst_object_unref (selpad);
2669       }
2670     }
2671
2672     /* check if we need to set to a special state */
2673     if (state != GST_STATE_NULL) {
2674       if (priv->funnel[i] && (priv->sinkpad || i == 1))
2675         gst_element_set_state (priv->funnel[i], state);
2676     }
2677   }
2678 }
2679
2680 /**
2681  * gst_rtsp_stream_join_bin:
2682  * @stream: a #GstRTSPStream
2683  * @bin: (transfer none): a #GstBin to join
2684  * @rtpbin: (transfer none): a rtpbin element in @bin
2685  * @state: the target state of the new elements
2686  *
2687  * Join the #GstBin @bin that contains the element @rtpbin.
2688  *
2689  * @stream will link to @rtpbin, which must be inside @bin. The elements
2690  * added to @bin will be set to the state given in @state.
2691  *
2692  * Returns: %TRUE on success.
2693  */
2694 gboolean
2695 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
2696     GstElement * rtpbin, GstState state)
2697 {
2698   GstRTSPStreamPrivate *priv;
2699   guint idx;
2700   gchar *name;
2701   GstPadLinkReturn ret;
2702
2703   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2704   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2705   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2706
2707   priv = stream->priv;
2708
2709   g_mutex_lock (&priv->lock);
2710   if (priv->is_joined)
2711     goto was_joined;
2712
2713   /* create a session with the same index as the stream */
2714   idx = priv->idx;
2715
2716   GST_INFO ("stream %p joining bin as session %u", stream, idx);
2717
2718   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2719       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2720     /* For SRTP */
2721     g_signal_connect (rtpbin, "request-rtp-encoder",
2722         (GCallback) request_rtp_encoder, stream);
2723     g_signal_connect (rtpbin, "request-rtcp-encoder",
2724         (GCallback) request_rtcp_encoder, stream);
2725     g_signal_connect (rtpbin, "request-rtp-decoder",
2726         (GCallback) request_rtp_rtcp_decoder, stream);
2727     g_signal_connect (rtpbin, "request-rtcp-decoder",
2728         (GCallback) request_rtp_rtcp_decoder, stream);
2729   }
2730
2731   if (priv->sinkpad) {
2732     g_signal_connect (rtpbin, "request-pt-map",
2733         (GCallback) request_pt_map, stream);
2734   }
2735
2736   /* get pads from the RTP session element for sending and receiving
2737    * RTP/RTCP*/
2738   if (priv->srcpad) {
2739     /* get a pad for sending RTP */
2740     name = g_strdup_printf ("send_rtp_sink_%u", idx);
2741     priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2742     g_free (name);
2743
2744     /* link the RTP pad to the session manager, it should not really fail unless
2745      * this is not really an RTP pad */
2746     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2747     if (ret != GST_PAD_LINK_OK)
2748       goto link_failed;
2749
2750     name = g_strdup_printf ("send_rtp_src_%u", idx);
2751     priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2752     g_free (name);
2753   } else {
2754     /* Need to connect our sinkpad from here */
2755     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2756     /* EOS */
2757     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2758
2759     name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2760     priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2761     g_free (name);
2762   }
2763
2764   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2765   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2766   g_free (name);
2767   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2768   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2769   g_free (name);
2770
2771   /* get the session */
2772   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2773
2774   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2775       stream);
2776   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2777       stream);
2778   g_signal_connect (priv->session, "on-ssrc-active",
2779       (GCallback) on_ssrc_active, stream);
2780   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2781       stream);
2782   g_signal_connect (priv->session, "on-bye-timeout",
2783       (GCallback) on_bye_timeout, stream);
2784   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2785       stream);
2786
2787   /* signal for sender ssrc */
2788   g_signal_connect (priv->session, "on-new-sender-ssrc",
2789       (GCallback) on_new_sender_ssrc, stream);
2790   g_signal_connect (priv->session, "on-sender-ssrc-active",
2791       (GCallback) on_sender_ssrc_active, stream);
2792
2793   if (!create_sender_part (stream, bin, state))
2794     goto no_udp_protocol;
2795
2796   create_receiver_part (stream, bin, state);
2797
2798   if (priv->srcpad) {
2799     /* be notified of caps changes */
2800     priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2801         (GCallback) caps_notify, stream);
2802   }
2803
2804   priv->joined_bin = bin;
2805   priv->is_joined = TRUE;
2806   g_mutex_unlock (&priv->lock);
2807
2808   return TRUE;
2809
2810   /* ERRORS */
2811 was_joined:
2812   {
2813     g_mutex_unlock (&priv->lock);
2814     return TRUE;
2815   }
2816 link_failed:
2817   {
2818     GST_WARNING ("failed to link stream %u", idx);
2819     gst_object_unref (priv->send_rtp_sink);
2820     priv->send_rtp_sink = NULL;
2821     g_mutex_unlock (&priv->lock);
2822     return FALSE;
2823   }
2824 no_udp_protocol:
2825   {
2826     GST_WARNING ("failed to allocate ports %u", idx);
2827     gst_object_unref (priv->send_rtp_sink);
2828     priv->send_rtp_sink = NULL;
2829     gst_object_unref (priv->send_src[0]);
2830     priv->send_src[0] = NULL;
2831     gst_object_unref (priv->send_src[1]);
2832     priv->send_src[1] = NULL;
2833     gst_object_unref (priv->recv_sink[0]);
2834     priv->recv_sink[0] = NULL;
2835     gst_object_unref (priv->recv_sink[1]);
2836     priv->recv_sink[1] = NULL;
2837     if (priv->udpsink[0])
2838       gst_element_set_state (priv->udpsink[0], GST_STATE_NULL);
2839     if (priv->udpsink[1])
2840       gst_element_set_state (priv->udpsink[1], GST_STATE_NULL);
2841     if (priv->udpsrc_v4[0]) {
2842       gst_element_set_state (priv->udpsrc_v4[0], GST_STATE_NULL);
2843       gst_object_unref (priv->udpsrc_v4[0]);
2844       priv->udpsrc_v4[0] = NULL;
2845     }
2846     if (priv->udpsrc_v4[1]) {
2847       gst_element_set_state (priv->udpsrc_v4[1], GST_STATE_NULL);
2848       gst_object_unref (priv->udpsrc_v4[1]);
2849       priv->udpsrc_v4[1] = NULL;
2850     }
2851     if (priv->udpsrc_mcast_v4[0]) {
2852       gst_element_set_state (priv->udpsrc_mcast_v4[0], GST_STATE_NULL);
2853       gst_object_unref (priv->udpsrc_mcast_v4[0]);
2854       priv->udpsrc_mcast_v4[0] = NULL;
2855     }
2856     if (priv->udpsrc_mcast_v4[1]) {
2857       gst_element_set_state (priv->udpsrc_mcast_v4[1], GST_STATE_NULL);
2858       gst_object_unref (priv->udpsrc_mcast_v4[1]);
2859       priv->udpsrc_mcast_v4[1] = NULL;
2860     }
2861     if (priv->udpsrc_v6[0]) {
2862       gst_element_set_state (priv->udpsrc_v6[0], GST_STATE_NULL);
2863       gst_object_unref (priv->udpsrc_v6[0]);
2864       priv->udpsrc_v6[0] = NULL;
2865     }
2866     if (priv->udpsrc_v6[1]) {
2867       gst_element_set_state (priv->udpsrc_v6[1], GST_STATE_NULL);
2868       gst_object_unref (priv->udpsrc_v6[1]);
2869       priv->udpsrc_v6[1] = NULL;
2870     }
2871     if (priv->udpsrc_mcast_v6[0]) {
2872       gst_element_set_state (priv->udpsrc_mcast_v6[0], GST_STATE_NULL);
2873       gst_object_unref (priv->udpsrc_mcast_v6[0]);
2874       priv->udpsrc_mcast_v6[0] = NULL;
2875     }
2876     if (priv->udpsrc_mcast_v6[1]) {
2877       gst_element_set_state (priv->udpsrc_mcast_v6[1], GST_STATE_NULL);
2878       gst_object_unref (priv->udpsrc_mcast_v6[1]);
2879       priv->udpsrc_mcast_v6[1] = NULL;
2880     }
2881     g_mutex_unlock (&priv->lock);
2882     return FALSE;
2883   }
2884 }
2885
2886 /**
2887  * gst_rtsp_stream_leave_bin:
2888  * @stream: a #GstRTSPStream
2889  * @bin: (transfer none): a #GstBin
2890  * @rtpbin: (transfer none): a rtpbin #GstElement
2891  *
2892  * Remove the elements of @stream from @bin.
2893  *
2894  * Return: %TRUE on success.
2895  */
2896 gboolean
2897 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2898     GstElement * rtpbin)
2899 {
2900   GstRTSPStreamPrivate *priv;
2901   gint i;
2902   gboolean is_tcp, is_udp;
2903
2904   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2905   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2906   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2907
2908   priv = stream->priv;
2909
2910   g_mutex_lock (&priv->lock);
2911   if (!priv->is_joined)
2912     goto was_not_joined;
2913
2914   priv->joined_bin = NULL;
2915
2916   /* all transports must be removed by now */
2917   if (priv->transports != NULL)
2918     goto transports_not_removed;
2919
2920   clear_tr_cache (priv, TRUE);
2921   clear_tr_cache (priv, FALSE);
2922
2923   GST_INFO ("stream %p leaving bin", stream);
2924
2925   if (priv->srcpad) {
2926     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2927
2928     g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2929     gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2930     gst_object_unref (priv->send_rtp_sink);
2931     priv->send_rtp_sink = NULL;
2932   } else if (priv->recv_rtp_src) {
2933     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2934     gst_object_unref (priv->recv_rtp_src);
2935     priv->recv_rtp_src = NULL;
2936   }
2937
2938   is_tcp = priv->protocols & GST_RTSP_LOWER_TRANS_TCP;
2939
2940   is_udp = ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
2941       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST));
2942
2943
2944   for (i = 0; i < 2; i++) {
2945     if (priv->udpsink[i])
2946       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2947     if (priv->appsink[i])
2948       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2949     if (priv->appqueue[i])
2950       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2951     if (priv->udpqueue[i])
2952       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2953     if (priv->tee[i])
2954       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2955     if (priv->funnel[i])
2956       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2957     if (priv->appsrc[i])
2958       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2959
2960     if (priv->udpsrc_v4[i]) {
2961       if (priv->sinkpad || i == 1) {
2962         /* and set udpsrc to NULL now before removing */
2963         gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2964         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2965         /* removing them should also nicely release the request
2966          * pads when they finalize */
2967         gst_bin_remove (bin, priv->udpsrc_v4[i]);
2968       } else {
2969         /* we need to set the state to NULL before unref */
2970         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2971         gst_object_unref (priv->udpsrc_v4[i]);
2972       }
2973     }
2974
2975     if (priv->udpsrc_mcast_v4[i]) {
2976       if (priv->sinkpad || i == 1) {
2977         /* and set udpsrc to NULL now before removing */
2978         gst_element_set_locked_state (priv->udpsrc_mcast_v4[i], FALSE);
2979         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2980         /* removing them should also nicely release the request
2981          * pads when they finalize */
2982         gst_bin_remove (bin, priv->udpsrc_mcast_v4[i]);
2983       } else {
2984         gst_element_set_state (priv->udpsrc_mcast_v4[i], GST_STATE_NULL);
2985         gst_object_unref (priv->udpsrc_mcast_v4[i]);
2986       }
2987     }
2988
2989     if (priv->udpsrc_v6[i]) {
2990       if (priv->sinkpad || i == 1) {
2991         gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2992         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2993         gst_bin_remove (bin, priv->udpsrc_v6[i]);
2994       } else {
2995         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2996         gst_object_unref (priv->udpsrc_v6[i]);
2997       }
2998     }
2999     if (priv->udpsrc_mcast_v6[i]) {
3000       if (priv->sinkpad || i == 1) {
3001         gst_element_set_locked_state (priv->udpsrc_mcast_v6[i], FALSE);
3002         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
3003         gst_bin_remove (bin, priv->udpsrc_mcast_v6[i]);
3004       } else {
3005         gst_element_set_state (priv->udpsrc_mcast_v6[i], GST_STATE_NULL);
3006         gst_object_unref (priv->udpsrc_mcast_v6[i]);
3007       }
3008     }
3009
3010     if (priv->udpsink[i] && is_udp && (priv->srcpad || i == 1))
3011       gst_bin_remove (bin, priv->udpsink[i]);
3012     if (priv->appsrc[i]) {
3013       if (priv->sinkpad || i == 1) {
3014         gst_element_set_locked_state (priv->appsrc[i], FALSE);
3015         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
3016         gst_bin_remove (bin, priv->appsrc[i]);
3017       } else {
3018         gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
3019         gst_object_unref (priv->appsrc[i]);
3020       }
3021     }
3022     if (priv->appsink[i] && is_tcp && (priv->srcpad || i == 1))
3023       gst_bin_remove (bin, priv->appsink[i]);
3024     if (priv->appqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
3025       gst_bin_remove (bin, priv->appqueue[i]);
3026     if (priv->udpqueue[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
3027       gst_bin_remove (bin, priv->udpqueue[i]);
3028     if (priv->tee[i] && is_tcp && is_udp && (priv->srcpad || i == 1))
3029       gst_bin_remove (bin, priv->tee[i]);
3030     if (priv->funnel[i] && (priv->sinkpad || i == 1))
3031       gst_bin_remove (bin, priv->funnel[i]);
3032
3033     if (priv->sinkpad || i == 1) {
3034       gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
3035       gst_object_unref (priv->recv_sink[i]);
3036       priv->recv_sink[i] = NULL;
3037     }
3038
3039     priv->udpsrc_v4[i] = NULL;
3040     priv->udpsrc_v6[i] = NULL;
3041     priv->udpsrc_mcast_v4[i] = NULL;
3042     priv->udpsrc_mcast_v6[i] = NULL;
3043     priv->udpsink[i] = NULL;
3044     priv->appsrc[i] = NULL;
3045     priv->appsink[i] = NULL;
3046     priv->appqueue[i] = NULL;
3047     priv->udpqueue[i] = NULL;
3048     priv->tee[i] = NULL;
3049     priv->funnel[i] = NULL;
3050   }
3051
3052   if (priv->srcpad) {
3053     gst_object_unref (priv->send_src[0]);
3054     priv->send_src[0] = NULL;
3055   }
3056
3057   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
3058   gst_object_unref (priv->send_src[1]);
3059   priv->send_src[1] = NULL;
3060
3061   g_object_unref (priv->session);
3062   priv->session = NULL;
3063   if (priv->caps)
3064     gst_caps_unref (priv->caps);
3065   priv->caps = NULL;
3066
3067   if (priv->srtpenc)
3068     gst_object_unref (priv->srtpenc);
3069   if (priv->srtpdec)
3070     gst_object_unref (priv->srtpdec);
3071
3072   priv->is_joined = FALSE;
3073   g_mutex_unlock (&priv->lock);
3074
3075   return TRUE;
3076
3077 was_not_joined:
3078   {
3079     g_mutex_unlock (&priv->lock);
3080     return TRUE;
3081   }
3082 transports_not_removed:
3083   {
3084     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
3085     g_mutex_unlock (&priv->lock);
3086     return FALSE;
3087   }
3088 }
3089
3090 /**
3091  * gst_rtsp_stream_get_joined_bin:
3092  * @stream: a #GstRTSPStream
3093  *
3094  * Get the previous joined bin with gst_rtsp_stream_join_bin() or NULL.
3095  *
3096  * Return: (transfer full): the joined bin or NULL.
3097  */
3098 GstBin *
3099 gst_rtsp_stream_get_joined_bin (GstRTSPStream * stream)
3100 {
3101   GstRTSPStreamPrivate *priv;
3102   GstBin *bin = NULL;
3103
3104   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3105
3106   priv = stream->priv;
3107
3108   g_mutex_lock (&priv->lock);
3109   bin = priv->joined_bin ? gst_object_ref (priv->joined_bin) : NULL;
3110   g_mutex_unlock (&priv->lock);
3111
3112   return bin;
3113 }
3114
3115 /**
3116  * gst_rtsp_stream_get_rtpinfo:
3117  * @stream: a #GstRTSPStream
3118  * @rtptime: (allow-none): result RTP timestamp
3119  * @seq: (allow-none): result RTP seqnum
3120  * @clock_rate: (allow-none): the clock rate
3121  * @running_time: (allow-none): result running-time
3122  *
3123  * Retrieve the current rtptime, seq and running-time. This is used to
3124  * construct a RTPInfo reply header.
3125  *
3126  * Returns: %TRUE when rtptime, seq and running-time could be determined.
3127  */
3128 gboolean
3129 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
3130     guint * rtptime, guint * seq, guint * clock_rate,
3131     GstClockTime * running_time)
3132 {
3133   GstRTSPStreamPrivate *priv;
3134   GstStructure *stats;
3135   GObjectClass *payobjclass;
3136
3137   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3138
3139   priv = stream->priv;
3140
3141   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
3142
3143   g_mutex_lock (&priv->lock);
3144
3145   /* First try to extract the information from the last buffer on the sinks.
3146    * This will have a more accurate sequence number and timestamp, as between
3147    * the payloader and the sink there can be some queues
3148    */
3149   if (priv->udpsink[0] || priv->appsink[0]) {
3150     GstSample *last_sample;
3151
3152     if (priv->udpsink[0])
3153       g_object_get (priv->udpsink[0], "last-sample", &last_sample, NULL);
3154     else
3155       g_object_get (priv->appsink[0], "last-sample", &last_sample, NULL);
3156
3157     if (last_sample) {
3158       GstCaps *caps;
3159       GstBuffer *buffer;
3160       GstSegment *segment;
3161       GstRTPBuffer rtp_buffer = GST_RTP_BUFFER_INIT;
3162
3163       caps = gst_sample_get_caps (last_sample);
3164       buffer = gst_sample_get_buffer (last_sample);
3165       segment = gst_sample_get_segment (last_sample);
3166
3167       if (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp_buffer)) {
3168         if (seq) {
3169           *seq = gst_rtp_buffer_get_seq (&rtp_buffer);
3170         }
3171
3172         if (rtptime) {
3173           *rtptime = gst_rtp_buffer_get_timestamp (&rtp_buffer);
3174         }
3175
3176         gst_rtp_buffer_unmap (&rtp_buffer);
3177
3178         if (running_time) {
3179           *running_time =
3180               gst_segment_to_running_time (segment, GST_FORMAT_TIME,
3181               GST_BUFFER_TIMESTAMP (buffer));
3182         }
3183
3184         if (clock_rate) {
3185           GstStructure *s = gst_caps_get_structure (caps, 0);
3186
3187           gst_structure_get_int (s, "clock-rate", (gint *) clock_rate);
3188
3189           if (*clock_rate == 0 && running_time)
3190             *running_time = GST_CLOCK_TIME_NONE;
3191         }
3192         gst_sample_unref (last_sample);
3193
3194         goto done;
3195       } else {
3196         gst_sample_unref (last_sample);
3197       }
3198     }
3199   }
3200
3201   if (g_object_class_find_property (payobjclass, "stats")) {
3202     g_object_get (priv->payloader, "stats", &stats, NULL);
3203     if (stats == NULL)
3204       goto no_stats;
3205
3206     if (seq)
3207       gst_structure_get_uint (stats, "seqnum", seq);
3208
3209     if (rtptime)
3210       gst_structure_get_uint (stats, "timestamp", rtptime);
3211
3212     if (running_time)
3213       gst_structure_get_clock_time (stats, "running-time", running_time);
3214
3215     if (clock_rate) {
3216       gst_structure_get_uint (stats, "clock-rate", clock_rate);
3217       if (*clock_rate == 0 && running_time)
3218         *running_time = GST_CLOCK_TIME_NONE;
3219     }
3220     gst_structure_free (stats);
3221   } else {
3222     if (!g_object_class_find_property (payobjclass, "seqnum") ||
3223         !g_object_class_find_property (payobjclass, "timestamp"))
3224       goto no_stats;
3225
3226     if (seq)
3227       g_object_get (priv->payloader, "seqnum", seq, NULL);
3228
3229     if (rtptime)
3230       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
3231
3232     if (running_time)
3233       *running_time = GST_CLOCK_TIME_NONE;
3234   }
3235
3236 done:
3237   g_mutex_unlock (&priv->lock);
3238
3239   return TRUE;
3240
3241   /* ERRORS */
3242 no_stats:
3243   {
3244     GST_WARNING ("Could not get payloader stats");
3245     g_mutex_unlock (&priv->lock);
3246     return FALSE;
3247   }
3248 }
3249
3250 /**
3251  * gst_rtsp_stream_get_caps:
3252  * @stream: a #GstRTSPStream
3253  *
3254  * Retrieve the current caps of @stream.
3255  *
3256  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
3257  * after usage.
3258  */
3259 GstCaps *
3260 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
3261 {
3262   GstRTSPStreamPrivate *priv;
3263   GstCaps *result;
3264
3265   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3266
3267   priv = stream->priv;
3268
3269   g_mutex_lock (&priv->lock);
3270   if ((result = priv->caps))
3271     gst_caps_ref (result);
3272   g_mutex_unlock (&priv->lock);
3273
3274   return result;
3275 }
3276
3277 /**
3278  * gst_rtsp_stream_recv_rtp:
3279  * @stream: a #GstRTSPStream
3280  * @buffer: (transfer full): a #GstBuffer
3281  *
3282  * Handle an RTP buffer for the stream. This method is usually called when a
3283  * message has been received from a client using the TCP transport.
3284  *
3285  * This function takes ownership of @buffer.
3286  *
3287  * Returns: a GstFlowReturn.
3288  */
3289 GstFlowReturn
3290 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
3291 {
3292   GstRTSPStreamPrivate *priv;
3293   GstFlowReturn ret;
3294   GstElement *element;
3295
3296   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3297   priv = stream->priv;
3298   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3299   g_return_val_if_fail (priv->is_joined, FALSE);
3300
3301   g_mutex_lock (&priv->lock);
3302   if (priv->appsrc[0])
3303     element = gst_object_ref (priv->appsrc[0]);
3304   else
3305     element = NULL;
3306   g_mutex_unlock (&priv->lock);
3307
3308   if (element) {
3309     if (priv->appsrc_base_time[0] == -1) {
3310       /* Take current running_time. This timestamp will be put on
3311        * the first buffer of each stream because we are a live source and so we
3312        * timestamp with the running_time. When we are dealing with TCP, we also
3313        * only timestamp the first buffer (using the DISCONT flag) because a server
3314        * typically bursts data, for which we don't want to compensate by speeding
3315        * up the media. The other timestamps will be interpollated from this one
3316        * using the RTP timestamps. */
3317       GST_OBJECT_LOCK (element);
3318       if (GST_ELEMENT_CLOCK (element)) {
3319         GstClockTime now;
3320         GstClockTime base_time;
3321
3322         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3323         base_time = GST_ELEMENT_CAST (element)->base_time;
3324
3325         priv->appsrc_base_time[0] = now - base_time;
3326         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
3327         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3328             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3329             GST_TIME_ARGS (base_time));
3330       }
3331       GST_OBJECT_UNLOCK (element);
3332     }
3333
3334     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3335     gst_object_unref (element);
3336   } else {
3337     ret = GST_FLOW_OK;
3338   }
3339   return ret;
3340 }
3341
3342 /**
3343  * gst_rtsp_stream_recv_rtcp:
3344  * @stream: a #GstRTSPStream
3345  * @buffer: (transfer full): a #GstBuffer
3346  *
3347  * Handle an RTCP buffer for the stream. This method is usually called when a
3348  * message has been received from a client using the TCP transport.
3349  *
3350  * This function takes ownership of @buffer.
3351  *
3352  * Returns: a GstFlowReturn.
3353  */
3354 GstFlowReturn
3355 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
3356 {
3357   GstRTSPStreamPrivate *priv;
3358   GstFlowReturn ret;
3359   GstElement *element;
3360
3361   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
3362   priv = stream->priv;
3363   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
3364
3365   if (!priv->is_joined) {
3366     gst_buffer_unref (buffer);
3367     return GST_FLOW_NOT_LINKED;
3368   }
3369   g_mutex_lock (&priv->lock);
3370   if (priv->appsrc[1])
3371     element = gst_object_ref (priv->appsrc[1]);
3372   else
3373     element = NULL;
3374   g_mutex_unlock (&priv->lock);
3375
3376   if (element) {
3377     if (priv->appsrc_base_time[1] == -1) {
3378       /* Take current running_time. This timestamp will be put on
3379        * the first buffer of each stream because we are a live source and so we
3380        * timestamp with the running_time. When we are dealing with TCP, we also
3381        * only timestamp the first buffer (using the DISCONT flag) because a server
3382        * typically bursts data, for which we don't want to compensate by speeding
3383        * up the media. The other timestamps will be interpollated from this one
3384        * using the RTP timestamps. */
3385       GST_OBJECT_LOCK (element);
3386       if (GST_ELEMENT_CLOCK (element)) {
3387         GstClockTime now;
3388         GstClockTime base_time;
3389
3390         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
3391         base_time = GST_ELEMENT_CAST (element)->base_time;
3392
3393         priv->appsrc_base_time[1] = now - base_time;
3394         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
3395         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
3396             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
3397             GST_TIME_ARGS (base_time));
3398       }
3399       GST_OBJECT_UNLOCK (element);
3400     }
3401
3402     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
3403     gst_object_unref (element);
3404   } else {
3405     ret = GST_FLOW_OK;
3406     gst_buffer_unref (buffer);
3407   }
3408   return ret;
3409 }
3410
3411 /* must be called with lock */
3412 static gboolean
3413 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
3414     gboolean add)
3415 {
3416   GstRTSPStreamPrivate *priv = stream->priv;
3417   const GstRTSPTransport *tr;
3418
3419   tr = gst_rtsp_stream_transport_get_transport (trans);
3420
3421   switch (tr->lower_transport) {
3422     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
3423     case GST_RTSP_LOWER_TRANS_UDP:
3424     {
3425       gchar *dest;
3426       gint min, max;
3427       guint ttl = 0;
3428
3429       dest = tr->destination;
3430       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
3431         min = tr->port.min;
3432         max = tr->port.max;
3433         ttl = tr->ttl;
3434       } else if (priv->client_side) {
3435         /* In client side mode the 'destination' is the RTSP server, so send
3436          * to those ports */
3437         min = tr->server_port.min;
3438         max = tr->server_port.max;
3439       } else {
3440         min = tr->client_port.min;
3441         max = tr->client_port.max;
3442       }
3443
3444       if (add) {
3445         if (ttl > 0) {
3446           GST_INFO ("setting ttl-mc %d", ttl);
3447           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
3448           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
3449         }
3450         GST_INFO ("adding %s:%d-%d", dest, min, max);
3451         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
3452         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
3453         priv->transports = g_list_prepend (priv->transports, trans);
3454       } else {
3455         GST_INFO ("removing %s:%d-%d", dest, min, max);
3456         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
3457         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
3458         priv->transports = g_list_remove (priv->transports, trans);
3459       }
3460       priv->transports_cookie++;
3461       break;
3462     }
3463     case GST_RTSP_LOWER_TRANS_TCP:
3464       if (add) {
3465         GST_INFO ("adding TCP %s", tr->destination);
3466         priv->transports = g_list_prepend (priv->transports, trans);
3467       } else {
3468         GST_INFO ("removing TCP %s", tr->destination);
3469         priv->transports = g_list_remove (priv->transports, trans);
3470       }
3471       priv->transports_cookie++;
3472       break;
3473     default:
3474       goto unknown_transport;
3475   }
3476   return TRUE;
3477
3478   /* ERRORS */
3479 unknown_transport:
3480   {
3481     GST_INFO ("Unknown transport %d", tr->lower_transport);
3482     return FALSE;
3483   }
3484 }
3485
3486
3487 /**
3488  * gst_rtsp_stream_add_transport:
3489  * @stream: a #GstRTSPStream
3490  * @trans: (transfer none): a #GstRTSPStreamTransport
3491  *
3492  * Add the transport in @trans to @stream. The media of @stream will
3493  * then also be send to the values configured in @trans.
3494  *
3495  * @stream must be joined to a bin.
3496  *
3497  * @trans must contain a valid #GstRTSPTransport.
3498  *
3499  * Returns: %TRUE if @trans was added
3500  */
3501 gboolean
3502 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
3503     GstRTSPStreamTransport * trans)
3504 {
3505   GstRTSPStreamPrivate *priv;
3506   gboolean res;
3507
3508   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3509   priv = stream->priv;
3510   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3511   g_return_val_if_fail (priv->is_joined, FALSE);
3512
3513   g_mutex_lock (&priv->lock);
3514   res = update_transport (stream, trans, TRUE);
3515   g_mutex_unlock (&priv->lock);
3516
3517   return res;
3518 }
3519
3520 /**
3521  * gst_rtsp_stream_remove_transport:
3522  * @stream: a #GstRTSPStream
3523  * @trans: (transfer none): a #GstRTSPStreamTransport
3524  *
3525  * Remove the transport in @trans from @stream. The media of @stream will
3526  * not be sent to the values configured in @trans.
3527  *
3528  * @stream must be joined to a bin.
3529  *
3530  * @trans must contain a valid #GstRTSPTransport.
3531  *
3532  * Returns: %TRUE if @trans was removed
3533  */
3534 gboolean
3535 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
3536     GstRTSPStreamTransport * trans)
3537 {
3538   GstRTSPStreamPrivate *priv;
3539   gboolean res;
3540
3541   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3542   priv = stream->priv;
3543   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
3544   g_return_val_if_fail (priv->is_joined, FALSE);
3545
3546   g_mutex_lock (&priv->lock);
3547   res = update_transport (stream, trans, FALSE);
3548   g_mutex_unlock (&priv->lock);
3549
3550   return res;
3551 }
3552
3553 /**
3554  * gst_rtsp_stream_update_crypto:
3555  * @stream: a #GstRTSPStream
3556  * @ssrc: the SSRC
3557  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
3558  *
3559  * Update the new crypto information for @ssrc in @stream. If information
3560  * for @ssrc did not exist, it will be added. If information
3561  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
3562  * be removed from @stream.
3563  *
3564  * Returns: %TRUE if @crypto could be updated
3565  */
3566 gboolean
3567 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
3568     guint ssrc, GstCaps * crypto)
3569 {
3570   GstRTSPStreamPrivate *priv;
3571
3572   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3573   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
3574
3575   priv = stream->priv;
3576
3577   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
3578
3579   g_mutex_lock (&priv->lock);
3580   if (crypto)
3581     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
3582         gst_caps_ref (crypto));
3583   else
3584     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
3585   g_mutex_unlock (&priv->lock);
3586
3587   return TRUE;
3588 }
3589
3590 /**
3591  * gst_rtsp_stream_get_rtp_socket:
3592  * @stream: a #GstRTSPStream
3593  * @family: the socket family
3594  *
3595  * Get the RTP socket from @stream for a @family.
3596  *
3597  * @stream must be joined to a bin.
3598  *
3599  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
3600  * socket could be allocated for @family. Unref after usage
3601  */
3602 GSocket *
3603 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
3604 {
3605   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3606   GSocket *socket;
3607   const gchar *name;
3608
3609   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3610   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3611       family == G_SOCKET_FAMILY_IPV6, NULL);
3612   g_return_val_if_fail (priv->udpsink[0], NULL);
3613
3614   if (family == G_SOCKET_FAMILY_IPV6)
3615     name = "socket-v6";
3616   else
3617     name = "socket";
3618
3619   g_object_get (priv->udpsink[0], name, &socket, NULL);
3620
3621   return socket;
3622 }
3623
3624 /**
3625  * gst_rtsp_stream_get_rtcp_socket:
3626  * @stream: a #GstRTSPStream
3627  * @family: the socket family
3628  *
3629  * Get the RTCP socket from @stream for a @family.
3630  *
3631  * @stream must be joined to a bin.
3632  *
3633  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
3634  * socket could be allocated for @family. Unref after usage
3635  */
3636 GSocket *
3637 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
3638 {
3639   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
3640   GSocket *socket;
3641   const gchar *name;
3642
3643   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3644   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
3645       family == G_SOCKET_FAMILY_IPV6, NULL);
3646   g_return_val_if_fail (priv->udpsink[1], NULL);
3647
3648   if (family == G_SOCKET_FAMILY_IPV6)
3649     name = "socket-v6";
3650   else
3651     name = "socket";
3652
3653   g_object_get (priv->udpsink[1], name, &socket, NULL);
3654
3655   return socket;
3656 }
3657
3658 /**
3659  * gst_rtsp_stream_set_seqnum:
3660  * @stream: a #GstRTSPStream
3661  * @seqnum: a new sequence number
3662  *
3663  * Configure the sequence number in the payloader of @stream to @seqnum.
3664  */
3665 void
3666 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
3667 {
3668   GstRTSPStreamPrivate *priv;
3669
3670   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3671
3672   priv = stream->priv;
3673
3674   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3675 }
3676
3677 /**
3678  * gst_rtsp_stream_get_seqnum:
3679  * @stream: a #GstRTSPStream
3680  *
3681  * Get the configured sequence number in the payloader of @stream.
3682  *
3683  * Returns: the sequence number of the payloader.
3684  */
3685 guint16
3686 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3687 {
3688   GstRTSPStreamPrivate *priv;
3689   guint seqnum;
3690
3691   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3692
3693   priv = stream->priv;
3694
3695   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3696
3697   return seqnum;
3698 }
3699
3700 /**
3701  * gst_rtsp_stream_transport_filter:
3702  * @stream: a #GstRTSPStream
3703  * @func: (scope call) (allow-none): a callback
3704  * @user_data: (closure): user data passed to @func
3705  *
3706  * Call @func for each transport managed by @stream. The result value of @func
3707  * determines what happens to the transport. @func will be called with @stream
3708  * locked so no further actions on @stream can be performed from @func.
3709  *
3710  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3711  * @stream.
3712  *
3713  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3714  *
3715  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3716  * will also be added with an additional ref to the result #GList of this
3717  * function..
3718  *
3719  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3720  *
3721  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3722  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3723  * element in the #GList should be unreffed before the list is freed.
3724  */
3725 GList *
3726 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3727     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3728 {
3729   GstRTSPStreamPrivate *priv;
3730   GList *result, *walk, *next;
3731   GHashTable *visited = NULL;
3732   guint cookie;
3733
3734   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3735
3736   priv = stream->priv;
3737
3738   result = NULL;
3739   if (func)
3740     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3741
3742   g_mutex_lock (&priv->lock);
3743 restart:
3744   cookie = priv->transports_cookie;
3745   for (walk = priv->transports; walk; walk = next) {
3746     GstRTSPStreamTransport *trans = walk->data;
3747     GstRTSPFilterResult res;
3748     gboolean changed;
3749
3750     next = g_list_next (walk);
3751
3752     if (func) {
3753       /* only visit each transport once */
3754       if (g_hash_table_contains (visited, trans))
3755         continue;
3756
3757       g_hash_table_add (visited, g_object_ref (trans));
3758       g_mutex_unlock (&priv->lock);
3759
3760       res = func (stream, trans, user_data);
3761
3762       g_mutex_lock (&priv->lock);
3763     } else
3764       res = GST_RTSP_FILTER_REF;
3765
3766     changed = (cookie != priv->transports_cookie);
3767
3768     switch (res) {
3769       case GST_RTSP_FILTER_REMOVE:
3770         update_transport (stream, trans, FALSE);
3771         break;
3772       case GST_RTSP_FILTER_REF:
3773         result = g_list_prepend (result, g_object_ref (trans));
3774         break;
3775       case GST_RTSP_FILTER_KEEP:
3776       default:
3777         break;
3778     }
3779     if (changed)
3780       goto restart;
3781   }
3782   g_mutex_unlock (&priv->lock);
3783
3784   if (func)
3785     g_hash_table_unref (visited);
3786
3787   return result;
3788 }
3789
3790 static GstPadProbeReturn
3791 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3792 {
3793   GstRTSPStreamPrivate *priv;
3794   GstRTSPStream *stream;
3795
3796   stream = user_data;
3797   priv = stream->priv;
3798
3799   GST_DEBUG_OBJECT (pad, "now blocking");
3800
3801   g_mutex_lock (&priv->lock);
3802   priv->blocking = TRUE;
3803   g_mutex_unlock (&priv->lock);
3804
3805   gst_element_post_message (priv->payloader,
3806       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3807           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3808
3809   return GST_PAD_PROBE_OK;
3810 }
3811
3812 /**
3813  * gst_rtsp_stream_set_blocked:
3814  * @stream: a #GstRTSPStream
3815  * @blocked: boolean indicating we should block or unblock
3816  *
3817  * Blocks or unblocks the dataflow on @stream.
3818  *
3819  * Returns: %TRUE on success
3820  */
3821 gboolean
3822 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3823 {
3824   GstRTSPStreamPrivate *priv;
3825
3826   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3827
3828   priv = stream->priv;
3829
3830   g_mutex_lock (&priv->lock);
3831   if (blocked) {
3832     priv->blocking = FALSE;
3833     if (priv->blocked_id == 0) {
3834       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3835           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3836           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3837           g_object_ref (stream), g_object_unref);
3838     }
3839   } else {
3840     if (priv->blocked_id != 0) {
3841       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3842       priv->blocked_id = 0;
3843       priv->blocking = FALSE;
3844     }
3845   }
3846   g_mutex_unlock (&priv->lock);
3847
3848   return TRUE;
3849 }
3850
3851 /**
3852  * gst_rtsp_stream_is_blocking:
3853  * @stream: a #GstRTSPStream
3854  *
3855  * Check if @stream is blocking on a #GstBuffer.
3856  *
3857  * Returns: %TRUE if @stream is blocking
3858  */
3859 gboolean
3860 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3861 {
3862   GstRTSPStreamPrivate *priv;
3863   gboolean result;
3864
3865   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3866
3867   priv = stream->priv;
3868
3869   g_mutex_lock (&priv->lock);
3870   result = priv->blocking;
3871   g_mutex_unlock (&priv->lock);
3872
3873   return result;
3874 }
3875
3876 /**
3877  * gst_rtsp_stream_query_position:
3878  * @stream: a #GstRTSPStream
3879  *
3880  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3881  * the RTP parts of the pipeline and not the RTCP parts.
3882  *
3883  * Returns: %TRUE if the position could be queried
3884  */
3885 gboolean
3886 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3887 {
3888   GstRTSPStreamPrivate *priv;
3889   GstElement *sink;
3890   gboolean ret;
3891
3892   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3893
3894   priv = stream->priv;
3895
3896   g_mutex_lock (&priv->lock);
3897   /* depending on the transport type, it should query corresponding sink */
3898   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3899       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3900     sink = priv->udpsink[0];
3901   else
3902     sink = priv->appsink[0];
3903
3904   if (sink)
3905     gst_object_ref (sink);
3906   g_mutex_unlock (&priv->lock);
3907
3908   if (!sink)
3909     return FALSE;
3910
3911   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3912   gst_object_unref (sink);
3913
3914   return ret;
3915 }
3916
3917 /**
3918  * gst_rtsp_stream_query_stop:
3919  * @stream: a #GstRTSPStream
3920  *
3921  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3922  * the RTP parts of the pipeline and not the RTCP parts.
3923  *
3924  * Returns: %TRUE if the stop could be queried
3925  */
3926 gboolean
3927 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3928 {
3929   GstRTSPStreamPrivate *priv;
3930   GstElement *sink;
3931   GstQuery *query;
3932   gboolean ret;
3933
3934   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3935
3936   priv = stream->priv;
3937
3938   g_mutex_lock (&priv->lock);
3939   /* depending on the transport type, it should query corresponding sink */
3940   if ((priv->protocols & GST_RTSP_LOWER_TRANS_UDP) ||
3941       (priv->protocols & GST_RTSP_LOWER_TRANS_UDP_MCAST))
3942     sink = priv->udpsink[0];
3943   else
3944     sink = priv->appsink[0];
3945
3946   if (sink)
3947     gst_object_ref (sink);
3948   g_mutex_unlock (&priv->lock);
3949
3950   if (!sink)
3951     return FALSE;
3952
3953   query = gst_query_new_segment (GST_FORMAT_TIME);
3954   if ((ret = gst_element_query (sink, query))) {
3955     GstFormat format;
3956
3957     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3958     if (format != GST_FORMAT_TIME)
3959       *stop = -1;
3960   }
3961   gst_query_unref (query);
3962   gst_object_unref (sink);
3963
3964   return ret;
3965
3966 }