rtsp: massive refactoring
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 enum
31 {
32   PROP_0,
33   PROP_LAST
34 };
35
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
38
39 static GQuark ssrc_stream_map_key;
40
41 static void gst_rtsp_stream_finalize (GObject * obj);
42
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
44
45 static void
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
47 {
48   GObjectClass *gobject_class;
49
50   gobject_class = G_OBJECT_CLASS (klass);
51
52   gobject_class->finalize = gst_rtsp_stream_finalize;
53
54   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
55
56   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
57 }
58
59 static void
60 gst_rtsp_stream_init (GstRTSPStream * media)
61 {
62 }
63
64 static void
65 gst_rtsp_stream_finalize (GObject * obj)
66 {
67   GstRTSPStream *stream;
68
69   stream = GST_RTSP_STREAM (obj);
70
71   g_assert (!stream->is_joined);
72
73   gst_object_unref (stream->payloader);
74   gst_object_unref (stream->srcpad);
75
76   if (stream->session)
77     g_object_unref (stream->session);
78
79   if (stream->caps)
80     gst_caps_unref (stream->caps);
81
82   if (stream->send_rtp_sink)
83     gst_object_unref (stream->send_rtp_sink);
84   if (stream->send_rtp_src)
85     gst_object_unref (stream->send_rtp_src);
86   if (stream->send_rtcp_src)
87     gst_object_unref (stream->send_rtcp_src);
88   if (stream->recv_rtcp_sink)
89     gst_object_unref (stream->recv_rtcp_sink);
90   if (stream->recv_rtp_sink)
91     gst_object_unref (stream->recv_rtp_sink);
92
93   g_list_free (stream->transports);
94
95   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
96 }
97
98 /**
99  * gst_rtsp_stream_new:
100  * @idx: an index
101  * @srcpad: a #GstPad
102  * @payloader: a #GstElement
103  *
104  * Create a new media stream with index @idx that handles RTP data on
105  * @srcpad and has a payloader element @payloader.
106  *
107  * Returns: a new #GstRTSPStream
108  */
109 GstRTSPStream *
110 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
111 {
112   GstRTSPStream *stream;
113
114   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
115   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
116   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
117
118   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
119   stream->idx = idx;
120   stream->payloader = gst_object_ref (payloader);
121   stream->srcpad = gst_object_ref (srcpad);
122
123   return stream;
124 }
125
126 static gboolean
127 alloc_ports (GstRTSPStream * stream)
128 {
129   GstStateChangeReturn ret;
130   GstElement *udpsrc0, *udpsrc1;
131   GstElement *udpsink0, *udpsink1;
132   gint tmp_rtp, tmp_rtcp;
133   guint count;
134   gint rtpport, rtcpport;
135   GSocket *socket;
136   const gchar *host;
137
138   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
139
140   udpsrc0 = NULL;
141   udpsrc1 = NULL;
142   udpsink0 = NULL;
143   udpsink1 = NULL;
144   count = 0;
145
146   /* Start with random port */
147   tmp_rtp = 0;
148
149   if (stream->is_ipv6)
150     host = "udp://[::0]";
151   else
152     host = "udp://0.0.0.0";
153
154   /* try to allocate 2 UDP ports, the RTP port should be an even
155    * number and the RTCP port should be the next (uneven) port */
156 again:
157   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
158   if (udpsrc0 == NULL)
159     goto no_udp_protocol;
160   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
161
162   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
163   if (ret == GST_STATE_CHANGE_FAILURE) {
164     if (tmp_rtp != 0) {
165       tmp_rtp += 2;
166       if (++count > 20)
167         goto no_ports;
168
169       gst_element_set_state (udpsrc0, GST_STATE_NULL);
170       gst_object_unref (udpsrc0);
171
172       goto again;
173     }
174     goto no_udp_protocol;
175   }
176
177   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
178
179   /* check if port is even */
180   if ((tmp_rtp & 1) != 0) {
181     /* port not even, close and allocate another */
182     if (++count > 20)
183       goto no_ports;
184
185     gst_element_set_state (udpsrc0, GST_STATE_NULL);
186     gst_object_unref (udpsrc0);
187
188     tmp_rtp++;
189     goto again;
190   }
191
192   /* allocate port+1 for RTCP now */
193   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
194   if (udpsrc1 == NULL)
195     goto no_udp_rtcp_protocol;
196
197   /* set port */
198   tmp_rtcp = tmp_rtp + 1;
199   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
200
201   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
202   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
203   if (ret == GST_STATE_CHANGE_FAILURE) {
204
205     if (++count > 20)
206       goto no_ports;
207
208     gst_element_set_state (udpsrc0, GST_STATE_NULL);
209     gst_object_unref (udpsrc0);
210
211     gst_element_set_state (udpsrc1, GST_STATE_NULL);
212     gst_object_unref (udpsrc1);
213
214     tmp_rtp += 2;
215     goto again;
216   }
217   /* all fine, do port check */
218   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
219   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
220
221   /* this should not happen... */
222   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
223     goto port_error;
224
225   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
226   if (!udpsink0)
227     goto no_udp_protocol;
228
229   g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
230   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
231   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
232
233   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
234   if (!udpsink1)
235     goto no_udp_protocol;
236
237   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
238           "send-duplicates")) {
239     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
240     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
241   } else {
242     g_warning
243         ("old multiudpsink version found without send-duplicates property");
244   }
245
246   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
247           "buffer-size")) {
248     g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
249         NULL);
250   } else {
251     GST_WARNING ("multiudpsink version found without buffer-size property");
252   }
253
254   g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
255   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
256   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
257   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
258   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
259   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
260   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
261   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
262   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
263
264   /* we keep these elements, we will further configure them when the
265    * client told us to really use the UDP ports. */
266   stream->udpsrc[0] = udpsrc0;
267   stream->udpsrc[1] = udpsrc1;
268   stream->udpsink[0] = udpsink0;
269   stream->udpsink[1] = udpsink1;
270   stream->server_port.min = rtpport;
271   stream->server_port.max = rtcpport;
272
273   return TRUE;
274
275   /* ERRORS */
276 no_udp_protocol:
277   {
278     goto cleanup;
279   }
280 no_ports:
281   {
282     goto cleanup;
283   }
284 no_udp_rtcp_protocol:
285   {
286     goto cleanup;
287   }
288 port_error:
289   {
290     goto cleanup;
291   }
292 cleanup:
293   {
294     if (udpsrc0) {
295       gst_element_set_state (udpsrc0, GST_STATE_NULL);
296       gst_object_unref (udpsrc0);
297     }
298     if (udpsrc1) {
299       gst_element_set_state (udpsrc1, GST_STATE_NULL);
300       gst_object_unref (udpsrc1);
301     }
302     if (udpsink0) {
303       gst_element_set_state (udpsink0, GST_STATE_NULL);
304       gst_object_unref (udpsink0);
305     }
306     if (udpsink1) {
307       gst_element_set_state (udpsink1, GST_STATE_NULL);
308       gst_object_unref (udpsink1);
309     }
310     return FALSE;
311   }
312 }
313
314 /* executed from streaming thread */
315 static void
316 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
317 {
318   GstCaps *newcaps, *oldcaps;
319
320   newcaps = gst_pad_get_current_caps (pad);
321
322   oldcaps = stream->caps;
323   stream->caps = newcaps;
324
325   if (oldcaps)
326     gst_caps_unref (oldcaps);
327
328   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
329       newcaps);
330 }
331
332 static void
333 dump_structure (const GstStructure * s)
334 {
335   gchar *sstr;
336
337   sstr = gst_structure_to_string (s);
338   GST_INFO ("structure: %s", sstr);
339   g_free (sstr);
340 }
341
342 static GstRTSPStreamTransport *
343 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
344 {
345   GList *walk;
346   GstRTSPStreamTransport *result = NULL;
347   const gchar *tmp;
348   gchar *dest;
349   guint port;
350
351   if (rtcp_from == NULL)
352     return NULL;
353
354   tmp = g_strrstr (rtcp_from, ":");
355   if (tmp == NULL)
356     return NULL;
357
358   port = atoi (tmp + 1);
359   dest = g_strndup (rtcp_from, tmp - rtcp_from);
360
361   GST_INFO ("finding %s:%d in %d transports", dest, port,
362       g_list_length (stream->transports));
363
364   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
365     GstRTSPStreamTransport *trans = walk->data;
366     gint min, max;
367
368     min = trans->transport->client_port.min;
369     max = trans->transport->client_port.max;
370
371     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
372             || max == port)) {
373       result = trans;
374       break;
375     }
376   }
377   g_free (dest);
378
379   return result;
380 }
381
382 static GstRTSPStreamTransport *
383 check_transport (GObject * source, GstRTSPStream * stream)
384 {
385   GstStructure *stats;
386   GstRTSPStreamTransport *trans;
387
388   /* see if we have a stream to match with the origin of the RTCP packet */
389   trans = g_object_get_qdata (source, ssrc_stream_map_key);
390   if (trans == NULL) {
391     g_object_get (source, "stats", &stats, NULL);
392     if (stats) {
393       const gchar *rtcp_from;
394
395       dump_structure (stats);
396
397       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
398       if ((trans = find_transport (stream, rtcp_from))) {
399         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
400             source);
401
402         /* keep ref to the source */
403         trans->rtpsource = source;
404
405         g_object_set_qdata (source, ssrc_stream_map_key, trans);
406       }
407       gst_structure_free (stats);
408     }
409   }
410
411   return trans;
412 }
413
414
415 static void
416 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
417 {
418   GstRTSPStreamTransport *trans;
419
420   GST_INFO ("%p: new source %p", stream, source);
421
422   trans = check_transport (source, stream);
423
424   if (trans)
425     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
426 }
427
428 static void
429 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
430 {
431   GST_INFO ("%p: new SDES %p", stream, source);
432 }
433
434 static void
435 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
436 {
437   GstRTSPStreamTransport *trans;
438
439   trans = check_transport (source, stream);
440
441   if (trans)
442     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
443
444   if (trans && trans->keep_alive)
445     trans->keep_alive (trans->ka_user_data);
446
447 #ifdef DUMP_STATS
448   {
449     GstStructure *stats;
450     g_object_get (source, "stats", &stats, NULL);
451     if (stats) {
452       dump_structure (stats);
453       gst_structure_free (stats);
454     }
455   }
456 #endif
457 }
458
459 static void
460 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
461 {
462   GST_INFO ("%p: source %p bye", stream, source);
463 }
464
465 static void
466 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
467 {
468   GstRTSPStreamTransport *trans;
469
470   GST_INFO ("%p: source %p bye timeout", stream, source);
471
472   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
473     trans->rtpsource = NULL;
474     trans->timeout = TRUE;
475   }
476 }
477
478 static void
479 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
480 {
481   GstRTSPStreamTransport *trans;
482
483   GST_INFO ("%p: source %p timeout", stream, source);
484
485   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
486     trans->rtpsource = NULL;
487     trans->timeout = TRUE;
488   }
489 }
490
491 static GstFlowReturn
492 handle_new_sample (GstAppSink * sink, gpointer user_data)
493 {
494   GList *walk;
495   GstSample *sample;
496   GstBuffer *buffer;
497   GstRTSPStream *stream;
498
499   sample = gst_app_sink_pull_sample (sink);
500   if (!sample)
501     return GST_FLOW_OK;
502
503   stream = (GstRTSPStream *) user_data;
504   buffer = gst_sample_get_buffer (sample);
505
506   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
507     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
508
509     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
510       if (tr->send_rtp)
511         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
512     } else {
513       if (tr->send_rtcp)
514         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
515     }
516   }
517   gst_sample_unref (sample);
518
519   return GST_FLOW_OK;
520 }
521
522 static GstAppSinkCallbacks sink_cb = {
523   NULL,                         /* not interested in EOS */
524   NULL,                         /* not interested in preroll samples */
525   handle_new_sample,
526 };
527
528 /**
529  * gst_rtsp_stream_join_bin:
530  * @stream: a #GstRTSPStream
531  * @bin: a #GstBin to join
532  * @rtpbin: a rtpbin element in @bin
533  *
534  * Join the #Gstbin @bin that contains the element @rtpbin.
535  *
536  * @stream will link to @rtpbin, which must be inside @bin.
537  *
538  * Returns: %TRUE on success.
539  */
540 gboolean
541 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
542     GstElement * rtpbin)
543 {
544   gint i, idx;
545   gchar *name;
546   GstPad *pad, *teepad, *queuepad, *selpad;
547   GstPadLinkReturn ret;
548
549   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
550   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
551   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
552
553   idx = stream->idx;
554
555   if (stream->is_joined)
556     return TRUE;
557
558   GST_INFO ("stream %p joining bin", stream);
559
560   if (!alloc_ports (stream))
561     goto no_ports;
562
563   /* add the ports to the pipeline */
564   for (i = 0; i < 2; i++) {
565     gst_bin_add (bin, stream->udpsink[i]);
566     gst_bin_add (bin, stream->udpsrc[i]);
567   }
568
569   /* create elements for the TCP transfer */
570   for (i = 0; i < 2; i++) {
571     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
572     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
573     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
574     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
575     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
576     gst_bin_add (bin, stream->appqueue[i]);
577     gst_bin_add (bin, stream->appsink[i]);
578     gst_bin_add (bin, stream->appsrc[i]);
579     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
580         &sink_cb, stream, NULL);
581   }
582
583   /* hook up the stream to the RTP session elements. */
584   name = g_strdup_printf ("send_rtp_sink_%u", idx);
585   stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
586   g_free (name);
587   name = g_strdup_printf ("send_rtp_src_%u", idx);
588   stream->send_rtp_src = gst_element_get_static_pad (rtpbin, name);
589   g_free (name);
590   name = g_strdup_printf ("send_rtcp_src_%u", idx);
591   stream->send_rtcp_src = gst_element_get_request_pad (rtpbin, name);
592   g_free (name);
593   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
594   stream->recv_rtcp_sink = gst_element_get_request_pad (rtpbin, name);
595   g_free (name);
596   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
597   stream->recv_rtp_sink = gst_element_get_request_pad (rtpbin, name);
598   g_free (name);
599   /* get the session */
600   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
601
602   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
603       stream);
604   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
605       stream);
606   g_signal_connect (stream->session, "on-ssrc-active",
607       (GCallback) on_ssrc_active, stream);
608   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
609       stream);
610   g_signal_connect (stream->session, "on-bye-timeout",
611       (GCallback) on_bye_timeout, stream);
612   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
613       stream);
614
615   /* link the RTP pad to the session manager */
616   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
617   if (ret != GST_PAD_LINK_OK)
618     goto link_failed;
619
620   /* make tee for RTP and link to stream */
621   stream->tee[0] = gst_element_factory_make ("tee", NULL);
622   gst_bin_add (bin, stream->tee[0]);
623
624   pad = gst_element_get_static_pad (stream->tee[0], "sink");
625   gst_pad_link (stream->send_rtp_src, pad);
626   gst_object_unref (pad);
627
628   /* link RTP sink, we're pretty sure this will work. */
629   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
630   pad = gst_element_get_static_pad (stream->udpsink[0], "sink");
631   gst_pad_link (teepad, pad);
632   gst_object_unref (pad);
633   gst_object_unref (teepad);
634
635   teepad = gst_element_get_request_pad (stream->tee[0], "src_%u");
636   pad = gst_element_get_static_pad (stream->appqueue[0], "sink");
637   gst_pad_link (teepad, pad);
638   gst_object_unref (pad);
639   gst_object_unref (teepad);
640
641   queuepad = gst_element_get_static_pad (stream->appqueue[0], "src");
642   pad = gst_element_get_static_pad (stream->appsink[0], "sink");
643   gst_pad_link (queuepad, pad);
644   gst_object_unref (pad);
645   gst_object_unref (queuepad);
646
647   /* make tee for RTCP */
648   stream->tee[1] = gst_element_factory_make ("tee", NULL);
649   gst_bin_add (bin, stream->tee[1]);
650
651   pad = gst_element_get_static_pad (stream->tee[1], "sink");
652   gst_pad_link (stream->send_rtcp_src, pad);
653   gst_object_unref (pad);
654
655   /* link RTCP elements */
656   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
657   pad = gst_element_get_static_pad (stream->udpsink[1], "sink");
658   gst_pad_link (teepad, pad);
659   gst_object_unref (pad);
660   gst_object_unref (teepad);
661
662   teepad = gst_element_get_request_pad (stream->tee[1], "src_%u");
663   pad = gst_element_get_static_pad (stream->appqueue[1], "sink");
664   gst_pad_link (teepad, pad);
665   gst_object_unref (pad);
666   gst_object_unref (teepad);
667
668   queuepad = gst_element_get_static_pad (stream->appqueue[1], "src");
669   pad = gst_element_get_static_pad (stream->appsink[1], "sink");
670   gst_pad_link (queuepad, pad);
671   gst_object_unref (pad);
672   gst_object_unref (queuepad);
673
674   /* make selector for the RTP receivers */
675   stream->selector[0] = gst_element_factory_make ("funnel", NULL);
676   gst_bin_add (bin, stream->selector[0]);
677
678   pad = gst_element_get_static_pad (stream->selector[0], "src");
679   gst_pad_link (pad, stream->recv_rtp_sink);
680   gst_object_unref (pad);
681
682   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
683   pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
684   gst_pad_link (pad, selpad);
685   gst_object_unref (pad);
686   gst_object_unref (selpad);
687   selpad = gst_element_get_request_pad (stream->selector[0], "sink_%u");
688   pad = gst_element_get_static_pad (stream->appsrc[0], "src");
689   gst_pad_link (pad, selpad);
690   gst_object_unref (pad);
691   gst_object_unref (selpad);
692
693   /* make selector for the RTCP receivers */
694   stream->selector[1] = gst_element_factory_make ("funnel", NULL);
695   gst_bin_add (bin, stream->selector[1]);
696
697   pad = gst_element_get_static_pad (stream->selector[1], "src");
698   gst_pad_link (pad, stream->recv_rtcp_sink);
699   gst_object_unref (pad);
700
701   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
702   pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
703   gst_pad_link (pad, selpad);
704   gst_object_unref (pad);
705   gst_object_unref (selpad);
706
707   selpad = gst_element_get_request_pad (stream->selector[1], "sink_%u");
708   pad = gst_element_get_static_pad (stream->appsrc[1], "src");
709   gst_pad_link (pad, selpad);
710   gst_object_unref (pad);
711   gst_object_unref (selpad);
712
713   /* we set and keep these to playing so that they don't cause NO_PREROLL return
714    * values */
715   gst_element_set_state (stream->udpsrc[0], GST_STATE_PLAYING);
716   gst_element_set_state (stream->udpsrc[1], GST_STATE_PLAYING);
717   gst_element_set_locked_state (stream->udpsrc[0], TRUE);
718   gst_element_set_locked_state (stream->udpsrc[1], TRUE);
719
720   /* be notified of caps changes */
721   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
722       (GCallback) caps_notify, stream);
723
724   stream->is_joined = TRUE;
725
726   return TRUE;
727
728   /* ERRORS */
729 no_ports:
730   {
731     GST_WARNING ("failed to allocate ports %d", idx);
732     return FALSE;
733   }
734 link_failed:
735   {
736     GST_WARNING ("failed to link stream %d", idx);
737     return FALSE;
738   }
739 }
740
741 /**
742  * gst_rtsp_stream_leave_bin:
743  * @stream: a #GstRTSPStream
744  * @bin: a #GstBin
745  * @rtpbin: a rtpbin #GstElement
746  *
747  * Remove the elements of @stream from the bin
748  *
749  * Return: %TRUE on success.
750  */
751 gboolean
752 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
753     GstElement * rtpbin)
754 {
755   gint i;
756
757   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
758   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
759   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
760
761   if (!stream->is_joined)
762     return TRUE;
763
764   GST_INFO ("stream %p leaving bin", stream);
765
766   gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
767
768   g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
769
770   /* FIXME not entirely the opposite of join_bin */
771   for (i = 0; i < 2; i++) {
772     gst_bin_remove (bin, stream->udpsrc[i]);
773     gst_bin_remove (bin, stream->udpsink[i]);
774     gst_bin_remove (bin, stream->appsrc[i]);
775     gst_bin_remove (bin, stream->appsink[i]);
776     gst_bin_remove (bin, stream->appqueue[i]);
777     gst_bin_remove (bin, stream->tee[i]);
778     gst_bin_remove (bin, stream->selector[i]);
779   }
780   stream->is_joined = FALSE;
781
782   return TRUE;
783 }
784
785 /**
786  * gst_rtsp_stream_get_rtpinfo:
787  * @stream: a #GstRTSPStream
788  * @rtptime: result RTP timestamp
789  * @seq: result RTP seqnum
790  *
791  * Retrieve the current rtptime and seq. This is used to
792  * construct a RTPInfo reply header.
793  *
794  * Returns: %TRUE when rtptime and seq could be determined.
795  */
796 gboolean
797 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
798     guint * rtptime, guint * seq)
799 {
800   GObjectClass *payobjclass;
801
802   payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
803
804   if (!g_object_class_find_property (payobjclass, "seqnum") ||
805       !g_object_class_find_property (payobjclass, "timestamp"))
806     return FALSE;
807
808   g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
809
810   return TRUE;
811 }
812
813 /**
814  * gst_rtsp_stream_recv_rtp:
815  * @stream: a #GstRTSPStream
816  * @buffer: (transfer full): a #GstBuffer
817  *
818  * Handle an RTP buffer for the stream. This method is usually called when a
819  * message has been received from a client using the TCP transport.
820  *
821  * This function takes ownership of @buffer.
822  *
823  * Returns: a GstFlowReturn.
824  */
825 GstFlowReturn
826 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
827 {
828   GstFlowReturn ret;
829
830   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
831
832   return ret;
833 }
834
835 /**
836  * gst_rtsp_stream_recv_rtcp:
837  * @stream: a #GstRTSPStream
838  * @buffer: (transfer full): a #GstBuffer
839  *
840  * Handle an RTCP buffer for the stream. This method is usually called when a
841  * message has been received from a client using the TCP transport.
842  *
843  * This function takes ownership of @buffer.
844  *
845  * Returns: a GstFlowReturn.
846  */
847 GstFlowReturn
848 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
849 {
850   GstFlowReturn ret;
851
852   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
853
854   return ret;
855 }
856
857 static gboolean
858 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
859     gboolean add)
860 {
861   GstRTSPTransport *tr;
862   gboolean updated;
863
864   updated = FALSE;
865
866   tr = trans->transport;
867
868   switch (tr->lower_transport) {
869     case GST_RTSP_LOWER_TRANS_UDP:
870     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
871     {
872       gchar *dest;
873       gint min, max;
874       guint ttl = 0;
875
876       dest = tr->destination;
877       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
878         min = tr->port.min;
879         max = tr->port.max;
880         ttl = tr->ttl;
881       } else {
882         min = tr->client_port.min;
883         max = tr->client_port.max;
884       }
885
886       if (add && !trans->active) {
887         GST_INFO ("adding %s:%d-%d", dest, min, max);
888         g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
889         g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
890         if (ttl > 0) {
891           GST_INFO ("setting ttl-mc %d", ttl);
892           g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
893           g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
894         }
895         stream->transports = g_list_prepend (stream->transports, trans);
896         trans->active = TRUE;
897         updated = TRUE;
898       } else if (trans->active) {
899         GST_INFO ("removing %s:%d-%d", dest, min, max);
900         g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
901         g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
902         stream->transports = g_list_remove (stream->transports, trans);
903         trans->active = FALSE;
904         updated = TRUE;
905       }
906       break;
907     }
908     case GST_RTSP_LOWER_TRANS_TCP:
909       if (add && !trans->active) {
910         GST_INFO ("adding TCP %s", tr->destination);
911         stream->transports = g_list_prepend (stream->transports, trans);
912         trans->active = TRUE;
913         updated = TRUE;
914       } else if (trans->active) {
915         GST_INFO ("removing TCP %s", tr->destination);
916         stream->transports = g_list_remove (stream->transports, trans);
917         trans->active = FALSE;
918         updated = TRUE;
919       }
920       break;
921     default:
922       GST_INFO ("Unknown transport %d", tr->lower_transport);
923       break;
924   }
925   return updated;
926 }
927
928
929 /**
930  * gst_rtsp_stream_add_transport:
931  * @stream: a #GstRTSPStream
932  * @trans: a #GstRTSPStreamTransport
933  *
934  * Add the transport in @trans to @stream. The media of @stream will
935  * then also be send to the values configured in @trans.
936  *
937  * @trans must contain a valid #GstRTSPTransport.
938  *
939  * Returns: %TRUE if @trans was added
940  */
941 gboolean
942 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
943     GstRTSPStreamTransport * trans)
944 {
945   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
946   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
947   g_return_val_if_fail (trans->transport != NULL, FALSE);
948
949   return update_transport (stream, trans, TRUE);
950 }
951
952 /**
953  * gst_rtsp_stream_remove_transport:
954  * @stream: a #GstRTSPStream
955  * @trans: a #GstRTSPStreamTransport
956  *
957  * Remove the transport in @trans from @stream. The media of @stream will
958  * not be sent to the values configured in @trans.
959  *
960  * Returns: %TRUE if @trans was removed
961  */
962 gboolean
963 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
964     GstRTSPStreamTransport * trans)
965 {
966   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
967   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
968   g_return_val_if_fail (trans->transport != NULL, FALSE);
969
970   return update_transport (stream, trans, FALSE);
971 }