stream-transport: add method to handle RTP/RTCP
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 enum
31 {
32   PROP_0,
33   PROP_LAST
34 };
35
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
38
39 static GQuark ssrc_stream_map_key;
40
41 static void gst_rtsp_stream_finalize (GObject * obj);
42
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
44
45 static void
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
47 {
48   GObjectClass *gobject_class;
49
50   gobject_class = G_OBJECT_CLASS (klass);
51
52   gobject_class->finalize = gst_rtsp_stream_finalize;
53
54   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
55
56   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
57 }
58
59 static void
60 gst_rtsp_stream_init (GstRTSPStream * media)
61 {
62 }
63
64 static void
65 gst_rtsp_stream_finalize (GObject * obj)
66 {
67   GstRTSPStream *stream;
68
69   stream = GST_RTSP_STREAM (obj);
70
71   /* we really need to be unjoined now */
72   g_return_if_fail (!stream->is_joined);
73
74   gst_object_unref (stream->payloader);
75   gst_object_unref (stream->srcpad);
76
77   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
78 }
79
80 /**
81  * gst_rtsp_stream_new:
82  * @idx: an index
83  * @srcpad: a #GstPad
84  * @payloader: a #GstElement
85  *
86  * Create a new media stream with index @idx that handles RTP data on
87  * @srcpad and has a payloader element @payloader.
88  *
89  * Returns: a new #GstRTSPStream
90  */
91 GstRTSPStream *
92 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
93 {
94   GstRTSPStream *stream;
95
96   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
97   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
98   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
99
100   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
101   stream->idx = idx;
102   stream->payloader = gst_object_ref (payloader);
103   stream->srcpad = gst_object_ref (srcpad);
104
105   return stream;
106 }
107
108 /**
109  * gst_rtsp_stream_set_mtu:
110  * @stream: a #GstRTSPStream
111  * @mtu: a new MTU
112  *
113  * Configure the mtu in the payloader of @stream to @mtu.
114  */
115 void
116 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
117 {
118   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
119
120   g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
121 }
122
123 /**
124  * gst_rtsp_stream_get_mtu:
125  * @stream: a #GstRTSPStream
126  *
127  * Get the configured MTU in the payloader of @stream.
128  *
129  * Returns: the MTU of the payloader.
130  */
131 guint
132 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
133 {
134   guint mtu;
135
136   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
137
138   g_object_get (G_OBJECT (stream->payloader), "mtu", &mtu, NULL);
139
140   return mtu;
141 }
142
143 static gboolean
144 alloc_ports (GstRTSPStream * stream)
145 {
146   GstStateChangeReturn ret;
147   GstElement *udpsrc0, *udpsrc1;
148   GstElement *udpsink0, *udpsink1;
149   gint tmp_rtp, tmp_rtcp;
150   guint count;
151   gint rtpport, rtcpport;
152   GSocket *socket;
153   const gchar *host;
154
155   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
156
157   udpsrc0 = NULL;
158   udpsrc1 = NULL;
159   udpsink0 = NULL;
160   udpsink1 = NULL;
161   count = 0;
162
163   /* Start with random port */
164   tmp_rtp = 0;
165
166   if (stream->is_ipv6)
167     host = "udp://[::0]";
168   else
169     host = "udp://0.0.0.0";
170
171   /* try to allocate 2 UDP ports, the RTP port should be an even
172    * number and the RTCP port should be the next (uneven) port */
173 again:
174   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
175   if (udpsrc0 == NULL)
176     goto no_udp_protocol;
177   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
178
179   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
180   if (ret == GST_STATE_CHANGE_FAILURE) {
181     if (tmp_rtp != 0) {
182       tmp_rtp += 2;
183       if (++count > 20)
184         goto no_ports;
185
186       gst_element_set_state (udpsrc0, GST_STATE_NULL);
187       gst_object_unref (udpsrc0);
188
189       goto again;
190     }
191     goto no_udp_protocol;
192   }
193
194   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
195
196   /* check if port is even */
197   if ((tmp_rtp & 1) != 0) {
198     /* port not even, close and allocate another */
199     if (++count > 20)
200       goto no_ports;
201
202     gst_element_set_state (udpsrc0, GST_STATE_NULL);
203     gst_object_unref (udpsrc0);
204
205     tmp_rtp++;
206     goto again;
207   }
208
209   /* allocate port+1 for RTCP now */
210   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
211   if (udpsrc1 == NULL)
212     goto no_udp_rtcp_protocol;
213
214   /* set port */
215   tmp_rtcp = tmp_rtp + 1;
216   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
217
218   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
219   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
220   if (ret == GST_STATE_CHANGE_FAILURE) {
221
222     if (++count > 20)
223       goto no_ports;
224
225     gst_element_set_state (udpsrc0, GST_STATE_NULL);
226     gst_object_unref (udpsrc0);
227
228     gst_element_set_state (udpsrc1, GST_STATE_NULL);
229     gst_object_unref (udpsrc1);
230
231     tmp_rtp += 2;
232     goto again;
233   }
234   /* all fine, do port check */
235   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
236   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
237
238   /* this should not happen... */
239   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
240     goto port_error;
241
242   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
243   if (!udpsink0)
244     goto no_udp_protocol;
245
246   g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
247   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
248   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
249
250   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
251   if (!udpsink1)
252     goto no_udp_protocol;
253
254   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
255           "send-duplicates")) {
256     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
257     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
258   } else {
259     g_warning
260         ("old multiudpsink version found without send-duplicates property");
261   }
262
263   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
264           "buffer-size")) {
265     g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
266         NULL);
267   } else {
268     GST_WARNING ("multiudpsink version found without buffer-size property");
269   }
270
271   g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
272   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
273   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
274   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
275   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
276   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
277   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
278   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
279   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
280
281   /* we keep these elements, we will further configure them when the
282    * client told us to really use the UDP ports. */
283   stream->udpsrc[0] = udpsrc0;
284   stream->udpsrc[1] = udpsrc1;
285   stream->udpsink[0] = udpsink0;
286   stream->udpsink[1] = udpsink1;
287   stream->server_port.min = rtpport;
288   stream->server_port.max = rtcpport;
289
290   return TRUE;
291
292   /* ERRORS */
293 no_udp_protocol:
294   {
295     goto cleanup;
296   }
297 no_ports:
298   {
299     goto cleanup;
300   }
301 no_udp_rtcp_protocol:
302   {
303     goto cleanup;
304   }
305 port_error:
306   {
307     goto cleanup;
308   }
309 cleanup:
310   {
311     if (udpsrc0) {
312       gst_element_set_state (udpsrc0, GST_STATE_NULL);
313       gst_object_unref (udpsrc0);
314     }
315     if (udpsrc1) {
316       gst_element_set_state (udpsrc1, GST_STATE_NULL);
317       gst_object_unref (udpsrc1);
318     }
319     if (udpsink0) {
320       gst_element_set_state (udpsink0, GST_STATE_NULL);
321       gst_object_unref (udpsink0);
322     }
323     if (udpsink1) {
324       gst_element_set_state (udpsink1, GST_STATE_NULL);
325       gst_object_unref (udpsink1);
326     }
327     return FALSE;
328   }
329 }
330
331 /* executed from streaming thread */
332 static void
333 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
334 {
335   GstCaps *newcaps, *oldcaps;
336
337   newcaps = gst_pad_get_current_caps (pad);
338
339   oldcaps = stream->caps;
340   stream->caps = newcaps;
341
342   if (oldcaps)
343     gst_caps_unref (oldcaps);
344
345   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
346       newcaps);
347 }
348
349 static void
350 dump_structure (const GstStructure * s)
351 {
352   gchar *sstr;
353
354   sstr = gst_structure_to_string (s);
355   GST_INFO ("structure: %s", sstr);
356   g_free (sstr);
357 }
358
359 static GstRTSPStreamTransport *
360 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
361 {
362   GList *walk;
363   GstRTSPStreamTransport *result = NULL;
364   const gchar *tmp;
365   gchar *dest;
366   guint port;
367
368   if (rtcp_from == NULL)
369     return NULL;
370
371   tmp = g_strrstr (rtcp_from, ":");
372   if (tmp == NULL)
373     return NULL;
374
375   port = atoi (tmp + 1);
376   dest = g_strndup (rtcp_from, tmp - rtcp_from);
377
378   GST_INFO ("finding %s:%d in %d transports", dest, port,
379       g_list_length (stream->transports));
380
381   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
382     GstRTSPStreamTransport *trans = walk->data;
383     gint min, max;
384
385     min = trans->transport->client_port.min;
386     max = trans->transport->client_port.max;
387
388     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
389             || max == port)) {
390       result = trans;
391       break;
392     }
393   }
394   g_free (dest);
395
396   return result;
397 }
398
399 static GstRTSPStreamTransport *
400 check_transport (GObject * source, GstRTSPStream * stream)
401 {
402   GstStructure *stats;
403   GstRTSPStreamTransport *trans;
404
405   /* see if we have a stream to match with the origin of the RTCP packet */
406   trans = g_object_get_qdata (source, ssrc_stream_map_key);
407   if (trans == NULL) {
408     g_object_get (source, "stats", &stats, NULL);
409     if (stats) {
410       const gchar *rtcp_from;
411
412       dump_structure (stats);
413
414       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
415       if ((trans = find_transport (stream, rtcp_from))) {
416         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
417             source);
418
419         /* keep ref to the source */
420         trans->rtpsource = source;
421
422         g_object_set_qdata (source, ssrc_stream_map_key, trans);
423       }
424       gst_structure_free (stats);
425     }
426   }
427
428   return trans;
429 }
430
431
432 static void
433 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
434 {
435   GstRTSPStreamTransport *trans;
436
437   GST_INFO ("%p: new source %p", stream, source);
438
439   trans = check_transport (source, stream);
440
441   if (trans)
442     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
443 }
444
445 static void
446 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
447 {
448   GST_INFO ("%p: new SDES %p", stream, source);
449 }
450
451 static void
452 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
453 {
454   GstRTSPStreamTransport *trans;
455
456   trans = check_transport (source, stream);
457
458   if (trans)
459     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
460
461   if (trans && trans->keep_alive)
462     trans->keep_alive (trans->ka_user_data);
463
464 #ifdef DUMP_STATS
465   {
466     GstStructure *stats;
467     g_object_get (source, "stats", &stats, NULL);
468     if (stats) {
469       dump_structure (stats);
470       gst_structure_free (stats);
471     }
472   }
473 #endif
474 }
475
476 static void
477 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
478 {
479   GST_INFO ("%p: source %p bye", stream, source);
480 }
481
482 static void
483 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
484 {
485   GstRTSPStreamTransport *trans;
486
487   GST_INFO ("%p: source %p bye timeout", stream, source);
488
489   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
490     trans->rtpsource = NULL;
491     trans->timeout = TRUE;
492   }
493 }
494
495 static void
496 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
497 {
498   GstRTSPStreamTransport *trans;
499
500   GST_INFO ("%p: source %p timeout", stream, source);
501
502   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
503     trans->rtpsource = NULL;
504     trans->timeout = TRUE;
505   }
506 }
507
508 static GstFlowReturn
509 handle_new_sample (GstAppSink * sink, gpointer user_data)
510 {
511   GList *walk;
512   GstSample *sample;
513   GstBuffer *buffer;
514   GstRTSPStream *stream;
515
516   sample = gst_app_sink_pull_sample (sink);
517   if (!sample)
518     return GST_FLOW_OK;
519
520   stream = (GstRTSPStream *) user_data;
521   buffer = gst_sample_get_buffer (sample);
522
523   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
524     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
525
526     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
527       gst_rtsp_stream_transport_send_rtp (tr, buffer);
528     } else {
529       gst_rtsp_stream_transport_send_rtcp (tr, buffer);
530     }
531   }
532   gst_sample_unref (sample);
533
534   return GST_FLOW_OK;
535 }
536
537 static GstAppSinkCallbacks sink_cb = {
538   NULL,                         /* not interested in EOS */
539   NULL,                         /* not interested in preroll samples */
540   handle_new_sample,
541 };
542
543 /**
544  * gst_rtsp_stream_join_bin:
545  * @stream: a #GstRTSPStream
546  * @bin: a #GstBin to join
547  * @rtpbin: a rtpbin element in @bin
548  * @state: the target state of the new elements
549  *
550  * Join the #Gstbin @bin that contains the element @rtpbin.
551  *
552  * @stream will link to @rtpbin, which must be inside @bin. The elements
553  * added to @bin will be set to the state given in @state.
554  *
555  * Returns: %TRUE on success.
556  */
557 gboolean
558 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
559     GstElement * rtpbin, GstState state)
560 {
561   gint i, idx;
562   gchar *name;
563   GstPad *pad, *teepad, *queuepad, *selpad;
564   GstPadLinkReturn ret;
565
566   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
567   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
568   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
569
570   if (stream->is_joined)
571     return TRUE;
572
573   /* create a session with the same index as the stream */
574   idx = stream->idx;
575
576   GST_INFO ("stream %p joining bin as session %d", stream, idx);
577
578   if (!alloc_ports (stream))
579     goto no_ports;
580
581   /* get a pad for sending RTP */
582   name = g_strdup_printf ("send_rtp_sink_%u", idx);
583   stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
584   g_free (name);
585   /* link the RTP pad to the session manager, it should not really fail unless
586    * this is not really an RTP pad */
587   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
588   if (ret != GST_PAD_LINK_OK)
589     goto link_failed;
590
591   /* get pads from the RTP session element for sending and receiving
592    * RTP/RTCP*/
593   name = g_strdup_printf ("send_rtp_src_%u", idx);
594   stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
595   g_free (name);
596   name = g_strdup_printf ("send_rtcp_src_%u", idx);
597   stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
598   g_free (name);
599   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
600   stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
601   g_free (name);
602   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
603   stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
604   g_free (name);
605
606   /* get the session */
607   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
608
609   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
610       stream);
611   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
612       stream);
613   g_signal_connect (stream->session, "on-ssrc-active",
614       (GCallback) on_ssrc_active, stream);
615   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
616       stream);
617   g_signal_connect (stream->session, "on-bye-timeout",
618       (GCallback) on_bye_timeout, stream);
619   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
620       stream);
621
622   for (i = 0; i < 2; i++) {
623     /* For the sender we create this bit of pipeline for both
624      * RTP and RTCP. Sync and preroll are enabled on udpsink so
625      * we need to add a queue before appsink to make the pipeline
626      * not block. For the TCP case, we want to pump data to the
627      * client as fast as possible anyway.
628      *
629      * .--------.      .-----.    .---------.
630      * | rtpbin |      | tee |    | udpsink |
631      * |       send->sink   src->sink       |
632      * '--------'      |     |    '---------'
633      *                 |     |    .---------.    .---------.
634      *                 |     |    |  queue  |    | appsink |
635      *                 |    src->sink      src->sink       |
636      *                 '-----'    '---------'    '---------'
637      */
638     /* make tee for RTP/RTCP */
639     stream->tee[i] = gst_element_factory_make ("tee", NULL);
640     gst_bin_add (bin, stream->tee[i]);
641
642     /* and link to rtpbin send pad */
643     pad = gst_element_get_static_pad (stream->tee[i], "sink");
644     gst_pad_link (stream->send_src[i], pad);
645     gst_object_unref (pad);
646
647     /* add udpsink */
648     gst_bin_add (bin, stream->udpsink[i]);
649
650     /* link tee to udpsink */
651     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
652     pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
653     gst_pad_link (teepad, pad);
654     gst_object_unref (pad);
655     gst_object_unref (teepad);
656
657     /* make queue */
658     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
659     gst_bin_add (bin, stream->appqueue[i]);
660     /* and link to tee */
661     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
662     pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
663     gst_pad_link (teepad, pad);
664     gst_object_unref (pad);
665     gst_object_unref (teepad);
666
667     /* make appsink */
668     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
669     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
670     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
671     gst_bin_add (bin, stream->appsink[i]);
672     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
673         &sink_cb, stream, NULL);
674     /* and link to queue */
675     queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
676     pad = gst_element_get_static_pad (stream->appsink[i], "sink");
677     gst_pad_link (queuepad, pad);
678     gst_object_unref (pad);
679     gst_object_unref (queuepad);
680
681     /* For the receiver we create this bit of pipeline for both
682      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
683      * and it is all funneled into the rtpbin receive pad.
684      *
685      * .--------.     .--------.    .--------.
686      * | udpsrc |     | funnel |    | rtpbin |
687      * |       src->sink      src->sink      |
688      * '--------'     |        |    '--------'
689      * .--------.     |        |
690      * | appsrc |     |        |
691      * |       src->sink       |
692      * '--------'     '--------'
693      */
694     /* make funnel for the RTP/RTCP receivers */
695     stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
696     gst_bin_add (bin, stream->funnel[i]);
697
698     pad = gst_element_get_static_pad (stream->funnel[i], "src");
699     gst_pad_link (pad, stream->recv_sink[i]);
700     gst_object_unref (pad);
701
702     /* add udpsrc */
703     gst_bin_add (bin, stream->udpsrc[i]);
704     /* and link to the funnel */
705     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
706     pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
707     gst_pad_link (pad, selpad);
708     gst_object_unref (pad);
709     gst_object_unref (selpad);
710
711     /* make and add appsrc */
712     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
713     gst_bin_add (bin, stream->appsrc[i]);
714     /* and link to the funnel */
715     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
716     pad = gst_element_get_static_pad (stream->appsrc[i], "src");
717     gst_pad_link (pad, selpad);
718     gst_object_unref (pad);
719     gst_object_unref (selpad);
720
721     /* check if we need to set to a special state */
722     if (state != GST_STATE_NULL) {
723       gst_element_set_state (stream->udpsink[i], state);
724       gst_element_set_state (stream->appsink[i], state);
725       gst_element_set_state (stream->appqueue[i], state);
726       gst_element_set_state (stream->tee[i], state);
727       gst_element_set_state (stream->funnel[i], state);
728       gst_element_set_state (stream->appsrc[i], state);
729     }
730     /* we set and keep these to playing so that they don't cause NO_PREROLL return
731      * values */
732     gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
733     gst_element_set_locked_state (stream->udpsrc[i], TRUE);
734   }
735
736   /* be notified of caps changes */
737   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
738       (GCallback) caps_notify, stream);
739
740   stream->is_joined = TRUE;
741
742   return TRUE;
743
744   /* ERRORS */
745 no_ports:
746   {
747     GST_WARNING ("failed to allocate ports %d", idx);
748     return FALSE;
749   }
750 link_failed:
751   {
752     GST_WARNING ("failed to link stream %d", idx);
753     gst_object_unref (stream->send_rtp_sink);
754     stream->send_rtp_sink = NULL;
755     return FALSE;
756   }
757 }
758
759 /**
760  * gst_rtsp_stream_leave_bin:
761  * @stream: a #GstRTSPStream
762  * @bin: a #GstBin
763  * @rtpbin: a rtpbin #GstElement
764  *
765  * Remove the elements of @stream from @bin. @bin must be set
766  * to the NULL state before calling this.
767  *
768  * Return: %TRUE on success.
769  */
770 gboolean
771 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
772     GstElement * rtpbin)
773 {
774   gint i;
775
776   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
777   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
778   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
779
780   if (!stream->is_joined)
781     return TRUE;
782
783   /* all transports must be removed by now */
784   g_return_val_if_fail (stream->transports == NULL, FALSE);
785
786   GST_INFO ("stream %p leaving bin", stream);
787
788   gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
789   g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
790   gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
791   gst_object_unref (stream->send_rtp_sink);
792   stream->send_rtp_sink = NULL;
793
794   for (i = 0; i < 2; i++) {
795     /* and set udpsrc to NULL now before removing */
796     gst_element_set_locked_state (stream->udpsrc[i], FALSE);
797     gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
798
799     /* removing them should also nicely release the request
800      * pads when they finalize */
801     gst_bin_remove (bin, stream->udpsrc[i]);
802     gst_bin_remove (bin, stream->udpsink[i]);
803     gst_bin_remove (bin, stream->appsrc[i]);
804     gst_bin_remove (bin, stream->appsink[i]);
805     gst_bin_remove (bin, stream->appqueue[i]);
806     gst_bin_remove (bin, stream->tee[i]);
807     gst_bin_remove (bin, stream->funnel[i]);
808
809     gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
810     gst_object_unref (stream->recv_sink[i]);
811     stream->recv_sink[i] = NULL;
812
813     stream->udpsrc[i] = NULL;
814     stream->udpsink[i] = NULL;
815     stream->appsrc[i] = NULL;
816     stream->appsink[i] = NULL;
817     stream->appqueue[i] = NULL;
818     stream->tee[i] = NULL;
819     stream->funnel[i] = NULL;
820   }
821   gst_object_unref (stream->send_src[0]);
822   stream->send_src[0] = NULL;
823
824   gst_element_release_request_pad (rtpbin, stream->send_src[1]);
825   gst_object_unref (stream->send_src[1]);
826   stream->send_src[1] = NULL;
827
828   g_object_unref (stream->session);
829   if (stream->caps)
830     gst_caps_unref (stream->caps);
831
832   stream->is_joined = FALSE;
833
834   return TRUE;
835 }
836
837 /**
838  * gst_rtsp_stream_get_rtpinfo:
839  * @stream: a #GstRTSPStream
840  * @rtptime: result RTP timestamp
841  * @seq: result RTP seqnum
842  *
843  * Retrieve the current rtptime and seq. This is used to
844  * construct a RTPInfo reply header.
845  *
846  * Returns: %TRUE when rtptime and seq could be determined.
847  */
848 gboolean
849 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
850     guint * rtptime, guint * seq)
851 {
852   GObjectClass *payobjclass;
853
854   payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
855
856   if (!g_object_class_find_property (payobjclass, "seqnum") ||
857       !g_object_class_find_property (payobjclass, "timestamp"))
858     return FALSE;
859
860   g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
861
862   return TRUE;
863 }
864
865 /**
866  * gst_rtsp_stream_recv_rtp:
867  * @stream: a #GstRTSPStream
868  * @buffer: (transfer full): a #GstBuffer
869  *
870  * Handle an RTP buffer for the stream. This method is usually called when a
871  * message has been received from a client using the TCP transport.
872  *
873  * This function takes ownership of @buffer.
874  *
875  * Returns: a GstFlowReturn.
876  */
877 GstFlowReturn
878 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
879 {
880   GstFlowReturn ret;
881
882   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
883   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
884   g_return_val_if_fail (stream->is_joined, FALSE);
885
886   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
887
888   return ret;
889 }
890
891 /**
892  * gst_rtsp_stream_recv_rtcp:
893  * @stream: a #GstRTSPStream
894  * @buffer: (transfer full): a #GstBuffer
895  *
896  * Handle an RTCP buffer for the stream. This method is usually called when a
897  * message has been received from a client using the TCP transport.
898  *
899  * This function takes ownership of @buffer.
900  *
901  * Returns: a GstFlowReturn.
902  */
903 GstFlowReturn
904 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
905 {
906   GstFlowReturn ret;
907
908   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
909   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
910   g_return_val_if_fail (stream->is_joined, FALSE);
911
912   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
913
914   return ret;
915 }
916
917 static gboolean
918 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
919     gboolean add)
920 {
921   GstRTSPTransport *tr;
922   gboolean updated;
923
924   updated = FALSE;
925
926   tr = trans->transport;
927
928   switch (tr->lower_transport) {
929     case GST_RTSP_LOWER_TRANS_UDP:
930     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
931     {
932       gchar *dest;
933       gint min, max;
934       guint ttl = 0;
935
936       dest = tr->destination;
937       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
938         min = tr->port.min;
939         max = tr->port.max;
940         ttl = tr->ttl;
941       } else {
942         min = tr->client_port.min;
943         max = tr->client_port.max;
944       }
945
946       if (add && !trans->active) {
947         GST_INFO ("adding %s:%d-%d", dest, min, max);
948         g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
949         g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
950         if (ttl > 0) {
951           GST_INFO ("setting ttl-mc %d", ttl);
952           g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
953           g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
954         }
955         stream->transports = g_list_prepend (stream->transports, trans);
956         trans->active = TRUE;
957         updated = TRUE;
958       } else if (trans->active) {
959         GST_INFO ("removing %s:%d-%d", dest, min, max);
960         g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
961         g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
962         stream->transports = g_list_remove (stream->transports, trans);
963         trans->active = FALSE;
964         updated = TRUE;
965       }
966       break;
967     }
968     case GST_RTSP_LOWER_TRANS_TCP:
969       if (add && !trans->active) {
970         GST_INFO ("adding TCP %s", tr->destination);
971         stream->transports = g_list_prepend (stream->transports, trans);
972         trans->active = TRUE;
973         updated = TRUE;
974       } else if (trans->active) {
975         GST_INFO ("removing TCP %s", tr->destination);
976         stream->transports = g_list_remove (stream->transports, trans);
977         trans->active = FALSE;
978         updated = TRUE;
979       }
980       break;
981     default:
982       GST_INFO ("Unknown transport %d", tr->lower_transport);
983       break;
984   }
985   return updated;
986 }
987
988
989 /**
990  * gst_rtsp_stream_add_transport:
991  * @stream: a #GstRTSPStream
992  * @trans: a #GstRTSPStreamTransport
993  *
994  * Add the transport in @trans to @stream. The media of @stream will
995  * then also be send to the values configured in @trans.
996  *
997  * @stream must be joined to a bin.
998  *
999  * @trans must contain a valid #GstRTSPTransport.
1000  *
1001  * Returns: %TRUE if @trans was added
1002  */
1003 gboolean
1004 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1005     GstRTSPStreamTransport * trans)
1006 {
1007   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1008   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1009   g_return_val_if_fail (stream->is_joined, FALSE);
1010   g_return_val_if_fail (trans->transport != NULL, FALSE);
1011
1012   return update_transport (stream, trans, TRUE);
1013 }
1014
1015 /**
1016  * gst_rtsp_stream_remove_transport:
1017  * @stream: a #GstRTSPStream
1018  * @trans: a #GstRTSPStreamTransport
1019  *
1020  * Remove the transport in @trans from @stream. The media of @stream will
1021  * not be sent to the values configured in @trans.
1022  *
1023  * @stream must be joined to a bin.
1024  *
1025  * @trans must contain a valid #GstRTSPTransport.
1026  *
1027  * Returns: %TRUE if @trans was removed
1028  */
1029 gboolean
1030 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1031     GstRTSPStreamTransport * trans)
1032 {
1033   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1034   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1035   g_return_val_if_fail (stream->is_joined, FALSE);
1036   g_return_val_if_fail (trans->transport != NULL, FALSE);
1037
1038   return update_transport (stream, trans, FALSE);
1039 }