Fix FSF address
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 enum
31 {
32   PROP_0,
33   PROP_LAST
34 };
35
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
38
39 static GQuark ssrc_stream_map_key;
40
41 static void gst_rtsp_stream_finalize (GObject * obj);
42
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
44
45 static void
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
47 {
48   GObjectClass *gobject_class;
49
50   gobject_class = G_OBJECT_CLASS (klass);
51
52   gobject_class->finalize = gst_rtsp_stream_finalize;
53
54   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
55
56   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
57 }
58
59 static void
60 gst_rtsp_stream_init (GstRTSPStream * media)
61 {
62 }
63
64 static void
65 gst_rtsp_stream_finalize (GObject * obj)
66 {
67   GstRTSPStream *stream;
68
69   stream = GST_RTSP_STREAM (obj);
70
71   /* we really need to be unjoined now */
72   g_return_if_fail (!stream->is_joined);
73
74   gst_object_unref (stream->payloader);
75   gst_object_unref (stream->srcpad);
76
77   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
78 }
79
80 /**
81  * gst_rtsp_stream_new:
82  * @idx: an index
83  * @srcpad: a #GstPad
84  * @payloader: a #GstElement
85  *
86  * Create a new media stream with index @idx that handles RTP data on
87  * @srcpad and has a payloader element @payloader.
88  *
89  * Returns: a new #GstRTSPStream
90  */
91 GstRTSPStream *
92 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
93 {
94   GstRTSPStream *stream;
95
96   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
97   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
98   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
99
100   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
101   stream->idx = idx;
102   stream->payloader = gst_object_ref (payloader);
103   stream->srcpad = gst_object_ref (srcpad);
104
105   return stream;
106 }
107
108 /**
109  * gst_rtsp_stream_set_mtu:
110  * @stream: a #GstRTSPStream
111  * @mtu: a new MTU
112  *
113  * Configure the mtu in the payloader of @stream to @mtu.
114  */
115 void
116 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
117 {
118   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
119
120   g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
121 }
122
123 /**
124  * gst_rtsp_stream_get_mtu:
125  * @stream: a #GstRTSPStream
126  *
127  * Get the configured MTU in the payloader of @stream.
128  *
129  * Returns: the MTU of the payloader.
130  */
131 guint
132 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
133 {
134   guint mtu;
135
136   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
137
138   g_object_get (G_OBJECT (stream->payloader), "mtu", &mtu, NULL);
139
140   return mtu;
141 }
142
143 static gboolean
144 alloc_ports (GstRTSPStream * stream)
145 {
146   GstStateChangeReturn ret;
147   GstElement *udpsrc0, *udpsrc1;
148   GstElement *udpsink0, *udpsink1;
149   gint tmp_rtp, tmp_rtcp;
150   guint count;
151   gint rtpport, rtcpport;
152   GSocket *socket;
153   const gchar *host;
154
155   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
156
157   udpsrc0 = NULL;
158   udpsrc1 = NULL;
159   udpsink0 = NULL;
160   udpsink1 = NULL;
161   count = 0;
162
163   /* Start with random port */
164   tmp_rtp = 0;
165
166   if (stream->is_ipv6)
167     host = "udp://[::0]";
168   else
169     host = "udp://0.0.0.0";
170
171   /* try to allocate 2 UDP ports, the RTP port should be an even
172    * number and the RTCP port should be the next (uneven) port */
173 again:
174   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
175   if (udpsrc0 == NULL)
176     goto no_udp_protocol;
177   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
178
179   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
180   if (ret == GST_STATE_CHANGE_FAILURE) {
181     if (tmp_rtp != 0) {
182       tmp_rtp += 2;
183       if (++count > 20)
184         goto no_ports;
185
186       gst_element_set_state (udpsrc0, GST_STATE_NULL);
187       gst_object_unref (udpsrc0);
188
189       goto again;
190     }
191     goto no_udp_protocol;
192   }
193
194   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
195
196   /* check if port is even */
197   if ((tmp_rtp & 1) != 0) {
198     /* port not even, close and allocate another */
199     if (++count > 20)
200       goto no_ports;
201
202     gst_element_set_state (udpsrc0, GST_STATE_NULL);
203     gst_object_unref (udpsrc0);
204
205     tmp_rtp++;
206     goto again;
207   }
208
209   /* allocate port+1 for RTCP now */
210   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
211   if (udpsrc1 == NULL)
212     goto no_udp_rtcp_protocol;
213
214   /* set port */
215   tmp_rtcp = tmp_rtp + 1;
216   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
217
218   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
219   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
220   if (ret == GST_STATE_CHANGE_FAILURE) {
221
222     if (++count > 20)
223       goto no_ports;
224
225     gst_element_set_state (udpsrc0, GST_STATE_NULL);
226     gst_object_unref (udpsrc0);
227
228     gst_element_set_state (udpsrc1, GST_STATE_NULL);
229     gst_object_unref (udpsrc1);
230
231     tmp_rtp += 2;
232     goto again;
233   }
234   /* all fine, do port check */
235   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
236   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
237
238   /* this should not happen... */
239   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
240     goto port_error;
241
242   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
243   if (!udpsink0)
244     goto no_udp_protocol;
245
246   g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
247   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
248   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
249
250   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
251   if (!udpsink1)
252     goto no_udp_protocol;
253
254   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
255           "send-duplicates")) {
256     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
257     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
258   } else {
259     g_warning
260         ("old multiudpsink version found without send-duplicates property");
261   }
262
263   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
264           "buffer-size")) {
265     g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
266         NULL);
267   } else {
268     GST_WARNING ("multiudpsink version found without buffer-size property");
269   }
270
271   g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
272   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
273   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
274   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
275   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
276   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
277   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
278   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
279   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
280
281   /* we keep these elements, we will further configure them when the
282    * client told us to really use the UDP ports. */
283   stream->udpsrc[0] = udpsrc0;
284   stream->udpsrc[1] = udpsrc1;
285   stream->udpsink[0] = udpsink0;
286   stream->udpsink[1] = udpsink1;
287   stream->server_port.min = rtpport;
288   stream->server_port.max = rtcpport;
289
290   return TRUE;
291
292   /* ERRORS */
293 no_udp_protocol:
294   {
295     goto cleanup;
296   }
297 no_ports:
298   {
299     goto cleanup;
300   }
301 no_udp_rtcp_protocol:
302   {
303     goto cleanup;
304   }
305 port_error:
306   {
307     goto cleanup;
308   }
309 cleanup:
310   {
311     if (udpsrc0) {
312       gst_element_set_state (udpsrc0, GST_STATE_NULL);
313       gst_object_unref (udpsrc0);
314     }
315     if (udpsrc1) {
316       gst_element_set_state (udpsrc1, GST_STATE_NULL);
317       gst_object_unref (udpsrc1);
318     }
319     if (udpsink0) {
320       gst_element_set_state (udpsink0, GST_STATE_NULL);
321       gst_object_unref (udpsink0);
322     }
323     if (udpsink1) {
324       gst_element_set_state (udpsink1, GST_STATE_NULL);
325       gst_object_unref (udpsink1);
326     }
327     return FALSE;
328   }
329 }
330
331 /* executed from streaming thread */
332 static void
333 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
334 {
335   GstCaps *newcaps, *oldcaps;
336
337   newcaps = gst_pad_get_current_caps (pad);
338
339   oldcaps = stream->caps;
340   stream->caps = newcaps;
341
342   if (oldcaps)
343     gst_caps_unref (oldcaps);
344
345   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
346       newcaps);
347 }
348
349 static void
350 dump_structure (const GstStructure * s)
351 {
352   gchar *sstr;
353
354   sstr = gst_structure_to_string (s);
355   GST_INFO ("structure: %s", sstr);
356   g_free (sstr);
357 }
358
359 static GstRTSPStreamTransport *
360 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
361 {
362   GList *walk;
363   GstRTSPStreamTransport *result = NULL;
364   const gchar *tmp;
365   gchar *dest;
366   guint port;
367
368   if (rtcp_from == NULL)
369     return NULL;
370
371   tmp = g_strrstr (rtcp_from, ":");
372   if (tmp == NULL)
373     return NULL;
374
375   port = atoi (tmp + 1);
376   dest = g_strndup (rtcp_from, tmp - rtcp_from);
377
378   GST_INFO ("finding %s:%d in %d transports", dest, port,
379       g_list_length (stream->transports));
380
381   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
382     GstRTSPStreamTransport *trans = walk->data;
383     gint min, max;
384
385     min = trans->transport->client_port.min;
386     max = trans->transport->client_port.max;
387
388     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
389             || max == port)) {
390       result = trans;
391       break;
392     }
393   }
394   g_free (dest);
395
396   return result;
397 }
398
399 static GstRTSPStreamTransport *
400 check_transport (GObject * source, GstRTSPStream * stream)
401 {
402   GstStructure *stats;
403   GstRTSPStreamTransport *trans;
404
405   /* see if we have a stream to match with the origin of the RTCP packet */
406   trans = g_object_get_qdata (source, ssrc_stream_map_key);
407   if (trans == NULL) {
408     g_object_get (source, "stats", &stats, NULL);
409     if (stats) {
410       const gchar *rtcp_from;
411
412       dump_structure (stats);
413
414       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
415       if ((trans = find_transport (stream, rtcp_from))) {
416         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
417             source);
418
419         /* keep ref to the source */
420         trans->rtpsource = source;
421
422         g_object_set_qdata (source, ssrc_stream_map_key, trans);
423       }
424       gst_structure_free (stats);
425     }
426   }
427
428   return trans;
429 }
430
431
432 static void
433 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
434 {
435   GstRTSPStreamTransport *trans;
436
437   GST_INFO ("%p: new source %p", stream, source);
438
439   trans = check_transport (source, stream);
440
441   if (trans)
442     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
443 }
444
445 static void
446 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
447 {
448   GST_INFO ("%p: new SDES %p", stream, source);
449 }
450
451 static void
452 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
453 {
454   GstRTSPStreamTransport *trans;
455
456   trans = check_transport (source, stream);
457
458   if (trans)
459     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
460
461   if (trans && trans->keep_alive)
462     trans->keep_alive (trans->ka_user_data);
463
464 #ifdef DUMP_STATS
465   {
466     GstStructure *stats;
467     g_object_get (source, "stats", &stats, NULL);
468     if (stats) {
469       dump_structure (stats);
470       gst_structure_free (stats);
471     }
472   }
473 #endif
474 }
475
476 static void
477 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
478 {
479   GST_INFO ("%p: source %p bye", stream, source);
480 }
481
482 static void
483 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
484 {
485   GstRTSPStreamTransport *trans;
486
487   GST_INFO ("%p: source %p bye timeout", stream, source);
488
489   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
490     trans->rtpsource = NULL;
491     trans->timeout = TRUE;
492   }
493 }
494
495 static void
496 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
497 {
498   GstRTSPStreamTransport *trans;
499
500   GST_INFO ("%p: source %p timeout", stream, source);
501
502   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
503     trans->rtpsource = NULL;
504     trans->timeout = TRUE;
505   }
506 }
507
508 static GstFlowReturn
509 handle_new_sample (GstAppSink * sink, gpointer user_data)
510 {
511   GList *walk;
512   GstSample *sample;
513   GstBuffer *buffer;
514   GstRTSPStream *stream;
515
516   sample = gst_app_sink_pull_sample (sink);
517   if (!sample)
518     return GST_FLOW_OK;
519
520   stream = (GstRTSPStream *) user_data;
521   buffer = gst_sample_get_buffer (sample);
522
523   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
524     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
525
526     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
527       if (tr->send_rtp)
528         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
529     } else {
530       if (tr->send_rtcp)
531         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
532     }
533   }
534   gst_sample_unref (sample);
535
536   return GST_FLOW_OK;
537 }
538
539 static GstAppSinkCallbacks sink_cb = {
540   NULL,                         /* not interested in EOS */
541   NULL,                         /* not interested in preroll samples */
542   handle_new_sample,
543 };
544
545 /**
546  * gst_rtsp_stream_join_bin:
547  * @stream: a #GstRTSPStream
548  * @bin: a #GstBin to join
549  * @rtpbin: a rtpbin element in @bin
550  * @state: the target state of the new elements
551  *
552  * Join the #Gstbin @bin that contains the element @rtpbin.
553  *
554  * @stream will link to @rtpbin, which must be inside @bin. The elements
555  * added to @bin will be set to the state given in @state.
556  *
557  * Returns: %TRUE on success.
558  */
559 gboolean
560 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
561     GstElement * rtpbin, GstState state)
562 {
563   gint i, idx;
564   gchar *name;
565   GstPad *pad, *teepad, *queuepad, *selpad;
566   GstPadLinkReturn ret;
567
568   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
569   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
570   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
571
572   if (stream->is_joined)
573     return TRUE;
574
575   /* create a session with the same index as the stream */
576   idx = stream->idx;
577
578   GST_INFO ("stream %p joining bin as session %d", stream, idx);
579
580   if (!alloc_ports (stream))
581     goto no_ports;
582
583   /* get a pad for sending RTP */
584   name = g_strdup_printf ("send_rtp_sink_%u", idx);
585   stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
586   g_free (name);
587   /* link the RTP pad to the session manager, it should not really fail unless
588    * this is not really an RTP pad */
589   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
590   if (ret != GST_PAD_LINK_OK)
591     goto link_failed;
592
593   /* get pads from the RTP session element for sending and receiving
594    * RTP/RTCP*/
595   name = g_strdup_printf ("send_rtp_src_%u", idx);
596   stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
597   g_free (name);
598   name = g_strdup_printf ("send_rtcp_src_%u", idx);
599   stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
600   g_free (name);
601   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
602   stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
603   g_free (name);
604   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
605   stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
606   g_free (name);
607
608   /* get the session */
609   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
610
611   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
612       stream);
613   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
614       stream);
615   g_signal_connect (stream->session, "on-ssrc-active",
616       (GCallback) on_ssrc_active, stream);
617   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
618       stream);
619   g_signal_connect (stream->session, "on-bye-timeout",
620       (GCallback) on_bye_timeout, stream);
621   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
622       stream);
623
624   for (i = 0; i < 2; i++) {
625     /* For the sender we create this bit of pipeline for both
626      * RTP and RTCP. Sync and preroll are enabled on udpsink so
627      * we need to add a queue before appsink to make the pipeline
628      * not block. For the TCP case, we want to pump data to the
629      * client as fast as possible anyway.
630      *
631      * .--------.      .-----.    .---------.
632      * | rtpbin |      | tee |    | udpsink |
633      * |       send->sink   src->sink       |
634      * '--------'      |     |    '---------'
635      *                 |     |    .---------.    .---------.
636      *                 |     |    |  queue  |    | appsink |
637      *                 |    src->sink      src->sink       |
638      *                 '-----'    '---------'    '---------'
639      */
640     /* make tee for RTP/RTCP */
641     stream->tee[i] = gst_element_factory_make ("tee", NULL);
642     gst_bin_add (bin, stream->tee[i]);
643
644     /* and link to rtpbin send pad */
645     pad = gst_element_get_static_pad (stream->tee[i], "sink");
646     gst_pad_link (stream->send_src[i], pad);
647     gst_object_unref (pad);
648
649     /* add udpsink */
650     gst_bin_add (bin, stream->udpsink[i]);
651
652     /* link tee to udpsink */
653     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
654     pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
655     gst_pad_link (teepad, pad);
656     gst_object_unref (pad);
657     gst_object_unref (teepad);
658
659     /* make queue */
660     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
661     gst_bin_add (bin, stream->appqueue[i]);
662     /* and link to tee */
663     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
664     pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
665     gst_pad_link (teepad, pad);
666     gst_object_unref (pad);
667     gst_object_unref (teepad);
668
669     /* make appsink */
670     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
671     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
672     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
673     gst_bin_add (bin, stream->appsink[i]);
674     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
675         &sink_cb, stream, NULL);
676     /* and link to queue */
677     queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
678     pad = gst_element_get_static_pad (stream->appsink[i], "sink");
679     gst_pad_link (queuepad, pad);
680     gst_object_unref (pad);
681     gst_object_unref (queuepad);
682
683     /* For the receiver we create this bit of pipeline for both
684      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
685      * and it is all funneled into the rtpbin receive pad.
686      *
687      * .--------.     .--------.    .--------.
688      * | udpsrc |     | funnel |    | rtpbin |
689      * |       src->sink      src->sink      |
690      * '--------'     |        |    '--------'
691      * .--------.     |        |
692      * | appsrc |     |        |
693      * |       src->sink       |
694      * '--------'     '--------'
695      */
696     /* make funnel for the RTP/RTCP receivers */
697     stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
698     gst_bin_add (bin, stream->funnel[i]);
699
700     pad = gst_element_get_static_pad (stream->funnel[i], "src");
701     gst_pad_link (pad, stream->recv_sink[i]);
702     gst_object_unref (pad);
703
704     /* add udpsrc */
705     gst_bin_add (bin, stream->udpsrc[i]);
706     /* and link to the funnel */
707     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
708     pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
709     gst_pad_link (pad, selpad);
710     gst_object_unref (pad);
711     gst_object_unref (selpad);
712
713     /* make and add appsrc */
714     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
715     gst_bin_add (bin, stream->appsrc[i]);
716     /* and link to the funnel */
717     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
718     pad = gst_element_get_static_pad (stream->appsrc[i], "src");
719     gst_pad_link (pad, selpad);
720     gst_object_unref (pad);
721     gst_object_unref (selpad);
722
723     /* check if we need to set to a special state */
724     if (state != GST_STATE_NULL) {
725       gst_element_set_state (stream->udpsink[i], state);
726       gst_element_set_state (stream->appsink[i], state);
727       gst_element_set_state (stream->appqueue[i], state);
728       gst_element_set_state (stream->tee[i], state);
729       gst_element_set_state (stream->funnel[i], state);
730       gst_element_set_state (stream->appsrc[i], state);
731     }
732     /* we set and keep these to playing so that they don't cause NO_PREROLL return
733      * values */
734     gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
735     gst_element_set_locked_state (stream->udpsrc[i], TRUE);
736   }
737
738   /* be notified of caps changes */
739   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
740       (GCallback) caps_notify, stream);
741
742   stream->is_joined = TRUE;
743
744   return TRUE;
745
746   /* ERRORS */
747 no_ports:
748   {
749     GST_WARNING ("failed to allocate ports %d", idx);
750     return FALSE;
751   }
752 link_failed:
753   {
754     GST_WARNING ("failed to link stream %d", idx);
755     gst_object_unref (stream->send_rtp_sink);
756     stream->send_rtp_sink = NULL;
757     return FALSE;
758   }
759 }
760
761 /**
762  * gst_rtsp_stream_leave_bin:
763  * @stream: a #GstRTSPStream
764  * @bin: a #GstBin
765  * @rtpbin: a rtpbin #GstElement
766  *
767  * Remove the elements of @stream from @bin. @bin must be set
768  * to the NULL state before calling this.
769  *
770  * Return: %TRUE on success.
771  */
772 gboolean
773 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
774     GstElement * rtpbin)
775 {
776   gint i;
777
778   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
779   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
780   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
781
782   if (!stream->is_joined)
783     return TRUE;
784
785   /* all transports must be removed by now */
786   g_return_val_if_fail (stream->transports == NULL, FALSE);
787
788   GST_INFO ("stream %p leaving bin", stream);
789
790   gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
791   g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
792   gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
793   gst_object_unref (stream->send_rtp_sink);
794   stream->send_rtp_sink = NULL;
795
796   for (i = 0; i < 2; i++) {
797     /* and set udpsrc to NULL now before removing */
798     gst_element_set_locked_state (stream->udpsrc[i], FALSE);
799     gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
800
801     /* removing them should also nicely release the request
802      * pads when they finalize */
803     gst_bin_remove (bin, stream->udpsrc[i]);
804     gst_bin_remove (bin, stream->udpsink[i]);
805     gst_bin_remove (bin, stream->appsrc[i]);
806     gst_bin_remove (bin, stream->appsink[i]);
807     gst_bin_remove (bin, stream->appqueue[i]);
808     gst_bin_remove (bin, stream->tee[i]);
809     gst_bin_remove (bin, stream->funnel[i]);
810
811     gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
812     gst_object_unref (stream->recv_sink[i]);
813     stream->recv_sink[i] = NULL;
814
815     stream->udpsrc[i] = NULL;
816     stream->udpsink[i] = NULL;
817     stream->appsrc[i] = NULL;
818     stream->appsink[i] = NULL;
819     stream->appqueue[i] = NULL;
820     stream->tee[i] = NULL;
821     stream->funnel[i] = NULL;
822   }
823   gst_object_unref (stream->send_src[0]);
824   stream->send_src[0] = NULL;
825
826   gst_element_release_request_pad (rtpbin, stream->send_src[1]);
827   gst_object_unref (stream->send_src[1]);
828   stream->send_src[1] = NULL;
829
830   g_object_unref (stream->session);
831   if (stream->caps)
832     gst_caps_unref (stream->caps);
833
834   stream->is_joined = FALSE;
835
836   return TRUE;
837 }
838
839 /**
840  * gst_rtsp_stream_get_rtpinfo:
841  * @stream: a #GstRTSPStream
842  * @rtptime: result RTP timestamp
843  * @seq: result RTP seqnum
844  *
845  * Retrieve the current rtptime and seq. This is used to
846  * construct a RTPInfo reply header.
847  *
848  * Returns: %TRUE when rtptime and seq could be determined.
849  */
850 gboolean
851 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
852     guint * rtptime, guint * seq)
853 {
854   GObjectClass *payobjclass;
855
856   payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
857
858   if (!g_object_class_find_property (payobjclass, "seqnum") ||
859       !g_object_class_find_property (payobjclass, "timestamp"))
860     return FALSE;
861
862   g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
863
864   return TRUE;
865 }
866
867 /**
868  * gst_rtsp_stream_recv_rtp:
869  * @stream: a #GstRTSPStream
870  * @buffer: (transfer full): a #GstBuffer
871  *
872  * Handle an RTP buffer for the stream. This method is usually called when a
873  * message has been received from a client using the TCP transport.
874  *
875  * This function takes ownership of @buffer.
876  *
877  * Returns: a GstFlowReturn.
878  */
879 GstFlowReturn
880 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
881 {
882   GstFlowReturn ret;
883
884   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
885   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
886   g_return_val_if_fail (stream->is_joined, FALSE);
887
888   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
889
890   return ret;
891 }
892
893 /**
894  * gst_rtsp_stream_recv_rtcp:
895  * @stream: a #GstRTSPStream
896  * @buffer: (transfer full): a #GstBuffer
897  *
898  * Handle an RTCP buffer for the stream. This method is usually called when a
899  * message has been received from a client using the TCP transport.
900  *
901  * This function takes ownership of @buffer.
902  *
903  * Returns: a GstFlowReturn.
904  */
905 GstFlowReturn
906 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
907 {
908   GstFlowReturn ret;
909
910   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
911   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
912   g_return_val_if_fail (stream->is_joined, FALSE);
913
914   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
915
916   return ret;
917 }
918
919 static gboolean
920 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
921     gboolean add)
922 {
923   GstRTSPTransport *tr;
924   gboolean updated;
925
926   updated = FALSE;
927
928   tr = trans->transport;
929
930   switch (tr->lower_transport) {
931     case GST_RTSP_LOWER_TRANS_UDP:
932     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
933     {
934       gchar *dest;
935       gint min, max;
936       guint ttl = 0;
937
938       dest = tr->destination;
939       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
940         min = tr->port.min;
941         max = tr->port.max;
942         ttl = tr->ttl;
943       } else {
944         min = tr->client_port.min;
945         max = tr->client_port.max;
946       }
947
948       if (add && !trans->active) {
949         GST_INFO ("adding %s:%d-%d", dest, min, max);
950         g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
951         g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
952         if (ttl > 0) {
953           GST_INFO ("setting ttl-mc %d", ttl);
954           g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
955           g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
956         }
957         stream->transports = g_list_prepend (stream->transports, trans);
958         trans->active = TRUE;
959         updated = TRUE;
960       } else if (trans->active) {
961         GST_INFO ("removing %s:%d-%d", dest, min, max);
962         g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
963         g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
964         stream->transports = g_list_remove (stream->transports, trans);
965         trans->active = FALSE;
966         updated = TRUE;
967       }
968       break;
969     }
970     case GST_RTSP_LOWER_TRANS_TCP:
971       if (add && !trans->active) {
972         GST_INFO ("adding TCP %s", tr->destination);
973         stream->transports = g_list_prepend (stream->transports, trans);
974         trans->active = TRUE;
975         updated = TRUE;
976       } else if (trans->active) {
977         GST_INFO ("removing TCP %s", tr->destination);
978         stream->transports = g_list_remove (stream->transports, trans);
979         trans->active = FALSE;
980         updated = TRUE;
981       }
982       break;
983     default:
984       GST_INFO ("Unknown transport %d", tr->lower_transport);
985       break;
986   }
987   return updated;
988 }
989
990
991 /**
992  * gst_rtsp_stream_add_transport:
993  * @stream: a #GstRTSPStream
994  * @trans: a #GstRTSPStreamTransport
995  *
996  * Add the transport in @trans to @stream. The media of @stream will
997  * then also be send to the values configured in @trans.
998  *
999  * @stream must be joined to a bin.
1000  *
1001  * @trans must contain a valid #GstRTSPTransport.
1002  *
1003  * Returns: %TRUE if @trans was added
1004  */
1005 gboolean
1006 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1007     GstRTSPStreamTransport * trans)
1008 {
1009   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1010   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1011   g_return_val_if_fail (stream->is_joined, FALSE);
1012   g_return_val_if_fail (trans->transport != NULL, FALSE);
1013
1014   return update_transport (stream, trans, TRUE);
1015 }
1016
1017 /**
1018  * gst_rtsp_stream_remove_transport:
1019  * @stream: a #GstRTSPStream
1020  * @trans: a #GstRTSPStreamTransport
1021  *
1022  * Remove the transport in @trans from @stream. The media of @stream will
1023  * not be sent to the values configured in @trans.
1024  *
1025  * @stream must be joined to a bin.
1026  *
1027  * @trans must contain a valid #GstRTSPTransport.
1028  *
1029  * Returns: %TRUE if @trans was removed
1030  */
1031 gboolean
1032 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1033     GstRTSPStreamTransport * trans)
1034 {
1035   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1036   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1037   g_return_val_if_fail (stream->is_joined, FALSE);
1038   g_return_val_if_fail (trans->transport != NULL, FALSE);
1039
1040   return update_transport (stream, trans, FALSE);
1041 }