rtsp-stream: Add necessary queues between tee and multiudpsink
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  * Copyright (C) 2015 Centricular Ltd
4  *     Author: Sebastian Dröge <sebastian@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 /**
22  * SECTION:rtsp-stream
23  * @short_description: A media stream
24  * @see_also: #GstRTSPMedia
25  *
26  * The #GstRTSPStream object manages the data transport for one stream. It
27  * is created from a payloader element and a source pad that produce the RTP
28  * packets for the stream.
29  *
30  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
31  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
32  *
33  * The #GstRTSPStream will use the configured addresspool, as set with
34  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
35  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
36  * configured address.
37  *
38  * With gst_rtsp_stream_get_server_port () you can get the port that the server
39  * will use to receive RTCP. This is the part that the clients will use to send
40  * RTCP to.
41  *
42  * With gst_rtsp_stream_add_transport() destinations can be added where the
43  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
44  * the destination again.
45  *
46  * Last reviewed on 2013-07-16 (1.0.0)
47  */
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <string.h>
52
53 #include <gio/gio.h>
54
55 #include <gst/app/gstappsrc.h>
56 #include <gst/app/gstappsink.h>
57
58 #include "rtsp-stream.h"
59
60 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
61      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
62
63 typedef struct
64 {
65   GstRTSPStreamTransport *transport;
66
67   /* RTP and RTCP source */
68   GstElement *udpsrc[2];
69   GstPad *selpad[2];
70 } GstRTSPMulticastTransportSource;
71
72 struct _GstRTSPStreamPrivate
73 {
74   GMutex lock;
75   guint idx;
76   /* Only one pad is ever set */
77   GstPad *srcpad, *sinkpad;
78   GstElement *payloader;
79   guint buffer_size;
80   gboolean is_joined;
81   gchar *control;
82
83   GstRTSPProfile profiles;
84   GstRTSPLowerTrans protocols;
85
86   /* pads on the rtpbin */
87   GstPad *send_rtp_sink;
88   GstPad *recv_rtp_src;
89   GstPad *recv_sink[2];
90   GstPad *send_src[2];
91
92   /* the RTPSession object */
93   GObject *session;
94
95   /* SRTP encoder/decoder */
96   GstElement *srtpenc;
97   GstElement *srtpdec;
98   GHashTable *keys;
99
100   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
101    * sockets */
102   GstElement *udpsrc_v4[2];
103
104   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
105    * sockets */
106   GstElement *udpsrc_v6[2];
107
108   GstElement *udpqueue[2];
109   GstElement *udpsink[2];
110
111   /* for TCP transport */
112   GstElement *appsrc[2];
113   GstClockTime appsrc_base_time[2];
114   GstElement *appqueue[2];
115   GstElement *appsink[2];
116
117   GstElement *tee[2];
118   GstElement *funnel[2];
119
120   /* retransmission */
121   GstElement *rtxsend;
122   guint rtx_pt;
123   GstClockTime rtx_time;
124
125   /* server ports for sending/receiving over ipv4 */
126   GstRTSPRange server_port_v4;
127   GstRTSPAddress *server_addr_v4;
128   gboolean have_ipv4;
129
130   /* server ports for sending/receiving over ipv6 */
131   GstRTSPRange server_port_v6;
132   GstRTSPAddress *server_addr_v6;
133   gboolean have_ipv6;
134
135   /* multicast addresses */
136   GstRTSPAddressPool *pool;
137   GstRTSPAddress *addr_v4;
138   GstRTSPAddress *addr_v6;
139
140   /* the caps of the stream */
141   gulong caps_sig;
142   GstCaps *caps;
143
144   /* transports we stream to */
145   guint n_active;
146   GList *transports;
147   guint transports_cookie;
148   GList *tr_cache_rtp;
149   GList *tr_cache_rtcp;
150   guint tr_cache_cookie_rtp;
151   guint tr_cache_cookie_rtcp;
152
153
154   /* UDP sources for UDP multicast transports */
155   GList *transport_sources;
156
157   gint dscp_qos;
158
159   /* stream blocking */
160   gulong blocked_id;
161   gboolean blocking;
162
163   /* pt->caps map for RECORD streams */
164   GHashTable *ptmap;
165 };
166
167 #define DEFAULT_CONTROL         NULL
168 #define DEFAULT_PROFILES        GST_RTSP_PROFILE_AVP
169 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_UDP_MCAST | \
170                                         GST_RTSP_LOWER_TRANS_TCP
171
172 enum
173 {
174   PROP_0,
175   PROP_CONTROL,
176   PROP_PROFILES,
177   PROP_PROTOCOLS,
178   PROP_LAST
179 };
180
181 enum
182 {
183   SIGNAL_NEW_RTP_ENCODER,
184   SIGNAL_NEW_RTCP_ENCODER,
185   SIGNAL_LAST
186 };
187
188 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
189 #define GST_CAT_DEFAULT rtsp_stream_debug
190
191 static GQuark ssrc_stream_map_key;
192
193 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
194     GValue * value, GParamSpec * pspec);
195 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
196     const GValue * value, GParamSpec * pspec);
197
198 static void gst_rtsp_stream_finalize (GObject * obj);
199
200 static guint gst_rtsp_stream_signals[SIGNAL_LAST] = { 0 };
201
202 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
203
204 static void
205 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
206 {
207   GObjectClass *gobject_class;
208
209   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
210
211   gobject_class = G_OBJECT_CLASS (klass);
212
213   gobject_class->get_property = gst_rtsp_stream_get_property;
214   gobject_class->set_property = gst_rtsp_stream_set_property;
215   gobject_class->finalize = gst_rtsp_stream_finalize;
216
217   g_object_class_install_property (gobject_class, PROP_CONTROL,
218       g_param_spec_string ("control", "Control",
219           "The control string for this stream", DEFAULT_CONTROL,
220           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
221
222   g_object_class_install_property (gobject_class, PROP_PROFILES,
223       g_param_spec_flags ("profiles", "Profiles",
224           "Allowed transfer profiles", GST_TYPE_RTSP_PROFILE,
225           DEFAULT_PROFILES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226
227   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
228       g_param_spec_flags ("protocols", "Protocols",
229           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
230           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231
232   gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER] =
233       g_signal_new ("new-rtp-encoder", G_TYPE_FROM_CLASS (klass),
234       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
235       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
236
237   gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER] =
238       g_signal_new ("new-rtcp-encoder", G_TYPE_FROM_CLASS (klass),
239       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
240       G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
241
242   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
243
244   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
245 }
246
247 static void
248 gst_rtsp_stream_init (GstRTSPStream * stream)
249 {
250   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
251
252   GST_DEBUG ("new stream %p", stream);
253
254   stream->priv = priv;
255
256   priv->dscp_qos = -1;
257   priv->control = g_strdup (DEFAULT_CONTROL);
258   priv->profiles = DEFAULT_PROFILES;
259   priv->protocols = DEFAULT_PROTOCOLS;
260
261   g_mutex_init (&priv->lock);
262
263   priv->keys = g_hash_table_new_full (g_direct_hash, g_direct_equal,
264       NULL, (GDestroyNotify) gst_caps_unref);
265   priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
266       (GDestroyNotify) gst_caps_unref);
267 }
268
269 static void
270 gst_rtsp_stream_finalize (GObject * obj)
271 {
272   GstRTSPStream *stream;
273   GstRTSPStreamPrivate *priv;
274
275   stream = GST_RTSP_STREAM (obj);
276   priv = stream->priv;
277
278   GST_DEBUG ("finalize stream %p", stream);
279
280   /* we really need to be unjoined now */
281   g_return_if_fail (!priv->is_joined);
282
283   if (priv->addr_v4)
284     gst_rtsp_address_free (priv->addr_v4);
285   if (priv->addr_v6)
286     gst_rtsp_address_free (priv->addr_v6);
287   if (priv->server_addr_v4)
288     gst_rtsp_address_free (priv->server_addr_v4);
289   if (priv->server_addr_v6)
290     gst_rtsp_address_free (priv->server_addr_v6);
291   if (priv->pool)
292     g_object_unref (priv->pool);
293   if (priv->rtxsend)
294     g_object_unref (priv->rtxsend);
295
296   gst_object_unref (priv->payloader);
297   if (priv->srcpad)
298     gst_object_unref (priv->srcpad);
299   if (priv->sinkpad)
300     gst_object_unref (priv->sinkpad);
301   g_free (priv->control);
302   g_mutex_clear (&priv->lock);
303
304   g_hash_table_unref (priv->keys);
305   g_hash_table_destroy (priv->ptmap);
306
307   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
308 }
309
310 static void
311 gst_rtsp_stream_get_property (GObject * object, guint propid,
312     GValue * value, GParamSpec * pspec)
313 {
314   GstRTSPStream *stream = GST_RTSP_STREAM (object);
315
316   switch (propid) {
317     case PROP_CONTROL:
318       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
319       break;
320     case PROP_PROFILES:
321       g_value_set_flags (value, gst_rtsp_stream_get_profiles (stream));
322       break;
323     case PROP_PROTOCOLS:
324       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
325       break;
326     default:
327       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
328   }
329 }
330
331 static void
332 gst_rtsp_stream_set_property (GObject * object, guint propid,
333     const GValue * value, GParamSpec * pspec)
334 {
335   GstRTSPStream *stream = GST_RTSP_STREAM (object);
336
337   switch (propid) {
338     case PROP_CONTROL:
339       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
340       break;
341     case PROP_PROFILES:
342       gst_rtsp_stream_set_profiles (stream, g_value_get_flags (value));
343       break;
344     case PROP_PROTOCOLS:
345       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
346       break;
347     default:
348       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
349   }
350 }
351
352 /**
353  * gst_rtsp_stream_new:
354  * @idx: an index
355  * @pad: a #GstPad
356  * @payloader: a #GstElement
357  *
358  * Create a new media stream with index @idx that handles RTP data on
359  * @pad and has a payloader element @payloader if @pad is a source pad
360  * or a depayloader element @payloader if @pad is a sink pad.
361  *
362  * Returns: (transfer full): a new #GstRTSPStream
363  */
364 GstRTSPStream *
365 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * pad)
366 {
367   GstRTSPStreamPrivate *priv;
368   GstRTSPStream *stream;
369
370   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
371   g_return_val_if_fail (GST_IS_PAD (pad), NULL);
372
373   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
374   priv = stream->priv;
375   priv->idx = idx;
376   priv->payloader = gst_object_ref (payloader);
377   if (GST_PAD_IS_SRC (pad))
378     priv->srcpad = gst_object_ref (pad);
379   else
380     priv->sinkpad = gst_object_ref (pad);
381
382   return stream;
383 }
384
385 /**
386  * gst_rtsp_stream_get_index:
387  * @stream: a #GstRTSPStream
388  *
389  * Get the stream index.
390  *
391  * Return: the stream index.
392  */
393 guint
394 gst_rtsp_stream_get_index (GstRTSPStream * stream)
395 {
396   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
397
398   return stream->priv->idx;
399 }
400
401 /**
402  * gst_rtsp_stream_get_pt:
403  * @stream: a #GstRTSPStream
404  *
405  * Get the stream payload type.
406  *
407  * Return: the stream payload type.
408  */
409 guint
410 gst_rtsp_stream_get_pt (GstRTSPStream * stream)
411 {
412   GstRTSPStreamPrivate *priv;
413   guint pt;
414
415   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
416
417   priv = stream->priv;
418
419   g_object_get (G_OBJECT (priv->payloader), "pt", &pt, NULL);
420
421   return pt;
422 }
423
424 /**
425  * gst_rtsp_stream_get_srcpad:
426  * @stream: a #GstRTSPStream
427  *
428  * Get the srcpad associated with @stream.
429  *
430  * Returns: (transfer full): the srcpad. Unref after usage.
431  */
432 GstPad *
433 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
434 {
435   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
436
437   if (!stream->priv->srcpad)
438     return NULL;
439
440   return gst_object_ref (stream->priv->srcpad);
441 }
442
443 /**
444  * gst_rtsp_stream_get_sinkpad:
445  * @stream: a #GstRTSPStream
446  *
447  * Get the sinkpad associated with @stream.
448  *
449  * Returns: (transfer full): the sinkpad. Unref after usage.
450  */
451 GstPad *
452 gst_rtsp_stream_get_sinkpad (GstRTSPStream * stream)
453 {
454   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
455
456   if (!stream->priv->sinkpad)
457     return NULL;
458
459   return gst_object_ref (stream->priv->sinkpad);
460 }
461
462 /**
463  * gst_rtsp_stream_get_control:
464  * @stream: a #GstRTSPStream
465  *
466  * Get the control string to identify this stream.
467  *
468  * Returns: (transfer full): the control string. g_free() after usage.
469  */
470 gchar *
471 gst_rtsp_stream_get_control (GstRTSPStream * stream)
472 {
473   GstRTSPStreamPrivate *priv;
474   gchar *result;
475
476   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
477
478   priv = stream->priv;
479
480   g_mutex_lock (&priv->lock);
481   if ((result = g_strdup (priv->control)) == NULL)
482     result = g_strdup_printf ("stream=%u", priv->idx);
483   g_mutex_unlock (&priv->lock);
484
485   return result;
486 }
487
488 /**
489  * gst_rtsp_stream_set_control:
490  * @stream: a #GstRTSPStream
491  * @control: a control string
492  *
493  * Set the control string in @stream.
494  */
495 void
496 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
497 {
498   GstRTSPStreamPrivate *priv;
499
500   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
501
502   priv = stream->priv;
503
504   g_mutex_lock (&priv->lock);
505   g_free (priv->control);
506   priv->control = g_strdup (control);
507   g_mutex_unlock (&priv->lock);
508 }
509
510 /**
511  * gst_rtsp_stream_has_control:
512  * @stream: a #GstRTSPStream
513  * @control: a control string
514  *
515  * Check if @stream has the control string @control.
516  *
517  * Returns: %TRUE is @stream has @control as the control string
518  */
519 gboolean
520 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
521 {
522   GstRTSPStreamPrivate *priv;
523   gboolean res;
524
525   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
526
527   priv = stream->priv;
528
529   g_mutex_lock (&priv->lock);
530   if (priv->control)
531     res = (g_strcmp0 (priv->control, control) == 0);
532   else {
533     guint streamid;
534
535     if (sscanf (control, "stream=%u", &streamid) > 0)
536       res = (streamid == priv->idx);
537     else
538       res = FALSE;
539   }
540   g_mutex_unlock (&priv->lock);
541
542   return res;
543 }
544
545 /**
546  * gst_rtsp_stream_set_mtu:
547  * @stream: a #GstRTSPStream
548  * @mtu: a new MTU
549  *
550  * Configure the mtu in the payloader of @stream to @mtu.
551  */
552 void
553 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
554 {
555   GstRTSPStreamPrivate *priv;
556
557   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
558
559   priv = stream->priv;
560
561   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
562
563   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
564 }
565
566 /**
567  * gst_rtsp_stream_get_mtu:
568  * @stream: a #GstRTSPStream
569  *
570  * Get the configured MTU in the payloader of @stream.
571  *
572  * Returns: the MTU of the payloader.
573  */
574 guint
575 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
576 {
577   GstRTSPStreamPrivate *priv;
578   guint mtu;
579
580   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
581
582   priv = stream->priv;
583
584   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
585
586   return mtu;
587 }
588
589 /* Update the dscp qos property on the udp sinks */
590 static void
591 update_dscp_qos (GstRTSPStream * stream)
592 {
593   GstRTSPStreamPrivate *priv;
594
595   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
596
597   priv = stream->priv;
598
599   if (priv->udpsink[0]) {
600     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
601         NULL);
602   }
603
604   if (priv->udpsink[1]) {
605     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
606         NULL);
607   }
608 }
609
610 /**
611  * gst_rtsp_stream_set_dscp_qos:
612  * @stream: a #GstRTSPStream
613  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
614  *
615  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
616  */
617 void
618 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
619 {
620   GstRTSPStreamPrivate *priv;
621
622   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
623
624   priv = stream->priv;
625
626   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
627
628   if (dscp_qos < -1 || dscp_qos > 63) {
629     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
630     return;
631   }
632
633   priv->dscp_qos = dscp_qos;
634
635   update_dscp_qos (stream);
636 }
637
638 /**
639  * gst_rtsp_stream_get_dscp_qos:
640  * @stream: a #GstRTSPStream
641  *
642  * Get the configured DSCP QoS in of the outgoing sockets.
643  *
644  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
645  */
646 gint
647 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
648 {
649   GstRTSPStreamPrivate *priv;
650
651   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
652
653   priv = stream->priv;
654
655   return priv->dscp_qos;
656 }
657
658 /**
659  * gst_rtsp_stream_is_transport_supported:
660  * @stream: a #GstRTSPStream
661  * @transport: (transfer none): a #GstRTSPTransport
662  *
663  * Check if @transport can be handled by stream
664  *
665  * Returns: %TRUE if @transport can be handled by @stream.
666  */
667 gboolean
668 gst_rtsp_stream_is_transport_supported (GstRTSPStream * stream,
669     GstRTSPTransport * transport)
670 {
671   GstRTSPStreamPrivate *priv;
672
673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
674
675   priv = stream->priv;
676
677   g_mutex_lock (&priv->lock);
678   if (transport->trans != GST_RTSP_TRANS_RTP)
679     goto unsupported_transmode;
680
681   if (!(transport->profile & priv->profiles))
682     goto unsupported_profile;
683
684   if (!(transport->lower_transport & priv->protocols))
685     goto unsupported_ltrans;
686
687   g_mutex_unlock (&priv->lock);
688
689   return TRUE;
690
691   /* ERRORS */
692 unsupported_transmode:
693   {
694     GST_DEBUG ("unsupported transport mode %d", transport->trans);
695     g_mutex_unlock (&priv->lock);
696     return FALSE;
697   }
698 unsupported_profile:
699   {
700     GST_DEBUG ("unsupported profile %d", transport->profile);
701     g_mutex_unlock (&priv->lock);
702     return FALSE;
703   }
704 unsupported_ltrans:
705   {
706     GST_DEBUG ("unsupported lower transport %d", transport->lower_transport);
707     g_mutex_unlock (&priv->lock);
708     return FALSE;
709   }
710 }
711
712 /**
713  * gst_rtsp_stream_set_profiles:
714  * @stream: a #GstRTSPStream
715  * @profiles: the new profiles
716  *
717  * Configure the allowed profiles for @stream.
718  */
719 void
720 gst_rtsp_stream_set_profiles (GstRTSPStream * stream, GstRTSPProfile profiles)
721 {
722   GstRTSPStreamPrivate *priv;
723
724   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
725
726   priv = stream->priv;
727
728   g_mutex_lock (&priv->lock);
729   priv->profiles = profiles;
730   g_mutex_unlock (&priv->lock);
731 }
732
733 /**
734  * gst_rtsp_stream_get_profiles:
735  * @stream: a #GstRTSPStream
736  *
737  * Get the allowed profiles of @stream.
738  *
739  * Returns: a #GstRTSPProfile
740  */
741 GstRTSPProfile
742 gst_rtsp_stream_get_profiles (GstRTSPStream * stream)
743 {
744   GstRTSPStreamPrivate *priv;
745   GstRTSPProfile res;
746
747   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_RTSP_PROFILE_UNKNOWN);
748
749   priv = stream->priv;
750
751   g_mutex_lock (&priv->lock);
752   res = priv->profiles;
753   g_mutex_unlock (&priv->lock);
754
755   return res;
756 }
757
758 /**
759  * gst_rtsp_stream_set_protocols:
760  * @stream: a #GstRTSPStream
761  * @protocols: the new flags
762  *
763  * Configure the allowed lower transport for @stream.
764  */
765 void
766 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
767     GstRTSPLowerTrans protocols)
768 {
769   GstRTSPStreamPrivate *priv;
770
771   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
772
773   priv = stream->priv;
774
775   g_mutex_lock (&priv->lock);
776   priv->protocols = protocols;
777   g_mutex_unlock (&priv->lock);
778 }
779
780 /**
781  * gst_rtsp_stream_get_protocols:
782  * @stream: a #GstRTSPStream
783  *
784  * Get the allowed protocols of @stream.
785  *
786  * Returns: a #GstRTSPLowerTrans
787  */
788 GstRTSPLowerTrans
789 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
790 {
791   GstRTSPStreamPrivate *priv;
792   GstRTSPLowerTrans res;
793
794   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
795       GST_RTSP_LOWER_TRANS_UNKNOWN);
796
797   priv = stream->priv;
798
799   g_mutex_lock (&priv->lock);
800   res = priv->protocols;
801   g_mutex_unlock (&priv->lock);
802
803   return res;
804 }
805
806 /**
807  * gst_rtsp_stream_set_address_pool:
808  * @stream: a #GstRTSPStream
809  * @pool: (transfer none): a #GstRTSPAddressPool
810  *
811  * configure @pool to be used as the address pool of @stream.
812  */
813 void
814 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
815     GstRTSPAddressPool * pool)
816 {
817   GstRTSPStreamPrivate *priv;
818   GstRTSPAddressPool *old;
819
820   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
821
822   priv = stream->priv;
823
824   GST_LOG_OBJECT (stream, "set address pool %p", pool);
825
826   g_mutex_lock (&priv->lock);
827   if ((old = priv->pool) != pool)
828     priv->pool = pool ? g_object_ref (pool) : NULL;
829   else
830     old = NULL;
831   g_mutex_unlock (&priv->lock);
832
833   if (old)
834     g_object_unref (old);
835 }
836
837 /**
838  * gst_rtsp_stream_get_address_pool:
839  * @stream: a #GstRTSPStream
840  *
841  * Get the #GstRTSPAddressPool used as the address pool of @stream.
842  *
843  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
844  * usage.
845  */
846 GstRTSPAddressPool *
847 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
848 {
849   GstRTSPStreamPrivate *priv;
850   GstRTSPAddressPool *result;
851
852   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
853
854   priv = stream->priv;
855
856   g_mutex_lock (&priv->lock);
857   if ((result = priv->pool))
858     g_object_ref (result);
859   g_mutex_unlock (&priv->lock);
860
861   return result;
862 }
863
864 /**
865  * gst_rtsp_stream_get_multicast_address:
866  * @stream: a #GstRTSPStream
867  * @family: the #GSocketFamily
868  *
869  * Get the multicast address of @stream for @family.
870  *
871  * Returns: (transfer full) (nullable): the #GstRTSPAddress of @stream
872  * or %NULL when no address could be allocated. gst_rtsp_address_free()
873  * after usage.
874  */
875 GstRTSPAddress *
876 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
877     GSocketFamily family)
878 {
879   GstRTSPStreamPrivate *priv;
880   GstRTSPAddress *result;
881   GstRTSPAddress **addrp;
882   GstRTSPAddressFlags flags;
883
884   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
885
886   priv = stream->priv;
887
888   if (family == G_SOCKET_FAMILY_IPV6) {
889     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
890     addrp = &priv->addr_v6;
891   } else {
892     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
893     addrp = &priv->addr_v4;
894   }
895
896   g_mutex_lock (&priv->lock);
897   if (*addrp == NULL) {
898     if (priv->pool == NULL)
899       goto no_pool;
900
901     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
902
903     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
904     if (*addrp == NULL)
905       goto no_address;
906   }
907   result = gst_rtsp_address_copy (*addrp);
908   g_mutex_unlock (&priv->lock);
909
910   return result;
911
912   /* ERRORS */
913 no_pool:
914   {
915     GST_ERROR_OBJECT (stream, "no address pool specified");
916     g_mutex_unlock (&priv->lock);
917     return NULL;
918   }
919 no_address:
920   {
921     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
922     g_mutex_unlock (&priv->lock);
923     return NULL;
924   }
925 }
926
927 /**
928  * gst_rtsp_stream_reserve_address:
929  * @stream: a #GstRTSPStream
930  * @address: an address
931  * @port: a port
932  * @n_ports: n_ports
933  * @ttl: a TTL
934  *
935  * Reserve @address and @port as the address and port of @stream.
936  *
937  * Returns: (nullable): the #GstRTSPAddress of @stream or %NULL when
938  * the address could be reserved. gst_rtsp_address_free() after usage.
939  */
940 GstRTSPAddress *
941 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
942     const gchar * address, guint port, guint n_ports, guint ttl)
943 {
944   GstRTSPStreamPrivate *priv;
945   GstRTSPAddress *result;
946   GInetAddress *addr;
947   GSocketFamily family;
948   GstRTSPAddress **addrp;
949
950   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
951   g_return_val_if_fail (address != NULL, NULL);
952   g_return_val_if_fail (port > 0, NULL);
953   g_return_val_if_fail (n_ports > 0, NULL);
954   g_return_val_if_fail (ttl > 0, NULL);
955
956   priv = stream->priv;
957
958   addr = g_inet_address_new_from_string (address);
959   if (!addr) {
960     GST_ERROR ("failed to get inet addr from %s", address);
961     family = G_SOCKET_FAMILY_IPV4;
962   } else {
963     family = g_inet_address_get_family (addr);
964     g_object_unref (addr);
965   }
966
967   if (family == G_SOCKET_FAMILY_IPV6)
968     addrp = &priv->addr_v6;
969   else
970     addrp = &priv->addr_v4;
971
972   g_mutex_lock (&priv->lock);
973   if (*addrp == NULL) {
974     GstRTSPAddressPoolResult res;
975
976     if (priv->pool == NULL)
977       goto no_pool;
978
979     res = gst_rtsp_address_pool_reserve_address (priv->pool, address,
980         port, n_ports, ttl, addrp);
981     if (res != GST_RTSP_ADDRESS_POOL_OK)
982       goto no_address;
983   } else {
984     if (strcmp ((*addrp)->address, address) ||
985         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
986         (*addrp)->ttl != ttl)
987       goto different_address;
988   }
989   result = gst_rtsp_address_copy (*addrp);
990   g_mutex_unlock (&priv->lock);
991
992   return result;
993
994   /* ERRORS */
995 no_pool:
996   {
997     GST_ERROR_OBJECT (stream, "no address pool specified");
998     g_mutex_unlock (&priv->lock);
999     return NULL;
1000   }
1001 no_address:
1002   {
1003     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
1004         address);
1005     g_mutex_unlock (&priv->lock);
1006     return NULL;
1007   }
1008 different_address:
1009   {
1010     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
1011         " reserved", address);
1012     g_mutex_unlock (&priv->lock);
1013     return NULL;
1014   }
1015 }
1016
1017 static gboolean
1018 alloc_ports_one_family (GstRTSPStream * stream, GstRTSPAddressPool * pool,
1019     gint buffer_size, GSocketFamily family, GstElement * udpsrc_out[2],
1020     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
1021     GstRTSPAddress ** server_addr_out)
1022 {
1023   GstRTSPStreamPrivate *priv = stream->priv;
1024   GstStateChangeReturn ret;
1025   GstElement *udpsrc0, *udpsrc1;
1026   GstElement *udpsink0, *udpsink1;
1027   GSocket *rtp_socket = NULL;
1028   GSocket *rtcp_socket;
1029   gint tmp_rtp, tmp_rtcp;
1030   guint count;
1031   gint rtpport, rtcpport;
1032   GList *rejected_addresses = NULL;
1033   GstRTSPAddress *addr = NULL;
1034   GInetAddress *inetaddr = NULL;
1035   GSocketAddress *rtp_sockaddr = NULL;
1036   GSocketAddress *rtcp_sockaddr = NULL;
1037   const gchar *multisink_socket;
1038
1039   if (family == G_SOCKET_FAMILY_IPV6)
1040     multisink_socket = "socket-v6";
1041   else
1042     multisink_socket = "socket";
1043
1044   udpsrc0 = NULL;
1045   udpsrc1 = NULL;
1046   udpsink0 = NULL;
1047   udpsink1 = NULL;
1048   count = 0;
1049
1050   /* Start with random port */
1051   tmp_rtp = 0;
1052
1053   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1054       G_SOCKET_PROTOCOL_UDP, NULL);
1055   if (!rtcp_socket)
1056     goto no_udp_protocol;
1057
1058   if (*server_addr_out)
1059     gst_rtsp_address_free (*server_addr_out);
1060
1061   /* try to allocate 2 UDP ports, the RTP port should be an even
1062    * number and the RTCP port should be the next (uneven) port */
1063 again:
1064
1065   if (rtp_socket == NULL) {
1066     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1067         G_SOCKET_PROTOCOL_UDP, NULL);
1068     if (!rtp_socket)
1069       goto no_udp_protocol;
1070   }
1071
1072   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
1073     GstRTSPAddressFlags flags;
1074
1075     if (addr)
1076       rejected_addresses = g_list_prepend (rejected_addresses, addr);
1077
1078     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
1079     if (family == G_SOCKET_FAMILY_IPV6)
1080       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
1081     else
1082       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
1083
1084     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
1085
1086     if (addr == NULL)
1087       goto no_ports;
1088
1089     tmp_rtp = addr->port;
1090
1091     g_clear_object (&inetaddr);
1092     inetaddr = g_inet_address_new_from_string (addr->address);
1093   } else {
1094     if (tmp_rtp != 0) {
1095       tmp_rtp += 2;
1096       if (++count > 20)
1097         goto no_ports;
1098     }
1099
1100     if (inetaddr == NULL)
1101       inetaddr = g_inet_address_new_any (family);
1102   }
1103
1104   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
1105   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
1106     g_object_unref (rtp_sockaddr);
1107     goto again;
1108   }
1109   g_object_unref (rtp_sockaddr);
1110
1111   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
1112   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
1113     g_clear_object (&rtp_sockaddr);
1114     goto socket_error;
1115   }
1116
1117   tmp_rtp =
1118       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
1119   g_object_unref (rtp_sockaddr);
1120
1121   /* check if port is even */
1122   if ((tmp_rtp & 1) != 0) {
1123     /* port not even, close and allocate another */
1124     tmp_rtp++;
1125     g_clear_object (&rtp_socket);
1126     goto again;
1127   }
1128
1129   /* set port */
1130   tmp_rtcp = tmp_rtp + 1;
1131
1132   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
1133   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
1134     g_object_unref (rtcp_sockaddr);
1135     g_clear_object (&rtp_socket);
1136     goto again;
1137   }
1138   g_object_unref (rtcp_sockaddr);
1139
1140   g_clear_object (&inetaddr);
1141
1142   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
1143   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
1144
1145   if (udpsrc0 == NULL || udpsrc1 == NULL)
1146     goto no_udp_protocol;
1147
1148   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
1149   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
1150
1151   ret = gst_element_set_state (udpsrc0, GST_STATE_READY);
1152   if (ret == GST_STATE_CHANGE_FAILURE)
1153     goto element_error;
1154   ret = gst_element_set_state (udpsrc1, GST_STATE_READY);
1155   if (ret == GST_STATE_CHANGE_FAILURE)
1156     goto element_error;
1157
1158   /* all fine, do port check */
1159   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
1160   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
1161
1162   /* this should not happen... */
1163   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
1164     goto port_error;
1165
1166   if (udpsink_out[0])
1167     udpsink0 = udpsink_out[0];
1168   else
1169     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
1170
1171   if (!udpsink0)
1172     goto no_udp_protocol;
1173
1174   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
1175   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
1176
1177   if (udpsink_out[1])
1178     udpsink1 = udpsink_out[1];
1179   else
1180     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
1181
1182   if (!udpsink1)
1183     goto no_udp_protocol;
1184
1185   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
1186   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
1187   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
1188
1189   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
1190   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
1191   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
1192   /* Needs to be async for RECORD streams, otherwise we will never go to
1193    * PLAYING because the sinks will wait for data while the udpsrc can't
1194    * provide data with timestamps in PAUSED. */
1195   if (priv->sinkpad)
1196     g_object_set (G_OBJECT (udpsink0), "async", FALSE, NULL);
1197   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
1198   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
1199   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
1200   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
1201   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
1202
1203   /* we keep these elements, we will further configure them when the
1204    * client told us to really use the UDP ports. */
1205   udpsrc_out[0] = udpsrc0;
1206   udpsrc_out[1] = udpsrc1;
1207   udpsink_out[0] = udpsink0;
1208   udpsink_out[1] = udpsink1;
1209
1210   server_port_out->min = rtpport;
1211   server_port_out->max = rtcpport;
1212
1213   *server_addr_out = addr;
1214   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
1215
1216   g_object_unref (rtp_socket);
1217   g_object_unref (rtcp_socket);
1218
1219   return TRUE;
1220
1221   /* ERRORS */
1222 no_udp_protocol:
1223   {
1224     goto cleanup;
1225   }
1226 no_ports:
1227   {
1228     goto cleanup;
1229   }
1230 port_error:
1231   {
1232     goto cleanup;
1233   }
1234 socket_error:
1235   {
1236     goto cleanup;
1237   }
1238 element_error:
1239   {
1240     goto cleanup;
1241   }
1242 cleanup:
1243   {
1244     if (udpsrc0) {
1245       gst_element_set_state (udpsrc0, GST_STATE_NULL);
1246       gst_object_unref (udpsrc0);
1247     }
1248     if (udpsrc1) {
1249       gst_element_set_state (udpsrc1, GST_STATE_NULL);
1250       gst_object_unref (udpsrc1);
1251     }
1252     if (udpsink0) {
1253       gst_element_set_state (udpsink0, GST_STATE_NULL);
1254       gst_object_unref (udpsink0);
1255     }
1256     if (inetaddr)
1257       g_object_unref (inetaddr);
1258     g_list_free_full (rejected_addresses,
1259         (GDestroyNotify) gst_rtsp_address_free);
1260     if (addr)
1261       gst_rtsp_address_free (addr);
1262     if (rtp_socket)
1263       g_object_unref (rtp_socket);
1264     if (rtcp_socket)
1265       g_object_unref (rtcp_socket);
1266     return FALSE;
1267   }
1268 }
1269
1270 /* must be called with lock */
1271 static gboolean
1272 alloc_ports (GstRTSPStream * stream)
1273 {
1274   GstRTSPStreamPrivate *priv = stream->priv;
1275
1276   priv->have_ipv4 =
1277       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1278       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1279       &priv->server_port_v4, &priv->server_addr_v4);
1280
1281   priv->have_ipv6 =
1282       alloc_ports_one_family (stream, priv->pool, priv->buffer_size,
1283       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1284       &priv->server_port_v6, &priv->server_addr_v6);
1285
1286   return priv->have_ipv4 || priv->have_ipv6;
1287 }
1288
1289 /**
1290  * gst_rtsp_stream_get_server_port:
1291  * @stream: a #GstRTSPStream
1292  * @server_port: (out): result server port
1293  * @family: the port family to get
1294  *
1295  * Fill @server_port with the port pair used by the server. This function can
1296  * only be called when @stream has been joined.
1297  */
1298 void
1299 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1300     GstRTSPRange * server_port, GSocketFamily family)
1301 {
1302   GstRTSPStreamPrivate *priv;
1303
1304   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1305   priv = stream->priv;
1306   g_return_if_fail (priv->is_joined);
1307
1308   g_mutex_lock (&priv->lock);
1309   if (family == G_SOCKET_FAMILY_IPV4) {
1310     if (server_port)
1311       *server_port = priv->server_port_v4;
1312   } else {
1313     if (server_port)
1314       *server_port = priv->server_port_v6;
1315   }
1316   g_mutex_unlock (&priv->lock);
1317 }
1318
1319 /**
1320  * gst_rtsp_stream_get_rtpsession:
1321  * @stream: a #GstRTSPStream
1322  *
1323  * Get the RTP session of this stream.
1324  *
1325  * Returns: (transfer full): The RTP session of this stream. Unref after usage.
1326  */
1327 GObject *
1328 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1329 {
1330   GstRTSPStreamPrivate *priv;
1331   GObject *session;
1332
1333   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1334
1335   priv = stream->priv;
1336
1337   g_mutex_lock (&priv->lock);
1338   if ((session = priv->session))
1339     g_object_ref (session);
1340   g_mutex_unlock (&priv->lock);
1341
1342   return session;
1343 }
1344
1345 /**
1346  * gst_rtsp_stream_get_ssrc:
1347  * @stream: a #GstRTSPStream
1348  * @ssrc: (out): result ssrc
1349  *
1350  * Get the SSRC used by the RTP session of this stream. This function can only
1351  * be called when @stream has been joined.
1352  */
1353 void
1354 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1355 {
1356   GstRTSPStreamPrivate *priv;
1357
1358   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1359   priv = stream->priv;
1360   g_return_if_fail (priv->is_joined);
1361
1362   g_mutex_lock (&priv->lock);
1363   if (ssrc && priv->session)
1364     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1365   g_mutex_unlock (&priv->lock);
1366 }
1367
1368 /**
1369  * gst_rtsp_stream_set_retransmission_time:
1370  * @stream: a #GstRTSPStream
1371  * @time: a #GstClockTime
1372  *
1373  * Set the amount of time to store retransmission packets.
1374  */
1375 void
1376 gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
1377     GstClockTime time)
1378 {
1379   GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
1380
1381   g_mutex_lock (&stream->priv->lock);
1382   stream->priv->rtx_time = time;
1383   if (stream->priv->rtxsend)
1384     g_object_set (stream->priv->rtxsend, "max-size-time",
1385         GST_TIME_AS_MSECONDS (time), NULL);
1386   g_mutex_unlock (&stream->priv->lock);
1387 }
1388
1389 /**
1390  * gst_rtsp_media_get_retransmission_time:
1391  * @media: a #GstRTSPMedia
1392  *
1393  * Get the amount of time to store retransmission data.
1394  *
1395  * Returns: the amount of time to store retransmission data.
1396  */
1397 GstClockTime
1398 gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
1399 {
1400   GstClockTime ret;
1401
1402   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1403
1404   g_mutex_lock (&stream->priv->lock);
1405   ret = stream->priv->rtx_time;
1406   g_mutex_unlock (&stream->priv->lock);
1407
1408   return ret;
1409 }
1410
1411 void
1412 gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
1413 {
1414   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1415
1416   GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
1417
1418   g_mutex_lock (&stream->priv->lock);
1419   stream->priv->rtx_pt = rtx_pt;
1420   if (stream->priv->rtxsend) {
1421     guint pt = gst_rtsp_stream_get_pt (stream);
1422     gchar *pt_s = g_strdup_printf ("%d", pt);
1423     GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
1424         pt_s, G_TYPE_UINT, rtx_pt, NULL);
1425     g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
1426     g_free (pt_s);
1427     gst_structure_free (rtx_pt_map);
1428   }
1429   g_mutex_unlock (&stream->priv->lock);
1430 }
1431
1432 guint
1433 gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
1434 {
1435   guint rtx_pt;
1436
1437   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
1438
1439   g_mutex_lock (&stream->priv->lock);
1440   rtx_pt = stream->priv->rtx_pt;
1441   g_mutex_unlock (&stream->priv->lock);
1442
1443   return rtx_pt;
1444 }
1445
1446 /* executed from streaming thread */
1447 static void
1448 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1449 {
1450   GstRTSPStreamPrivate *priv = stream->priv;
1451   GstCaps *newcaps, *oldcaps;
1452
1453   newcaps = gst_pad_get_current_caps (pad);
1454
1455   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1456       newcaps);
1457
1458   g_mutex_lock (&priv->lock);
1459   oldcaps = priv->caps;
1460   priv->caps = newcaps;
1461   g_mutex_unlock (&priv->lock);
1462
1463   if (oldcaps)
1464     gst_caps_unref (oldcaps);
1465 }
1466
1467 static void
1468 dump_structure (const GstStructure * s)
1469 {
1470   gchar *sstr;
1471
1472   sstr = gst_structure_to_string (s);
1473   GST_INFO ("structure: %s", sstr);
1474   g_free (sstr);
1475 }
1476
1477 static GstRTSPStreamTransport *
1478 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1479 {
1480   GstRTSPStreamPrivate *priv = stream->priv;
1481   GList *walk;
1482   GstRTSPStreamTransport *result = NULL;
1483   const gchar *tmp;
1484   gchar *dest;
1485   guint port;
1486
1487   if (rtcp_from == NULL)
1488     return NULL;
1489
1490   tmp = g_strrstr (rtcp_from, ":");
1491   if (tmp == NULL)
1492     return NULL;
1493
1494   port = atoi (tmp + 1);
1495   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1496
1497   g_mutex_lock (&priv->lock);
1498   GST_INFO ("finding %s:%d in %d transports", dest, port,
1499       g_list_length (priv->transports));
1500
1501   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1502     GstRTSPStreamTransport *trans = walk->data;
1503     const GstRTSPTransport *tr;
1504     gint min, max;
1505
1506     tr = gst_rtsp_stream_transport_get_transport (trans);
1507
1508     min = tr->client_port.min;
1509     max = tr->client_port.max;
1510
1511     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1512       result = trans;
1513       break;
1514     }
1515   }
1516   if (result)
1517     g_object_ref (result);
1518   g_mutex_unlock (&priv->lock);
1519
1520   g_free (dest);
1521
1522   return result;
1523 }
1524
1525 static GstRTSPStreamTransport *
1526 check_transport (GObject * source, GstRTSPStream * stream)
1527 {
1528   GstStructure *stats;
1529   GstRTSPStreamTransport *trans;
1530
1531   /* see if we have a stream to match with the origin of the RTCP packet */
1532   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1533   if (trans == NULL) {
1534     g_object_get (source, "stats", &stats, NULL);
1535     if (stats) {
1536       const gchar *rtcp_from;
1537
1538       dump_structure (stats);
1539
1540       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1541       if ((trans = find_transport (stream, rtcp_from))) {
1542         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1543             source);
1544         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1545             g_object_unref);
1546       }
1547       gst_structure_free (stats);
1548     }
1549   }
1550   return trans;
1551 }
1552
1553
1554 static void
1555 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1556 {
1557   GstRTSPStreamTransport *trans;
1558
1559   GST_INFO ("%p: new source %p", stream, source);
1560
1561   trans = check_transport (source, stream);
1562
1563   if (trans)
1564     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1565 }
1566
1567 static void
1568 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1569 {
1570   GST_INFO ("%p: new SDES %p", stream, source);
1571 }
1572
1573 static void
1574 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1575 {
1576   GstRTSPStreamTransport *trans;
1577
1578   trans = check_transport (source, stream);
1579
1580   if (trans) {
1581     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1582     gst_rtsp_stream_transport_keep_alive (trans);
1583   }
1584 #ifdef DUMP_STATS
1585   {
1586     GstStructure *stats;
1587     g_object_get (source, "stats", &stats, NULL);
1588     if (stats) {
1589       dump_structure (stats);
1590       gst_structure_free (stats);
1591     }
1592   }
1593 #endif
1594 }
1595
1596 static void
1597 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1598 {
1599   GST_INFO ("%p: source %p bye", stream, source);
1600 }
1601
1602 static void
1603 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1604 {
1605   GstRTSPStreamTransport *trans;
1606
1607   GST_INFO ("%p: source %p bye timeout", stream, source);
1608
1609   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1610     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1611     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1612   }
1613 }
1614
1615 static void
1616 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1617 {
1618   GstRTSPStreamTransport *trans;
1619
1620   GST_INFO ("%p: source %p timeout", stream, source);
1621
1622   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1623     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1624     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1625   }
1626 }
1627
1628 static void
1629 clear_tr_cache (GstRTSPStreamPrivate * priv, gboolean is_rtp)
1630 {
1631   if (is_rtp) {
1632     g_list_foreach (priv->tr_cache_rtp, (GFunc) g_object_unref, NULL);
1633     g_list_free (priv->tr_cache_rtp);
1634     priv->tr_cache_rtp = NULL;
1635   } else {
1636     g_list_foreach (priv->tr_cache_rtcp, (GFunc) g_object_unref, NULL);
1637     g_list_free (priv->tr_cache_rtcp);
1638     priv->tr_cache_rtcp = NULL;
1639   }
1640 }
1641
1642 static GstFlowReturn
1643 handle_new_sample (GstAppSink * sink, gpointer user_data)
1644 {
1645   GstRTSPStreamPrivate *priv;
1646   GList *walk;
1647   GstSample *sample;
1648   GstBuffer *buffer;
1649   GstRTSPStream *stream;
1650   gboolean is_rtp;
1651
1652   sample = gst_app_sink_pull_sample (sink);
1653   if (!sample)
1654     return GST_FLOW_OK;
1655
1656   stream = (GstRTSPStream *) user_data;
1657   priv = stream->priv;
1658   buffer = gst_sample_get_buffer (sample);
1659
1660   is_rtp = GST_ELEMENT_CAST (sink) == priv->appsink[0];
1661
1662   g_mutex_lock (&priv->lock);
1663   if (is_rtp) {
1664     if (priv->tr_cache_cookie_rtp != priv->transports_cookie) {
1665       clear_tr_cache (priv, is_rtp);
1666       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1667         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1668         priv->tr_cache_rtp =
1669             g_list_prepend (priv->tr_cache_rtp, g_object_ref (tr));
1670       }
1671       priv->tr_cache_cookie_rtp = priv->transports_cookie;
1672     }
1673   } else {
1674     if (priv->tr_cache_cookie_rtcp != priv->transports_cookie) {
1675       clear_tr_cache (priv, is_rtp);
1676       for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1677         GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1678         priv->tr_cache_rtcp =
1679             g_list_prepend (priv->tr_cache_rtcp, g_object_ref (tr));
1680       }
1681       priv->tr_cache_cookie_rtcp = priv->transports_cookie;
1682     }
1683   }
1684   g_mutex_unlock (&priv->lock);
1685
1686   if (is_rtp) {
1687     for (walk = priv->tr_cache_rtp; walk; walk = g_list_next (walk)) {
1688       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1689       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1690     }
1691   } else {
1692     for (walk = priv->tr_cache_rtcp; walk; walk = g_list_next (walk)) {
1693       GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1694       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1695     }
1696   }
1697   gst_sample_unref (sample);
1698
1699   return GST_FLOW_OK;
1700 }
1701
1702 static GstAppSinkCallbacks sink_cb = {
1703   NULL,                         /* not interested in EOS */
1704   NULL,                         /* not interested in preroll samples */
1705   handle_new_sample,
1706 };
1707
1708 static GstElement *
1709 get_rtp_encoder (GstRTSPStream * stream, guint session)
1710 {
1711   GstRTSPStreamPrivate *priv = stream->priv;
1712
1713   if (priv->srtpenc == NULL) {
1714     gchar *name;
1715
1716     name = g_strdup_printf ("srtpenc_%u", session);
1717     priv->srtpenc = gst_element_factory_make ("srtpenc", name);
1718     g_free (name);
1719
1720     g_object_set (priv->srtpenc, "random-key", TRUE, NULL);
1721   }
1722   return gst_object_ref (priv->srtpenc);
1723 }
1724
1725 static GstElement *
1726 request_rtp_encoder (GstElement * rtpbin, guint session, GstRTSPStream * stream)
1727 {
1728   GstRTSPStreamPrivate *priv = stream->priv;
1729   GstElement *oldenc, *enc;
1730   GstPad *pad;
1731   gchar *name;
1732
1733   if (priv->idx != session)
1734     return NULL;
1735
1736   GST_DEBUG_OBJECT (stream, "make RTP encoder for session %u", session);
1737
1738   oldenc = priv->srtpenc;
1739   enc = get_rtp_encoder (stream, session);
1740   name = g_strdup_printf ("rtp_sink_%d", session);
1741   pad = gst_element_get_request_pad (enc, name);
1742   g_free (name);
1743   gst_object_unref (pad);
1744
1745   if (oldenc == NULL)
1746     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTP_ENCODER], 0,
1747         enc);
1748
1749   return enc;
1750 }
1751
1752 static GstElement *
1753 request_rtcp_encoder (GstElement * rtpbin, guint session,
1754     GstRTSPStream * stream)
1755 {
1756   GstRTSPStreamPrivate *priv = stream->priv;
1757   GstElement *oldenc, *enc;
1758   GstPad *pad;
1759   gchar *name;
1760
1761   if (priv->idx != session)
1762     return NULL;
1763
1764   GST_DEBUG_OBJECT (stream, "make RTCP encoder for session %u", session);
1765
1766   oldenc = priv->srtpenc;
1767   enc = get_rtp_encoder (stream, session);
1768   name = g_strdup_printf ("rtcp_sink_%d", session);
1769   pad = gst_element_get_request_pad (enc, name);
1770   g_free (name);
1771   gst_object_unref (pad);
1772
1773   if (oldenc == NULL)
1774     g_signal_emit (stream, gst_rtsp_stream_signals[SIGNAL_NEW_RTCP_ENCODER], 0,
1775         enc);
1776
1777   return enc;
1778 }
1779
1780 static GstCaps *
1781 request_key (GstElement * srtpdec, guint ssrc, GstRTSPStream * stream)
1782 {
1783   GstRTSPStreamPrivate *priv = stream->priv;
1784   GstCaps *caps;
1785
1786   GST_DEBUG ("request key %08x", ssrc);
1787
1788   g_mutex_lock (&priv->lock);
1789   if ((caps = g_hash_table_lookup (priv->keys, GINT_TO_POINTER (ssrc))))
1790     gst_caps_ref (caps);
1791   g_mutex_unlock (&priv->lock);
1792
1793   return caps;
1794 }
1795
1796 static GstElement *
1797 request_rtp_rtcp_decoder (GstElement * rtpbin, guint session,
1798     GstRTSPStream * stream)
1799 {
1800   GstRTSPStreamPrivate *priv = stream->priv;
1801
1802   if (priv->idx != session)
1803     return NULL;
1804
1805   if (priv->srtpdec == NULL) {
1806     gchar *name;
1807
1808     name = g_strdup_printf ("srtpdec_%u", session);
1809     priv->srtpdec = gst_element_factory_make ("srtpdec", name);
1810     g_free (name);
1811
1812     g_signal_connect (priv->srtpdec, "request-key",
1813         (GCallback) request_key, stream);
1814   }
1815   return gst_object_ref (priv->srtpdec);
1816 }
1817
1818 static GstElement *
1819 request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
1820 {
1821   GstElement *bin;
1822   GstPad *pad;
1823   GstStructure *pt_map;
1824   gchar *name;
1825   guint pt, rtx_pt;
1826   gchar *pt_s;
1827
1828   pt = gst_rtsp_stream_get_pt (stream);
1829   pt_s = g_strdup_printf ("%u", pt);
1830   rtx_pt = stream->priv->rtx_pt;
1831
1832   GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
1833
1834   bin = gst_bin_new (NULL);
1835   stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
1836   pt_map = gst_structure_new ("application/x-rtp-pt-map",
1837       pt_s, G_TYPE_UINT, rtx_pt, NULL);
1838   g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
1839       "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
1840   g_free (pt_s);
1841   gst_structure_free (pt_map);
1842   gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
1843
1844   pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
1845   name = g_strdup_printf ("src_%u", sessid);
1846   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1847   g_free (name);
1848   gst_object_unref (pad);
1849
1850   pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
1851   name = g_strdup_printf ("sink_%u", sessid);
1852   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
1853   g_free (name);
1854   gst_object_unref (pad);
1855
1856   return bin;
1857 }
1858
1859 /**
1860  * gst_rtsp_stream_set_pt_map:
1861  * @stream: a #GstRTSPStream
1862  * @pt: the pt
1863  * @caps: a #GstCaps
1864  *
1865  * Configure a pt map between @pt and @caps.
1866  */
1867 void
1868 gst_rtsp_stream_set_pt_map (GstRTSPStream * stream, guint pt, GstCaps * caps)
1869 {
1870   GstRTSPStreamPrivate *priv = stream->priv;
1871
1872   g_mutex_lock (&priv->lock);
1873   g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (pt), gst_caps_ref (caps));
1874   g_mutex_unlock (&priv->lock);
1875 }
1876
1877 static GstCaps *
1878 request_pt_map (GstElement * rtpbin, guint session, guint pt,
1879     GstRTSPStream * stream)
1880 {
1881   GstRTSPStreamPrivate *priv = stream->priv;
1882   GstCaps *caps = NULL;
1883
1884   g_mutex_lock (&priv->lock);
1885
1886   if (priv->idx == session) {
1887     caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (pt));
1888     if (caps) {
1889       GST_DEBUG ("Stream %p, pt %u: caps %" GST_PTR_FORMAT, stream, pt, caps);
1890       gst_caps_ref (caps);
1891     } else {
1892       GST_DEBUG ("Stream %p, pt %u: no caps", stream, pt);
1893     }
1894   }
1895
1896   g_mutex_unlock (&priv->lock);
1897
1898   return caps;
1899 }
1900
1901 static void
1902 pad_added (GstElement * rtpbin, GstPad * pad, GstRTSPStream * stream)
1903 {
1904   GstRTSPStreamPrivate *priv = stream->priv;
1905   gchar *name;
1906   GstPadLinkReturn ret;
1907   guint sessid;
1908
1909   GST_DEBUG ("Stream %p added pad %s:%s for pad %s:%s", stream,
1910       GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
1911
1912   name = gst_pad_get_name (pad);
1913   if (sscanf (name, "recv_rtp_src_%u", &sessid) != 1) {
1914     g_free (name);
1915     return;
1916   }
1917   g_free (name);
1918
1919   if (priv->idx != sessid)
1920     return;
1921
1922   if (gst_pad_is_linked (priv->sinkpad)) {
1923     GST_WARNING ("Stream %p: Pad %s:%s is linked already", stream,
1924         GST_DEBUG_PAD_NAME (priv->sinkpad));
1925     return;
1926   }
1927
1928   /* link the RTP pad to the session manager, it should not really fail unless
1929    * this is not really an RTP pad */
1930   ret = gst_pad_link (pad, priv->sinkpad);
1931   if (ret != GST_PAD_LINK_OK)
1932     goto link_failed;
1933   priv->recv_rtp_src = gst_object_ref (pad);
1934
1935   return;
1936
1937 /* ERRORS */
1938 link_failed:
1939   {
1940     GST_ERROR ("Stream %p: Failed to link pads %s:%s and %s:%s", stream,
1941         GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (priv->sinkpad));
1942   }
1943 }
1944
1945 static void
1946 on_npt_stop (GstElement * rtpbin, guint session, guint ssrc,
1947     GstRTSPStream * stream)
1948 {
1949   /* TODO: What to do here other than this? */
1950   GST_DEBUG ("Stream %p: Got EOS", stream);
1951   gst_pad_send_event (stream->priv->sinkpad, gst_event_new_eos ());
1952 }
1953
1954 /**
1955  * gst_rtsp_stream_join_bin:
1956  * @stream: a #GstRTSPStream
1957  * @bin: (transfer none): a #GstBin to join
1958  * @rtpbin: (transfer none): a rtpbin element in @bin
1959  * @state: the target state of the new elements
1960  *
1961  * Join the #GstBin @bin that contains the element @rtpbin.
1962  *
1963  * @stream will link to @rtpbin, which must be inside @bin. The elements
1964  * added to @bin will be set to the state given in @state.
1965  *
1966  * Returns: %TRUE on success.
1967  */
1968 gboolean
1969 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1970     GstElement * rtpbin, GstState state)
1971 {
1972   GstRTSPStreamPrivate *priv;
1973   gint i;
1974   guint idx;
1975   gchar *name;
1976   GstPad *pad, *sinkpad, *selpad;
1977   GstPadLinkReturn ret;
1978
1979   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1980   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1981   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1982
1983   priv = stream->priv;
1984
1985   g_mutex_lock (&priv->lock);
1986   if (priv->is_joined)
1987     goto was_joined;
1988
1989   /* create a session with the same index as the stream */
1990   idx = priv->idx;
1991
1992   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1993
1994   if (!alloc_ports (stream))
1995     goto no_ports;
1996
1997   /* update the dscp qos field in the sinks */
1998   update_dscp_qos (stream);
1999
2000   if (priv->profiles & GST_RTSP_PROFILE_SAVP
2001       || priv->profiles & GST_RTSP_PROFILE_SAVPF) {
2002     /* For SRTP */
2003     g_signal_connect (rtpbin, "request-rtp-encoder",
2004         (GCallback) request_rtp_encoder, stream);
2005     g_signal_connect (rtpbin, "request-rtcp-encoder",
2006         (GCallback) request_rtcp_encoder, stream);
2007     g_signal_connect (rtpbin, "request-rtp-decoder",
2008         (GCallback) request_rtp_rtcp_decoder, stream);
2009     g_signal_connect (rtpbin, "request-rtcp-decoder",
2010         (GCallback) request_rtp_rtcp_decoder, stream);
2011   }
2012
2013   if (priv->rtx_time > 0 && priv->srcpad) {
2014     /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
2015     g_signal_connect (rtpbin, "request-aux-sender",
2016         (GCallback) request_aux_sender, stream);
2017   }
2018   if (priv->sinkpad) {
2019     g_signal_connect (rtpbin, "request-pt-map",
2020         (GCallback) request_pt_map, stream);
2021   }
2022
2023   /* get a pad for sending RTP */
2024   name = g_strdup_printf ("send_rtp_sink_%u", idx);
2025   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
2026   g_free (name);
2027
2028   if (priv->srcpad) {
2029     /* link the RTP pad to the session manager, it should not really fail unless
2030      * this is not really an RTP pad */
2031     ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
2032     if (ret != GST_PAD_LINK_OK)
2033       goto link_failed;
2034   } else {
2035     /* Need to connect our sinkpad from here */
2036     g_signal_connect (rtpbin, "pad-added", (GCallback) pad_added, stream);
2037     /* EOS */
2038     g_signal_connect (rtpbin, "on-npt-stop", (GCallback) on_npt_stop, stream);
2039   }
2040
2041   /* get pads from the RTP session element for sending and receiving
2042    * RTP/RTCP*/
2043   name = g_strdup_printf ("send_rtp_src_%u", idx);
2044   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
2045   g_free (name);
2046   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
2047   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
2048   g_free (name);
2049
2050   name = g_strdup_printf ("send_rtcp_src_%u", idx);
2051   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
2052   g_free (name);
2053   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
2054   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
2055   g_free (name);
2056
2057   /* get the session */
2058   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
2059
2060   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
2061       stream);
2062   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
2063       stream);
2064   g_signal_connect (priv->session, "on-ssrc-active",
2065       (GCallback) on_ssrc_active, stream);
2066   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
2067       stream);
2068   g_signal_connect (priv->session, "on-bye-timeout",
2069       (GCallback) on_bye_timeout, stream);
2070   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
2071       stream);
2072
2073   for (i = 0; i < 2; i++) {
2074     GstPad *teepad, *queuepad;
2075     /* For the sender we create this bit of pipeline for both
2076      * RTP and RTCP. Sync and preroll are enabled on udpsink so
2077      * we need to add a queue before appsink to make the pipeline
2078      * not block. For the TCP case, we want to pump data to the
2079      * client as fast as possible anyway.
2080      *
2081      * .--------.      .-----.    .---------.
2082      * | rtpbin |      | tee |    | udpsink |
2083      * |       send->sink   src->sink       |
2084      * '--------'      |     |    '---------'
2085      *                 |     |    .---------.    .---------.
2086      *                 |     |    |  queue  |    | appsink |
2087      *                 |    src->sink      src->sink       |
2088      *                 '-----'    '---------'    '---------'
2089      *
2090      * When only UDP is allowed, we skip the tee, queue and appsink and link the
2091      * udpsink directly to the session.
2092      */
2093     /* add udpsink */
2094     gst_bin_add (bin, priv->udpsink[i]);
2095     sinkpad = gst_element_get_static_pad (priv->udpsink[i], "sink");
2096
2097     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2098       /* make tee for RTP/RTCP */
2099       priv->tee[i] = gst_element_factory_make ("tee", NULL);
2100       gst_bin_add (bin, priv->tee[i]);
2101
2102       /* and link to rtpbin send pad */
2103       pad = gst_element_get_static_pad (priv->tee[i], "sink");
2104       gst_pad_link (priv->send_src[i], pad);
2105       gst_object_unref (pad);
2106
2107       priv->udpqueue[i] = gst_element_factory_make ("queue", NULL);
2108       gst_bin_add (bin, priv->udpqueue[i]);
2109       /* link tee to udpqueue */
2110       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2111       pad = gst_element_get_static_pad (priv->udpqueue[i], "sink");
2112       gst_pad_link (teepad, pad);
2113       gst_object_unref (pad);
2114       gst_object_unref (teepad);
2115
2116       /* link udpqueue to udpsink */
2117       queuepad = gst_element_get_static_pad (priv->udpqueue[i], "src");
2118       gst_pad_link (queuepad, sinkpad);
2119       gst_object_unref (queuepad);
2120
2121       /* make queue */
2122       priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
2123       gst_bin_add (bin, priv->appqueue[i]);
2124       /* and link to tee */
2125       teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
2126       pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
2127       gst_pad_link (teepad, pad);
2128       gst_object_unref (pad);
2129       gst_object_unref (teepad);
2130
2131       /* make appsink */
2132       priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
2133       g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
2134       g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
2135       gst_bin_add (bin, priv->appsink[i]);
2136       gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
2137           &sink_cb, stream, NULL);
2138       /* and link to queue */
2139       queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
2140       pad = gst_element_get_static_pad (priv->appsink[i], "sink");
2141       gst_pad_link (queuepad, pad);
2142       gst_object_unref (pad);
2143       gst_object_unref (queuepad);
2144     } else {
2145       /* else only udpsink needed, link it to the session */
2146       gst_pad_link (priv->send_src[i], sinkpad);
2147     }
2148     gst_object_unref (sinkpad);
2149
2150     /* For the receiver we create this bit of pipeline for both
2151      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
2152      * and it is all funneled into the rtpbin receive pad.
2153      *
2154      * .--------.     .--------.    .--------.
2155      * | udpsrc |     | funnel |    | rtpbin |
2156      * |       src->sink      src->sink      |
2157      * '--------'     |        |    '--------'
2158      * .--------.     |        |
2159      * | appsrc |     |        |
2160      * |       src->sink       |
2161      * '--------'     '--------'
2162      */
2163     /* make funnel for the RTP/RTCP receivers */
2164     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
2165     gst_bin_add (bin, priv->funnel[i]);
2166
2167     pad = gst_element_get_static_pad (priv->funnel[i], "src");
2168     gst_pad_link (pad, priv->recv_sink[i]);
2169     gst_object_unref (pad);
2170
2171     if (priv->udpsrc_v4[i]) {
2172       if (priv->srcpad) {
2173         /* we set and keep these to playing so that they don't cause NO_PREROLL return
2174          * values. This is only relevant for PLAY pipelines */
2175         gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
2176         gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
2177       }
2178       /* add udpsrc */
2179       gst_bin_add (bin, priv->udpsrc_v4[i]);
2180
2181       /* and link to the funnel v4 */
2182       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2183       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
2184       gst_pad_link (pad, selpad);
2185       gst_object_unref (pad);
2186       gst_object_unref (selpad);
2187     }
2188
2189     if (priv->udpsrc_v6[i]) {
2190       if (priv->srcpad) {
2191         gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
2192         gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
2193       }
2194       gst_bin_add (bin, priv->udpsrc_v6[i]);
2195
2196       /* and link to the funnel v6 */
2197       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2198       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
2199       gst_pad_link (pad, selpad);
2200       gst_object_unref (pad);
2201       gst_object_unref (selpad);
2202     }
2203
2204     if (priv->protocols & GST_RTSP_LOWER_TRANS_TCP) {
2205       /* make and add appsrc */
2206       priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
2207       priv->appsrc_base_time[i] = -1;
2208       g_object_set (priv->appsrc[i], "format", GST_FORMAT_TIME, NULL);
2209       gst_bin_add (bin, priv->appsrc[i]);
2210       /* and link to the funnel */
2211       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2212       pad = gst_element_get_static_pad (priv->appsrc[i], "src");
2213       gst_pad_link (pad, selpad);
2214       gst_object_unref (pad);
2215       gst_object_unref (selpad);
2216     }
2217
2218     /* check if we need to set to a special state */
2219     if (state != GST_STATE_NULL) {
2220       if (priv->udpsink[i])
2221         gst_element_set_state (priv->udpsink[i], state);
2222       if (priv->appsink[i])
2223         gst_element_set_state (priv->appsink[i], state);
2224       if (priv->appqueue[i])
2225         gst_element_set_state (priv->appqueue[i], state);
2226       if (priv->udpqueue[i])
2227         gst_element_set_state (priv->udpqueue[i], state);
2228       if (priv->tee[i])
2229         gst_element_set_state (priv->tee[i], state);
2230       if (priv->funnel[i])
2231         gst_element_set_state (priv->funnel[i], state);
2232       if (priv->appsrc[i])
2233         gst_element_set_state (priv->appsrc[i], state);
2234     }
2235   }
2236
2237   /* be notified of caps changes */
2238   priv->caps_sig = g_signal_connect (priv->send_src[0], "notify::caps",
2239       (GCallback) caps_notify, stream);
2240
2241   priv->is_joined = TRUE;
2242   g_mutex_unlock (&priv->lock);
2243
2244   return TRUE;
2245
2246   /* ERRORS */
2247 was_joined:
2248   {
2249     g_mutex_unlock (&priv->lock);
2250     return TRUE;
2251   }
2252 no_ports:
2253   {
2254     g_mutex_unlock (&priv->lock);
2255     GST_WARNING ("failed to allocate ports %u", idx);
2256     return FALSE;
2257   }
2258 link_failed:
2259   {
2260     GST_WARNING ("failed to link stream %u", idx);
2261     gst_object_unref (priv->send_rtp_sink);
2262     priv->send_rtp_sink = NULL;
2263     g_mutex_unlock (&priv->lock);
2264     return FALSE;
2265   }
2266 }
2267
2268 /**
2269  * gst_rtsp_stream_leave_bin:
2270  * @stream: a #GstRTSPStream
2271  * @bin: (transfer none): a #GstBin
2272  * @rtpbin: (transfer none): a rtpbin #GstElement
2273  *
2274  * Remove the elements of @stream from @bin.
2275  *
2276  * Return: %TRUE on success.
2277  */
2278 gboolean
2279 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
2280     GstElement * rtpbin)
2281 {
2282   GstRTSPStreamPrivate *priv;
2283   gint i;
2284   GList *l;
2285
2286   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2287   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
2288   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
2289
2290   priv = stream->priv;
2291
2292   g_mutex_lock (&priv->lock);
2293   if (!priv->is_joined)
2294     goto was_not_joined;
2295
2296   /* all transports must be removed by now */
2297   if (priv->transports != NULL)
2298     goto transports_not_removed;
2299
2300   clear_tr_cache (priv, TRUE);
2301   clear_tr_cache (priv, FALSE);
2302
2303   GST_INFO ("stream %p leaving bin", stream);
2304
2305   if (priv->srcpad) {
2306     gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
2307   } else if (priv->recv_rtp_src) {
2308     gst_pad_unlink (priv->recv_rtp_src, priv->sinkpad);
2309     gst_object_unref (priv->recv_rtp_src);
2310     priv->recv_rtp_src = NULL;
2311   }
2312   g_signal_handler_disconnect (priv->send_src[0], priv->caps_sig);
2313   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
2314   gst_object_unref (priv->send_rtp_sink);
2315   priv->send_rtp_sink = NULL;
2316
2317   for (i = 0; i < 2; i++) {
2318     if (priv->udpsink[i])
2319       gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
2320     if (priv->appsink[i])
2321       gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
2322     if (priv->appqueue[i])
2323       gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
2324     if (priv->udpqueue[i])
2325       gst_element_set_state (priv->udpqueue[i], GST_STATE_NULL);
2326     if (priv->tee[i])
2327       gst_element_set_state (priv->tee[i], GST_STATE_NULL);
2328     if (priv->funnel[i])
2329       gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
2330     if (priv->appsrc[i])
2331       gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
2332     if (priv->udpsrc_v4[i]) {
2333       /* and set udpsrc to NULL now before removing */
2334       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
2335       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
2336       /* removing them should also nicely release the request
2337        * pads when they finalize */
2338       gst_bin_remove (bin, priv->udpsrc_v4[i]);
2339     }
2340     if (priv->udpsrc_v6[i]) {
2341       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
2342       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
2343       gst_bin_remove (bin, priv->udpsrc_v6[i]);
2344     }
2345
2346     for (l = priv->transport_sources; l; l = l->next) {
2347       GstRTSPMulticastTransportSource *s = l->data;
2348
2349       if (!s->udpsrc[i])
2350         continue;
2351
2352       gst_element_set_locked_state (s->udpsrc[i], FALSE);
2353       gst_element_set_state (s->udpsrc[i], GST_STATE_NULL);
2354       gst_bin_remove (bin, s->udpsrc[i]);
2355     }
2356
2357     if (priv->udpsink[i])
2358       gst_bin_remove (bin, priv->udpsink[i]);
2359     if (priv->appsrc[i])
2360       gst_bin_remove (bin, priv->appsrc[i]);
2361     if (priv->appsink[i])
2362       gst_bin_remove (bin, priv->appsink[i]);
2363     if (priv->appqueue[i])
2364       gst_bin_remove (bin, priv->appqueue[i]);
2365     if (priv->udpqueue[i])
2366       gst_bin_remove (bin, priv->udpqueue[i]);
2367     if (priv->tee[i])
2368       gst_bin_remove (bin, priv->tee[i]);
2369     if (priv->funnel[i])
2370       gst_bin_remove (bin, priv->funnel[i]);
2371
2372     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
2373     gst_object_unref (priv->recv_sink[i]);
2374     priv->recv_sink[i] = NULL;
2375
2376     priv->udpsrc_v4[i] = NULL;
2377     priv->udpsrc_v6[i] = NULL;
2378     priv->udpsink[i] = NULL;
2379     priv->appsrc[i] = NULL;
2380     priv->appsink[i] = NULL;
2381     priv->appqueue[i] = NULL;
2382     priv->udpqueue[i] = NULL;
2383     priv->tee[i] = NULL;
2384     priv->funnel[i] = NULL;
2385   }
2386
2387   for (l = priv->transport_sources; l; l = l->next) {
2388     GstRTSPMulticastTransportSource *s = l->data;
2389     g_slice_free (GstRTSPMulticastTransportSource, s);
2390   }
2391   g_list_free (priv->transport_sources);
2392   priv->transport_sources = NULL;
2393
2394   gst_object_unref (priv->send_src[0]);
2395   priv->send_src[0] = NULL;
2396
2397   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
2398   gst_object_unref (priv->send_src[1]);
2399   priv->send_src[1] = NULL;
2400
2401   g_object_unref (priv->session);
2402   priv->session = NULL;
2403   if (priv->caps)
2404     gst_caps_unref (priv->caps);
2405   priv->caps = NULL;
2406
2407   if (priv->srtpenc)
2408     gst_object_unref (priv->srtpenc);
2409   if (priv->srtpdec)
2410     gst_object_unref (priv->srtpdec);
2411
2412   priv->is_joined = FALSE;
2413   g_mutex_unlock (&priv->lock);
2414
2415   return TRUE;
2416
2417 was_not_joined:
2418   {
2419     g_mutex_unlock (&priv->lock);
2420     return TRUE;
2421   }
2422 transports_not_removed:
2423   {
2424     GST_ERROR_OBJECT (stream, "can't leave bin (transports not removed)");
2425     g_mutex_unlock (&priv->lock);
2426     return FALSE;
2427   }
2428 }
2429
2430 /**
2431  * gst_rtsp_stream_get_rtpinfo:
2432  * @stream: a #GstRTSPStream
2433  * @rtptime: (allow-none): result RTP timestamp
2434  * @seq: (allow-none): result RTP seqnum
2435  * @clock_rate: (allow-none): the clock rate
2436  * @running_time: (allow-none): result running-time
2437  *
2438  * Retrieve the current rtptime, seq and running-time. This is used to
2439  * construct a RTPInfo reply header.
2440  *
2441  * Returns: %TRUE when rtptime, seq and running-time could be determined.
2442  */
2443 gboolean
2444 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
2445     guint * rtptime, guint * seq, guint * clock_rate,
2446     GstClockTime * running_time)
2447 {
2448   GstRTSPStreamPrivate *priv;
2449   GstStructure *stats;
2450   GObjectClass *payobjclass;
2451
2452   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2453
2454   priv = stream->priv;
2455
2456   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
2457
2458   g_mutex_lock (&priv->lock);
2459
2460   if (g_object_class_find_property (payobjclass, "stats")) {
2461     g_object_get (priv->payloader, "stats", &stats, NULL);
2462     if (stats == NULL)
2463       goto no_stats;
2464
2465     if (seq)
2466       gst_structure_get_uint (stats, "seqnum", seq);
2467
2468     if (rtptime)
2469       gst_structure_get_uint (stats, "timestamp", rtptime);
2470
2471     if (running_time)
2472       gst_structure_get_clock_time (stats, "running-time", running_time);
2473
2474     if (clock_rate) {
2475       gst_structure_get_uint (stats, "clock-rate", clock_rate);
2476       if (*clock_rate == 0 && running_time)
2477         *running_time = GST_CLOCK_TIME_NONE;
2478     }
2479     gst_structure_free (stats);
2480   } else {
2481     if (!g_object_class_find_property (payobjclass, "seqnum") ||
2482         !g_object_class_find_property (payobjclass, "timestamp"))
2483       goto no_stats;
2484
2485     if (seq)
2486       g_object_get (priv->payloader, "seqnum", seq, NULL);
2487
2488     if (rtptime)
2489       g_object_get (priv->payloader, "timestamp", rtptime, NULL);
2490
2491     if (running_time)
2492       *running_time = GST_CLOCK_TIME_NONE;
2493   }
2494   g_mutex_unlock (&priv->lock);
2495
2496   return TRUE;
2497
2498   /* ERRORS */
2499 no_stats:
2500   {
2501     GST_WARNING ("Could not get payloader stats");
2502     g_mutex_unlock (&priv->lock);
2503     return FALSE;
2504   }
2505 }
2506
2507 /**
2508  * gst_rtsp_stream_get_caps:
2509  * @stream: a #GstRTSPStream
2510  *
2511  * Retrieve the current caps of @stream.
2512  *
2513  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
2514  * after usage.
2515  */
2516 GstCaps *
2517 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
2518 {
2519   GstRTSPStreamPrivate *priv;
2520   GstCaps *result;
2521
2522   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2523
2524   priv = stream->priv;
2525
2526   g_mutex_lock (&priv->lock);
2527   if ((result = priv->caps))
2528     gst_caps_ref (result);
2529   g_mutex_unlock (&priv->lock);
2530
2531   return result;
2532 }
2533
2534 /**
2535  * gst_rtsp_stream_recv_rtp:
2536  * @stream: a #GstRTSPStream
2537  * @buffer: (transfer full): a #GstBuffer
2538  *
2539  * Handle an RTP buffer for the stream. This method is usually called when a
2540  * message has been received from a client using the TCP transport.
2541  *
2542  * This function takes ownership of @buffer.
2543  *
2544  * Returns: a GstFlowReturn.
2545  */
2546 GstFlowReturn
2547 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
2548 {
2549   GstRTSPStreamPrivate *priv;
2550   GstFlowReturn ret;
2551   GstElement *element;
2552
2553   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2554   priv = stream->priv;
2555   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2556   g_return_val_if_fail (priv->is_joined, FALSE);
2557
2558   g_mutex_lock (&priv->lock);
2559   if (priv->appsrc[0])
2560     element = gst_object_ref (priv->appsrc[0]);
2561   else
2562     element = NULL;
2563   g_mutex_unlock (&priv->lock);
2564
2565   if (element) {
2566     if (priv->appsrc_base_time[0] == -1) {
2567       /* Take current running_time. This timestamp will be put on
2568        * the first buffer of each stream because we are a live source and so we
2569        * timestamp with the running_time. When we are dealing with TCP, we also
2570        * only timestamp the first buffer (using the DISCONT flag) because a server
2571        * typically bursts data, for which we don't want to compensate by speeding
2572        * up the media. The other timestamps will be interpollated from this one
2573        * using the RTP timestamps. */
2574       GST_OBJECT_LOCK (element);
2575       if (GST_ELEMENT_CLOCK (element)) {
2576         GstClockTime now;
2577         GstClockTime base_time;
2578
2579         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2580         base_time = GST_ELEMENT_CAST (element)->base_time;
2581
2582         priv->appsrc_base_time[0] = now - base_time;
2583         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[0];
2584         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2585             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2586             GST_TIME_ARGS (base_time));
2587       }
2588       GST_OBJECT_UNLOCK (element);
2589     }
2590
2591     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2592     gst_object_unref (element);
2593   } else {
2594     ret = GST_FLOW_OK;
2595   }
2596   return ret;
2597 }
2598
2599 /**
2600  * gst_rtsp_stream_recv_rtcp:
2601  * @stream: a #GstRTSPStream
2602  * @buffer: (transfer full): a #GstBuffer
2603  *
2604  * Handle an RTCP buffer for the stream. This method is usually called when a
2605  * message has been received from a client using the TCP transport.
2606  *
2607  * This function takes ownership of @buffer.
2608  *
2609  * Returns: a GstFlowReturn.
2610  */
2611 GstFlowReturn
2612 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
2613 {
2614   GstRTSPStreamPrivate *priv;
2615   GstFlowReturn ret;
2616   GstElement *element;
2617
2618   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
2619   priv = stream->priv;
2620   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
2621
2622   if (!priv->is_joined) {
2623     gst_buffer_unref (buffer);
2624     return GST_FLOW_NOT_LINKED;
2625   }
2626   g_mutex_lock (&priv->lock);
2627   if (priv->appsrc[1])
2628     element = gst_object_ref (priv->appsrc[1]);
2629   else
2630     element = NULL;
2631   g_mutex_unlock (&priv->lock);
2632
2633   if (element) {
2634     if (priv->appsrc_base_time[1] == -1) {
2635       /* Take current running_time. This timestamp will be put on
2636        * the first buffer of each stream because we are a live source and so we
2637        * timestamp with the running_time. When we are dealing with TCP, we also
2638        * only timestamp the first buffer (using the DISCONT flag) because a server
2639        * typically bursts data, for which we don't want to compensate by speeding
2640        * up the media. The other timestamps will be interpollated from this one
2641        * using the RTP timestamps. */
2642       GST_OBJECT_LOCK (element);
2643       if (GST_ELEMENT_CLOCK (element)) {
2644         GstClockTime now;
2645         GstClockTime base_time;
2646
2647         now = gst_clock_get_time (GST_ELEMENT_CLOCK (element));
2648         base_time = GST_ELEMENT_CAST (element)->base_time;
2649
2650         priv->appsrc_base_time[1] = now - base_time;
2651         GST_BUFFER_TIMESTAMP (buffer) = priv->appsrc_base_time[1];
2652         GST_DEBUG ("stream %p: first buffer at time %" GST_TIME_FORMAT
2653             ", base %" GST_TIME_FORMAT, stream, GST_TIME_ARGS (now),
2654             GST_TIME_ARGS (base_time));
2655       }
2656       GST_OBJECT_UNLOCK (element);
2657     }
2658
2659     ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
2660     gst_object_unref (element);
2661   } else {
2662     ret = GST_FLOW_OK;
2663     gst_buffer_unref (buffer);
2664   }
2665   return ret;
2666 }
2667
2668 /* must be called with lock */
2669 static gboolean
2670 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
2671     gboolean add)
2672 {
2673   GstRTSPStreamPrivate *priv = stream->priv;
2674   const GstRTSPTransport *tr;
2675
2676   tr = gst_rtsp_stream_transport_get_transport (trans);
2677
2678   switch (tr->lower_transport) {
2679     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
2680     {
2681       GstRTSPMulticastTransportSource *source;
2682       GstBin *bin;
2683
2684       bin = GST_BIN (gst_object_get_parent (GST_OBJECT (priv->funnel[0])));
2685
2686       if (add) {
2687         gchar *host;
2688         gint i;
2689         GstPad *selpad, *pad;
2690
2691         source = g_slice_new0 (GstRTSPMulticastTransportSource);
2692         source->transport = trans;
2693
2694         for (i = 0; i < 2; i++) {
2695           host =
2696               g_strdup_printf ("udp://%s:%d", tr->destination,
2697               (i == 0) ? tr->port.min : tr->port.max);
2698           source->udpsrc[i] =
2699               gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
2700           g_free (host);
2701
2702           if (priv->srcpad) {
2703             /* we set and keep these to playing so that they don't cause NO_PREROLL return
2704              * values. This is only relevant for PLAY pipelines */
2705             gst_element_set_state (source->udpsrc[i], GST_STATE_PLAYING);
2706             gst_element_set_locked_state (source->udpsrc[i], TRUE);
2707           }
2708           /* add udpsrc */
2709           gst_bin_add (bin, source->udpsrc[i]);
2710
2711           /* and link to the funnel v4 */
2712           source->selpad[i] = selpad =
2713               gst_element_get_request_pad (priv->funnel[i], "sink_%u");
2714           pad = gst_element_get_static_pad (source->udpsrc[i], "src");
2715           gst_pad_link (pad, selpad);
2716           gst_object_unref (pad);
2717           gst_object_unref (selpad);
2718         }
2719         gst_object_unref (bin);
2720
2721         priv->transport_sources =
2722             g_list_prepend (priv->transport_sources, source);
2723       } else {
2724         GList *l;
2725
2726         for (l = priv->transport_sources; l; l = l->next) {
2727           source = l->data;
2728
2729           if (source->transport == trans) {
2730             priv->transport_sources =
2731                 g_list_delete_link (priv->transport_sources, l);
2732             break;
2733           }
2734         }
2735
2736         if (l != NULL) {
2737           gint i;
2738
2739           for (i = 0; i < 2; i++) {
2740             /* Will automatically unlink everything */
2741             gst_bin_remove (bin,
2742                 GST_ELEMENT (gst_object_ref (source->udpsrc[i])));
2743
2744             gst_element_set_state (source->udpsrc[i], GST_STATE_NULL);
2745             gst_object_unref (source->udpsrc[i]);
2746
2747             gst_element_release_request_pad (priv->funnel[i],
2748                 source->selpad[i]);
2749           }
2750
2751           g_slice_free (GstRTSPMulticastTransportSource, source);
2752         }
2753       }
2754
2755       /* fall through for the generic case */
2756     }
2757     case GST_RTSP_LOWER_TRANS_UDP:
2758     {
2759       gchar *dest;
2760       gint min, max;
2761       guint ttl = 0;
2762
2763       dest = tr->destination;
2764       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
2765         min = tr->port.min;
2766         max = tr->port.max;
2767         ttl = tr->ttl;
2768       } else {
2769         min = tr->client_port.min;
2770         max = tr->client_port.max;
2771       }
2772
2773       if (add) {
2774         if (ttl > 0) {
2775           GST_INFO ("setting ttl-mc %d", ttl);
2776           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
2777           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
2778         }
2779         GST_INFO ("adding %s:%d-%d", dest, min, max);
2780         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
2781         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
2782         priv->transports = g_list_prepend (priv->transports, trans);
2783       } else {
2784         GST_INFO ("removing %s:%d-%d", dest, min, max);
2785         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
2786         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
2787         priv->transports = g_list_remove (priv->transports, trans);
2788       }
2789       priv->transports_cookie++;
2790       break;
2791     }
2792     case GST_RTSP_LOWER_TRANS_TCP:
2793       if (add) {
2794         GST_INFO ("adding TCP %s", tr->destination);
2795         priv->transports = g_list_prepend (priv->transports, trans);
2796       } else {
2797         GST_INFO ("removing TCP %s", tr->destination);
2798         priv->transports = g_list_remove (priv->transports, trans);
2799       }
2800       priv->transports_cookie++;
2801       break;
2802     default:
2803       goto unknown_transport;
2804   }
2805   return TRUE;
2806
2807   /* ERRORS */
2808 unknown_transport:
2809   {
2810     GST_INFO ("Unknown transport %d", tr->lower_transport);
2811     return FALSE;
2812   }
2813 }
2814
2815
2816 /**
2817  * gst_rtsp_stream_add_transport:
2818  * @stream: a #GstRTSPStream
2819  * @trans: (transfer none): a #GstRTSPStreamTransport
2820  *
2821  * Add the transport in @trans to @stream. The media of @stream will
2822  * then also be send to the values configured in @trans.
2823  *
2824  * @stream must be joined to a bin.
2825  *
2826  * @trans must contain a valid #GstRTSPTransport.
2827  *
2828  * Returns: %TRUE if @trans was added
2829  */
2830 gboolean
2831 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
2832     GstRTSPStreamTransport * trans)
2833 {
2834   GstRTSPStreamPrivate *priv;
2835   gboolean res;
2836
2837   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2838   priv = stream->priv;
2839   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2840   g_return_val_if_fail (priv->is_joined, FALSE);
2841
2842   g_mutex_lock (&priv->lock);
2843   res = update_transport (stream, trans, TRUE);
2844   g_mutex_unlock (&priv->lock);
2845
2846   return res;
2847 }
2848
2849 /**
2850  * gst_rtsp_stream_remove_transport:
2851  * @stream: a #GstRTSPStream
2852  * @trans: (transfer none): a #GstRTSPStreamTransport
2853  *
2854  * Remove the transport in @trans from @stream. The media of @stream will
2855  * not be sent to the values configured in @trans.
2856  *
2857  * @stream must be joined to a bin.
2858  *
2859  * @trans must contain a valid #GstRTSPTransport.
2860  *
2861  * Returns: %TRUE if @trans was removed
2862  */
2863 gboolean
2864 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
2865     GstRTSPStreamTransport * trans)
2866 {
2867   GstRTSPStreamPrivate *priv;
2868   gboolean res;
2869
2870   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2871   priv = stream->priv;
2872   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
2873   g_return_val_if_fail (priv->is_joined, FALSE);
2874
2875   g_mutex_lock (&priv->lock);
2876   res = update_transport (stream, trans, FALSE);
2877   g_mutex_unlock (&priv->lock);
2878
2879   return res;
2880 }
2881
2882 /**
2883  * gst_rtsp_stream_update_crypto:
2884  * @stream: a #GstRTSPStream
2885  * @ssrc: the SSRC
2886  * @crypto: (transfer none) (allow-none): a #GstCaps with crypto info
2887  *
2888  * Update the new crypto information for @ssrc in @stream. If information
2889  * for @ssrc did not exist, it will be added. If information
2890  * for @ssrc existed, it will be replaced. If @crypto is %NULL, it will
2891  * be removed from @stream.
2892  *
2893  * Returns: %TRUE if @crypto could be updated
2894  */
2895 gboolean
2896 gst_rtsp_stream_update_crypto (GstRTSPStream * stream,
2897     guint ssrc, GstCaps * crypto)
2898 {
2899   GstRTSPStreamPrivate *priv;
2900
2901   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
2902   g_return_val_if_fail (crypto == NULL || GST_IS_CAPS (crypto), FALSE);
2903
2904   priv = stream->priv;
2905
2906   GST_DEBUG_OBJECT (stream, "update key for %08x", ssrc);
2907
2908   g_mutex_lock (&priv->lock);
2909   if (crypto)
2910     g_hash_table_insert (priv->keys, GINT_TO_POINTER (ssrc),
2911         gst_caps_ref (crypto));
2912   else
2913     g_hash_table_remove (priv->keys, GINT_TO_POINTER (ssrc));
2914   g_mutex_unlock (&priv->lock);
2915
2916   return TRUE;
2917 }
2918
2919 /**
2920  * gst_rtsp_stream_get_rtp_socket:
2921  * @stream: a #GstRTSPStream
2922  * @family: the socket family
2923  *
2924  * Get the RTP socket from @stream for a @family.
2925  *
2926  * @stream must be joined to a bin.
2927  *
2928  * Returns: (transfer full) (nullable): the RTP socket or %NULL if no
2929  * socket could be allocated for @family. Unref after usage
2930  */
2931 GSocket *
2932 gst_rtsp_stream_get_rtp_socket (GstRTSPStream * stream, GSocketFamily family)
2933 {
2934   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2935   GSocket *socket;
2936   const gchar *name;
2937
2938   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2939   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2940       family == G_SOCKET_FAMILY_IPV6, NULL);
2941   g_return_val_if_fail (priv->udpsink[0], NULL);
2942
2943   if (family == G_SOCKET_FAMILY_IPV6)
2944     name = "socket-v6";
2945   else
2946     name = "socket";
2947
2948   g_object_get (priv->udpsink[0], name, &socket, NULL);
2949
2950   return socket;
2951 }
2952
2953 /**
2954  * gst_rtsp_stream_get_rtcp_socket:
2955  * @stream: a #GstRTSPStream
2956  * @family: the socket family
2957  *
2958  * Get the RTCP socket from @stream for a @family.
2959  *
2960  * @stream must be joined to a bin.
2961  *
2962  * Returns: (transfer full) (nullable): the RTCP socket or %NULL if no
2963  * socket could be allocated for @family. Unref after usage
2964  */
2965 GSocket *
2966 gst_rtsp_stream_get_rtcp_socket (GstRTSPStream * stream, GSocketFamily family)
2967 {
2968   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
2969   GSocket *socket;
2970   const gchar *name;
2971
2972   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
2973   g_return_val_if_fail (family == G_SOCKET_FAMILY_IPV4 ||
2974       family == G_SOCKET_FAMILY_IPV6, NULL);
2975   g_return_val_if_fail (priv->udpsink[1], NULL);
2976
2977   if (family == G_SOCKET_FAMILY_IPV6)
2978     name = "socket-v6";
2979   else
2980     name = "socket";
2981
2982   g_object_get (priv->udpsink[1], name, &socket, NULL);
2983
2984   return socket;
2985 }
2986
2987 /**
2988  * gst_rtsp_stream_set_seqnum:
2989  * @stream: a #GstRTSPStream
2990  * @seqnum: a new sequence number
2991  *
2992  * Configure the sequence number in the payloader of @stream to @seqnum.
2993  */
2994 void
2995 gst_rtsp_stream_set_seqnum_offset (GstRTSPStream * stream, guint16 seqnum)
2996 {
2997   GstRTSPStreamPrivate *priv;
2998
2999   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
3000
3001   priv = stream->priv;
3002
3003   g_object_set (G_OBJECT (priv->payloader), "seqnum-offset", seqnum, NULL);
3004 }
3005
3006 /**
3007  * gst_rtsp_stream_get_seqnum:
3008  * @stream: a #GstRTSPStream
3009  *
3010  * Get the configured sequence number in the payloader of @stream.
3011  *
3012  * Returns: the sequence number of the payloader.
3013  */
3014 guint16
3015 gst_rtsp_stream_get_current_seqnum (GstRTSPStream * stream)
3016 {
3017   GstRTSPStreamPrivate *priv;
3018   guint seqnum;
3019
3020   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
3021
3022   priv = stream->priv;
3023
3024   g_object_get (G_OBJECT (priv->payloader), "seqnum", &seqnum, NULL);
3025
3026   return seqnum;
3027 }
3028
3029 /**
3030  * gst_rtsp_stream_transport_filter:
3031  * @stream: a #GstRTSPStream
3032  * @func: (scope call) (allow-none): a callback
3033  * @user_data: (closure): user data passed to @func
3034  *
3035  * Call @func for each transport managed by @stream. The result value of @func
3036  * determines what happens to the transport. @func will be called with @stream
3037  * locked so no further actions on @stream can be performed from @func.
3038  *
3039  * If @func returns #GST_RTSP_FILTER_REMOVE, the transport will be removed from
3040  * @stream.
3041  *
3042  * If @func returns #GST_RTSP_FILTER_KEEP, the transport will remain in @stream.
3043  *
3044  * If @func returns #GST_RTSP_FILTER_REF, the transport will remain in @stream but
3045  * will also be added with an additional ref to the result #GList of this
3046  * function..
3047  *
3048  * When @func is %NULL, #GST_RTSP_FILTER_REF will be assumed for each transport.
3049  *
3050  * Returns: (element-type GstRTSPStreamTransport) (transfer full): a #GList with all
3051  * transports for which @func returned #GST_RTSP_FILTER_REF. After usage, each
3052  * element in the #GList should be unreffed before the list is freed.
3053  */
3054 GList *
3055 gst_rtsp_stream_transport_filter (GstRTSPStream * stream,
3056     GstRTSPStreamTransportFilterFunc func, gpointer user_data)
3057 {
3058   GstRTSPStreamPrivate *priv;
3059   GList *result, *walk, *next;
3060   GHashTable *visited = NULL;
3061   guint cookie;
3062
3063   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
3064
3065   priv = stream->priv;
3066
3067   result = NULL;
3068   if (func)
3069     visited = g_hash_table_new_full (NULL, NULL, g_object_unref, NULL);
3070
3071   g_mutex_lock (&priv->lock);
3072 restart:
3073   cookie = priv->transports_cookie;
3074   for (walk = priv->transports; walk; walk = next) {
3075     GstRTSPStreamTransport *trans = walk->data;
3076     GstRTSPFilterResult res;
3077     gboolean changed;
3078
3079     next = g_list_next (walk);
3080
3081     if (func) {
3082       /* only visit each transport once */
3083       if (g_hash_table_contains (visited, trans))
3084         continue;
3085
3086       g_hash_table_add (visited, g_object_ref (trans));
3087       g_mutex_unlock (&priv->lock);
3088
3089       res = func (stream, trans, user_data);
3090
3091       g_mutex_lock (&priv->lock);
3092     } else
3093       res = GST_RTSP_FILTER_REF;
3094
3095     changed = (cookie != priv->transports_cookie);
3096
3097     switch (res) {
3098       case GST_RTSP_FILTER_REMOVE:
3099         update_transport (stream, trans, FALSE);
3100         break;
3101       case GST_RTSP_FILTER_REF:
3102         result = g_list_prepend (result, g_object_ref (trans));
3103         break;
3104       case GST_RTSP_FILTER_KEEP:
3105       default:
3106         break;
3107     }
3108     if (changed)
3109       goto restart;
3110   }
3111   g_mutex_unlock (&priv->lock);
3112
3113   if (func)
3114     g_hash_table_unref (visited);
3115
3116   return result;
3117 }
3118
3119 static GstPadProbeReturn
3120 pad_blocking (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3121 {
3122   GstRTSPStreamPrivate *priv;
3123   GstRTSPStream *stream;
3124
3125   stream = user_data;
3126   priv = stream->priv;
3127
3128   GST_DEBUG_OBJECT (pad, "now blocking");
3129
3130   g_mutex_lock (&priv->lock);
3131   priv->blocking = TRUE;
3132   g_mutex_unlock (&priv->lock);
3133
3134   gst_element_post_message (priv->payloader,
3135       gst_message_new_element (GST_OBJECT_CAST (priv->payloader),
3136           gst_structure_new_empty ("GstRTSPStreamBlocking")));
3137
3138   return GST_PAD_PROBE_OK;
3139 }
3140
3141 /**
3142  * gst_rtsp_stream_set_blocked:
3143  * @stream: a #GstRTSPStream
3144  * @blocked: boolean indicating we should block or unblock
3145  *
3146  * Blocks or unblocks the dataflow on @stream.
3147  *
3148  * Returns: %TRUE on success
3149  */
3150 gboolean
3151 gst_rtsp_stream_set_blocked (GstRTSPStream * stream, gboolean blocked)
3152 {
3153   GstRTSPStreamPrivate *priv;
3154
3155   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3156
3157   priv = stream->priv;
3158
3159   g_mutex_lock (&priv->lock);
3160   if (blocked) {
3161     priv->blocking = FALSE;
3162     if (priv->blocked_id == 0) {
3163       priv->blocked_id = gst_pad_add_probe (priv->srcpad,
3164           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
3165           GST_PAD_PROBE_TYPE_BUFFER_LIST, pad_blocking,
3166           g_object_ref (stream), g_object_unref);
3167     }
3168   } else {
3169     if (priv->blocked_id != 0) {
3170       gst_pad_remove_probe (priv->srcpad, priv->blocked_id);
3171       priv->blocked_id = 0;
3172       priv->blocking = FALSE;
3173     }
3174   }
3175   g_mutex_unlock (&priv->lock);
3176
3177   return TRUE;
3178 }
3179
3180 /**
3181  * gst_rtsp_stream_is_blocking:
3182  * @stream: a #GstRTSPStream
3183  *
3184  * Check if @stream is blocking on a #GstBuffer.
3185  *
3186  * Returns: %TRUE if @stream is blocking
3187  */
3188 gboolean
3189 gst_rtsp_stream_is_blocking (GstRTSPStream * stream)
3190 {
3191   GstRTSPStreamPrivate *priv;
3192   gboolean result;
3193
3194   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3195
3196   priv = stream->priv;
3197
3198   g_mutex_lock (&priv->lock);
3199   result = priv->blocking;
3200   g_mutex_unlock (&priv->lock);
3201
3202   return result;
3203 }
3204
3205 /**
3206  * gst_rtsp_stream_query_position:
3207  * @stream: a #GstRTSPStream
3208  *
3209  * Query the position of the stream in %GST_FORMAT_TIME. This only considers
3210  * the RTP parts of the pipeline and not the RTCP parts.
3211  *
3212  * Returns: %TRUE if the position could be queried
3213  */
3214 gboolean
3215 gst_rtsp_stream_query_position (GstRTSPStream * stream, gint64 * position)
3216 {
3217   GstRTSPStreamPrivate *priv;
3218   GstElement *sink;
3219   gboolean ret;
3220
3221   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3222
3223   priv = stream->priv;
3224
3225   g_mutex_lock (&priv->lock);
3226   if ((sink = priv->udpsink[0]))
3227     gst_object_ref (sink);
3228   g_mutex_unlock (&priv->lock);
3229
3230   if (!sink)
3231     return FALSE;
3232
3233   ret = gst_element_query_position (sink, GST_FORMAT_TIME, position);
3234   gst_object_unref (sink);
3235
3236   return ret;
3237 }
3238
3239 /**
3240  * gst_rtsp_stream_query_stop:
3241  * @stream: a #GstRTSPStream
3242  *
3243  * Query the stop of the stream in %GST_FORMAT_TIME. This only considers
3244  * the RTP parts of the pipeline and not the RTCP parts.
3245  *
3246  * Returns: %TRUE if the stop could be queried
3247  */
3248 gboolean
3249 gst_rtsp_stream_query_stop (GstRTSPStream * stream, gint64 * stop)
3250 {
3251   GstRTSPStreamPrivate *priv;
3252   GstElement *sink;
3253   GstQuery *query;
3254   gboolean ret;
3255
3256   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
3257
3258   priv = stream->priv;
3259
3260   g_mutex_lock (&priv->lock);
3261   if ((sink = priv->udpsink[0]))
3262     gst_object_ref (sink);
3263   g_mutex_unlock (&priv->lock);
3264
3265   if (!sink)
3266     return FALSE;
3267
3268   query = gst_query_new_segment (GST_FORMAT_TIME);
3269   if ((ret = gst_element_query (sink, query))) {
3270     GstFormat format;
3271
3272     gst_query_parse_segment (query, NULL, &format, NULL, stop);
3273     if (format != GST_FORMAT_TIME)
3274       *stop = -1;
3275   }
3276   gst_query_unref (query);
3277   gst_object_unref (sink);
3278
3279   return ret;
3280
3281 }