rtsp: improve debug
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 enum
31 {
32   PROP_0,
33   PROP_LAST
34 };
35
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
38
39 static GQuark ssrc_stream_map_key;
40
41 static void gst_rtsp_stream_finalize (GObject * obj);
42
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
44
45 static void
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
47 {
48   GObjectClass *gobject_class;
49
50   gobject_class = G_OBJECT_CLASS (klass);
51
52   gobject_class->finalize = gst_rtsp_stream_finalize;
53
54   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
55
56   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
57 }
58
59 static void
60 gst_rtsp_stream_init (GstRTSPStream * stream)
61 {
62   g_mutex_init (&stream->lock);
63 }
64
65 static void
66 gst_rtsp_stream_finalize (GObject * obj)
67 {
68   GstRTSPStream *stream;
69
70   stream = GST_RTSP_STREAM (obj);
71
72   /* we really need to be unjoined now */
73   g_return_if_fail (!stream->is_joined);
74
75   if (stream->addr)
76     gst_rtsp_address_free (stream->addr);
77   if (stream->pool)
78     g_object_unref (stream->pool);
79   gst_object_unref (stream->payloader);
80   gst_object_unref (stream->srcpad);
81   g_mutex_clear (&stream->lock);
82
83   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
84 }
85
86 /**
87  * gst_rtsp_stream_new:
88  * @idx: an index
89  * @srcpad: a #GstPad
90  * @payloader: a #GstElement
91  *
92  * Create a new media stream with index @idx that handles RTP data on
93  * @srcpad and has a payloader element @payloader.
94  *
95  * Returns: a new #GstRTSPStream
96  */
97 GstRTSPStream *
98 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
99 {
100   GstRTSPStream *stream;
101
102   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
103   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
104   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
105
106   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
107   stream->idx = idx;
108   stream->payloader = gst_object_ref (payloader);
109   stream->srcpad = gst_object_ref (srcpad);
110
111   return stream;
112 }
113
114 /**
115  * gst_rtsp_stream_set_mtu:
116  * @stream: a #GstRTSPStream
117  * @mtu: a new MTU
118  *
119  * Configure the mtu in the payloader of @stream to @mtu.
120  */
121 void
122 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
123 {
124   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
125
126   GST_LOG_OBJECT (stream, "set MTU %u", mtu);
127
128   g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
129 }
130
131 /**
132  * gst_rtsp_stream_get_mtu:
133  * @stream: a #GstRTSPStream
134  *
135  * Get the configured MTU in the payloader of @stream.
136  *
137  * Returns: the MTU of the payloader.
138  */
139 guint
140 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
141 {
142   guint mtu;
143
144   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
145
146   g_object_get (G_OBJECT (stream->payloader), "mtu", &mtu, NULL);
147
148   return mtu;
149 }
150
151 /**
152  * gst_rtsp_stream_set_address_pool:
153  * @stream: a #GstRTSPStream
154  * @pool: a #GstRTSPAddressPool
155  *
156  * configure @pool to be used as the address pool of @stream.
157  */
158 void
159 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
160     GstRTSPAddressPool * pool)
161 {
162   GstRTSPAddressPool *old;
163
164   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
165
166   GST_LOG_OBJECT (stream, "set address pool %p", pool);
167
168   g_mutex_lock (&stream->lock);
169   if ((old = stream->pool) != pool)
170     stream->pool = pool ? g_object_ref (pool) : NULL;
171   else
172     old = NULL;
173   g_mutex_unlock (&stream->lock);
174
175   if (old)
176     g_object_unref (old);
177 }
178
179 /**
180  * gst_rtsp_stream_get_address_pool:
181  * @stream: a #GstRTSPStream
182  *
183  * Get the #GstRTSPAddressPool used as the address pool of @stream.
184  *
185  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
186  * usage.
187  */
188 GstRTSPAddressPool *
189 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
190 {
191   GstRTSPAddressPool *result;
192
193   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
194
195   g_mutex_lock (&stream->lock);
196   if ((result = stream->pool))
197     g_object_ref (result);
198   g_mutex_unlock (&stream->lock);
199
200   return result;
201 }
202
203 /**
204  * gst_rtsp_stream_get_address:
205  * @stream: a #GstRTSPStream
206  *
207  * Get the multicast address of @stream.
208  *
209  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
210  * allocated. gst_rtsp_address_free() after usage.
211  */
212 GstRTSPAddress *
213 gst_rtsp_stream_get_address (GstRTSPStream * stream)
214 {
215   GstRTSPAddress *result;
216
217   g_mutex_lock (&stream->lock);
218   if (stream->addr == NULL) {
219     if (stream->pool == NULL)
220       goto no_pool;
221
222     stream->addr = gst_rtsp_address_pool_acquire_address (stream->pool,
223         GST_RTSP_ADDRESS_FLAG_EVEN_PORT, 2);
224     if (stream->addr == NULL)
225       goto no_address;
226   }
227   result = gst_rtsp_address_copy (stream->addr);
228   g_mutex_unlock (&stream->lock);
229
230   return result;
231
232   /* ERRORS */
233 no_pool:
234   {
235     GST_ERROR_OBJECT (stream, "no address pool specified");
236     g_mutex_unlock (&stream->lock);
237     return NULL;
238   }
239 no_address:
240   {
241     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
242     g_mutex_unlock (&stream->lock);
243     return NULL;
244   }
245 }
246
247 /* must be called with lock */
248 static gboolean
249 alloc_ports (GstRTSPStream * stream)
250 {
251   GstStateChangeReturn ret;
252   GstElement *udpsrc0, *udpsrc1;
253   GstElement *udpsink0, *udpsink1;
254   gint tmp_rtp, tmp_rtcp;
255   guint count;
256   gint rtpport, rtcpport;
257   GSocket *socket;
258   const gchar *host;
259
260   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
261
262   udpsrc0 = NULL;
263   udpsrc1 = NULL;
264   udpsink0 = NULL;
265   udpsink1 = NULL;
266   count = 0;
267
268   /* Start with random port */
269   tmp_rtp = 0;
270
271   if (stream->is_ipv6)
272     host = "udp://[::0]";
273   else
274     host = "udp://0.0.0.0";
275
276   /* try to allocate 2 UDP ports, the RTP port should be an even
277    * number and the RTCP port should be the next (uneven) port */
278 again:
279   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
280   if (udpsrc0 == NULL)
281     goto no_udp_protocol;
282   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
283
284   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
285   if (ret == GST_STATE_CHANGE_FAILURE) {
286     if (tmp_rtp != 0) {
287       tmp_rtp += 2;
288       if (++count > 20)
289         goto no_ports;
290
291       gst_element_set_state (udpsrc0, GST_STATE_NULL);
292       gst_object_unref (udpsrc0);
293
294       goto again;
295     }
296     goto no_udp_protocol;
297   }
298
299   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
300
301   /* check if port is even */
302   if ((tmp_rtp & 1) != 0) {
303     /* port not even, close and allocate another */
304     if (++count > 20)
305       goto no_ports;
306
307     gst_element_set_state (udpsrc0, GST_STATE_NULL);
308     gst_object_unref (udpsrc0);
309
310     tmp_rtp++;
311     goto again;
312   }
313
314   /* allocate port+1 for RTCP now */
315   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
316   if (udpsrc1 == NULL)
317     goto no_udp_rtcp_protocol;
318
319   /* set port */
320   tmp_rtcp = tmp_rtp + 1;
321   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
322
323   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
324   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
325   if (ret == GST_STATE_CHANGE_FAILURE) {
326
327     if (++count > 20)
328       goto no_ports;
329
330     gst_element_set_state (udpsrc0, GST_STATE_NULL);
331     gst_object_unref (udpsrc0);
332
333     gst_element_set_state (udpsrc1, GST_STATE_NULL);
334     gst_object_unref (udpsrc1);
335
336     tmp_rtp += 2;
337     goto again;
338   }
339   /* all fine, do port check */
340   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
341   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
342
343   /* this should not happen... */
344   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
345     goto port_error;
346
347   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
348   if (!udpsink0)
349     goto no_udp_protocol;
350
351   g_object_get (G_OBJECT (udpsrc0), "used-socket", &socket, NULL);
352   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
353   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
354
355   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
356   if (!udpsink1)
357     goto no_udp_protocol;
358
359   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
360           "send-duplicates")) {
361     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
362     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
363   } else {
364     g_warning
365         ("old multiudpsink version found without send-duplicates property");
366   }
367
368   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
369           "buffer-size")) {
370     g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
371         NULL);
372   } else {
373     GST_WARNING ("multiudpsink version found without buffer-size property");
374   }
375
376   g_object_get (G_OBJECT (udpsrc1), "used-socket", &socket, NULL);
377   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
378   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
379   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
380   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
381   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
382   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
383   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
384   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
385
386   /* we keep these elements, we will further configure them when the
387    * client told us to really use the UDP ports. */
388   stream->udpsrc[0] = udpsrc0;
389   stream->udpsrc[1] = udpsrc1;
390   stream->udpsink[0] = udpsink0;
391   stream->udpsink[1] = udpsink1;
392   stream->server_port.min = rtpport;
393   stream->server_port.max = rtcpport;
394
395   return TRUE;
396
397   /* ERRORS */
398 no_udp_protocol:
399   {
400     goto cleanup;
401   }
402 no_ports:
403   {
404     goto cleanup;
405   }
406 no_udp_rtcp_protocol:
407   {
408     goto cleanup;
409   }
410 port_error:
411   {
412     goto cleanup;
413   }
414 cleanup:
415   {
416     if (udpsrc0) {
417       gst_element_set_state (udpsrc0, GST_STATE_NULL);
418       gst_object_unref (udpsrc0);
419     }
420     if (udpsrc1) {
421       gst_element_set_state (udpsrc1, GST_STATE_NULL);
422       gst_object_unref (udpsrc1);
423     }
424     if (udpsink0) {
425       gst_element_set_state (udpsink0, GST_STATE_NULL);
426       gst_object_unref (udpsink0);
427     }
428     if (udpsink1) {
429       gst_element_set_state (udpsink1, GST_STATE_NULL);
430       gst_object_unref (udpsink1);
431     }
432     return FALSE;
433   }
434 }
435
436 /* executed from streaming thread */
437 static void
438 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
439 {
440   GstCaps *newcaps, *oldcaps;
441
442   newcaps = gst_pad_get_current_caps (pad);
443
444   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
445       newcaps);
446
447   g_mutex_lock (&stream->lock);
448   oldcaps = stream->caps;
449   stream->caps = newcaps;
450   g_mutex_unlock (&stream->lock);
451
452   if (oldcaps)
453     gst_caps_unref (oldcaps);
454 }
455
456 static void
457 dump_structure (const GstStructure * s)
458 {
459   gchar *sstr;
460
461   sstr = gst_structure_to_string (s);
462   GST_INFO ("structure: %s", sstr);
463   g_free (sstr);
464 }
465
466 static GstRTSPStreamTransport *
467 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
468 {
469   GList *walk;
470   GstRTSPStreamTransport *result = NULL;
471   const gchar *tmp;
472   gchar *dest;
473   guint port;
474
475   if (rtcp_from == NULL)
476     return NULL;
477
478   tmp = g_strrstr (rtcp_from, ":");
479   if (tmp == NULL)
480     return NULL;
481
482   port = atoi (tmp + 1);
483   dest = g_strndup (rtcp_from, tmp - rtcp_from);
484
485   g_mutex_lock (&stream->lock);
486   GST_INFO ("finding %s:%d in %d transports", dest, port,
487       g_list_length (stream->transports));
488
489   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
490     GstRTSPStreamTransport *trans = walk->data;
491     gint min, max;
492
493     min = trans->transport->client_port.min;
494     max = trans->transport->client_port.max;
495
496     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
497             || max == port)) {
498       result = trans;
499       break;
500     }
501   }
502   g_mutex_unlock (&stream->lock);
503
504   g_free (dest);
505
506   return result;
507 }
508
509 static GstRTSPStreamTransport *
510 check_transport (GObject * source, GstRTSPStream * stream)
511 {
512   GstStructure *stats;
513   GstRTSPStreamTransport *trans;
514
515   /* see if we have a stream to match with the origin of the RTCP packet */
516   trans = g_object_get_qdata (source, ssrc_stream_map_key);
517   if (trans == NULL) {
518     g_object_get (source, "stats", &stats, NULL);
519     if (stats) {
520       const gchar *rtcp_from;
521
522       dump_structure (stats);
523
524       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
525       if ((trans = find_transport (stream, rtcp_from))) {
526         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
527             source);
528
529         /* keep ref to the source */
530         trans->rtpsource = source;
531
532         g_object_set_qdata (source, ssrc_stream_map_key, trans);
533       }
534       gst_structure_free (stats);
535     }
536   }
537
538   return trans;
539 }
540
541
542 static void
543 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
544 {
545   GstRTSPStreamTransport *trans;
546
547   GST_INFO ("%p: new source %p", stream, source);
548
549   trans = check_transport (source, stream);
550
551   if (trans)
552     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
553 }
554
555 static void
556 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
557 {
558   GST_INFO ("%p: new SDES %p", stream, source);
559 }
560
561 static void
562 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
563 {
564   GstRTSPStreamTransport *trans;
565
566   trans = check_transport (source, stream);
567
568   if (trans) {
569     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
570     gst_rtsp_stream_transport_keep_alive (trans);
571   }
572 #ifdef DUMP_STATS
573   {
574     GstStructure *stats;
575     g_object_get (source, "stats", &stats, NULL);
576     if (stats) {
577       dump_structure (stats);
578       gst_structure_free (stats);
579     }
580   }
581 #endif
582 }
583
584 static void
585 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
586 {
587   GST_INFO ("%p: source %p bye", stream, source);
588 }
589
590 static void
591 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
592 {
593   GstRTSPStreamTransport *trans;
594
595   GST_INFO ("%p: source %p bye timeout", stream, source);
596
597   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
598     trans->rtpsource = NULL;
599     trans->timeout = TRUE;
600   }
601 }
602
603 static void
604 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
605 {
606   GstRTSPStreamTransport *trans;
607
608   GST_INFO ("%p: source %p timeout", stream, source);
609
610   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
611     trans->rtpsource = NULL;
612     trans->timeout = TRUE;
613   }
614 }
615
616 static GstFlowReturn
617 handle_new_sample (GstAppSink * sink, gpointer user_data)
618 {
619   GList *walk;
620   GstSample *sample;
621   GstBuffer *buffer;
622   GstRTSPStream *stream;
623
624   sample = gst_app_sink_pull_sample (sink);
625   if (!sample)
626     return GST_FLOW_OK;
627
628   stream = (GstRTSPStream *) user_data;
629   buffer = gst_sample_get_buffer (sample);
630
631   g_mutex_lock (&stream->lock);
632   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
633     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
634
635     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
636       gst_rtsp_stream_transport_send_rtp (tr, buffer);
637     } else {
638       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
639     }
640   }
641   g_mutex_unlock (&stream->lock);
642
643   gst_sample_unref (sample);
644
645   return GST_FLOW_OK;
646 }
647
648 static GstAppSinkCallbacks sink_cb = {
649   NULL,                         /* not interested in EOS */
650   NULL,                         /* not interested in preroll samples */
651   handle_new_sample,
652 };
653
654 /**
655  * gst_rtsp_stream_join_bin:
656  * @stream: a #GstRTSPStream
657  * @bin: a #GstBin to join
658  * @rtpbin: a rtpbin element in @bin
659  * @state: the target state of the new elements
660  *
661  * Join the #Gstbin @bin that contains the element @rtpbin.
662  *
663  * @stream will link to @rtpbin, which must be inside @bin. The elements
664  * added to @bin will be set to the state given in @state.
665  *
666  * Returns: %TRUE on success.
667  */
668 gboolean
669 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
670     GstElement * rtpbin, GstState state)
671 {
672   gint i, idx;
673   gchar *name;
674   GstPad *pad, *teepad, *queuepad, *selpad;
675   GstPadLinkReturn ret;
676
677   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
678   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
679   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
680
681   g_mutex_lock (&stream->lock);
682   if (stream->is_joined)
683     goto was_joined;
684
685   /* create a session with the same index as the stream */
686   idx = stream->idx;
687
688   GST_INFO ("stream %p joining bin as session %d", stream, idx);
689
690   if (!alloc_ports (stream))
691     goto no_ports;
692
693   /* get a pad for sending RTP */
694   name = g_strdup_printf ("send_rtp_sink_%u", idx);
695   stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
696   g_free (name);
697   /* link the RTP pad to the session manager, it should not really fail unless
698    * this is not really an RTP pad */
699   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
700   if (ret != GST_PAD_LINK_OK)
701     goto link_failed;
702
703   /* get pads from the RTP session element for sending and receiving
704    * RTP/RTCP*/
705   name = g_strdup_printf ("send_rtp_src_%u", idx);
706   stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
707   g_free (name);
708   name = g_strdup_printf ("send_rtcp_src_%u", idx);
709   stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
710   g_free (name);
711   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
712   stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
713   g_free (name);
714   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
715   stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
716   g_free (name);
717
718   /* get the session */
719   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
720
721   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
722       stream);
723   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
724       stream);
725   g_signal_connect (stream->session, "on-ssrc-active",
726       (GCallback) on_ssrc_active, stream);
727   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
728       stream);
729   g_signal_connect (stream->session, "on-bye-timeout",
730       (GCallback) on_bye_timeout, stream);
731   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
732       stream);
733
734   for (i = 0; i < 2; i++) {
735     /* For the sender we create this bit of pipeline for both
736      * RTP and RTCP. Sync and preroll are enabled on udpsink so
737      * we need to add a queue before appsink to make the pipeline
738      * not block. For the TCP case, we want to pump data to the
739      * client as fast as possible anyway.
740      *
741      * .--------.      .-----.    .---------.
742      * | rtpbin |      | tee |    | udpsink |
743      * |       send->sink   src->sink       |
744      * '--------'      |     |    '---------'
745      *                 |     |    .---------.    .---------.
746      *                 |     |    |  queue  |    | appsink |
747      *                 |    src->sink      src->sink       |
748      *                 '-----'    '---------'    '---------'
749      */
750     /* make tee for RTP/RTCP */
751     stream->tee[i] = gst_element_factory_make ("tee", NULL);
752     gst_bin_add (bin, stream->tee[i]);
753
754     /* and link to rtpbin send pad */
755     pad = gst_element_get_static_pad (stream->tee[i], "sink");
756     gst_pad_link (stream->send_src[i], pad);
757     gst_object_unref (pad);
758
759     /* add udpsink */
760     gst_bin_add (bin, stream->udpsink[i]);
761
762     /* link tee to udpsink */
763     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
764     pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
765     gst_pad_link (teepad, pad);
766     gst_object_unref (pad);
767     gst_object_unref (teepad);
768
769     /* make queue */
770     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
771     gst_bin_add (bin, stream->appqueue[i]);
772     /* and link to tee */
773     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
774     pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
775     gst_pad_link (teepad, pad);
776     gst_object_unref (pad);
777     gst_object_unref (teepad);
778
779     /* make appsink */
780     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
781     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
782     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
783     gst_bin_add (bin, stream->appsink[i]);
784     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
785         &sink_cb, stream, NULL);
786     /* and link to queue */
787     queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
788     pad = gst_element_get_static_pad (stream->appsink[i], "sink");
789     gst_pad_link (queuepad, pad);
790     gst_object_unref (pad);
791     gst_object_unref (queuepad);
792
793     /* For the receiver we create this bit of pipeline for both
794      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
795      * and it is all funneled into the rtpbin receive pad.
796      *
797      * .--------.     .--------.    .--------.
798      * | udpsrc |     | funnel |    | rtpbin |
799      * |       src->sink      src->sink      |
800      * '--------'     |        |    '--------'
801      * .--------.     |        |
802      * | appsrc |     |        |
803      * |       src->sink       |
804      * '--------'     '--------'
805      */
806     /* make funnel for the RTP/RTCP receivers */
807     stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
808     gst_bin_add (bin, stream->funnel[i]);
809
810     pad = gst_element_get_static_pad (stream->funnel[i], "src");
811     gst_pad_link (pad, stream->recv_sink[i]);
812     gst_object_unref (pad);
813
814     /* add udpsrc */
815     gst_bin_add (bin, stream->udpsrc[i]);
816     /* and link to the funnel */
817     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
818     pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
819     gst_pad_link (pad, selpad);
820     gst_object_unref (pad);
821     gst_object_unref (selpad);
822
823     /* make and add appsrc */
824     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
825     gst_bin_add (bin, stream->appsrc[i]);
826     /* and link to the funnel */
827     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
828     pad = gst_element_get_static_pad (stream->appsrc[i], "src");
829     gst_pad_link (pad, selpad);
830     gst_object_unref (pad);
831     gst_object_unref (selpad);
832
833     /* check if we need to set to a special state */
834     if (state != GST_STATE_NULL) {
835       gst_element_set_state (stream->udpsink[i], state);
836       gst_element_set_state (stream->appsink[i], state);
837       gst_element_set_state (stream->appqueue[i], state);
838       gst_element_set_state (stream->tee[i], state);
839       gst_element_set_state (stream->funnel[i], state);
840       gst_element_set_state (stream->appsrc[i], state);
841     }
842     /* we set and keep these to playing so that they don't cause NO_PREROLL return
843      * values */
844     gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
845     gst_element_set_locked_state (stream->udpsrc[i], TRUE);
846   }
847
848   /* be notified of caps changes */
849   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
850       (GCallback) caps_notify, stream);
851
852   stream->is_joined = TRUE;
853   g_mutex_unlock (&stream->lock);
854
855   return TRUE;
856
857   /* ERRORS */
858 was_joined:
859   {
860     g_mutex_unlock (&stream->lock);
861     return TRUE;
862   }
863 no_ports:
864   {
865     g_mutex_unlock (&stream->lock);
866     GST_WARNING ("failed to allocate ports %d", idx);
867     return FALSE;
868   }
869 link_failed:
870   {
871     GST_WARNING ("failed to link stream %d", idx);
872     gst_object_unref (stream->send_rtp_sink);
873     stream->send_rtp_sink = NULL;
874     g_mutex_unlock (&stream->lock);
875     return FALSE;
876   }
877 }
878
879 /**
880  * gst_rtsp_stream_leave_bin:
881  * @stream: a #GstRTSPStream
882  * @bin: a #GstBin
883  * @rtpbin: a rtpbin #GstElement
884  *
885  * Remove the elements of @stream from @bin. @bin must be set
886  * to the NULL state before calling this.
887  *
888  * Return: %TRUE on success.
889  */
890 gboolean
891 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
892     GstElement * rtpbin)
893 {
894   gint i;
895
896   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
897   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
898   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
899
900   g_mutex_lock (&stream->lock);
901   if (!stream->is_joined)
902     goto was_not_joined;
903
904   /* all transports must be removed by now */
905   g_return_val_if_fail (stream->transports == NULL, FALSE);
906
907   GST_INFO ("stream %p leaving bin", stream);
908
909   gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
910   g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
911   gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
912   gst_object_unref (stream->send_rtp_sink);
913   stream->send_rtp_sink = NULL;
914
915   for (i = 0; i < 2; i++) {
916     /* and set udpsrc to NULL now before removing */
917     gst_element_set_locked_state (stream->udpsrc[i], FALSE);
918     gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
919
920     /* removing them should also nicely release the request
921      * pads when they finalize */
922     gst_bin_remove (bin, stream->udpsrc[i]);
923     gst_bin_remove (bin, stream->udpsink[i]);
924     gst_bin_remove (bin, stream->appsrc[i]);
925     gst_bin_remove (bin, stream->appsink[i]);
926     gst_bin_remove (bin, stream->appqueue[i]);
927     gst_bin_remove (bin, stream->tee[i]);
928     gst_bin_remove (bin, stream->funnel[i]);
929
930     gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
931     gst_object_unref (stream->recv_sink[i]);
932     stream->recv_sink[i] = NULL;
933
934     stream->udpsrc[i] = NULL;
935     stream->udpsink[i] = NULL;
936     stream->appsrc[i] = NULL;
937     stream->appsink[i] = NULL;
938     stream->appqueue[i] = NULL;
939     stream->tee[i] = NULL;
940     stream->funnel[i] = NULL;
941   }
942   gst_object_unref (stream->send_src[0]);
943   stream->send_src[0] = NULL;
944
945   gst_element_release_request_pad (rtpbin, stream->send_src[1]);
946   gst_object_unref (stream->send_src[1]);
947   stream->send_src[1] = NULL;
948
949   g_object_unref (stream->session);
950   if (stream->caps)
951     gst_caps_unref (stream->caps);
952
953   stream->is_joined = FALSE;
954   g_mutex_unlock (&stream->lock);
955
956   return TRUE;
957
958 was_not_joined:
959   {
960     return TRUE;
961   }
962 }
963
964 /**
965  * gst_rtsp_stream_get_rtpinfo:
966  * @stream: a #GstRTSPStream
967  * @rtptime: result RTP timestamp
968  * @seq: result RTP seqnum
969  *
970  * Retrieve the current rtptime and seq. This is used to
971  * construct a RTPInfo reply header.
972  *
973  * Returns: %TRUE when rtptime and seq could be determined.
974  */
975 gboolean
976 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
977     guint * rtptime, guint * seq)
978 {
979   GObjectClass *payobjclass;
980
981   payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
982
983   if (!g_object_class_find_property (payobjclass, "seqnum") ||
984       !g_object_class_find_property (payobjclass, "timestamp"))
985     return FALSE;
986
987   g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
988
989   return TRUE;
990 }
991
992 /**
993  * gst_rtsp_stream_recv_rtp:
994  * @stream: a #GstRTSPStream
995  * @buffer: (transfer full): a #GstBuffer
996  *
997  * Handle an RTP buffer for the stream. This method is usually called when a
998  * message has been received from a client using the TCP transport.
999  *
1000  * This function takes ownership of @buffer.
1001  *
1002  * Returns: a GstFlowReturn.
1003  */
1004 GstFlowReturn
1005 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1006 {
1007   GstFlowReturn ret;
1008   GstElement *element;
1009
1010   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1011   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1012   g_return_val_if_fail (stream->is_joined, FALSE);
1013
1014   g_mutex_lock (&stream->lock);
1015   element = gst_object_ref (stream->appsrc[0]);
1016   g_mutex_unlock (&stream->lock);
1017
1018   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1019
1020   gst_object_unref (element);
1021
1022   return ret;
1023 }
1024
1025 /**
1026  * gst_rtsp_stream_recv_rtcp:
1027  * @stream: a #GstRTSPStream
1028  * @buffer: (transfer full): a #GstBuffer
1029  *
1030  * Handle an RTCP buffer for the stream. This method is usually called when a
1031  * message has been received from a client using the TCP transport.
1032  *
1033  * This function takes ownership of @buffer.
1034  *
1035  * Returns: a GstFlowReturn.
1036  */
1037 GstFlowReturn
1038 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1039 {
1040   GstFlowReturn ret;
1041   GstElement *element;
1042
1043   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1044   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1045   g_return_val_if_fail (stream->is_joined, FALSE);
1046
1047   g_mutex_lock (&stream->lock);
1048   element = gst_object_ref (stream->appsrc[1]);
1049   g_mutex_unlock (&stream->lock);
1050
1051   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1052
1053   gst_object_unref (element);
1054
1055   return ret;
1056 }
1057
1058 /* must be called with lock */
1059 static gboolean
1060 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1061     gboolean add)
1062 {
1063   GstRTSPTransport *tr;
1064   gboolean updated;
1065
1066   updated = FALSE;
1067
1068   tr = trans->transport;
1069
1070   switch (tr->lower_transport) {
1071     case GST_RTSP_LOWER_TRANS_UDP:
1072     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1073     {
1074       gchar *dest;
1075       gint min, max;
1076       guint ttl = 0;
1077
1078       dest = tr->destination;
1079       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1080         min = tr->port.min;
1081         max = tr->port.max;
1082         ttl = tr->ttl;
1083       } else {
1084         min = tr->client_port.min;
1085         max = tr->client_port.max;
1086       }
1087
1088       if (add && !trans->active) {
1089         GST_INFO ("adding %s:%d-%d", dest, min, max);
1090         g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1091         g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1092         if (ttl > 0) {
1093           GST_INFO ("setting ttl-mc %d", ttl);
1094           g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
1095           g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
1096         }
1097         stream->transports = g_list_prepend (stream->transports, trans);
1098         trans->active = TRUE;
1099         updated = TRUE;
1100       } else if (trans->active) {
1101         GST_INFO ("removing %s:%d-%d", dest, min, max);
1102         g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1103         g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1104         stream->transports = g_list_remove (stream->transports, trans);
1105         trans->active = FALSE;
1106         updated = TRUE;
1107       }
1108       break;
1109     }
1110     case GST_RTSP_LOWER_TRANS_TCP:
1111       if (add && !trans->active) {
1112         GST_INFO ("adding TCP %s", tr->destination);
1113         stream->transports = g_list_prepend (stream->transports, trans);
1114         trans->active = TRUE;
1115         updated = TRUE;
1116       } else if (trans->active) {
1117         GST_INFO ("removing TCP %s", tr->destination);
1118         stream->transports = g_list_remove (stream->transports, trans);
1119         trans->active = FALSE;
1120         updated = TRUE;
1121       }
1122       break;
1123     default:
1124       GST_INFO ("Unknown transport %d", tr->lower_transport);
1125       break;
1126   }
1127   return updated;
1128 }
1129
1130
1131 /**
1132  * gst_rtsp_stream_add_transport:
1133  * @stream: a #GstRTSPStream
1134  * @trans: a #GstRTSPStreamTransport
1135  *
1136  * Add the transport in @trans to @stream. The media of @stream will
1137  * then also be send to the values configured in @trans.
1138  *
1139  * @stream must be joined to a bin.
1140  *
1141  * @trans must contain a valid #GstRTSPTransport.
1142  *
1143  * Returns: %TRUE if @trans was added
1144  */
1145 gboolean
1146 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1147     GstRTSPStreamTransport * trans)
1148 {
1149   gboolean res;
1150
1151   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1152   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1153   g_return_val_if_fail (stream->is_joined, FALSE);
1154   g_return_val_if_fail (trans->transport != NULL, FALSE);
1155
1156   g_mutex_lock (&stream->lock);
1157   res = update_transport (stream, trans, TRUE);
1158   g_mutex_unlock (&stream->lock);
1159
1160   return res;
1161 }
1162
1163 /**
1164  * gst_rtsp_stream_remove_transport:
1165  * @stream: a #GstRTSPStream
1166  * @trans: a #GstRTSPStreamTransport
1167  *
1168  * Remove the transport in @trans from @stream. The media of @stream will
1169  * not be sent to the values configured in @trans.
1170  *
1171  * @stream must be joined to a bin.
1172  *
1173  * @trans must contain a valid #GstRTSPTransport.
1174  *
1175  * Returns: %TRUE if @trans was removed
1176  */
1177 gboolean
1178 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1179     GstRTSPStreamTransport * trans)
1180 {
1181   gboolean res;
1182
1183   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1184   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1185   g_return_val_if_fail (stream->is_joined, FALSE);
1186   g_return_val_if_fail (trans->transport != NULL, FALSE);
1187
1188   g_mutex_lock (&stream->lock);
1189   res = update_transport (stream, trans, FALSE);
1190   g_mutex_unlock (&stream->lock);
1191
1192   return res;
1193 }