2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
28 #include "rtsp-stream.h"
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
39 static GQuark ssrc_stream_map_key;
41 static void gst_rtsp_stream_finalize (GObject * obj);
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
48 GObjectClass *gobject_class;
50 gobject_class = G_OBJECT_CLASS (klass);
52 gobject_class->finalize = gst_rtsp_stream_finalize;
54 GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
56 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
60 gst_rtsp_stream_init (GstRTSPStream * stream)
62 g_mutex_init (&stream->lock);
66 gst_rtsp_stream_finalize (GObject * obj)
68 GstRTSPStream *stream;
70 stream = GST_RTSP_STREAM (obj);
72 /* we really need to be unjoined now */
73 g_return_if_fail (!stream->is_joined);
75 gst_object_unref (stream->payloader);
76 gst_object_unref (stream->srcpad);
77 g_mutex_clear (&stream->lock);
79 G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
83 * gst_rtsp_stream_new:
86 * @payloader: a #GstElement
88 * Create a new media stream with index @idx that handles RTP data on
89 * @srcpad and has a payloader element @payloader.
91 * Returns: a new #GstRTSPStream
94 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
96 GstRTSPStream *stream;
98 g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
99 g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
100 g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
102 stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
104 stream->payloader = gst_object_ref (payloader);
105 stream->srcpad = gst_object_ref (srcpad);
111 * gst_rtsp_stream_set_mtu:
112 * @stream: a #GstRTSPStream
115 * Configure the mtu in the payloader of @stream to @mtu.
118 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
120 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
122 g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
126 * gst_rtsp_stream_get_mtu:
127 * @stream: a #GstRTSPStream
129 * Get the configured MTU in the payloader of @stream.
131 * Returns: the MTU of the payloader.
134 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
138 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
140 g_object_get (G_OBJECT (stream->payloader), "mtu", &mtu, NULL);
145 /* must be called with lock */
147 alloc_ports (GstRTSPStream * stream)
149 GstStateChangeReturn ret;
150 GstElement *udpsrc0, *udpsrc1;
151 GstElement *udpsink0, *udpsink1;
152 gint tmp_rtp, tmp_rtcp;
154 gint rtpport, rtcpport;
158 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
166 /* Start with random port */
170 host = "udp://[::0]";
172 host = "udp://0.0.0.0";
174 /* try to allocate 2 UDP ports, the RTP port should be an even
175 * number and the RTCP port should be the next (uneven) port */
177 udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
179 goto no_udp_protocol;
180 g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
182 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
183 if (ret == GST_STATE_CHANGE_FAILURE) {
189 gst_element_set_state (udpsrc0, GST_STATE_NULL);
190 gst_object_unref (udpsrc0);
194 goto no_udp_protocol;
197 g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
199 /* check if port is even */
200 if ((tmp_rtp & 1) != 0) {
201 /* port not even, close and allocate another */
205 gst_element_set_state (udpsrc0, GST_STATE_NULL);
206 gst_object_unref (udpsrc0);
212 /* allocate port+1 for RTCP now */
213 udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
215 goto no_udp_rtcp_protocol;
218 tmp_rtcp = tmp_rtp + 1;
219 g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
221 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
222 /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
223 if (ret == GST_STATE_CHANGE_FAILURE) {
228 gst_element_set_state (udpsrc0, GST_STATE_NULL);
229 gst_object_unref (udpsrc0);
231 gst_element_set_state (udpsrc1, GST_STATE_NULL);
232 gst_object_unref (udpsrc1);
237 /* all fine, do port check */
238 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
239 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
241 /* this should not happen... */
242 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
245 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
247 goto no_udp_protocol;
249 g_object_get (G_OBJECT (udpsrc0), "socket", &socket, NULL);
250 g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
251 g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
253 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
255 goto no_udp_protocol;
257 if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
258 "send-duplicates")) {
259 g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
260 g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
263 ("old multiudpsink version found without send-duplicates property");
266 if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
268 g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
271 GST_WARNING ("multiudpsink version found without buffer-size property");
274 g_object_get (G_OBJECT (udpsrc1), "socket", &socket, NULL);
275 g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
276 g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
277 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
278 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
279 g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
280 g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
281 g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
282 g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
284 /* we keep these elements, we will further configure them when the
285 * client told us to really use the UDP ports. */
286 stream->udpsrc[0] = udpsrc0;
287 stream->udpsrc[1] = udpsrc1;
288 stream->udpsink[0] = udpsink0;
289 stream->udpsink[1] = udpsink1;
290 stream->server_port.min = rtpport;
291 stream->server_port.max = rtcpport;
304 no_udp_rtcp_protocol:
315 gst_element_set_state (udpsrc0, GST_STATE_NULL);
316 gst_object_unref (udpsrc0);
319 gst_element_set_state (udpsrc1, GST_STATE_NULL);
320 gst_object_unref (udpsrc1);
323 gst_element_set_state (udpsink0, GST_STATE_NULL);
324 gst_object_unref (udpsink0);
327 gst_element_set_state (udpsink1, GST_STATE_NULL);
328 gst_object_unref (udpsink1);
334 /* executed from streaming thread */
336 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
338 GstCaps *newcaps, *oldcaps;
340 newcaps = gst_pad_get_current_caps (pad);
342 GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
345 g_mutex_lock (&stream->lock);
346 oldcaps = stream->caps;
347 stream->caps = newcaps;
348 g_mutex_unlock (&stream->lock);
351 gst_caps_unref (oldcaps);
355 dump_structure (const GstStructure * s)
359 sstr = gst_structure_to_string (s);
360 GST_INFO ("structure: %s", sstr);
364 static GstRTSPStreamTransport *
365 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
368 GstRTSPStreamTransport *result = NULL;
373 if (rtcp_from == NULL)
376 tmp = g_strrstr (rtcp_from, ":");
380 port = atoi (tmp + 1);
381 dest = g_strndup (rtcp_from, tmp - rtcp_from);
383 g_mutex_lock (&stream->lock);
384 GST_INFO ("finding %s:%d in %d transports", dest, port,
385 g_list_length (stream->transports));
387 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
388 GstRTSPStreamTransport *trans = walk->data;
391 min = trans->transport->client_port.min;
392 max = trans->transport->client_port.max;
394 if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
400 g_mutex_unlock (&stream->lock);
407 static GstRTSPStreamTransport *
408 check_transport (GObject * source, GstRTSPStream * stream)
411 GstRTSPStreamTransport *trans;
413 /* see if we have a stream to match with the origin of the RTCP packet */
414 trans = g_object_get_qdata (source, ssrc_stream_map_key);
416 g_object_get (source, "stats", &stats, NULL);
418 const gchar *rtcp_from;
420 dump_structure (stats);
422 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
423 if ((trans = find_transport (stream, rtcp_from))) {
424 GST_INFO ("%p: found transport %p for source %p", stream, trans,
427 /* keep ref to the source */
428 trans->rtpsource = source;
430 g_object_set_qdata (source, ssrc_stream_map_key, trans);
432 gst_structure_free (stats);
441 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
443 GstRTSPStreamTransport *trans;
445 GST_INFO ("%p: new source %p", stream, source);
447 trans = check_transport (source, stream);
450 GST_INFO ("%p: source %p for transport %p", stream, source, trans);
454 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
456 GST_INFO ("%p: new SDES %p", stream, source);
460 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
462 GstRTSPStreamTransport *trans;
464 trans = check_transport (source, stream);
467 GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
468 gst_rtsp_stream_transport_keep_alive (trans);
473 g_object_get (source, "stats", &stats, NULL);
475 dump_structure (stats);
476 gst_structure_free (stats);
483 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
485 GST_INFO ("%p: source %p bye", stream, source);
489 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
491 GstRTSPStreamTransport *trans;
493 GST_INFO ("%p: source %p bye timeout", stream, source);
495 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
496 trans->rtpsource = NULL;
497 trans->timeout = TRUE;
502 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
504 GstRTSPStreamTransport *trans;
506 GST_INFO ("%p: source %p timeout", stream, source);
508 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
509 trans->rtpsource = NULL;
510 trans->timeout = TRUE;
515 handle_new_sample (GstAppSink * sink, gpointer user_data)
520 GstRTSPStream *stream;
522 sample = gst_app_sink_pull_sample (sink);
526 stream = (GstRTSPStream *) user_data;
527 buffer = gst_sample_get_buffer (sample);
529 g_mutex_lock (&stream->lock);
530 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
531 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
533 if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
534 gst_rtsp_stream_transport_send_rtp (tr, buffer);
536 gst_rtsp_stream_transport_send_rtcp (tr, buffer);
539 g_mutex_unlock (&stream->lock);
541 gst_sample_unref (sample);
546 static GstAppSinkCallbacks sink_cb = {
547 NULL, /* not interested in EOS */
548 NULL, /* not interested in preroll samples */
553 * gst_rtsp_stream_join_bin:
554 * @stream: a #GstRTSPStream
555 * @bin: a #GstBin to join
556 * @rtpbin: a rtpbin element in @bin
557 * @state: the target state of the new elements
559 * Join the #Gstbin @bin that contains the element @rtpbin.
561 * @stream will link to @rtpbin, which must be inside @bin. The elements
562 * added to @bin will be set to the state given in @state.
564 * Returns: %TRUE on success.
567 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
568 GstElement * rtpbin, GstState state)
572 GstPad *pad, *teepad, *queuepad, *selpad;
573 GstPadLinkReturn ret;
575 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
576 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
577 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
579 g_mutex_lock (&stream->lock);
580 if (stream->is_joined)
583 /* create a session with the same index as the stream */
586 GST_INFO ("stream %p joining bin as session %d", stream, idx);
588 if (!alloc_ports (stream))
591 /* get a pad for sending RTP */
592 name = g_strdup_printf ("send_rtp_sink_%u", idx);
593 stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
595 /* link the RTP pad to the session manager, it should not really fail unless
596 * this is not really an RTP pad */
597 ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
598 if (ret != GST_PAD_LINK_OK)
601 /* get pads from the RTP session element for sending and receiving
603 name = g_strdup_printf ("send_rtp_src_%u", idx);
604 stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
606 name = g_strdup_printf ("send_rtcp_src_%u", idx);
607 stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
609 name = g_strdup_printf ("recv_rtp_sink_%u", idx);
610 stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
612 name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
613 stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
616 /* get the session */
617 g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
619 g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
621 g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
623 g_signal_connect (stream->session, "on-ssrc-active",
624 (GCallback) on_ssrc_active, stream);
625 g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
627 g_signal_connect (stream->session, "on-bye-timeout",
628 (GCallback) on_bye_timeout, stream);
629 g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
632 for (i = 0; i < 2; i++) {
633 /* For the sender we create this bit of pipeline for both
634 * RTP and RTCP. Sync and preroll are enabled on udpsink so
635 * we need to add a queue before appsink to make the pipeline
636 * not block. For the TCP case, we want to pump data to the
637 * client as fast as possible anyway.
639 * .--------. .-----. .---------.
640 * | rtpbin | | tee | | udpsink |
641 * | send->sink src->sink |
642 * '--------' | | '---------'
643 * | | .---------. .---------.
644 * | | | queue | | appsink |
645 * | src->sink src->sink |
646 * '-----' '---------' '---------'
648 /* make tee for RTP/RTCP */
649 stream->tee[i] = gst_element_factory_make ("tee", NULL);
650 gst_bin_add (bin, stream->tee[i]);
652 /* and link to rtpbin send pad */
653 pad = gst_element_get_static_pad (stream->tee[i], "sink");
654 gst_pad_link (stream->send_src[i], pad);
655 gst_object_unref (pad);
658 gst_bin_add (bin, stream->udpsink[i]);
660 /* link tee to udpsink */
661 teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
662 pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
663 gst_pad_link (teepad, pad);
664 gst_object_unref (pad);
665 gst_object_unref (teepad);
668 stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
669 gst_bin_add (bin, stream->appqueue[i]);
670 /* and link to tee */
671 teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
672 pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
673 gst_pad_link (teepad, pad);
674 gst_object_unref (pad);
675 gst_object_unref (teepad);
678 stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
679 g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
680 g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
681 gst_bin_add (bin, stream->appsink[i]);
682 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
683 &sink_cb, stream, NULL);
684 /* and link to queue */
685 queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
686 pad = gst_element_get_static_pad (stream->appsink[i], "sink");
687 gst_pad_link (queuepad, pad);
688 gst_object_unref (pad);
689 gst_object_unref (queuepad);
691 /* For the receiver we create this bit of pipeline for both
692 * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
693 * and it is all funneled into the rtpbin receive pad.
695 * .--------. .--------. .--------.
696 * | udpsrc | | funnel | | rtpbin |
697 * | src->sink src->sink |
698 * '--------' | | '--------'
702 * '--------' '--------'
704 /* make funnel for the RTP/RTCP receivers */
705 stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
706 gst_bin_add (bin, stream->funnel[i]);
708 pad = gst_element_get_static_pad (stream->funnel[i], "src");
709 gst_pad_link (pad, stream->recv_sink[i]);
710 gst_object_unref (pad);
713 gst_bin_add (bin, stream->udpsrc[i]);
714 /* and link to the funnel */
715 selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
716 pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
717 gst_pad_link (pad, selpad);
718 gst_object_unref (pad);
719 gst_object_unref (selpad);
721 /* make and add appsrc */
722 stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
723 gst_bin_add (bin, stream->appsrc[i]);
724 /* and link to the funnel */
725 selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
726 pad = gst_element_get_static_pad (stream->appsrc[i], "src");
727 gst_pad_link (pad, selpad);
728 gst_object_unref (pad);
729 gst_object_unref (selpad);
731 /* check if we need to set to a special state */
732 if (state != GST_STATE_NULL) {
733 gst_element_set_state (stream->udpsink[i], state);
734 gst_element_set_state (stream->appsink[i], state);
735 gst_element_set_state (stream->appqueue[i], state);
736 gst_element_set_state (stream->tee[i], state);
737 gst_element_set_state (stream->funnel[i], state);
738 gst_element_set_state (stream->appsrc[i], state);
740 /* we set and keep these to playing so that they don't cause NO_PREROLL return
742 gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
743 gst_element_set_locked_state (stream->udpsrc[i], TRUE);
746 /* be notified of caps changes */
747 stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
748 (GCallback) caps_notify, stream);
750 stream->is_joined = TRUE;
751 g_mutex_unlock (&stream->lock);
758 g_mutex_unlock (&stream->lock);
763 g_mutex_unlock (&stream->lock);
764 GST_WARNING ("failed to allocate ports %d", idx);
769 GST_WARNING ("failed to link stream %d", idx);
770 gst_object_unref (stream->send_rtp_sink);
771 stream->send_rtp_sink = NULL;
772 g_mutex_unlock (&stream->lock);
778 * gst_rtsp_stream_leave_bin:
779 * @stream: a #GstRTSPStream
781 * @rtpbin: a rtpbin #GstElement
783 * Remove the elements of @stream from @bin. @bin must be set
784 * to the NULL state before calling this.
786 * Return: %TRUE on success.
789 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
794 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
795 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
796 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
798 g_mutex_lock (&stream->lock);
799 if (!stream->is_joined)
802 /* all transports must be removed by now */
803 g_return_val_if_fail (stream->transports == NULL, FALSE);
805 GST_INFO ("stream %p leaving bin", stream);
807 gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
808 g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
809 gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
810 gst_object_unref (stream->send_rtp_sink);
811 stream->send_rtp_sink = NULL;
813 for (i = 0; i < 2; i++) {
814 /* and set udpsrc to NULL now before removing */
815 gst_element_set_locked_state (stream->udpsrc[i], FALSE);
816 gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
818 /* removing them should also nicely release the request
819 * pads when they finalize */
820 gst_bin_remove (bin, stream->udpsrc[i]);
821 gst_bin_remove (bin, stream->udpsink[i]);
822 gst_bin_remove (bin, stream->appsrc[i]);
823 gst_bin_remove (bin, stream->appsink[i]);
824 gst_bin_remove (bin, stream->appqueue[i]);
825 gst_bin_remove (bin, stream->tee[i]);
826 gst_bin_remove (bin, stream->funnel[i]);
828 gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
829 gst_object_unref (stream->recv_sink[i]);
830 stream->recv_sink[i] = NULL;
832 stream->udpsrc[i] = NULL;
833 stream->udpsink[i] = NULL;
834 stream->appsrc[i] = NULL;
835 stream->appsink[i] = NULL;
836 stream->appqueue[i] = NULL;
837 stream->tee[i] = NULL;
838 stream->funnel[i] = NULL;
840 gst_object_unref (stream->send_src[0]);
841 stream->send_src[0] = NULL;
843 gst_element_release_request_pad (rtpbin, stream->send_src[1]);
844 gst_object_unref (stream->send_src[1]);
845 stream->send_src[1] = NULL;
847 g_object_unref (stream->session);
849 gst_caps_unref (stream->caps);
851 stream->is_joined = FALSE;
852 g_mutex_unlock (&stream->lock);
863 * gst_rtsp_stream_get_rtpinfo:
864 * @stream: a #GstRTSPStream
865 * @rtptime: result RTP timestamp
866 * @seq: result RTP seqnum
868 * Retrieve the current rtptime and seq. This is used to
869 * construct a RTPInfo reply header.
871 * Returns: %TRUE when rtptime and seq could be determined.
874 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
875 guint * rtptime, guint * seq)
877 GObjectClass *payobjclass;
879 payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
881 if (!g_object_class_find_property (payobjclass, "seqnum") ||
882 !g_object_class_find_property (payobjclass, "timestamp"))
885 g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
891 * gst_rtsp_stream_recv_rtp:
892 * @stream: a #GstRTSPStream
893 * @buffer: (transfer full): a #GstBuffer
895 * Handle an RTP buffer for the stream. This method is usually called when a
896 * message has been received from a client using the TCP transport.
898 * This function takes ownership of @buffer.
900 * Returns: a GstFlowReturn.
903 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
908 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
909 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
910 g_return_val_if_fail (stream->is_joined, FALSE);
912 g_mutex_lock (&stream->lock);
913 element = gst_object_ref (stream->appsrc[0]);
914 g_mutex_unlock (&stream->lock);
916 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
918 gst_object_unref (element);
924 * gst_rtsp_stream_recv_rtcp:
925 * @stream: a #GstRTSPStream
926 * @buffer: (transfer full): a #GstBuffer
928 * Handle an RTCP buffer for the stream. This method is usually called when a
929 * message has been received from a client using the TCP transport.
931 * This function takes ownership of @buffer.
933 * Returns: a GstFlowReturn.
936 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
941 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
942 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
943 g_return_val_if_fail (stream->is_joined, FALSE);
945 g_mutex_lock (&stream->lock);
946 element = gst_object_ref (stream->appsrc[1]);
947 g_mutex_unlock (&stream->lock);
949 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
951 gst_object_unref (element);
956 /* must be called with lock */
958 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
961 GstRTSPTransport *tr;
966 tr = trans->transport;
968 switch (tr->lower_transport) {
969 case GST_RTSP_LOWER_TRANS_UDP:
970 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
976 dest = tr->destination;
977 if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
982 min = tr->client_port.min;
983 max = tr->client_port.max;
986 if (add && !trans->active) {
987 GST_INFO ("adding %s:%d-%d", dest, min, max);
988 g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
989 g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
991 GST_INFO ("setting ttl-mc %d", ttl);
992 g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
993 g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
995 stream->transports = g_list_prepend (stream->transports, trans);
996 trans->active = TRUE;
998 } else if (trans->active) {
999 GST_INFO ("removing %s:%d-%d", dest, min, max);
1000 g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1001 g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1002 stream->transports = g_list_remove (stream->transports, trans);
1003 trans->active = FALSE;
1008 case GST_RTSP_LOWER_TRANS_TCP:
1009 if (add && !trans->active) {
1010 GST_INFO ("adding TCP %s", tr->destination);
1011 stream->transports = g_list_prepend (stream->transports, trans);
1012 trans->active = TRUE;
1014 } else if (trans->active) {
1015 GST_INFO ("removing TCP %s", tr->destination);
1016 stream->transports = g_list_remove (stream->transports, trans);
1017 trans->active = FALSE;
1022 GST_INFO ("Unknown transport %d", tr->lower_transport);
1030 * gst_rtsp_stream_add_transport:
1031 * @stream: a #GstRTSPStream
1032 * @trans: a #GstRTSPStreamTransport
1034 * Add the transport in @trans to @stream. The media of @stream will
1035 * then also be send to the values configured in @trans.
1037 * @stream must be joined to a bin.
1039 * @trans must contain a valid #GstRTSPTransport.
1041 * Returns: %TRUE if @trans was added
1044 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1045 GstRTSPStreamTransport * trans)
1049 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1050 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1051 g_return_val_if_fail (stream->is_joined, FALSE);
1052 g_return_val_if_fail (trans->transport != NULL, FALSE);
1054 g_mutex_lock (&stream->lock);
1055 res = update_transport (stream, trans, TRUE);
1056 g_mutex_unlock (&stream->lock);
1062 * gst_rtsp_stream_remove_transport:
1063 * @stream: a #GstRTSPStream
1064 * @trans: a #GstRTSPStreamTransport
1066 * Remove the transport in @trans from @stream. The media of @stream will
1067 * not be sent to the values configured in @trans.
1069 * @stream must be joined to a bin.
1071 * @trans must contain a valid #GstRTSPTransport.
1073 * Returns: %TRUE if @trans was removed
1076 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1077 GstRTSPStreamTransport * trans)
1081 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1082 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1083 g_return_val_if_fail (stream->is_joined, FALSE);
1084 g_return_val_if_fail (trans->transport != NULL, FALSE);
1086 g_mutex_lock (&stream->lock);
1087 res = update_transport (stream, trans, FALSE);
1088 g_mutex_unlock (&stream->lock);