stream: add protocols property
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:rtsp-stream
21  * @short_description: A media stream
22  * @see_also: #GstRTSPMedia
23  *
24  * The #GstRTSPStream object manages the data transport for one stream. It
25  * is created from a payloader element and a source pad that produce the RTP
26  * packets for the stream.
27  *
28  * With gst_rtsp_stream_join_bin() the streaming elements are added to the bin
29  * and rtpbin. gst_rtsp_stream_leave_bin() removes the elements again.
30  *
31  * The #GstRTSPStream will use the configured addresspool, as set with
32  * gst_rtsp_stream_set_address_pool(), to allocate multicast addresses for the
33  * stream. With gst_rtsp_stream_get_multicast_address() you can get the
34  * configured address.
35  *
36  * With gst_rtsp_stream_get_server_port () you can get the port that the server
37  * will use to receive RTCP. This is the part that the clients will use to send
38  * RTCP to.
39  *
40  * With gst_rtsp_stream_add_transport() destinations can be added where the
41  * stream should be sent to. Use gst_rtsp_stream_remove_transport() to remove
42  * the destination again.
43  *
44  * Last reviewed on 2013-07-16 (1.0.0)
45  */
46
47 #include <stdlib.h>
48 #include <stdio.h>
49 #include <string.h>
50
51 #include <gio/gio.h>
52
53 #include <gst/app/gstappsrc.h>
54 #include <gst/app/gstappsink.h>
55
56 #include "rtsp-stream.h"
57
58 #define GST_RTSP_STREAM_GET_PRIVATE(obj)  \
59      (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTSP_STREAM, GstRTSPStreamPrivate))
60
61 struct _GstRTSPStreamPrivate
62 {
63   GMutex lock;
64   guint idx;
65   GstPad *srcpad;
66   GstElement *payloader;
67   guint buffer_size;
68   gboolean is_joined;
69   gchar *control;
70
71   GstRTSPLowerTrans protocols;
72
73   /* pads on the rtpbin */
74   GstPad *send_rtp_sink;
75   GstPad *recv_sink[2];
76   GstPad *send_src[2];
77
78   /* the RTPSession object */
79   GObject *session;
80
81   /* sinks used for sending and receiving RTP and RTCP over ipv4, they share
82    * sockets */
83   GstElement *udpsrc_v4[2];
84
85   /* sinks used for sending and receiving RTP and RTCP over ipv6, they share
86    * sockets */
87   GstElement *udpsrc_v6[2];
88
89   GstElement *udpsink[2];
90
91   /* for TCP transport */
92   GstElement *appsrc[2];
93   GstElement *appqueue[2];
94   GstElement *appsink[2];
95
96   GstElement *tee[2];
97   GstElement *funnel[2];
98
99   /* server ports for sending/receiving over ipv4 */
100   GstRTSPRange server_port_v4;
101   GstRTSPAddress *server_addr_v4;
102   gboolean have_ipv4;
103
104   /* server ports for sending/receiving over ipv6 */
105   GstRTSPRange server_port_v6;
106   GstRTSPAddress *server_addr_v6;
107   gboolean have_ipv6;
108
109   /* multicast addresses */
110   GstRTSPAddressPool *pool;
111   GstRTSPAddress *addr_v4;
112   GstRTSPAddress *addr_v6;
113
114   /* the caps of the stream */
115   gulong caps_sig;
116   GstCaps *caps;
117
118   /* transports we stream to */
119   guint n_active;
120   GList *transports;
121
122   gint dscp_qos;
123 };
124
125 #define DEFAULT_CONTROL         NULL
126 #define DEFAULT_PROTOCOLS       GST_RTSP_LOWER_TRANS_UDP | GST_RTSP_LOWER_TRANS_TCP
127
128 enum
129 {
130   PROP_0,
131   PROP_CONTROL,
132   PROP_PROTOCOLS,
133   PROP_LAST
134 };
135
136 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
137 #define GST_CAT_DEFAULT rtsp_stream_debug
138
139 static GQuark ssrc_stream_map_key;
140
141 static void gst_rtsp_stream_get_property (GObject * object, guint propid,
142     GValue * value, GParamSpec * pspec);
143 static void gst_rtsp_stream_set_property (GObject * object, guint propid,
144     const GValue * value, GParamSpec * pspec);
145
146 static void gst_rtsp_stream_finalize (GObject * obj);
147
148 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
149
150 static void
151 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
152 {
153   GObjectClass *gobject_class;
154
155   g_type_class_add_private (klass, sizeof (GstRTSPStreamPrivate));
156
157   gobject_class = G_OBJECT_CLASS (klass);
158
159   gobject_class->get_property = gst_rtsp_stream_get_property;
160   gobject_class->set_property = gst_rtsp_stream_set_property;
161   gobject_class->finalize = gst_rtsp_stream_finalize;
162
163   g_object_class_install_property (gobject_class, PROP_CONTROL,
164       g_param_spec_string ("control", "Control",
165           "The control string for this stream", DEFAULT_CONTROL,
166           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167
168   g_object_class_install_property (gobject_class, PROP_PROTOCOLS,
169       g_param_spec_flags ("protocols", "Protocols",
170           "Allowed lower transport protocols", GST_TYPE_RTSP_LOWER_TRANS,
171           DEFAULT_PROTOCOLS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
172
173   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
174
175   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
176 }
177
178 static void
179 gst_rtsp_stream_init (GstRTSPStream * stream)
180 {
181   GstRTSPStreamPrivate *priv = GST_RTSP_STREAM_GET_PRIVATE (stream);
182
183   GST_DEBUG ("new stream %p", stream);
184
185   stream->priv = priv;
186
187   priv->dscp_qos = -1;
188   priv->control = g_strdup (DEFAULT_CONTROL);
189   priv->protocols = DEFAULT_PROTOCOLS;
190
191   g_mutex_init (&priv->lock);
192 }
193
194 static void
195 gst_rtsp_stream_finalize (GObject * obj)
196 {
197   GstRTSPStream *stream;
198   GstRTSPStreamPrivate *priv;
199
200   stream = GST_RTSP_STREAM (obj);
201   priv = stream->priv;
202
203   GST_DEBUG ("finalize stream %p", stream);
204
205   /* we really need to be unjoined now */
206   g_return_if_fail (!priv->is_joined);
207
208   if (priv->addr_v4)
209     gst_rtsp_address_free (priv->addr_v4);
210   if (priv->addr_v6)
211     gst_rtsp_address_free (priv->addr_v6);
212   if (priv->server_addr_v4)
213     gst_rtsp_address_free (priv->server_addr_v4);
214   if (priv->server_addr_v6)
215     gst_rtsp_address_free (priv->server_addr_v6);
216   if (priv->pool)
217     g_object_unref (priv->pool);
218   gst_object_unref (priv->payloader);
219   gst_object_unref (priv->srcpad);
220   g_free (priv->control);
221   g_mutex_clear (&priv->lock);
222
223   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
224 }
225
226 static void
227 gst_rtsp_stream_get_property (GObject * object, guint propid,
228     GValue * value, GParamSpec * pspec)
229 {
230   GstRTSPStream *stream = GST_RTSP_STREAM (object);
231
232   switch (propid) {
233     case PROP_CONTROL:
234       g_value_take_string (value, gst_rtsp_stream_get_control (stream));
235       break;
236     case PROP_PROTOCOLS:
237       g_value_set_flags (value, gst_rtsp_stream_get_protocols (stream));
238       break;
239     default:
240       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
241   }
242 }
243
244 static void
245 gst_rtsp_stream_set_property (GObject * object, guint propid,
246     const GValue * value, GParamSpec * pspec)
247 {
248   GstRTSPStream *stream = GST_RTSP_STREAM (object);
249
250   switch (propid) {
251     case PROP_CONTROL:
252       gst_rtsp_stream_set_control (stream, g_value_get_string (value));
253       break;
254     case PROP_PROTOCOLS:
255       gst_rtsp_stream_set_protocols (stream, g_value_get_flags (value));
256       break;
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, propid, pspec);
259   }
260 }
261
262 /**
263  * gst_rtsp_stream_new:
264  * @idx: an index
265  * @srcpad: a #GstPad
266  * @payloader: a #GstElement
267  *
268  * Create a new media stream with index @idx that handles RTP data on
269  * @srcpad and has a payloader element @payloader.
270  *
271  * Returns: a new #GstRTSPStream
272  */
273 GstRTSPStream *
274 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
275 {
276   GstRTSPStreamPrivate *priv;
277   GstRTSPStream *stream;
278
279   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
280   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
281   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
282
283   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
284   priv = stream->priv;
285   priv->idx = idx;
286   priv->payloader = gst_object_ref (payloader);
287   priv->srcpad = gst_object_ref (srcpad);
288
289   return stream;
290 }
291
292 /**
293  * gst_rtsp_stream_get_index:
294  * @stream: a #GstRTSPStream
295  *
296  * Get the stream index.
297  *
298  * Return: the stream index.
299  */
300 guint
301 gst_rtsp_stream_get_index (GstRTSPStream * stream)
302 {
303   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
304
305   return stream->priv->idx;
306 }
307
308 /**
309  * gst_rtsp_stream_get_srcpad:
310  * @stream: a #GstRTSPStream
311  *
312  * Get the srcpad associated with @stream.
313  *
314  * Return: the srcpad. Unref after usage.
315  */
316 GstPad *
317 gst_rtsp_stream_get_srcpad (GstRTSPStream * stream)
318 {
319   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
320
321   return gst_object_ref (stream->priv->srcpad);
322 }
323
324 /**
325  * gst_rtsp_stream_get_control:
326  * @stream: a #GstRTSPStream
327  *
328  * Get the control string to identify this stream.
329  *
330  * Return: the control string. free after usage.
331  */
332 gchar *
333 gst_rtsp_stream_get_control (GstRTSPStream * stream)
334 {
335   GstRTSPStreamPrivate *priv;
336   gchar *result;
337
338   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
339
340   priv = stream->priv;
341
342   g_mutex_lock (&priv->lock);
343   if ((result = g_strdup (priv->control)) == NULL)
344     result = g_strdup_printf ("stream=%u", priv->idx);
345   g_mutex_unlock (&priv->lock);
346
347   return result;
348 }
349
350 /**
351  * gst_rtsp_stream_set_control:
352  * @stream: a #GstRTSPStream
353  * @control: a control string
354  *
355  * Set the control string in @stream.
356  */
357 void
358 gst_rtsp_stream_set_control (GstRTSPStream * stream, const gchar * control)
359 {
360   GstRTSPStreamPrivate *priv;
361
362   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
363
364   priv = stream->priv;
365
366   g_mutex_lock (&priv->lock);
367   g_free (priv->control);
368   priv->control = g_strdup (control);
369   g_mutex_unlock (&priv->lock);
370 }
371
372 /**
373  * gst_rtsp_stream_has_control:
374  * @stream: a #GstRTSPStream
375  * @control: a control string
376  *
377  * Check if @stream has the control string @control.
378  *
379  * Returns: %TRUE is @stream has @control as the control string
380  */
381 gboolean
382 gst_rtsp_stream_has_control (GstRTSPStream * stream, const gchar * control)
383 {
384   GstRTSPStreamPrivate *priv;
385   gboolean res;
386
387   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
388
389   priv = stream->priv;
390
391   g_mutex_lock (&priv->lock);
392   if (priv->control)
393     res = g_strcmp0 (priv->control, control);
394   else {
395     guint streamid;
396     sscanf (control, "stream=%u", &streamid);
397     res = (streamid == priv->idx);
398   }
399   g_mutex_unlock (&priv->lock);
400
401   return res;
402 }
403
404 /**
405  * gst_rtsp_stream_set_mtu:
406  * @stream: a #GstRTSPStream
407  * @mtu: a new MTU
408  *
409  * Configure the mtu in the payloader of @stream to @mtu.
410  */
411 void
412 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
413 {
414   GstRTSPStreamPrivate *priv;
415
416   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
417
418   priv = stream->priv;
419
420   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
421
422   g_object_set (G_OBJECT (priv->payloader), "mtu", mtu, NULL);
423 }
424
425 /**
426  * gst_rtsp_stream_get_mtu:
427  * @stream: a #GstRTSPStream
428  *
429  * Get the configured MTU in the payloader of @stream.
430  *
431  * Returns: the MTU of the payloader.
432  */
433 guint
434 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
435 {
436   GstRTSPStreamPrivate *priv;
437   guint mtu;
438
439   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
440
441   priv = stream->priv;
442
443   g_object_get (G_OBJECT (priv->payloader), "mtu", &mtu, NULL);
444
445   return mtu;
446 }
447
448 /* Update the dscp qos property on the udp sinks */
449 static void
450 update_dscp_qos (GstRTSPStream * stream)
451 {
452   GstRTSPStreamPrivate *priv;
453
454   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
455
456   priv = stream->priv;
457
458   if (priv->udpsink[0]) {
459     g_object_set (G_OBJECT (priv->udpsink[0]), "qos-dscp", priv->dscp_qos,
460         NULL);
461   }
462
463   if (priv->udpsink[1]) {
464     g_object_set (G_OBJECT (priv->udpsink[1]), "qos-dscp", priv->dscp_qos,
465         NULL);
466   }
467 }
468
469 /**
470  * gst_rtsp_stream_set_dscp_qos:
471  * @stream: a #GstRTSPStream
472  * @dscp_qos: a new dscp qos value (0-63, or -1 to disable)
473  *
474  * Configure the dscp qos of the outgoing sockets to @dscp_qos.
475  */
476 void
477 gst_rtsp_stream_set_dscp_qos (GstRTSPStream * stream, gint dscp_qos)
478 {
479   GstRTSPStreamPrivate *priv;
480
481   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
482
483   priv = stream->priv;
484
485   GST_LOG_OBJECT (stream, "set DSCP QoS %d", dscp_qos);
486
487   if (dscp_qos < -1 || dscp_qos > 63) {
488     GST_WARNING_OBJECT (stream, "trying to set illegal dscp qos %d", dscp_qos);
489     return;
490   }
491
492   priv->dscp_qos = dscp_qos;
493
494   update_dscp_qos (stream);
495 }
496
497 /**
498  * gst_rtsp_stream_get_dscp_qos:
499  * @stream: a #GstRTSPStream
500  *
501  * Get the configured DSCP QoS in of the outgoing sockets.
502  *
503  * Returns: the DSCP QoS value of the outgoing sockets, or -1 if disbled.
504  */
505 gint
506 gst_rtsp_stream_get_dscp_qos (GstRTSPStream * stream)
507 {
508   GstRTSPStreamPrivate *priv;
509
510   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), -1);
511
512   priv = stream->priv;
513
514   return priv->dscp_qos;
515 }
516
517 /**
518  * gst_rtsp_stream_set_protocols:
519  * @stream: a #GstRTSPStream
520  * @protocols: the new flags
521  *
522  * Configure the allowed lower transport for @stream.
523  */
524 void
525 gst_rtsp_stream_set_protocols (GstRTSPStream * stream,
526     GstRTSPLowerTrans protocols)
527 {
528   GstRTSPStreamPrivate *priv;
529
530   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
531
532   priv = stream->priv;
533
534   g_mutex_lock (&priv->lock);
535   priv->protocols = protocols;
536   g_mutex_unlock (&priv->lock);
537 }
538
539 /**
540  * gst_rtsp_stream_get_protocols:
541  * @stream: a #GstRTSPStream
542  *
543  * Get the allowed protocols of @stream.
544  *
545  * Returns: a #GstRTSPLowerTrans
546  */
547 GstRTSPLowerTrans
548 gst_rtsp_stream_get_protocols (GstRTSPStream * stream)
549 {
550   GstRTSPStreamPrivate *priv;
551   GstRTSPLowerTrans res;
552
553   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream),
554       GST_RTSP_LOWER_TRANS_UNKNOWN);
555
556   priv = stream->priv;
557
558   g_mutex_lock (&priv->lock);
559   res = priv->protocols;
560   g_mutex_unlock (&priv->lock);
561
562   return res;
563 }
564
565 /**
566  * gst_rtsp_stream_set_address_pool:
567  * @stream: a #GstRTSPStream
568  * @pool: a #GstRTSPAddressPool
569  *
570  * configure @pool to be used as the address pool of @stream.
571  */
572 void
573 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
574     GstRTSPAddressPool * pool)
575 {
576   GstRTSPStreamPrivate *priv;
577   GstRTSPAddressPool *old;
578
579   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
580
581   priv = stream->priv;
582
583   GST_LOG_OBJECT (stream, "set address pool %p", pool);
584
585   g_mutex_lock (&priv->lock);
586   if ((old = priv->pool) != pool)
587     priv->pool = pool ? g_object_ref (pool) : NULL;
588   else
589     old = NULL;
590   g_mutex_unlock (&priv->lock);
591
592   if (old)
593     g_object_unref (old);
594 }
595
596 /**
597  * gst_rtsp_stream_get_address_pool:
598  * @stream: a #GstRTSPStream
599  *
600  * Get the #GstRTSPAddressPool used as the address pool of @stream.
601  *
602  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
603  * usage.
604  */
605 GstRTSPAddressPool *
606 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
607 {
608   GstRTSPStreamPrivate *priv;
609   GstRTSPAddressPool *result;
610
611   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
612
613   priv = stream->priv;
614
615   g_mutex_lock (&priv->lock);
616   if ((result = priv->pool))
617     g_object_ref (result);
618   g_mutex_unlock (&priv->lock);
619
620   return result;
621 }
622
623 /**
624  * gst_rtsp_stream_get_multicast_address:
625  * @stream: a #GstRTSPStream
626  * @family: the #GSocketFamily
627  *
628  * Get the multicast address of @stream for @family.
629  *
630  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
631  * allocated. gst_rtsp_address_free() after usage.
632  */
633 GstRTSPAddress *
634 gst_rtsp_stream_get_multicast_address (GstRTSPStream * stream,
635     GSocketFamily family)
636 {
637   GstRTSPStreamPrivate *priv;
638   GstRTSPAddress *result;
639   GstRTSPAddress **addrp;
640   GstRTSPAddressFlags flags;
641
642   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
643
644   priv = stream->priv;
645
646   if (family == G_SOCKET_FAMILY_IPV6) {
647     flags = GST_RTSP_ADDRESS_FLAG_IPV6;
648     addrp = &priv->addr_v4;
649   } else {
650     flags = GST_RTSP_ADDRESS_FLAG_IPV4;
651     addrp = &priv->addr_v6;
652   }
653
654   g_mutex_lock (&priv->lock);
655   if (*addrp == NULL) {
656     if (priv->pool == NULL)
657       goto no_pool;
658
659     flags |= GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_MULTICAST;
660
661     *addrp = gst_rtsp_address_pool_acquire_address (priv->pool, flags, 2);
662     if (*addrp == NULL)
663       goto no_address;
664   }
665   result = gst_rtsp_address_copy (*addrp);
666   g_mutex_unlock (&priv->lock);
667
668   return result;
669
670   /* ERRORS */
671 no_pool:
672   {
673     GST_ERROR_OBJECT (stream, "no address pool specified");
674     g_mutex_unlock (&priv->lock);
675     return NULL;
676   }
677 no_address:
678   {
679     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
680     g_mutex_unlock (&priv->lock);
681     return NULL;
682   }
683 }
684
685 /**
686  * gst_rtsp_stream_reserve_address:
687  * @stream: a #GstRTSPStream
688  * @address: an address
689  * @port: a port
690  * @n_ports: n_ports
691  * @ttl: a TTL
692  *
693  * Reserve @address and @port as the address and port of @stream.
694  *
695  * Returns: the #GstRTSPAddress of @stream or %NULL when the address could be
696  * reserved. gst_rtsp_address_free() after usage.
697  */
698 GstRTSPAddress *
699 gst_rtsp_stream_reserve_address (GstRTSPStream * stream,
700     const gchar * address, guint port, guint n_ports, guint ttl)
701 {
702   GstRTSPStreamPrivate *priv;
703   GstRTSPAddress *result;
704   GInetAddress *addr;
705   GSocketFamily family;
706   GstRTSPAddress **addrp;
707
708   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
709   g_return_val_if_fail (address != NULL, NULL);
710   g_return_val_if_fail (port > 0, NULL);
711   g_return_val_if_fail (n_ports > 0, NULL);
712   g_return_val_if_fail (ttl > 0, NULL);
713
714   priv = stream->priv;
715
716   addr = g_inet_address_new_from_string (address);
717   if (!addr) {
718     GST_ERROR ("failed to get inet addr from %s", address);
719     family = G_SOCKET_FAMILY_IPV4;
720   } else {
721     family = g_inet_address_get_family (addr);
722     g_object_unref (addr);
723   }
724
725   if (family == G_SOCKET_FAMILY_IPV6)
726     addrp = &priv->addr_v4;
727   else
728     addrp = &priv->addr_v6;
729
730   g_mutex_lock (&priv->lock);
731   if (*addrp == NULL) {
732     if (priv->pool == NULL)
733       goto no_pool;
734
735     *addrp = gst_rtsp_address_pool_reserve_address (priv->pool, address,
736         port, n_ports, ttl);
737     if (*addrp == NULL)
738       goto no_address;
739   } else {
740     if (strcmp ((*addrp)->address, address) ||
741         (*addrp)->port != port || (*addrp)->n_ports != n_ports ||
742         (*addrp)->ttl != ttl)
743       goto different_address;
744   }
745   result = gst_rtsp_address_copy (*addrp);
746   g_mutex_unlock (&priv->lock);
747
748   return result;
749
750   /* ERRORS */
751 no_pool:
752   {
753     GST_ERROR_OBJECT (stream, "no address pool specified");
754     g_mutex_unlock (&priv->lock);
755     return NULL;
756   }
757 no_address:
758   {
759     GST_ERROR_OBJECT (stream, "failed to acquire address %s from pool",
760         address);
761     g_mutex_unlock (&priv->lock);
762     return NULL;
763   }
764 different_address:
765   {
766     GST_ERROR_OBJECT (stream, "address %s is not the same that was already"
767         " reserved", address);
768     g_mutex_unlock (&priv->lock);
769     return NULL;
770   }
771 }
772
773 static gboolean
774 alloc_ports_one_family (GstRTSPAddressPool * pool, gint buffer_size,
775     GSocketFamily family, GstElement * udpsrc_out[2],
776     GstElement * udpsink_out[2], GstRTSPRange * server_port_out,
777     GstRTSPAddress ** server_addr_out)
778 {
779   GstStateChangeReturn ret;
780   GstElement *udpsrc0, *udpsrc1;
781   GstElement *udpsink0, *udpsink1;
782   GSocket *rtp_socket = NULL;
783   GSocket *rtcp_socket;
784   gint tmp_rtp, tmp_rtcp;
785   guint count;
786   gint rtpport, rtcpport;
787   GList *rejected_addresses = NULL;
788   GstRTSPAddress *addr = NULL;
789   GInetAddress *inetaddr = NULL;
790   GSocketAddress *rtp_sockaddr = NULL;
791   GSocketAddress *rtcp_sockaddr = NULL;
792   const gchar *multisink_socket;
793
794   if (family == G_SOCKET_FAMILY_IPV6)
795     multisink_socket = "socket-v6";
796   else
797     multisink_socket = "socket";
798
799   udpsrc0 = NULL;
800   udpsrc1 = NULL;
801   udpsink0 = NULL;
802   udpsink1 = NULL;
803   count = 0;
804
805   /* Start with random port */
806   tmp_rtp = 0;
807
808   rtcp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
809       G_SOCKET_PROTOCOL_UDP, NULL);
810   if (!rtcp_socket)
811     goto no_udp_protocol;
812
813   if (*server_addr_out)
814     gst_rtsp_address_free (*server_addr_out);
815
816   /* try to allocate 2 UDP ports, the RTP port should be an even
817    * number and the RTCP port should be the next (uneven) port */
818 again:
819
820   if (rtp_socket == NULL) {
821     rtp_socket = g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
822         G_SOCKET_PROTOCOL_UDP, NULL);
823     if (!rtp_socket)
824       goto no_udp_protocol;
825   }
826
827   if (pool && gst_rtsp_address_pool_has_unicast_addresses (pool)) {
828     GstRTSPAddressFlags flags;
829
830     if (addr)
831       rejected_addresses = g_list_prepend (rejected_addresses, addr);
832
833     flags = GST_RTSP_ADDRESS_FLAG_EVEN_PORT | GST_RTSP_ADDRESS_FLAG_UNICAST;
834     if (family == G_SOCKET_FAMILY_IPV6)
835       flags |= GST_RTSP_ADDRESS_FLAG_IPV6;
836     else
837       flags |= GST_RTSP_ADDRESS_FLAG_IPV4;
838
839     addr = gst_rtsp_address_pool_acquire_address (pool, flags, 2);
840
841     if (addr == NULL)
842       goto no_ports;
843
844     tmp_rtp = addr->port;
845
846     g_clear_object (&inetaddr);
847     inetaddr = g_inet_address_new_from_string (addr->address);
848   } else {
849     if (tmp_rtp != 0) {
850       tmp_rtp += 2;
851       if (++count > 20)
852         goto no_ports;
853     }
854
855     if (inetaddr == NULL)
856       inetaddr = g_inet_address_new_any (family);
857   }
858
859   rtp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtp);
860   if (!g_socket_bind (rtp_socket, rtp_sockaddr, FALSE, NULL)) {
861     g_object_unref (rtp_sockaddr);
862     goto again;
863   }
864   g_object_unref (rtp_sockaddr);
865
866   rtp_sockaddr = g_socket_get_local_address (rtp_socket, NULL);
867   if (rtp_sockaddr == NULL || !G_IS_INET_SOCKET_ADDRESS (rtp_sockaddr)) {
868     g_clear_object (&rtp_sockaddr);
869     goto socket_error;
870   }
871
872   tmp_rtp =
873       g_inet_socket_address_get_port (G_INET_SOCKET_ADDRESS (rtp_sockaddr));
874   g_object_unref (rtp_sockaddr);
875
876   /* check if port is even */
877   if ((tmp_rtp & 1) != 0) {
878     /* port not even, close and allocate another */
879     tmp_rtp++;
880     g_clear_object (&rtp_socket);
881     goto again;
882   }
883
884   /* set port */
885   tmp_rtcp = tmp_rtp + 1;
886
887   rtcp_sockaddr = g_inet_socket_address_new (inetaddr, tmp_rtcp);
888   if (!g_socket_bind (rtcp_socket, rtcp_sockaddr, FALSE, NULL)) {
889     g_object_unref (rtcp_sockaddr);
890     g_clear_object (&rtp_socket);
891     goto again;
892   }
893   g_object_unref (rtcp_sockaddr);
894
895   g_clear_object (&inetaddr);
896
897   udpsrc0 = gst_element_factory_make ("udpsrc", NULL);
898   udpsrc1 = gst_element_factory_make ("udpsrc", NULL);
899
900   if (udpsrc0 == NULL || udpsrc1 == NULL)
901     goto no_udp_protocol;
902
903   g_object_set (G_OBJECT (udpsrc0), "socket", rtp_socket, NULL);
904   g_object_set (G_OBJECT (udpsrc1), "socket", rtcp_socket, NULL);
905
906   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
907   if (ret == GST_STATE_CHANGE_FAILURE)
908     goto element_error;
909   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
910   if (ret == GST_STATE_CHANGE_FAILURE)
911     goto element_error;
912
913   /* all fine, do port check */
914   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
915   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
916
917   /* this should not happen... */
918   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
919     goto port_error;
920
921   if (udpsink_out[0])
922     udpsink0 = udpsink_out[0];
923   else
924     udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
925
926   if (!udpsink0)
927     goto no_udp_protocol;
928
929   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
930   g_object_set (G_OBJECT (udpsink0), multisink_socket, rtp_socket, NULL);
931
932   if (udpsink_out[1])
933     udpsink1 = udpsink_out[1];
934   else
935     udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
936
937   if (!udpsink1)
938     goto no_udp_protocol;
939
940   g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
941   g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
942   g_object_set (G_OBJECT (udpsink0), "buffer-size", buffer_size, NULL);
943
944   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
945   g_object_set (G_OBJECT (udpsink1), multisink_socket, rtcp_socket, NULL);
946   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
947   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
948   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
949   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
950   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
951   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
952
953   /* we keep these elements, we will further configure them when the
954    * client told us to really use the UDP ports. */
955   udpsrc_out[0] = udpsrc0;
956   udpsrc_out[1] = udpsrc1;
957   udpsink_out[0] = udpsink0;
958   udpsink_out[1] = udpsink1;
959   server_port_out->min = rtpport;
960   server_port_out->max = rtcpport;
961
962   *server_addr_out = addr;
963   g_list_free_full (rejected_addresses, (GDestroyNotify) gst_rtsp_address_free);
964
965   g_object_unref (rtp_socket);
966   g_object_unref (rtcp_socket);
967
968   return TRUE;
969
970   /* ERRORS */
971 no_udp_protocol:
972   {
973     goto cleanup;
974   }
975 no_ports:
976   {
977     goto cleanup;
978   }
979 port_error:
980   {
981     goto cleanup;
982   }
983 socket_error:
984   {
985     goto cleanup;
986   }
987 element_error:
988   {
989     goto cleanup;
990   }
991 cleanup:
992   {
993     if (udpsrc0) {
994       gst_element_set_state (udpsrc0, GST_STATE_NULL);
995       gst_object_unref (udpsrc0);
996     }
997     if (udpsrc1) {
998       gst_element_set_state (udpsrc1, GST_STATE_NULL);
999       gst_object_unref (udpsrc1);
1000     }
1001     if (udpsink0) {
1002       gst_element_set_state (udpsink0, GST_STATE_NULL);
1003       gst_object_unref (udpsink0);
1004     }
1005     if (udpsink1) {
1006       gst_element_set_state (udpsink1, GST_STATE_NULL);
1007       gst_object_unref (udpsink1);
1008     }
1009     if (inetaddr)
1010       g_object_unref (inetaddr);
1011     g_list_free_full (rejected_addresses,
1012         (GDestroyNotify) gst_rtsp_address_free);
1013     if (addr)
1014       gst_rtsp_address_free (addr);
1015     if (rtp_socket)
1016       g_object_unref (rtp_socket);
1017     if (rtcp_socket)
1018       g_object_unref (rtcp_socket);
1019     return FALSE;
1020   }
1021 }
1022
1023 /* must be called with lock */
1024 static gboolean
1025 alloc_ports (GstRTSPStream * stream)
1026 {
1027   GstRTSPStreamPrivate *priv = stream->priv;
1028
1029   priv->have_ipv4 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1030       G_SOCKET_FAMILY_IPV4, priv->udpsrc_v4, priv->udpsink,
1031       &priv->server_port_v4, &priv->server_addr_v4);
1032
1033   priv->have_ipv6 = alloc_ports_one_family (priv->pool, priv->buffer_size,
1034       G_SOCKET_FAMILY_IPV6, priv->udpsrc_v6, priv->udpsink,
1035       &priv->server_port_v6, &priv->server_addr_v6);
1036
1037   return priv->have_ipv4 || priv->have_ipv6;
1038 }
1039
1040 /**
1041  * gst_rtsp_stream_get_server_port:
1042  * @stream: a #GstRTSPStream
1043  * @server_port: (out): result server port
1044  * @family: the port family to get
1045  *
1046  * Fill @server_port with the port pair used by the server. This function can
1047  * only be called when @stream has been joined.
1048  */
1049 void
1050 gst_rtsp_stream_get_server_port (GstRTSPStream * stream,
1051     GstRTSPRange * server_port, GSocketFamily family)
1052 {
1053   GstRTSPStreamPrivate *priv;
1054
1055   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1056   priv = stream->priv;
1057   g_return_if_fail (priv->is_joined);
1058
1059   g_mutex_lock (&priv->lock);
1060   if (family == G_SOCKET_FAMILY_IPV4) {
1061     if (server_port)
1062       *server_port = priv->server_port_v4;
1063   } else {
1064     if (server_port)
1065       *server_port = priv->server_port_v6;
1066   }
1067   g_mutex_unlock (&priv->lock);
1068 }
1069
1070 /**
1071  * gst_rtsp_stream_get_rtpsession:
1072  * @stream: a #GstRTSPStream
1073  *
1074  * Get the RTP session of this stream.
1075  *
1076  * Returns: The RTP session of this stream. Unref after usage.
1077  */
1078 GObject *
1079 gst_rtsp_stream_get_rtpsession (GstRTSPStream * stream)
1080 {
1081   GstRTSPStreamPrivate *priv;
1082   GObject *session;
1083
1084   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1085
1086   priv = stream->priv;
1087
1088   g_mutex_lock (&priv->lock);
1089   if ((session = priv->session))
1090     g_object_ref (session);
1091   g_mutex_unlock (&priv->lock);
1092
1093   return session;
1094 }
1095
1096 /**
1097  * gst_rtsp_stream_get_ssrc:
1098  * @stream: a #GstRTSPStream
1099  * @ssrc: (out): result ssrc
1100  *
1101  * Get the SSRC used by the RTP session of this stream. This function can only
1102  * be called when @stream has been joined.
1103  */
1104 void
1105 gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
1106 {
1107   GstRTSPStreamPrivate *priv;
1108
1109   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
1110   priv = stream->priv;
1111   g_return_if_fail (priv->is_joined);
1112
1113   g_mutex_lock (&priv->lock);
1114   if (ssrc && priv->session)
1115     g_object_get (priv->session, "internal-ssrc", ssrc, NULL);
1116   g_mutex_unlock (&priv->lock);
1117 }
1118
1119 /* executed from streaming thread */
1120 static void
1121 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
1122 {
1123   GstRTSPStreamPrivate *priv = stream->priv;
1124   GstCaps *newcaps, *oldcaps;
1125
1126   newcaps = gst_pad_get_current_caps (pad);
1127
1128   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
1129       newcaps);
1130
1131   g_mutex_lock (&priv->lock);
1132   oldcaps = priv->caps;
1133   priv->caps = newcaps;
1134   g_mutex_unlock (&priv->lock);
1135
1136   if (oldcaps)
1137     gst_caps_unref (oldcaps);
1138 }
1139
1140 static void
1141 dump_structure (const GstStructure * s)
1142 {
1143   gchar *sstr;
1144
1145   sstr = gst_structure_to_string (s);
1146   GST_INFO ("structure: %s", sstr);
1147   g_free (sstr);
1148 }
1149
1150 static GstRTSPStreamTransport *
1151 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
1152 {
1153   GstRTSPStreamPrivate *priv = stream->priv;
1154   GList *walk;
1155   GstRTSPStreamTransport *result = NULL;
1156   const gchar *tmp;
1157   gchar *dest;
1158   guint port;
1159
1160   if (rtcp_from == NULL)
1161     return NULL;
1162
1163   tmp = g_strrstr (rtcp_from, ":");
1164   if (tmp == NULL)
1165     return NULL;
1166
1167   port = atoi (tmp + 1);
1168   dest = g_strndup (rtcp_from, tmp - rtcp_from);
1169
1170   g_mutex_lock (&priv->lock);
1171   GST_INFO ("finding %s:%d in %d transports", dest, port,
1172       g_list_length (priv->transports));
1173
1174   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1175     GstRTSPStreamTransport *trans = walk->data;
1176     const GstRTSPTransport *tr;
1177     gint min, max;
1178
1179     tr = gst_rtsp_stream_transport_get_transport (trans);
1180
1181     min = tr->client_port.min;
1182     max = tr->client_port.max;
1183
1184     if ((strcmp (tr->destination, dest) == 0) && (min == port || max == port)) {
1185       result = trans;
1186       break;
1187     }
1188   }
1189   if (result)
1190     g_object_ref (result);
1191   g_mutex_unlock (&priv->lock);
1192
1193   g_free (dest);
1194
1195   return result;
1196 }
1197
1198 static GstRTSPStreamTransport *
1199 check_transport (GObject * source, GstRTSPStream * stream)
1200 {
1201   GstStructure *stats;
1202   GstRTSPStreamTransport *trans;
1203
1204   /* see if we have a stream to match with the origin of the RTCP packet */
1205   trans = g_object_get_qdata (source, ssrc_stream_map_key);
1206   if (trans == NULL) {
1207     g_object_get (source, "stats", &stats, NULL);
1208     if (stats) {
1209       const gchar *rtcp_from;
1210
1211       dump_structure (stats);
1212
1213       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
1214       if ((trans = find_transport (stream, rtcp_from))) {
1215         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
1216             source);
1217         g_object_set_qdata_full (source, ssrc_stream_map_key, trans,
1218             g_object_unref);
1219       }
1220       gst_structure_free (stats);
1221     }
1222   }
1223   return trans;
1224 }
1225
1226
1227 static void
1228 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1229 {
1230   GstRTSPStreamTransport *trans;
1231
1232   GST_INFO ("%p: new source %p", stream, source);
1233
1234   trans = check_transport (source, stream);
1235
1236   if (trans)
1237     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
1238 }
1239
1240 static void
1241 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
1242 {
1243   GST_INFO ("%p: new SDES %p", stream, source);
1244 }
1245
1246 static void
1247 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
1248 {
1249   GstRTSPStreamTransport *trans;
1250
1251   trans = check_transport (source, stream);
1252
1253   if (trans) {
1254     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
1255     gst_rtsp_stream_transport_keep_alive (trans);
1256   }
1257 #ifdef DUMP_STATS
1258   {
1259     GstStructure *stats;
1260     g_object_get (source, "stats", &stats, NULL);
1261     if (stats) {
1262       dump_structure (stats);
1263       gst_structure_free (stats);
1264     }
1265   }
1266 #endif
1267 }
1268
1269 static void
1270 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
1271 {
1272   GST_INFO ("%p: source %p bye", stream, source);
1273 }
1274
1275 static void
1276 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1277 {
1278   GstRTSPStreamTransport *trans;
1279
1280   GST_INFO ("%p: source %p bye timeout", stream, source);
1281
1282   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1283     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1284     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1285   }
1286 }
1287
1288 static void
1289 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
1290 {
1291   GstRTSPStreamTransport *trans;
1292
1293   GST_INFO ("%p: source %p timeout", stream, source);
1294
1295   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
1296     gst_rtsp_stream_transport_set_timed_out (trans, TRUE);
1297     g_object_set_qdata (source, ssrc_stream_map_key, NULL);
1298   }
1299 }
1300
1301 static GstFlowReturn
1302 handle_new_sample (GstAppSink * sink, gpointer user_data)
1303 {
1304   GstRTSPStreamPrivate *priv;
1305   GList *walk;
1306   GstSample *sample;
1307   GstBuffer *buffer;
1308   GstRTSPStream *stream;
1309
1310   sample = gst_app_sink_pull_sample (sink);
1311   if (!sample)
1312     return GST_FLOW_OK;
1313
1314   stream = (GstRTSPStream *) user_data;
1315   priv = stream->priv;
1316   buffer = gst_sample_get_buffer (sample);
1317
1318   g_mutex_lock (&priv->lock);
1319   for (walk = priv->transports; walk; walk = g_list_next (walk)) {
1320     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
1321
1322     if (GST_ELEMENT_CAST (sink) == priv->appsink[0]) {
1323       gst_rtsp_stream_transport_send_rtp (tr, buffer);
1324     } else {
1325       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
1326     }
1327   }
1328   g_mutex_unlock (&priv->lock);
1329
1330   gst_sample_unref (sample);
1331
1332   return GST_FLOW_OK;
1333 }
1334
1335 static GstAppSinkCallbacks sink_cb = {
1336   NULL,                         /* not interested in EOS */
1337   NULL,                         /* not interested in preroll samples */
1338   handle_new_sample,
1339 };
1340
1341 /**
1342  * gst_rtsp_stream_join_bin:
1343  * @stream: a #GstRTSPStream
1344  * @bin: a #GstBin to join
1345  * @rtpbin: a rtpbin element in @bin
1346  * @state: the target state of the new elements
1347  *
1348  * Join the #Gstbin @bin that contains the element @rtpbin.
1349  *
1350  * @stream will link to @rtpbin, which must be inside @bin. The elements
1351  * added to @bin will be set to the state given in @state.
1352  *
1353  * Returns: %TRUE on success.
1354  */
1355 gboolean
1356 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
1357     GstElement * rtpbin, GstState state)
1358 {
1359   GstRTSPStreamPrivate *priv;
1360   gint i;
1361   guint idx;
1362   gchar *name;
1363   GstPad *pad, *teepad, *queuepad, *selpad;
1364   GstPadLinkReturn ret;
1365
1366   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1367   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1368   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1369
1370   priv = stream->priv;
1371
1372   g_mutex_lock (&priv->lock);
1373   if (priv->is_joined)
1374     goto was_joined;
1375
1376   /* create a session with the same index as the stream */
1377   idx = priv->idx;
1378
1379   GST_INFO ("stream %p joining bin as session %u", stream, idx);
1380
1381   if (!alloc_ports (stream))
1382     goto no_ports;
1383
1384   /* update the dscp qos field in the sinks */
1385   update_dscp_qos (stream);
1386
1387   /* get a pad for sending RTP */
1388   name = g_strdup_printf ("send_rtp_sink_%u", idx);
1389   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
1390   g_free (name);
1391   /* link the RTP pad to the session manager, it should not really fail unless
1392    * this is not really an RTP pad */
1393   ret = gst_pad_link (priv->srcpad, priv->send_rtp_sink);
1394   if (ret != GST_PAD_LINK_OK)
1395     goto link_failed;
1396
1397   /* get pads from the RTP session element for sending and receiving
1398    * RTP/RTCP*/
1399   name = g_strdup_printf ("send_rtp_src_%u", idx);
1400   priv->send_src[0] = gst_element_get_static_pad (rtpbin, name);
1401   g_free (name);
1402   name = g_strdup_printf ("send_rtcp_src_%u", idx);
1403   priv->send_src[1] = gst_element_get_request_pad (rtpbin, name);
1404   g_free (name);
1405   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
1406   priv->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
1407   g_free (name);
1408   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
1409   priv->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
1410   g_free (name);
1411
1412   /* get the session */
1413   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &priv->session);
1414
1415   g_signal_connect (priv->session, "on-new-ssrc", (GCallback) on_new_ssrc,
1416       stream);
1417   g_signal_connect (priv->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
1418       stream);
1419   g_signal_connect (priv->session, "on-ssrc-active",
1420       (GCallback) on_ssrc_active, stream);
1421   g_signal_connect (priv->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
1422       stream);
1423   g_signal_connect (priv->session, "on-bye-timeout",
1424       (GCallback) on_bye_timeout, stream);
1425   g_signal_connect (priv->session, "on-timeout", (GCallback) on_timeout,
1426       stream);
1427
1428   for (i = 0; i < 2; i++) {
1429     /* For the sender we create this bit of pipeline for both
1430      * RTP and RTCP. Sync and preroll are enabled on udpsink so
1431      * we need to add a queue before appsink to make the pipeline
1432      * not block. For the TCP case, we want to pump data to the
1433      * client as fast as possible anyway.
1434      *
1435      * .--------.      .-----.    .---------.
1436      * | rtpbin |      | tee |    | udpsink |
1437      * |       send->sink   src->sink       |
1438      * '--------'      |     |    '---------'
1439      *                 |     |    .---------.    .---------.
1440      *                 |     |    |  queue  |    | appsink |
1441      *                 |    src->sink      src->sink       |
1442      *                 '-----'    '---------'    '---------'
1443      */
1444     /* make tee for RTP/RTCP */
1445     priv->tee[i] = gst_element_factory_make ("tee", NULL);
1446     gst_bin_add (bin, priv->tee[i]);
1447
1448     /* and link to rtpbin send pad */
1449     pad = gst_element_get_static_pad (priv->tee[i], "sink");
1450     gst_pad_link (priv->send_src[i], pad);
1451     gst_object_unref (pad);
1452
1453     /* add udpsink */
1454     gst_bin_add (bin, priv->udpsink[i]);
1455
1456     /* link tee to udpsink */
1457     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1458     pad = gst_element_get_static_pad (priv->udpsink[i], "sink");
1459     gst_pad_link (teepad, pad);
1460     gst_object_unref (pad);
1461     gst_object_unref (teepad);
1462
1463     /* make queue */
1464     priv->appqueue[i] = gst_element_factory_make ("queue", NULL);
1465     gst_bin_add (bin, priv->appqueue[i]);
1466     /* and link to tee */
1467     teepad = gst_element_get_request_pad (priv->tee[i], "src_%u");
1468     pad = gst_element_get_static_pad (priv->appqueue[i], "sink");
1469     gst_pad_link (teepad, pad);
1470     gst_object_unref (pad);
1471     gst_object_unref (teepad);
1472
1473     /* make appsink */
1474     priv->appsink[i] = gst_element_factory_make ("appsink", NULL);
1475     g_object_set (priv->appsink[i], "async", FALSE, "sync", FALSE, NULL);
1476     g_object_set (priv->appsink[i], "emit-signals", FALSE, NULL);
1477     gst_bin_add (bin, priv->appsink[i]);
1478     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (priv->appsink[i]),
1479         &sink_cb, stream, NULL);
1480     /* and link to queue */
1481     queuepad = gst_element_get_static_pad (priv->appqueue[i], "src");
1482     pad = gst_element_get_static_pad (priv->appsink[i], "sink");
1483     gst_pad_link (queuepad, pad);
1484     gst_object_unref (pad);
1485     gst_object_unref (queuepad);
1486
1487     /* For the receiver we create this bit of pipeline for both
1488      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
1489      * and it is all funneled into the rtpbin receive pad.
1490      *
1491      * .--------.     .--------.    .--------.
1492      * | udpsrc |     | funnel |    | rtpbin |
1493      * |       src->sink      src->sink      |
1494      * '--------'     |        |    '--------'
1495      * .--------.     |        |
1496      * | appsrc |     |        |
1497      * |       src->sink       |
1498      * '--------'     '--------'
1499      */
1500     /* make funnel for the RTP/RTCP receivers */
1501     priv->funnel[i] = gst_element_factory_make ("funnel", NULL);
1502     gst_bin_add (bin, priv->funnel[i]);
1503
1504     pad = gst_element_get_static_pad (priv->funnel[i], "src");
1505     gst_pad_link (pad, priv->recv_sink[i]);
1506     gst_object_unref (pad);
1507
1508     if (priv->udpsrc_v4[i]) {
1509       /* we set and keep these to playing so that they don't cause NO_PREROLL return
1510        * values */
1511       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_PLAYING);
1512       gst_element_set_locked_state (priv->udpsrc_v4[i], TRUE);
1513       /* add udpsrc */
1514       gst_bin_add (bin, priv->udpsrc_v4[i]);
1515
1516       /* and link to the funnel v4 */
1517       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1518       pad = gst_element_get_static_pad (priv->udpsrc_v4[i], "src");
1519       gst_pad_link (pad, selpad);
1520       gst_object_unref (pad);
1521       gst_object_unref (selpad);
1522     }
1523
1524     if (priv->udpsrc_v6[i]) {
1525       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_PLAYING);
1526       gst_element_set_locked_state (priv->udpsrc_v6[i], TRUE);
1527       gst_bin_add (bin, priv->udpsrc_v6[i]);
1528
1529       /* and link to the funnel v6 */
1530       selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1531       pad = gst_element_get_static_pad (priv->udpsrc_v6[i], "src");
1532       gst_pad_link (pad, selpad);
1533       gst_object_unref (pad);
1534       gst_object_unref (selpad);
1535     }
1536
1537     /* make and add appsrc */
1538     priv->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
1539     gst_bin_add (bin, priv->appsrc[i]);
1540     /* and link to the funnel */
1541     selpad = gst_element_get_request_pad (priv->funnel[i], "sink_%u");
1542     pad = gst_element_get_static_pad (priv->appsrc[i], "src");
1543     gst_pad_link (pad, selpad);
1544     gst_object_unref (pad);
1545     gst_object_unref (selpad);
1546
1547     /* check if we need to set to a special state */
1548     if (state != GST_STATE_NULL) {
1549       gst_element_set_state (priv->udpsink[i], state);
1550       gst_element_set_state (priv->appsink[i], state);
1551       gst_element_set_state (priv->appqueue[i], state);
1552       gst_element_set_state (priv->tee[i], state);
1553       gst_element_set_state (priv->funnel[i], state);
1554       gst_element_set_state (priv->appsrc[i], state);
1555     }
1556   }
1557
1558   /* be notified of caps changes */
1559   priv->caps_sig = g_signal_connect (priv->send_rtp_sink, "notify::caps",
1560       (GCallback) caps_notify, stream);
1561
1562   priv->is_joined = TRUE;
1563   g_mutex_unlock (&priv->lock);
1564
1565   return TRUE;
1566
1567   /* ERRORS */
1568 was_joined:
1569   {
1570     g_mutex_unlock (&priv->lock);
1571     return TRUE;
1572   }
1573 no_ports:
1574   {
1575     g_mutex_unlock (&priv->lock);
1576     GST_WARNING ("failed to allocate ports %u", idx);
1577     return FALSE;
1578   }
1579 link_failed:
1580   {
1581     GST_WARNING ("failed to link stream %u", idx);
1582     gst_object_unref (priv->send_rtp_sink);
1583     priv->send_rtp_sink = NULL;
1584     g_mutex_unlock (&priv->lock);
1585     return FALSE;
1586   }
1587 }
1588
1589 /**
1590  * gst_rtsp_stream_leave_bin:
1591  * @stream: a #GstRTSPStream
1592  * @bin: a #GstBin
1593  * @rtpbin: a rtpbin #GstElement
1594  *
1595  * Remove the elements of @stream from @bin.
1596  *
1597  * Return: %TRUE on success.
1598  */
1599 gboolean
1600 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
1601     GstElement * rtpbin)
1602 {
1603   GstRTSPStreamPrivate *priv;
1604   gint i;
1605
1606   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1607   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
1608   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
1609
1610   priv = stream->priv;
1611
1612   g_mutex_lock (&priv->lock);
1613   if (!priv->is_joined)
1614     goto was_not_joined;
1615
1616   /* all transports must be removed by now */
1617   g_return_val_if_fail (priv->transports == NULL, FALSE);
1618
1619   GST_INFO ("stream %p leaving bin", stream);
1620
1621   gst_pad_unlink (priv->srcpad, priv->send_rtp_sink);
1622   g_signal_handler_disconnect (priv->send_rtp_sink, priv->caps_sig);
1623   gst_element_release_request_pad (rtpbin, priv->send_rtp_sink);
1624   gst_object_unref (priv->send_rtp_sink);
1625   priv->send_rtp_sink = NULL;
1626
1627   for (i = 0; i < 2; i++) {
1628     gst_element_set_state (priv->udpsink[i], GST_STATE_NULL);
1629     gst_element_set_state (priv->appsink[i], GST_STATE_NULL);
1630     gst_element_set_state (priv->appqueue[i], GST_STATE_NULL);
1631     gst_element_set_state (priv->tee[i], GST_STATE_NULL);
1632     gst_element_set_state (priv->funnel[i], GST_STATE_NULL);
1633     gst_element_set_state (priv->appsrc[i], GST_STATE_NULL);
1634     if (priv->udpsrc_v4[i]) {
1635       /* and set udpsrc to NULL now before removing */
1636       gst_element_set_locked_state (priv->udpsrc_v4[i], FALSE);
1637       gst_element_set_state (priv->udpsrc_v4[i], GST_STATE_NULL);
1638       /* removing them should also nicely release the request
1639        * pads when they finalize */
1640       gst_bin_remove (bin, priv->udpsrc_v4[i]);
1641     }
1642     if (priv->udpsrc_v6[i]) {
1643       gst_element_set_locked_state (priv->udpsrc_v6[i], FALSE);
1644       gst_element_set_state (priv->udpsrc_v6[i], GST_STATE_NULL);
1645       gst_bin_remove (bin, priv->udpsrc_v6[i]);
1646     }
1647     gst_bin_remove (bin, priv->udpsink[i]);
1648     gst_bin_remove (bin, priv->appsrc[i]);
1649     gst_bin_remove (bin, priv->appsink[i]);
1650     gst_bin_remove (bin, priv->appqueue[i]);
1651     gst_bin_remove (bin, priv->tee[i]);
1652     gst_bin_remove (bin, priv->funnel[i]);
1653
1654     gst_element_release_request_pad (rtpbin, priv->recv_sink[i]);
1655     gst_object_unref (priv->recv_sink[i]);
1656     priv->recv_sink[i] = NULL;
1657
1658     priv->udpsrc_v4[i] = NULL;
1659     priv->udpsrc_v6[i] = NULL;
1660     priv->udpsink[i] = NULL;
1661     priv->appsrc[i] = NULL;
1662     priv->appsink[i] = NULL;
1663     priv->appqueue[i] = NULL;
1664     priv->tee[i] = NULL;
1665     priv->funnel[i] = NULL;
1666   }
1667   gst_object_unref (priv->send_src[0]);
1668   priv->send_src[0] = NULL;
1669
1670   gst_element_release_request_pad (rtpbin, priv->send_src[1]);
1671   gst_object_unref (priv->send_src[1]);
1672   priv->send_src[1] = NULL;
1673
1674   g_object_unref (priv->session);
1675   priv->session = NULL;
1676   if (priv->caps)
1677     gst_caps_unref (priv->caps);
1678   priv->caps = NULL;
1679
1680   priv->is_joined = FALSE;
1681   g_mutex_unlock (&priv->lock);
1682
1683   return TRUE;
1684
1685 was_not_joined:
1686   {
1687     return TRUE;
1688   }
1689 }
1690
1691 /**
1692  * gst_rtsp_stream_get_rtpinfo:
1693  * @stream: a #GstRTSPStream
1694  * @rtptime: result RTP timestamp
1695  * @seq: result RTP seqnum
1696  *
1697  * Retrieve the current rtptime and seq. This is used to
1698  * construct a RTPInfo reply header.
1699  *
1700  * Returns: %TRUE when rtptime and seq could be determined.
1701  */
1702 gboolean
1703 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
1704     guint * rtptime, guint * seq)
1705 {
1706   GstRTSPStreamPrivate *priv;
1707   GObjectClass *payobjclass;
1708
1709   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1710   g_return_val_if_fail (rtptime != NULL, FALSE);
1711   g_return_val_if_fail (seq != NULL, FALSE);
1712
1713   priv = stream->priv;
1714
1715   payobjclass = G_OBJECT_GET_CLASS (priv->payloader);
1716
1717   if (!g_object_class_find_property (payobjclass, "seqnum") ||
1718       !g_object_class_find_property (payobjclass, "timestamp"))
1719     return FALSE;
1720
1721   g_object_get (priv->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
1722
1723   return TRUE;
1724 }
1725
1726 /**
1727  * gst_rtsp_stream_get_caps:
1728  * @stream: a #GstRTSPStream
1729  *
1730  * Retrieve the current caps of @stream.
1731  *
1732  * Returns: (transfer full): the #GstCaps of @stream. use gst_caps_unref()
1733  *    after usage.
1734  */
1735 GstCaps *
1736 gst_rtsp_stream_get_caps (GstRTSPStream * stream)
1737 {
1738   GstRTSPStreamPrivate *priv;
1739   GstCaps *result;
1740
1741   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
1742
1743   priv = stream->priv;
1744
1745   g_mutex_lock (&priv->lock);
1746   if ((result = priv->caps))
1747     gst_caps_ref (result);
1748   g_mutex_unlock (&priv->lock);
1749
1750   return result;
1751 }
1752
1753 /**
1754  * gst_rtsp_stream_recv_rtp:
1755  * @stream: a #GstRTSPStream
1756  * @buffer: (transfer full): a #GstBuffer
1757  *
1758  * Handle an RTP buffer for the stream. This method is usually called when a
1759  * message has been received from a client using the TCP transport.
1760  *
1761  * This function takes ownership of @buffer.
1762  *
1763  * Returns: a GstFlowReturn.
1764  */
1765 GstFlowReturn
1766 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1767 {
1768   GstRTSPStreamPrivate *priv;
1769   GstFlowReturn ret;
1770   GstElement *element;
1771
1772   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1773   priv = stream->priv;
1774   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1775   g_return_val_if_fail (priv->is_joined, FALSE);
1776
1777   g_mutex_lock (&priv->lock);
1778   element = gst_object_ref (priv->appsrc[0]);
1779   g_mutex_unlock (&priv->lock);
1780
1781   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1782
1783   gst_object_unref (element);
1784
1785   return ret;
1786 }
1787
1788 /**
1789  * gst_rtsp_stream_recv_rtcp:
1790  * @stream: a #GstRTSPStream
1791  * @buffer: (transfer full): a #GstBuffer
1792  *
1793  * Handle an RTCP buffer for the stream. This method is usually called when a
1794  * message has been received from a client using the TCP transport.
1795  *
1796  * This function takes ownership of @buffer.
1797  *
1798  * Returns: a GstFlowReturn.
1799  */
1800 GstFlowReturn
1801 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1802 {
1803   GstRTSPStreamPrivate *priv;
1804   GstFlowReturn ret;
1805   GstElement *element;
1806
1807   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1808   priv = stream->priv;
1809   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1810   g_return_val_if_fail (priv->is_joined, FALSE);
1811
1812   g_mutex_lock (&priv->lock);
1813   element = gst_object_ref (priv->appsrc[1]);
1814   g_mutex_unlock (&priv->lock);
1815
1816   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1817
1818   gst_object_unref (element);
1819
1820   return ret;
1821 }
1822
1823 /* must be called with lock */
1824 static gboolean
1825 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1826     gboolean add)
1827 {
1828   GstRTSPStreamPrivate *priv = stream->priv;
1829   const GstRTSPTransport *tr;
1830
1831   tr = gst_rtsp_stream_transport_get_transport (trans);
1832
1833   switch (tr->lower_transport) {
1834     case GST_RTSP_LOWER_TRANS_UDP:
1835     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1836     {
1837       gchar *dest;
1838       gint min, max;
1839       guint ttl = 0;
1840
1841       dest = tr->destination;
1842       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1843         min = tr->port.min;
1844         max = tr->port.max;
1845         ttl = tr->ttl;
1846       } else {
1847         min = tr->client_port.min;
1848         max = tr->client_port.max;
1849       }
1850
1851       if (add) {
1852         GST_INFO ("adding %s:%d-%d", dest, min, max);
1853         g_signal_emit_by_name (priv->udpsink[0], "add", dest, min, NULL);
1854         g_signal_emit_by_name (priv->udpsink[1], "add", dest, max, NULL);
1855         if (ttl > 0) {
1856           GST_INFO ("setting ttl-mc %d", ttl);
1857           g_object_set (G_OBJECT (priv->udpsink[0]), "ttl-mc", ttl, NULL);
1858           g_object_set (G_OBJECT (priv->udpsink[1]), "ttl-mc", ttl, NULL);
1859         }
1860         priv->transports = g_list_prepend (priv->transports, trans);
1861       } else {
1862         GST_INFO ("removing %s:%d-%d", dest, min, max);
1863         g_signal_emit_by_name (priv->udpsink[0], "remove", dest, min, NULL);
1864         g_signal_emit_by_name (priv->udpsink[1], "remove", dest, max, NULL);
1865         priv->transports = g_list_remove (priv->transports, trans);
1866       }
1867       break;
1868     }
1869     case GST_RTSP_LOWER_TRANS_TCP:
1870       if (add) {
1871         GST_INFO ("adding TCP %s", tr->destination);
1872         priv->transports = g_list_prepend (priv->transports, trans);
1873       } else {
1874         GST_INFO ("removing TCP %s", tr->destination);
1875         priv->transports = g_list_remove (priv->transports, trans);
1876       }
1877       break;
1878     default:
1879       goto unknown_transport;
1880   }
1881   return TRUE;
1882
1883   /* ERRORS */
1884 unknown_transport:
1885   {
1886     GST_INFO ("Unknown transport %d", tr->lower_transport);
1887     return FALSE;
1888   }
1889 }
1890
1891
1892 /**
1893  * gst_rtsp_stream_add_transport:
1894  * @stream: a #GstRTSPStream
1895  * @trans: a #GstRTSPStreamTransport
1896  *
1897  * Add the transport in @trans to @stream. The media of @stream will
1898  * then also be send to the values configured in @trans.
1899  *
1900  * @stream must be joined to a bin.
1901  *
1902  * @trans must contain a valid #GstRTSPTransport.
1903  *
1904  * Returns: %TRUE if @trans was added
1905  */
1906 gboolean
1907 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1908     GstRTSPStreamTransport * trans)
1909 {
1910   GstRTSPStreamPrivate *priv;
1911   gboolean res;
1912
1913   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1914   priv = stream->priv;
1915   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1916   g_return_val_if_fail (priv->is_joined, FALSE);
1917
1918   g_mutex_lock (&priv->lock);
1919   res = update_transport (stream, trans, TRUE);
1920   g_mutex_unlock (&priv->lock);
1921
1922   return res;
1923 }
1924
1925 /**
1926  * gst_rtsp_stream_remove_transport:
1927  * @stream: a #GstRTSPStream
1928  * @trans: a #GstRTSPStreamTransport
1929  *
1930  * Remove the transport in @trans from @stream. The media of @stream will
1931  * not be sent to the values configured in @trans.
1932  *
1933  * @stream must be joined to a bin.
1934  *
1935  * @trans must contain a valid #GstRTSPTransport.
1936  *
1937  * Returns: %TRUE if @trans was removed
1938  */
1939 gboolean
1940 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1941     GstRTSPStreamTransport * trans)
1942 {
1943   GstRTSPStreamPrivate *priv;
1944   gboolean res;
1945
1946   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1947   priv = stream->priv;
1948   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1949   g_return_val_if_fail (priv->is_joined, FALSE);
1950
1951   g_mutex_lock (&priv->lock);
1952   res = update_transport (stream, trans, FALSE);
1953   g_mutex_unlock (&priv->lock);
1954
1955   return res;
1956 }