stream: add methods to deal with address pool
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 enum
31 {
32   PROP_0,
33   PROP_LAST
34 };
35
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
38
39 static GQuark ssrc_stream_map_key;
40
41 static void gst_rtsp_stream_finalize (GObject * obj);
42
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
44
45 static void
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
47 {
48   GObjectClass *gobject_class;
49
50   gobject_class = G_OBJECT_CLASS (klass);
51
52   gobject_class->finalize = gst_rtsp_stream_finalize;
53
54   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
55
56   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
57 }
58
59 static void
60 gst_rtsp_stream_init (GstRTSPStream * stream)
61 {
62   g_mutex_init (&stream->lock);
63 }
64
65 static void
66 gst_rtsp_stream_finalize (GObject * obj)
67 {
68   GstRTSPStream *stream;
69
70   stream = GST_RTSP_STREAM (obj);
71
72   /* we really need to be unjoined now */
73   g_return_if_fail (!stream->is_joined);
74
75   if (stream->addr)
76     gst_rtsp_address_free (stream->addr);
77   if (stream->pool)
78     g_object_unref (stream->pool);
79   gst_object_unref (stream->payloader);
80   gst_object_unref (stream->srcpad);
81   g_mutex_clear (&stream->lock);
82
83   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
84 }
85
86 /**
87  * gst_rtsp_stream_new:
88  * @idx: an index
89  * @srcpad: a #GstPad
90  * @payloader: a #GstElement
91  *
92  * Create a new media stream with index @idx that handles RTP data on
93  * @srcpad and has a payloader element @payloader.
94  *
95  * Returns: a new #GstRTSPStream
96  */
97 GstRTSPStream *
98 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
99 {
100   GstRTSPStream *stream;
101
102   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
103   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
104   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
105
106   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
107   stream->idx = idx;
108   stream->payloader = gst_object_ref (payloader);
109   stream->srcpad = gst_object_ref (srcpad);
110
111   return stream;
112 }
113
114 /**
115  * gst_rtsp_stream_set_mtu:
116  * @stream: a #GstRTSPStream
117  * @mtu: a new MTU
118  *
119  * Configure the mtu in the payloader of @stream to @mtu.
120  */
121 void
122 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
123 {
124   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
125
126   g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
127 }
128
129 /**
130  * gst_rtsp_stream_get_mtu:
131  * @stream: a #GstRTSPStream
132  *
133  * Get the configured MTU in the payloader of @stream.
134  *
135  * Returns: the MTU of the payloader.
136  */
137 guint
138 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
139 {
140   guint mtu;
141
142   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
143
144   g_object_get (G_OBJECT (stream->payloader), "mtu", &mtu, NULL);
145
146   return mtu;
147 }
148
149 /**
150  * gst_rtsp_stream_set_address_pool:
151  * @stream: a #GstRTSPStream
152  * @pool: a #GstRTSPAddressPool
153  *
154  * configure @pool to be used as the address pool of @stream.
155  */
156 void
157 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
158     GstRTSPAddressPool * pool)
159 {
160   GstRTSPAddressPool *old;
161
162   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
163
164   g_mutex_lock (&stream->lock);
165   if ((old = stream->pool) != pool)
166     stream->pool = pool ? g_object_ref (pool) : NULL;
167   else
168     old = NULL;
169   g_mutex_unlock (&stream->lock);
170
171   if (old)
172     g_object_unref (old);
173 }
174
175 /**
176  * gst_rtsp_stream_get_address_pool:
177  * @stream: a #GstRTSPStream
178  *
179  * Get the #GstRTSPAddressPool used as the address pool of @stream.
180  *
181  * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
182  * usage.
183  */
184 GstRTSPAddressPool *
185 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
186 {
187   GstRTSPAddressPool *result;
188
189   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
190
191   g_mutex_lock (&stream->lock);
192   if ((result = stream->pool))
193     g_object_ref (result);
194   g_mutex_unlock (&stream->lock);
195
196   return result;
197 }
198
199 /**
200  * gst_rtsp_stream_get_address:
201  * @stream: a #GstRTSPStream
202  *
203  * Get the multicast address of @stream.
204  *
205  * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
206  * allocated. gst_rtsp_address_free() after usage.
207  */
208 GstRTSPAddress *
209 gst_rtsp_stream_get_address (GstRTSPStream * stream)
210 {
211   GstRTSPAddress *result;
212
213   g_mutex_lock (&stream->lock);
214   if (stream->addr == NULL) {
215     if (stream->pool == NULL)
216       goto no_pool;
217
218     stream->addr = gst_rtsp_address_pool_acquire_address (stream->pool,
219         GST_RTSP_ADDRESS_FLAG_EVEN_PORT, 2);
220     if (stream->addr == NULL)
221       goto no_address;
222   }
223   result = gst_rtsp_address_copy (stream->addr);
224   g_mutex_unlock (&stream->lock);
225
226   return result;
227
228   /* ERRORS */
229 no_pool:
230   {
231     GST_ERROR_OBJECT (stream, "no address pool specified");
232     g_mutex_unlock (&stream->lock);
233     return NULL;
234   }
235 no_address:
236   {
237     GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
238     g_mutex_unlock (&stream->lock);
239     return NULL;
240   }
241 }
242
243 /* must be called with lock */
244 static gboolean
245 alloc_ports (GstRTSPStream * stream)
246 {
247   GstStateChangeReturn ret;
248   GstElement *udpsrc0, *udpsrc1;
249   GstElement *udpsink0, *udpsink1;
250   gint tmp_rtp, tmp_rtcp;
251   guint count;
252   gint rtpport, rtcpport;
253   GSocket *socket;
254   const gchar *host;
255
256   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
257
258   udpsrc0 = NULL;
259   udpsrc1 = NULL;
260   udpsink0 = NULL;
261   udpsink1 = NULL;
262   count = 0;
263
264   /* Start with random port */
265   tmp_rtp = 0;
266
267   if (stream->is_ipv6)
268     host = "udp://[::0]";
269   else
270     host = "udp://0.0.0.0";
271
272   /* try to allocate 2 UDP ports, the RTP port should be an even
273    * number and the RTCP port should be the next (uneven) port */
274 again:
275   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
276   if (udpsrc0 == NULL)
277     goto no_udp_protocol;
278   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
279
280   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
281   if (ret == GST_STATE_CHANGE_FAILURE) {
282     if (tmp_rtp != 0) {
283       tmp_rtp += 2;
284       if (++count > 20)
285         goto no_ports;
286
287       gst_element_set_state (udpsrc0, GST_STATE_NULL);
288       gst_object_unref (udpsrc0);
289
290       goto again;
291     }
292     goto no_udp_protocol;
293   }
294
295   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
296
297   /* check if port is even */
298   if ((tmp_rtp & 1) != 0) {
299     /* port not even, close and allocate another */
300     if (++count > 20)
301       goto no_ports;
302
303     gst_element_set_state (udpsrc0, GST_STATE_NULL);
304     gst_object_unref (udpsrc0);
305
306     tmp_rtp++;
307     goto again;
308   }
309
310   /* allocate port+1 for RTCP now */
311   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
312   if (udpsrc1 == NULL)
313     goto no_udp_rtcp_protocol;
314
315   /* set port */
316   tmp_rtcp = tmp_rtp + 1;
317   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
318
319   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
320   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
321   if (ret == GST_STATE_CHANGE_FAILURE) {
322
323     if (++count > 20)
324       goto no_ports;
325
326     gst_element_set_state (udpsrc0, GST_STATE_NULL);
327     gst_object_unref (udpsrc0);
328
329     gst_element_set_state (udpsrc1, GST_STATE_NULL);
330     gst_object_unref (udpsrc1);
331
332     tmp_rtp += 2;
333     goto again;
334   }
335   /* all fine, do port check */
336   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
337   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
338
339   /* this should not happen... */
340   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
341     goto port_error;
342
343   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
344   if (!udpsink0)
345     goto no_udp_protocol;
346
347   g_object_get (G_OBJECT (udpsrc0), "used-socket", &socket, NULL);
348   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
349   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
350
351   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
352   if (!udpsink1)
353     goto no_udp_protocol;
354
355   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
356           "send-duplicates")) {
357     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
358     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
359   } else {
360     g_warning
361         ("old multiudpsink version found without send-duplicates property");
362   }
363
364   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
365           "buffer-size")) {
366     g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
367         NULL);
368   } else {
369     GST_WARNING ("multiudpsink version found without buffer-size property");
370   }
371
372   g_object_get (G_OBJECT (udpsrc1), "used-socket", &socket, NULL);
373   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
374   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
375   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
376   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
377   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
378   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
379   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
380   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
381
382   /* we keep these elements, we will further configure them when the
383    * client told us to really use the UDP ports. */
384   stream->udpsrc[0] = udpsrc0;
385   stream->udpsrc[1] = udpsrc1;
386   stream->udpsink[0] = udpsink0;
387   stream->udpsink[1] = udpsink1;
388   stream->server_port.min = rtpport;
389   stream->server_port.max = rtcpport;
390
391   return TRUE;
392
393   /* ERRORS */
394 no_udp_protocol:
395   {
396     goto cleanup;
397   }
398 no_ports:
399   {
400     goto cleanup;
401   }
402 no_udp_rtcp_protocol:
403   {
404     goto cleanup;
405   }
406 port_error:
407   {
408     goto cleanup;
409   }
410 cleanup:
411   {
412     if (udpsrc0) {
413       gst_element_set_state (udpsrc0, GST_STATE_NULL);
414       gst_object_unref (udpsrc0);
415     }
416     if (udpsrc1) {
417       gst_element_set_state (udpsrc1, GST_STATE_NULL);
418       gst_object_unref (udpsrc1);
419     }
420     if (udpsink0) {
421       gst_element_set_state (udpsink0, GST_STATE_NULL);
422       gst_object_unref (udpsink0);
423     }
424     if (udpsink1) {
425       gst_element_set_state (udpsink1, GST_STATE_NULL);
426       gst_object_unref (udpsink1);
427     }
428     return FALSE;
429   }
430 }
431
432 /* executed from streaming thread */
433 static void
434 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
435 {
436   GstCaps *newcaps, *oldcaps;
437
438   newcaps = gst_pad_get_current_caps (pad);
439
440   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
441       newcaps);
442
443   g_mutex_lock (&stream->lock);
444   oldcaps = stream->caps;
445   stream->caps = newcaps;
446   g_mutex_unlock (&stream->lock);
447
448   if (oldcaps)
449     gst_caps_unref (oldcaps);
450 }
451
452 static void
453 dump_structure (const GstStructure * s)
454 {
455   gchar *sstr;
456
457   sstr = gst_structure_to_string (s);
458   GST_INFO ("structure: %s", sstr);
459   g_free (sstr);
460 }
461
462 static GstRTSPStreamTransport *
463 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
464 {
465   GList *walk;
466   GstRTSPStreamTransport *result = NULL;
467   const gchar *tmp;
468   gchar *dest;
469   guint port;
470
471   if (rtcp_from == NULL)
472     return NULL;
473
474   tmp = g_strrstr (rtcp_from, ":");
475   if (tmp == NULL)
476     return NULL;
477
478   port = atoi (tmp + 1);
479   dest = g_strndup (rtcp_from, tmp - rtcp_from);
480
481   g_mutex_lock (&stream->lock);
482   GST_INFO ("finding %s:%d in %d transports", dest, port,
483       g_list_length (stream->transports));
484
485   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
486     GstRTSPStreamTransport *trans = walk->data;
487     gint min, max;
488
489     min = trans->transport->client_port.min;
490     max = trans->transport->client_port.max;
491
492     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
493             || max == port)) {
494       result = trans;
495       break;
496     }
497   }
498   g_mutex_unlock (&stream->lock);
499
500   g_free (dest);
501
502   return result;
503 }
504
505 static GstRTSPStreamTransport *
506 check_transport (GObject * source, GstRTSPStream * stream)
507 {
508   GstStructure *stats;
509   GstRTSPStreamTransport *trans;
510
511   /* see if we have a stream to match with the origin of the RTCP packet */
512   trans = g_object_get_qdata (source, ssrc_stream_map_key);
513   if (trans == NULL) {
514     g_object_get (source, "stats", &stats, NULL);
515     if (stats) {
516       const gchar *rtcp_from;
517
518       dump_structure (stats);
519
520       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
521       if ((trans = find_transport (stream, rtcp_from))) {
522         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
523             source);
524
525         /* keep ref to the source */
526         trans->rtpsource = source;
527
528         g_object_set_qdata (source, ssrc_stream_map_key, trans);
529       }
530       gst_structure_free (stats);
531     }
532   }
533
534   return trans;
535 }
536
537
538 static void
539 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
540 {
541   GstRTSPStreamTransport *trans;
542
543   GST_INFO ("%p: new source %p", stream, source);
544
545   trans = check_transport (source, stream);
546
547   if (trans)
548     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
549 }
550
551 static void
552 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
553 {
554   GST_INFO ("%p: new SDES %p", stream, source);
555 }
556
557 static void
558 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
559 {
560   GstRTSPStreamTransport *trans;
561
562   trans = check_transport (source, stream);
563
564   if (trans) {
565     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
566     gst_rtsp_stream_transport_keep_alive (trans);
567   }
568 #ifdef DUMP_STATS
569   {
570     GstStructure *stats;
571     g_object_get (source, "stats", &stats, NULL);
572     if (stats) {
573       dump_structure (stats);
574       gst_structure_free (stats);
575     }
576   }
577 #endif
578 }
579
580 static void
581 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
582 {
583   GST_INFO ("%p: source %p bye", stream, source);
584 }
585
586 static void
587 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
588 {
589   GstRTSPStreamTransport *trans;
590
591   GST_INFO ("%p: source %p bye timeout", stream, source);
592
593   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
594     trans->rtpsource = NULL;
595     trans->timeout = TRUE;
596   }
597 }
598
599 static void
600 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
601 {
602   GstRTSPStreamTransport *trans;
603
604   GST_INFO ("%p: source %p timeout", stream, source);
605
606   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
607     trans->rtpsource = NULL;
608     trans->timeout = TRUE;
609   }
610 }
611
612 static GstFlowReturn
613 handle_new_sample (GstAppSink * sink, gpointer user_data)
614 {
615   GList *walk;
616   GstSample *sample;
617   GstBuffer *buffer;
618   GstRTSPStream *stream;
619
620   sample = gst_app_sink_pull_sample (sink);
621   if (!sample)
622     return GST_FLOW_OK;
623
624   stream = (GstRTSPStream *) user_data;
625   buffer = gst_sample_get_buffer (sample);
626
627   g_mutex_lock (&stream->lock);
628   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
629     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
630
631     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
632       gst_rtsp_stream_transport_send_rtp (tr, buffer);
633     } else {
634       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
635     }
636   }
637   g_mutex_unlock (&stream->lock);
638
639   gst_sample_unref (sample);
640
641   return GST_FLOW_OK;
642 }
643
644 static GstAppSinkCallbacks sink_cb = {
645   NULL,                         /* not interested in EOS */
646   NULL,                         /* not interested in preroll samples */
647   handle_new_sample,
648 };
649
650 /**
651  * gst_rtsp_stream_join_bin:
652  * @stream: a #GstRTSPStream
653  * @bin: a #GstBin to join
654  * @rtpbin: a rtpbin element in @bin
655  * @state: the target state of the new elements
656  *
657  * Join the #Gstbin @bin that contains the element @rtpbin.
658  *
659  * @stream will link to @rtpbin, which must be inside @bin. The elements
660  * added to @bin will be set to the state given in @state.
661  *
662  * Returns: %TRUE on success.
663  */
664 gboolean
665 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
666     GstElement * rtpbin, GstState state)
667 {
668   gint i, idx;
669   gchar *name;
670   GstPad *pad, *teepad, *queuepad, *selpad;
671   GstPadLinkReturn ret;
672
673   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
674   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
675   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
676
677   g_mutex_lock (&stream->lock);
678   if (stream->is_joined)
679     goto was_joined;
680
681   /* create a session with the same index as the stream */
682   idx = stream->idx;
683
684   GST_INFO ("stream %p joining bin as session %d", stream, idx);
685
686   if (!alloc_ports (stream))
687     goto no_ports;
688
689   /* get a pad for sending RTP */
690   name = g_strdup_printf ("send_rtp_sink_%u", idx);
691   stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
692   g_free (name);
693   /* link the RTP pad to the session manager, it should not really fail unless
694    * this is not really an RTP pad */
695   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
696   if (ret != GST_PAD_LINK_OK)
697     goto link_failed;
698
699   /* get pads from the RTP session element for sending and receiving
700    * RTP/RTCP*/
701   name = g_strdup_printf ("send_rtp_src_%u", idx);
702   stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
703   g_free (name);
704   name = g_strdup_printf ("send_rtcp_src_%u", idx);
705   stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
706   g_free (name);
707   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
708   stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
709   g_free (name);
710   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
711   stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
712   g_free (name);
713
714   /* get the session */
715   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
716
717   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
718       stream);
719   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
720       stream);
721   g_signal_connect (stream->session, "on-ssrc-active",
722       (GCallback) on_ssrc_active, stream);
723   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
724       stream);
725   g_signal_connect (stream->session, "on-bye-timeout",
726       (GCallback) on_bye_timeout, stream);
727   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
728       stream);
729
730   for (i = 0; i < 2; i++) {
731     /* For the sender we create this bit of pipeline for both
732      * RTP and RTCP. Sync and preroll are enabled on udpsink so
733      * we need to add a queue before appsink to make the pipeline
734      * not block. For the TCP case, we want to pump data to the
735      * client as fast as possible anyway.
736      *
737      * .--------.      .-----.    .---------.
738      * | rtpbin |      | tee |    | udpsink |
739      * |       send->sink   src->sink       |
740      * '--------'      |     |    '---------'
741      *                 |     |    .---------.    .---------.
742      *                 |     |    |  queue  |    | appsink |
743      *                 |    src->sink      src->sink       |
744      *                 '-----'    '---------'    '---------'
745      */
746     /* make tee for RTP/RTCP */
747     stream->tee[i] = gst_element_factory_make ("tee", NULL);
748     gst_bin_add (bin, stream->tee[i]);
749
750     /* and link to rtpbin send pad */
751     pad = gst_element_get_static_pad (stream->tee[i], "sink");
752     gst_pad_link (stream->send_src[i], pad);
753     gst_object_unref (pad);
754
755     /* add udpsink */
756     gst_bin_add (bin, stream->udpsink[i]);
757
758     /* link tee to udpsink */
759     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
760     pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
761     gst_pad_link (teepad, pad);
762     gst_object_unref (pad);
763     gst_object_unref (teepad);
764
765     /* make queue */
766     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
767     gst_bin_add (bin, stream->appqueue[i]);
768     /* and link to tee */
769     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
770     pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
771     gst_pad_link (teepad, pad);
772     gst_object_unref (pad);
773     gst_object_unref (teepad);
774
775     /* make appsink */
776     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
777     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
778     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
779     gst_bin_add (bin, stream->appsink[i]);
780     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
781         &sink_cb, stream, NULL);
782     /* and link to queue */
783     queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
784     pad = gst_element_get_static_pad (stream->appsink[i], "sink");
785     gst_pad_link (queuepad, pad);
786     gst_object_unref (pad);
787     gst_object_unref (queuepad);
788
789     /* For the receiver we create this bit of pipeline for both
790      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
791      * and it is all funneled into the rtpbin receive pad.
792      *
793      * .--------.     .--------.    .--------.
794      * | udpsrc |     | funnel |    | rtpbin |
795      * |       src->sink      src->sink      |
796      * '--------'     |        |    '--------'
797      * .--------.     |        |
798      * | appsrc |     |        |
799      * |       src->sink       |
800      * '--------'     '--------'
801      */
802     /* make funnel for the RTP/RTCP receivers */
803     stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
804     gst_bin_add (bin, stream->funnel[i]);
805
806     pad = gst_element_get_static_pad (stream->funnel[i], "src");
807     gst_pad_link (pad, stream->recv_sink[i]);
808     gst_object_unref (pad);
809
810     /* add udpsrc */
811     gst_bin_add (bin, stream->udpsrc[i]);
812     /* and link to the funnel */
813     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
814     pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
815     gst_pad_link (pad, selpad);
816     gst_object_unref (pad);
817     gst_object_unref (selpad);
818
819     /* make and add appsrc */
820     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
821     gst_bin_add (bin, stream->appsrc[i]);
822     /* and link to the funnel */
823     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
824     pad = gst_element_get_static_pad (stream->appsrc[i], "src");
825     gst_pad_link (pad, selpad);
826     gst_object_unref (pad);
827     gst_object_unref (selpad);
828
829     /* check if we need to set to a special state */
830     if (state != GST_STATE_NULL) {
831       gst_element_set_state (stream->udpsink[i], state);
832       gst_element_set_state (stream->appsink[i], state);
833       gst_element_set_state (stream->appqueue[i], state);
834       gst_element_set_state (stream->tee[i], state);
835       gst_element_set_state (stream->funnel[i], state);
836       gst_element_set_state (stream->appsrc[i], state);
837     }
838     /* we set and keep these to playing so that they don't cause NO_PREROLL return
839      * values */
840     gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
841     gst_element_set_locked_state (stream->udpsrc[i], TRUE);
842   }
843
844   /* be notified of caps changes */
845   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
846       (GCallback) caps_notify, stream);
847
848   stream->is_joined = TRUE;
849   g_mutex_unlock (&stream->lock);
850
851   return TRUE;
852
853   /* ERRORS */
854 was_joined:
855   {
856     g_mutex_unlock (&stream->lock);
857     return TRUE;
858   }
859 no_ports:
860   {
861     g_mutex_unlock (&stream->lock);
862     GST_WARNING ("failed to allocate ports %d", idx);
863     return FALSE;
864   }
865 link_failed:
866   {
867     GST_WARNING ("failed to link stream %d", idx);
868     gst_object_unref (stream->send_rtp_sink);
869     stream->send_rtp_sink = NULL;
870     g_mutex_unlock (&stream->lock);
871     return FALSE;
872   }
873 }
874
875 /**
876  * gst_rtsp_stream_leave_bin:
877  * @stream: a #GstRTSPStream
878  * @bin: a #GstBin
879  * @rtpbin: a rtpbin #GstElement
880  *
881  * Remove the elements of @stream from @bin. @bin must be set
882  * to the NULL state before calling this.
883  *
884  * Return: %TRUE on success.
885  */
886 gboolean
887 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
888     GstElement * rtpbin)
889 {
890   gint i;
891
892   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
893   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
894   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
895
896   g_mutex_lock (&stream->lock);
897   if (!stream->is_joined)
898     goto was_not_joined;
899
900   /* all transports must be removed by now */
901   g_return_val_if_fail (stream->transports == NULL, FALSE);
902
903   GST_INFO ("stream %p leaving bin", stream);
904
905   gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
906   g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
907   gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
908   gst_object_unref (stream->send_rtp_sink);
909   stream->send_rtp_sink = NULL;
910
911   for (i = 0; i < 2; i++) {
912     /* and set udpsrc to NULL now before removing */
913     gst_element_set_locked_state (stream->udpsrc[i], FALSE);
914     gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
915
916     /* removing them should also nicely release the request
917      * pads when they finalize */
918     gst_bin_remove (bin, stream->udpsrc[i]);
919     gst_bin_remove (bin, stream->udpsink[i]);
920     gst_bin_remove (bin, stream->appsrc[i]);
921     gst_bin_remove (bin, stream->appsink[i]);
922     gst_bin_remove (bin, stream->appqueue[i]);
923     gst_bin_remove (bin, stream->tee[i]);
924     gst_bin_remove (bin, stream->funnel[i]);
925
926     gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
927     gst_object_unref (stream->recv_sink[i]);
928     stream->recv_sink[i] = NULL;
929
930     stream->udpsrc[i] = NULL;
931     stream->udpsink[i] = NULL;
932     stream->appsrc[i] = NULL;
933     stream->appsink[i] = NULL;
934     stream->appqueue[i] = NULL;
935     stream->tee[i] = NULL;
936     stream->funnel[i] = NULL;
937   }
938   gst_object_unref (stream->send_src[0]);
939   stream->send_src[0] = NULL;
940
941   gst_element_release_request_pad (rtpbin, stream->send_src[1]);
942   gst_object_unref (stream->send_src[1]);
943   stream->send_src[1] = NULL;
944
945   g_object_unref (stream->session);
946   if (stream->caps)
947     gst_caps_unref (stream->caps);
948
949   stream->is_joined = FALSE;
950   g_mutex_unlock (&stream->lock);
951
952   return TRUE;
953
954 was_not_joined:
955   {
956     return TRUE;
957   }
958 }
959
960 /**
961  * gst_rtsp_stream_get_rtpinfo:
962  * @stream: a #GstRTSPStream
963  * @rtptime: result RTP timestamp
964  * @seq: result RTP seqnum
965  *
966  * Retrieve the current rtptime and seq. This is used to
967  * construct a RTPInfo reply header.
968  *
969  * Returns: %TRUE when rtptime and seq could be determined.
970  */
971 gboolean
972 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
973     guint * rtptime, guint * seq)
974 {
975   GObjectClass *payobjclass;
976
977   payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
978
979   if (!g_object_class_find_property (payobjclass, "seqnum") ||
980       !g_object_class_find_property (payobjclass, "timestamp"))
981     return FALSE;
982
983   g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
984
985   return TRUE;
986 }
987
988 /**
989  * gst_rtsp_stream_recv_rtp:
990  * @stream: a #GstRTSPStream
991  * @buffer: (transfer full): a #GstBuffer
992  *
993  * Handle an RTP buffer for the stream. This method is usually called when a
994  * message has been received from a client using the TCP transport.
995  *
996  * This function takes ownership of @buffer.
997  *
998  * Returns: a GstFlowReturn.
999  */
1000 GstFlowReturn
1001 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1002 {
1003   GstFlowReturn ret;
1004   GstElement *element;
1005
1006   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1007   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1008   g_return_val_if_fail (stream->is_joined, FALSE);
1009
1010   g_mutex_lock (&stream->lock);
1011   element = gst_object_ref (stream->appsrc[0]);
1012   g_mutex_unlock (&stream->lock);
1013
1014   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1015
1016   gst_object_unref (element);
1017
1018   return ret;
1019 }
1020
1021 /**
1022  * gst_rtsp_stream_recv_rtcp:
1023  * @stream: a #GstRTSPStream
1024  * @buffer: (transfer full): a #GstBuffer
1025  *
1026  * Handle an RTCP buffer for the stream. This method is usually called when a
1027  * message has been received from a client using the TCP transport.
1028  *
1029  * This function takes ownership of @buffer.
1030  *
1031  * Returns: a GstFlowReturn.
1032  */
1033 GstFlowReturn
1034 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1035 {
1036   GstFlowReturn ret;
1037   GstElement *element;
1038
1039   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1040   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1041   g_return_val_if_fail (stream->is_joined, FALSE);
1042
1043   g_mutex_lock (&stream->lock);
1044   element = gst_object_ref (stream->appsrc[1]);
1045   g_mutex_unlock (&stream->lock);
1046
1047   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1048
1049   gst_object_unref (element);
1050
1051   return ret;
1052 }
1053
1054 /* must be called with lock */
1055 static gboolean
1056 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1057     gboolean add)
1058 {
1059   GstRTSPTransport *tr;
1060   gboolean updated;
1061
1062   updated = FALSE;
1063
1064   tr = trans->transport;
1065
1066   switch (tr->lower_transport) {
1067     case GST_RTSP_LOWER_TRANS_UDP:
1068     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1069     {
1070       gchar *dest;
1071       gint min, max;
1072       guint ttl = 0;
1073
1074       dest = tr->destination;
1075       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1076         min = tr->port.min;
1077         max = tr->port.max;
1078         ttl = tr->ttl;
1079       } else {
1080         min = tr->client_port.min;
1081         max = tr->client_port.max;
1082       }
1083
1084       if (add && !trans->active) {
1085         GST_INFO ("adding %s:%d-%d", dest, min, max);
1086         g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1087         g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1088         if (ttl > 0) {
1089           GST_INFO ("setting ttl-mc %d", ttl);
1090           g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
1091           g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
1092         }
1093         stream->transports = g_list_prepend (stream->transports, trans);
1094         trans->active = TRUE;
1095         updated = TRUE;
1096       } else if (trans->active) {
1097         GST_INFO ("removing %s:%d-%d", dest, min, max);
1098         g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1099         g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1100         stream->transports = g_list_remove (stream->transports, trans);
1101         trans->active = FALSE;
1102         updated = TRUE;
1103       }
1104       break;
1105     }
1106     case GST_RTSP_LOWER_TRANS_TCP:
1107       if (add && !trans->active) {
1108         GST_INFO ("adding TCP %s", tr->destination);
1109         stream->transports = g_list_prepend (stream->transports, trans);
1110         trans->active = TRUE;
1111         updated = TRUE;
1112       } else if (trans->active) {
1113         GST_INFO ("removing TCP %s", tr->destination);
1114         stream->transports = g_list_remove (stream->transports, trans);
1115         trans->active = FALSE;
1116         updated = TRUE;
1117       }
1118       break;
1119     default:
1120       GST_INFO ("Unknown transport %d", tr->lower_transport);
1121       break;
1122   }
1123   return updated;
1124 }
1125
1126
1127 /**
1128  * gst_rtsp_stream_add_transport:
1129  * @stream: a #GstRTSPStream
1130  * @trans: a #GstRTSPStreamTransport
1131  *
1132  * Add the transport in @trans to @stream. The media of @stream will
1133  * then also be send to the values configured in @trans.
1134  *
1135  * @stream must be joined to a bin.
1136  *
1137  * @trans must contain a valid #GstRTSPTransport.
1138  *
1139  * Returns: %TRUE if @trans was added
1140  */
1141 gboolean
1142 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1143     GstRTSPStreamTransport * trans)
1144 {
1145   gboolean res;
1146
1147   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1148   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1149   g_return_val_if_fail (stream->is_joined, FALSE);
1150   g_return_val_if_fail (trans->transport != NULL, FALSE);
1151
1152   g_mutex_lock (&stream->lock);
1153   res = update_transport (stream, trans, TRUE);
1154   g_mutex_unlock (&stream->lock);
1155
1156   return res;
1157 }
1158
1159 /**
1160  * gst_rtsp_stream_remove_transport:
1161  * @stream: a #GstRTSPStream
1162  * @trans: a #GstRTSPStreamTransport
1163  *
1164  * Remove the transport in @trans from @stream. The media of @stream will
1165  * not be sent to the values configured in @trans.
1166  *
1167  * @stream must be joined to a bin.
1168  *
1169  * @trans must contain a valid #GstRTSPTransport.
1170  *
1171  * Returns: %TRUE if @trans was removed
1172  */
1173 gboolean
1174 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1175     GstRTSPStreamTransport * trans)
1176 {
1177   gboolean res;
1178
1179   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1180   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1181   g_return_val_if_fail (stream->is_joined, FALSE);
1182   g_return_val_if_fail (trans->transport != NULL, FALSE);
1183
1184   g_mutex_lock (&stream->lock);
1185   res = update_transport (stream, trans, FALSE);
1186   g_mutex_unlock (&stream->lock);
1187
1188   return res;
1189 }