stream: improve join and leave of the pipeline
[platform/upstream/gstreamer.git] / gst / rtsp-server / rtsp-stream.c
1 /* GStreamer
2  * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19
20 #include <string.h>
21 #include <stdlib.h>
22
23 #include <gio/gio.h>
24
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
27
28 #include "rtsp-stream.h"
29
30 enum
31 {
32   PROP_0,
33   PROP_LAST
34 };
35
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
38
39 static GQuark ssrc_stream_map_key;
40
41 static void gst_rtsp_stream_finalize (GObject * obj);
42
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
44
45 static void
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
47 {
48   GObjectClass *gobject_class;
49
50   gobject_class = G_OBJECT_CLASS (klass);
51
52   gobject_class->finalize = gst_rtsp_stream_finalize;
53
54   GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
55
56   ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
57 }
58
59 static void
60 gst_rtsp_stream_init (GstRTSPStream * media)
61 {
62 }
63
64 static void
65 gst_rtsp_stream_finalize (GObject * obj)
66 {
67   GstRTSPStream *stream;
68
69   stream = GST_RTSP_STREAM (obj);
70
71   /* we really need to be unjoined now */
72   g_return_if_fail (!stream->is_joined);
73
74   gst_object_unref (stream->payloader);
75   gst_object_unref (stream->srcpad);
76
77   g_list_free (stream->transports);
78
79   G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
80 }
81
82 /**
83  * gst_rtsp_stream_new:
84  * @idx: an index
85  * @srcpad: a #GstPad
86  * @payloader: a #GstElement
87  *
88  * Create a new media stream with index @idx that handles RTP data on
89  * @srcpad and has a payloader element @payloader.
90  *
91  * Returns: a new #GstRTSPStream
92  */
93 GstRTSPStream *
94 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
95 {
96   GstRTSPStream *stream;
97
98   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
99   g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
100   g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
101
102   stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
103   stream->idx = idx;
104   stream->payloader = gst_object_ref (payloader);
105   stream->srcpad = gst_object_ref (srcpad);
106
107   return stream;
108 }
109
110 /**
111  * gst_rtsp_stream_set_mtu:
112  * @stream: a #GstRTSPStream
113  * @mtu: a new MTU
114  *
115  * Configure the mtu in the payloader of @stream to @mtu.
116  */
117 void
118 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
119 {
120   g_return_if_fail (GST_IS_RTSP_STREAM (stream));
121
122   g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
123 }
124
125 /**
126  * gst_rtsp_stream_get_mtu:
127  * @stream: a #GstRTSPStream
128  *
129  * Get the configured MTU in the payloader of @stream.
130  *
131  * Returns: the MTU of the payloader.
132  */
133 guint
134 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
135 {
136   guint mtu;
137
138   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
139
140   g_object_get (G_OBJECT (stream->payloader), "mtu", &mtu, NULL);
141
142   return mtu;
143 }
144
145 static gboolean
146 alloc_ports (GstRTSPStream * stream)
147 {
148   GstStateChangeReturn ret;
149   GstElement *udpsrc0, *udpsrc1;
150   GstElement *udpsink0, *udpsink1;
151   gint tmp_rtp, tmp_rtcp;
152   guint count;
153   gint rtpport, rtcpport;
154   GSocket *socket;
155   const gchar *host;
156
157   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
158
159   udpsrc0 = NULL;
160   udpsrc1 = NULL;
161   udpsink0 = NULL;
162   udpsink1 = NULL;
163   count = 0;
164
165   /* Start with random port */
166   tmp_rtp = 0;
167
168   if (stream->is_ipv6)
169     host = "udp://[::0]";
170   else
171     host = "udp://0.0.0.0";
172
173   /* try to allocate 2 UDP ports, the RTP port should be an even
174    * number and the RTCP port should be the next (uneven) port */
175 again:
176   udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
177   if (udpsrc0 == NULL)
178     goto no_udp_protocol;
179   g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
180
181   ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
182   if (ret == GST_STATE_CHANGE_FAILURE) {
183     if (tmp_rtp != 0) {
184       tmp_rtp += 2;
185       if (++count > 20)
186         goto no_ports;
187
188       gst_element_set_state (udpsrc0, GST_STATE_NULL);
189       gst_object_unref (udpsrc0);
190
191       goto again;
192     }
193     goto no_udp_protocol;
194   }
195
196   g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
197
198   /* check if port is even */
199   if ((tmp_rtp & 1) != 0) {
200     /* port not even, close and allocate another */
201     if (++count > 20)
202       goto no_ports;
203
204     gst_element_set_state (udpsrc0, GST_STATE_NULL);
205     gst_object_unref (udpsrc0);
206
207     tmp_rtp++;
208     goto again;
209   }
210
211   /* allocate port+1 for RTCP now */
212   udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
213   if (udpsrc1 == NULL)
214     goto no_udp_rtcp_protocol;
215
216   /* set port */
217   tmp_rtcp = tmp_rtp + 1;
218   g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
219
220   ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
221   /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
222   if (ret == GST_STATE_CHANGE_FAILURE) {
223
224     if (++count > 20)
225       goto no_ports;
226
227     gst_element_set_state (udpsrc0, GST_STATE_NULL);
228     gst_object_unref (udpsrc0);
229
230     gst_element_set_state (udpsrc1, GST_STATE_NULL);
231     gst_object_unref (udpsrc1);
232
233     tmp_rtp += 2;
234     goto again;
235   }
236   /* all fine, do port check */
237   g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
238   g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
239
240   /* this should not happen... */
241   if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
242     goto port_error;
243
244   udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
245   if (!udpsink0)
246     goto no_udp_protocol;
247
248   g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
249   g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
250   g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
251
252   udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
253   if (!udpsink1)
254     goto no_udp_protocol;
255
256   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
257           "send-duplicates")) {
258     g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
259     g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
260   } else {
261     g_warning
262         ("old multiudpsink version found without send-duplicates property");
263   }
264
265   if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
266           "buffer-size")) {
267     g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
268         NULL);
269   } else {
270     GST_WARNING ("multiudpsink version found without buffer-size property");
271   }
272
273   g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
274   g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
275   g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
276   g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
277   g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
278   g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
279   g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
280   g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
281   g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
282
283   /* we keep these elements, we will further configure them when the
284    * client told us to really use the UDP ports. */
285   stream->udpsrc[0] = udpsrc0;
286   stream->udpsrc[1] = udpsrc1;
287   stream->udpsink[0] = udpsink0;
288   stream->udpsink[1] = udpsink1;
289   stream->server_port.min = rtpport;
290   stream->server_port.max = rtcpport;
291
292   return TRUE;
293
294   /* ERRORS */
295 no_udp_protocol:
296   {
297     goto cleanup;
298   }
299 no_ports:
300   {
301     goto cleanup;
302   }
303 no_udp_rtcp_protocol:
304   {
305     goto cleanup;
306   }
307 port_error:
308   {
309     goto cleanup;
310   }
311 cleanup:
312   {
313     if (udpsrc0) {
314       gst_element_set_state (udpsrc0, GST_STATE_NULL);
315       gst_object_unref (udpsrc0);
316     }
317     if (udpsrc1) {
318       gst_element_set_state (udpsrc1, GST_STATE_NULL);
319       gst_object_unref (udpsrc1);
320     }
321     if (udpsink0) {
322       gst_element_set_state (udpsink0, GST_STATE_NULL);
323       gst_object_unref (udpsink0);
324     }
325     if (udpsink1) {
326       gst_element_set_state (udpsink1, GST_STATE_NULL);
327       gst_object_unref (udpsink1);
328     }
329     return FALSE;
330   }
331 }
332
333 /* executed from streaming thread */
334 static void
335 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
336 {
337   GstCaps *newcaps, *oldcaps;
338
339   newcaps = gst_pad_get_current_caps (pad);
340
341   oldcaps = stream->caps;
342   stream->caps = newcaps;
343
344   if (oldcaps)
345     gst_caps_unref (oldcaps);
346
347   GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
348       newcaps);
349 }
350
351 static void
352 dump_structure (const GstStructure * s)
353 {
354   gchar *sstr;
355
356   sstr = gst_structure_to_string (s);
357   GST_INFO ("structure: %s", sstr);
358   g_free (sstr);
359 }
360
361 static GstRTSPStreamTransport *
362 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
363 {
364   GList *walk;
365   GstRTSPStreamTransport *result = NULL;
366   const gchar *tmp;
367   gchar *dest;
368   guint port;
369
370   if (rtcp_from == NULL)
371     return NULL;
372
373   tmp = g_strrstr (rtcp_from, ":");
374   if (tmp == NULL)
375     return NULL;
376
377   port = atoi (tmp + 1);
378   dest = g_strndup (rtcp_from, tmp - rtcp_from);
379
380   GST_INFO ("finding %s:%d in %d transports", dest, port,
381       g_list_length (stream->transports));
382
383   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
384     GstRTSPStreamTransport *trans = walk->data;
385     gint min, max;
386
387     min = trans->transport->client_port.min;
388     max = trans->transport->client_port.max;
389
390     if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
391             || max == port)) {
392       result = trans;
393       break;
394     }
395   }
396   g_free (dest);
397
398   return result;
399 }
400
401 static GstRTSPStreamTransport *
402 check_transport (GObject * source, GstRTSPStream * stream)
403 {
404   GstStructure *stats;
405   GstRTSPStreamTransport *trans;
406
407   /* see if we have a stream to match with the origin of the RTCP packet */
408   trans = g_object_get_qdata (source, ssrc_stream_map_key);
409   if (trans == NULL) {
410     g_object_get (source, "stats", &stats, NULL);
411     if (stats) {
412       const gchar *rtcp_from;
413
414       dump_structure (stats);
415
416       rtcp_from = gst_structure_get_string (stats, "rtcp-from");
417       if ((trans = find_transport (stream, rtcp_from))) {
418         GST_INFO ("%p: found transport %p for source  %p", stream, trans,
419             source);
420
421         /* keep ref to the source */
422         trans->rtpsource = source;
423
424         g_object_set_qdata (source, ssrc_stream_map_key, trans);
425       }
426       gst_structure_free (stats);
427     }
428   }
429
430   return trans;
431 }
432
433
434 static void
435 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
436 {
437   GstRTSPStreamTransport *trans;
438
439   GST_INFO ("%p: new source %p", stream, source);
440
441   trans = check_transport (source, stream);
442
443   if (trans)
444     GST_INFO ("%p: source %p for transport %p", stream, source, trans);
445 }
446
447 static void
448 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
449 {
450   GST_INFO ("%p: new SDES %p", stream, source);
451 }
452
453 static void
454 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
455 {
456   GstRTSPStreamTransport *trans;
457
458   trans = check_transport (source, stream);
459
460   if (trans)
461     GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
462
463   if (trans && trans->keep_alive)
464     trans->keep_alive (trans->ka_user_data);
465
466 #ifdef DUMP_STATS
467   {
468     GstStructure *stats;
469     g_object_get (source, "stats", &stats, NULL);
470     if (stats) {
471       dump_structure (stats);
472       gst_structure_free (stats);
473     }
474   }
475 #endif
476 }
477
478 static void
479 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
480 {
481   GST_INFO ("%p: source %p bye", stream, source);
482 }
483
484 static void
485 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
486 {
487   GstRTSPStreamTransport *trans;
488
489   GST_INFO ("%p: source %p bye timeout", stream, source);
490
491   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
492     trans->rtpsource = NULL;
493     trans->timeout = TRUE;
494   }
495 }
496
497 static void
498 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
499 {
500   GstRTSPStreamTransport *trans;
501
502   GST_INFO ("%p: source %p timeout", stream, source);
503
504   if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
505     trans->rtpsource = NULL;
506     trans->timeout = TRUE;
507   }
508 }
509
510 static GstFlowReturn
511 handle_new_sample (GstAppSink * sink, gpointer user_data)
512 {
513   GList *walk;
514   GstSample *sample;
515   GstBuffer *buffer;
516   GstRTSPStream *stream;
517
518   sample = gst_app_sink_pull_sample (sink);
519   if (!sample)
520     return GST_FLOW_OK;
521
522   stream = (GstRTSPStream *) user_data;
523   buffer = gst_sample_get_buffer (sample);
524
525   for (walk = stream->transports; walk; walk = g_list_next (walk)) {
526     GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
527
528     if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
529       if (tr->send_rtp)
530         tr->send_rtp (buffer, tr->transport->interleaved.min, tr->user_data);
531     } else {
532       if (tr->send_rtcp)
533         tr->send_rtcp (buffer, tr->transport->interleaved.max, tr->user_data);
534     }
535   }
536   gst_sample_unref (sample);
537
538   return GST_FLOW_OK;
539 }
540
541 static GstAppSinkCallbacks sink_cb = {
542   NULL,                         /* not interested in EOS */
543   NULL,                         /* not interested in preroll samples */
544   handle_new_sample,
545 };
546
547 /**
548  * gst_rtsp_stream_join_bin:
549  * @stream: a #GstRTSPStream
550  * @bin: a #GstBin to join
551  * @rtpbin: a rtpbin element in @bin
552  * @state: the target state of the new elements
553  *
554  * Join the #Gstbin @bin that contains the element @rtpbin.
555  *
556  * @stream will link to @rtpbin, which must be inside @bin. The elements
557  * added to @bin will be set to the state given in @state.
558  *
559  * Returns: %TRUE on success.
560  */
561 gboolean
562 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
563     GstElement * rtpbin, GstState state)
564 {
565   gint i, idx;
566   gchar *name;
567   GstPad *pad, *teepad, *queuepad, *selpad;
568   GstPadLinkReturn ret;
569
570   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
571   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
572   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
573
574   if (stream->is_joined)
575     return TRUE;
576
577   /* create a session with the same index as the stream */
578   idx = stream->idx;
579
580   GST_INFO ("stream %p joining bin as session %d", stream, idx);
581
582   if (!alloc_ports (stream))
583     goto no_ports;
584
585   /* get a pad for sending RTP */
586   name = g_strdup_printf ("send_rtp_sink_%u", idx);
587   stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
588   g_free (name);
589   /* link the RTP pad to the session manager, it should not really fail unless
590    * this is not really an RTP pad */
591   ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
592   if (ret != GST_PAD_LINK_OK)
593     goto link_failed;
594
595   /* get pads from the RTP session element for sending and receiving
596    * RTP/RTCP*/
597   name = g_strdup_printf ("send_rtp_src_%u", idx);
598   stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
599   g_free (name);
600   name = g_strdup_printf ("send_rtcp_src_%u", idx);
601   stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
602   g_free (name);
603   name = g_strdup_printf ("recv_rtp_sink_%u", idx);
604   stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
605   g_free (name);
606   name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
607   stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
608   g_free (name);
609
610   /* get the session */
611   g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
612
613   g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
614       stream);
615   g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
616       stream);
617   g_signal_connect (stream->session, "on-ssrc-active",
618       (GCallback) on_ssrc_active, stream);
619   g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
620       stream);
621   g_signal_connect (stream->session, "on-bye-timeout",
622       (GCallback) on_bye_timeout, stream);
623   g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
624       stream);
625
626   for (i = 0; i < 2; i++) {
627     /* For the sender we create this bit of pipeline for both
628      * RTP and RTCP. Sync and preroll are enabled on udpsink so
629      * we need to add a queue before appsink to make the pipeline
630      * not block. For the TCP case, we want to pump data to the
631      * client as fast as possible anyway.
632      *
633      * .--------.      .-----.    .---------.
634      * | rtpbin |      | tee |    | udpsink |
635      * |       send->sink   src->sink       |
636      * '--------'      |     |    '---------'
637      *                 |     |    .---------.    .---------.
638      *                 |     |    |  queue  |    | appsink |
639      *                 |    src->sink      src->sink       |
640      *                 '-----'    '---------'    '---------'
641      */
642     /* make tee for RTP/RTCP */
643     stream->tee[i] = gst_element_factory_make ("tee", NULL);
644     gst_bin_add (bin, stream->tee[i]);
645
646     /* and link to rtpbin send pad */
647     pad = gst_element_get_static_pad (stream->tee[i], "sink");
648     gst_pad_link (stream->send_src[i], pad);
649     gst_object_unref (pad);
650
651     /* add udpsink */
652     gst_bin_add (bin, stream->udpsink[i]);
653
654     /* link tee to udpsink */
655     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
656     pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
657     gst_pad_link (teepad, pad);
658     gst_object_unref (pad);
659     gst_object_unref (teepad);
660
661     /* make queue */
662     stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
663     gst_bin_add (bin, stream->appqueue[i]);
664     /* and link to tee */
665     teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
666     pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
667     gst_pad_link (teepad, pad);
668     gst_object_unref (pad);
669     gst_object_unref (teepad);
670
671     /* make appsink */
672     stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
673     g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
674     g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
675     gst_bin_add (bin, stream->appsink[i]);
676     gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
677         &sink_cb, stream, NULL);
678     /* and link to queue */
679     queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
680     pad = gst_element_get_static_pad (stream->appsink[i], "sink");
681     gst_pad_link (queuepad, pad);
682     gst_object_unref (pad);
683     gst_object_unref (queuepad);
684
685     /* For the receiver we create this bit of pipeline for both
686      * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
687      * and it is all funneled into the rtpbin receive pad.
688      *
689      * .--------.     .--------.    .--------.
690      * | udpsrc |     | funnel |    | rtpbin |
691      * |       src->sink      src->sink      |
692      * '--------'     |        |    '--------'
693      * .--------.     |        |
694      * | appsrc |     |        |
695      * |       src->sink       |
696      * '--------'     '--------'
697      */
698     /* make funnel for the RTP/RTCP receivers */
699     stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
700     gst_bin_add (bin, stream->funnel[i]);
701
702     pad = gst_element_get_static_pad (stream->funnel[i], "src");
703     gst_pad_link (pad, stream->recv_sink[i]);
704     gst_object_unref (pad);
705
706     /* add udpsrc */
707     gst_bin_add (bin, stream->udpsrc[i]);
708     /* and link to the funnel */
709     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
710     pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
711     gst_pad_link (pad, selpad);
712     gst_object_unref (pad);
713     gst_object_unref (selpad);
714
715     /* make and add appsrc */
716     stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
717     gst_bin_add (bin, stream->appsrc[i]);
718     /* and link to the funnel */
719     selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
720     pad = gst_element_get_static_pad (stream->appsrc[i], "src");
721     gst_pad_link (pad, selpad);
722     gst_object_unref (pad);
723     gst_object_unref (selpad);
724
725     /* check if we need to set to a special state */
726     if (state != GST_STATE_NULL) {
727       gst_element_set_state (stream->udpsink[i], state);
728       gst_element_set_state (stream->appsink[i], state);
729       gst_element_set_state (stream->appqueue[i], state);
730       gst_element_set_state (stream->tee[i], state);
731       gst_element_set_state (stream->funnel[i], state);
732       gst_element_set_state (stream->appsrc[i], state);
733     }
734     /* we set and keep these to playing so that they don't cause NO_PREROLL return
735      * values */
736     gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
737     gst_element_set_locked_state (stream->udpsrc[i], TRUE);
738   }
739
740   /* be notified of caps changes */
741   stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
742       (GCallback) caps_notify, stream);
743
744   stream->is_joined = TRUE;
745
746   return TRUE;
747
748   /* ERRORS */
749 no_ports:
750   {
751     GST_WARNING ("failed to allocate ports %d", idx);
752     return FALSE;
753   }
754 link_failed:
755   {
756     GST_WARNING ("failed to link stream %d", idx);
757     gst_object_unref (stream->send_rtp_sink);
758     stream->send_rtp_sink = NULL;
759     return FALSE;
760   }
761 }
762
763 /**
764  * gst_rtsp_stream_leave_bin:
765  * @stream: a #GstRTSPStream
766  * @bin: a #GstBin
767  * @rtpbin: a rtpbin #GstElement
768  *
769  * Remove the elements of @stream from @bin. @bin must be set
770  * to the NULL state before calling this.
771  *
772  * Return: %TRUE on success.
773  */
774 gboolean
775 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
776     GstElement * rtpbin)
777 {
778   gint i;
779
780   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
781   g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
782   g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
783
784   if (!stream->is_joined)
785     return TRUE;
786
787   /* all transports must be removed by now */
788   g_return_val_if_fail (stream->transports == NULL, FALSE);
789
790   GST_INFO ("stream %p leaving bin", stream);
791
792   gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
793   g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
794   gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
795   gst_object_unref (stream->send_rtp_sink);
796   stream->send_rtp_sink = NULL;
797
798   for (i = 0; i < 2; i++) {
799     /* and set udpsrc to NULL now before removing */
800     gst_element_set_locked_state (stream->udpsrc[i], FALSE);
801     gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
802
803     /* removing them should also nicely release the request
804      * pads when they finalize */
805     gst_bin_remove (bin, stream->udpsrc[i]);
806     gst_bin_remove (bin, stream->udpsink[i]);
807     gst_bin_remove (bin, stream->appsrc[i]);
808     gst_bin_remove (bin, stream->appsink[i]);
809     gst_bin_remove (bin, stream->appqueue[i]);
810     gst_bin_remove (bin, stream->tee[i]);
811     gst_bin_remove (bin, stream->funnel[i]);
812
813     gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
814     gst_object_unref (stream->recv_sink[i]);
815     stream->recv_sink[i] = NULL;
816
817     stream->udpsrc[i] = NULL;
818     stream->udpsink[i] = NULL;
819     stream->appsrc[i] = NULL;
820     stream->appsink[i] = NULL;
821     stream->appqueue[i] = NULL;
822     stream->tee[i] = NULL;
823     stream->funnel[i] = NULL;
824   }
825   gst_object_unref (stream->send_src[0]);
826   stream->send_src[0] = NULL;
827
828   gst_element_release_request_pad (rtpbin, stream->send_src[1]);
829   gst_object_unref (stream->send_src[1]);
830   stream->send_src[1] = NULL;
831
832   g_object_unref (stream->session);
833   if (stream->caps)
834     gst_caps_unref (stream->caps);
835
836   stream->is_joined = FALSE;
837
838   return TRUE;
839 }
840
841 /**
842  * gst_rtsp_stream_get_rtpinfo:
843  * @stream: a #GstRTSPStream
844  * @rtptime: result RTP timestamp
845  * @seq: result RTP seqnum
846  *
847  * Retrieve the current rtptime and seq. This is used to
848  * construct a RTPInfo reply header.
849  *
850  * Returns: %TRUE when rtptime and seq could be determined.
851  */
852 gboolean
853 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
854     guint * rtptime, guint * seq)
855 {
856   GObjectClass *payobjclass;
857
858   payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
859
860   if (!g_object_class_find_property (payobjclass, "seqnum") ||
861       !g_object_class_find_property (payobjclass, "timestamp"))
862     return FALSE;
863
864   g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
865
866   return TRUE;
867 }
868
869 /**
870  * gst_rtsp_stream_recv_rtp:
871  * @stream: a #GstRTSPStream
872  * @buffer: (transfer full): a #GstBuffer
873  *
874  * Handle an RTP buffer for the stream. This method is usually called when a
875  * message has been received from a client using the TCP transport.
876  *
877  * This function takes ownership of @buffer.
878  *
879  * Returns: a GstFlowReturn.
880  */
881 GstFlowReturn
882 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
883 {
884   GstFlowReturn ret;
885
886   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
887   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
888   g_return_val_if_fail (stream->is_joined, FALSE);
889
890   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[0]), buffer);
891
892   return ret;
893 }
894
895 /**
896  * gst_rtsp_stream_recv_rtcp:
897  * @stream: a #GstRTSPStream
898  * @buffer: (transfer full): a #GstBuffer
899  *
900  * Handle an RTCP buffer for the stream. This method is usually called when a
901  * message has been received from a client using the TCP transport.
902  *
903  * This function takes ownership of @buffer.
904  *
905  * Returns: a GstFlowReturn.
906  */
907 GstFlowReturn
908 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
909 {
910   GstFlowReturn ret;
911
912   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
913   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
914   g_return_val_if_fail (stream->is_joined, FALSE);
915
916   ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (stream->appsrc[1]), buffer);
917
918   return ret;
919 }
920
921 static gboolean
922 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
923     gboolean add)
924 {
925   GstRTSPTransport *tr;
926   gboolean updated;
927
928   updated = FALSE;
929
930   tr = trans->transport;
931
932   switch (tr->lower_transport) {
933     case GST_RTSP_LOWER_TRANS_UDP:
934     case GST_RTSP_LOWER_TRANS_UDP_MCAST:
935     {
936       gchar *dest;
937       gint min, max;
938       guint ttl = 0;
939
940       dest = tr->destination;
941       if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
942         min = tr->port.min;
943         max = tr->port.max;
944         ttl = tr->ttl;
945       } else {
946         min = tr->client_port.min;
947         max = tr->client_port.max;
948       }
949
950       if (add && !trans->active) {
951         GST_INFO ("adding %s:%d-%d", dest, min, max);
952         g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
953         g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
954         if (ttl > 0) {
955           GST_INFO ("setting ttl-mc %d", ttl);
956           g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
957           g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
958         }
959         stream->transports = g_list_prepend (stream->transports, trans);
960         trans->active = TRUE;
961         updated = TRUE;
962       } else if (trans->active) {
963         GST_INFO ("removing %s:%d-%d", dest, min, max);
964         g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
965         g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
966         stream->transports = g_list_remove (stream->transports, trans);
967         trans->active = FALSE;
968         updated = TRUE;
969       }
970       break;
971     }
972     case GST_RTSP_LOWER_TRANS_TCP:
973       if (add && !trans->active) {
974         GST_INFO ("adding TCP %s", tr->destination);
975         stream->transports = g_list_prepend (stream->transports, trans);
976         trans->active = TRUE;
977         updated = TRUE;
978       } else if (trans->active) {
979         GST_INFO ("removing TCP %s", tr->destination);
980         stream->transports = g_list_remove (stream->transports, trans);
981         trans->active = FALSE;
982         updated = TRUE;
983       }
984       break;
985     default:
986       GST_INFO ("Unknown transport %d", tr->lower_transport);
987       break;
988   }
989   return updated;
990 }
991
992
993 /**
994  * gst_rtsp_stream_add_transport:
995  * @stream: a #GstRTSPStream
996  * @trans: a #GstRTSPStreamTransport
997  *
998  * Add the transport in @trans to @stream. The media of @stream will
999  * then also be send to the values configured in @trans.
1000  *
1001  * @stream must be joined to a bin.
1002  *
1003  * @trans must contain a valid #GstRTSPTransport.
1004  *
1005  * Returns: %TRUE if @trans was added
1006  */
1007 gboolean
1008 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1009     GstRTSPStreamTransport * trans)
1010 {
1011   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1012   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1013   g_return_val_if_fail (stream->is_joined, FALSE);
1014   g_return_val_if_fail (trans->transport != NULL, FALSE);
1015
1016   return update_transport (stream, trans, TRUE);
1017 }
1018
1019 /**
1020  * gst_rtsp_stream_remove_transport:
1021  * @stream: a #GstRTSPStream
1022  * @trans: a #GstRTSPStreamTransport
1023  *
1024  * Remove the transport in @trans from @stream. The media of @stream will
1025  * not be sent to the values configured in @trans.
1026  *
1027  * @stream must be joined to a bin.
1028  *
1029  * @trans must contain a valid #GstRTSPTransport.
1030  *
1031  * Returns: %TRUE if @trans was removed
1032  */
1033 gboolean
1034 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1035     GstRTSPStreamTransport * trans)
1036 {
1037   g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1038   g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1039   g_return_val_if_fail (stream->is_joined, FALSE);
1040   g_return_val_if_fail (trans->transport != NULL, FALSE);
1041
1042   return update_transport (stream, trans, FALSE);
1043 }