2 * Copyright (C) 2008 Wim Taymans <wim.taymans at gmail.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
25 #include <gst/app/gstappsrc.h>
26 #include <gst/app/gstappsink.h>
28 #include "rtsp-stream.h"
36 GST_DEBUG_CATEGORY_STATIC (rtsp_stream_debug);
37 #define GST_CAT_DEFAULT rtsp_stream_debug
39 static GQuark ssrc_stream_map_key;
41 static void gst_rtsp_stream_finalize (GObject * obj);
43 G_DEFINE_TYPE (GstRTSPStream, gst_rtsp_stream, G_TYPE_OBJECT);
46 gst_rtsp_stream_class_init (GstRTSPStreamClass * klass)
48 GObjectClass *gobject_class;
50 gobject_class = G_OBJECT_CLASS (klass);
52 gobject_class->finalize = gst_rtsp_stream_finalize;
54 GST_DEBUG_CATEGORY_INIT (rtsp_stream_debug, "rtspstream", 0, "GstRTSPStream");
56 ssrc_stream_map_key = g_quark_from_static_string ("GstRTSPServer.stream");
60 gst_rtsp_stream_init (GstRTSPStream * stream)
62 GST_DEBUG ("new stream %p", stream);
64 g_mutex_init (&stream->lock);
68 gst_rtsp_stream_finalize (GObject * obj)
70 GstRTSPStream *stream;
72 stream = GST_RTSP_STREAM (obj);
74 GST_DEBUG ("finalize stream %p", stream);
76 /* we really need to be unjoined now */
77 g_return_if_fail (!stream->is_joined);
80 gst_rtsp_address_free (stream->addr);
82 g_object_unref (stream->pool);
83 gst_object_unref (stream->payloader);
84 gst_object_unref (stream->srcpad);
85 g_mutex_clear (&stream->lock);
87 G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
91 * gst_rtsp_stream_new:
94 * @payloader: a #GstElement
96 * Create a new media stream with index @idx that handles RTP data on
97 * @srcpad and has a payloader element @payloader.
99 * Returns: a new #GstRTSPStream
102 gst_rtsp_stream_new (guint idx, GstElement * payloader, GstPad * srcpad)
104 GstRTSPStream *stream;
106 g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
107 g_return_val_if_fail (GST_IS_PAD (srcpad), NULL);
108 g_return_val_if_fail (GST_PAD_IS_SRC (srcpad), NULL);
110 stream = g_object_new (GST_TYPE_RTSP_STREAM, NULL);
112 stream->payloader = gst_object_ref (payloader);
113 stream->srcpad = gst_object_ref (srcpad);
119 * gst_rtsp_stream_set_mtu:
120 * @stream: a #GstRTSPStream
123 * Configure the mtu in the payloader of @stream to @mtu.
126 gst_rtsp_stream_set_mtu (GstRTSPStream * stream, guint mtu)
128 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
130 GST_LOG_OBJECT (stream, "set MTU %u", mtu);
132 g_object_set (G_OBJECT (stream->payloader), "mtu", mtu, NULL);
136 * gst_rtsp_stream_get_mtu:
137 * @stream: a #GstRTSPStream
139 * Get the configured MTU in the payloader of @stream.
141 * Returns: the MTU of the payloader.
144 gst_rtsp_stream_get_mtu (GstRTSPStream * stream)
148 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), 0);
150 g_object_get (G_OBJECT (stream->payloader), "mtu", &mtu, NULL);
156 * gst_rtsp_stream_set_address_pool:
157 * @stream: a #GstRTSPStream
158 * @pool: a #GstRTSPAddressPool
160 * configure @pool to be used as the address pool of @stream.
163 gst_rtsp_stream_set_address_pool (GstRTSPStream * stream,
164 GstRTSPAddressPool * pool)
166 GstRTSPAddressPool *old;
168 g_return_if_fail (GST_IS_RTSP_STREAM (stream));
170 GST_LOG_OBJECT (stream, "set address pool %p", pool);
172 g_mutex_lock (&stream->lock);
173 if ((old = stream->pool) != pool)
174 stream->pool = pool ? g_object_ref (pool) : NULL;
177 g_mutex_unlock (&stream->lock);
180 g_object_unref (old);
184 * gst_rtsp_stream_get_address_pool:
185 * @stream: a #GstRTSPStream
187 * Get the #GstRTSPAddressPool used as the address pool of @stream.
189 * Returns: (transfer full): the #GstRTSPAddressPool of @stream. g_object_unref() after
193 gst_rtsp_stream_get_address_pool (GstRTSPStream * stream)
195 GstRTSPAddressPool *result;
197 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), NULL);
199 g_mutex_lock (&stream->lock);
200 if ((result = stream->pool))
201 g_object_ref (result);
202 g_mutex_unlock (&stream->lock);
208 * gst_rtsp_stream_get_address:
209 * @stream: a #GstRTSPStream
211 * Get the multicast address of @stream.
213 * Returns: the #GstRTSPAddress of @stream or %NULL when no address could be
214 * allocated. gst_rtsp_address_free() after usage.
217 gst_rtsp_stream_get_address (GstRTSPStream * stream)
219 GstRTSPAddress *result;
221 g_mutex_lock (&stream->lock);
222 if (stream->addr == NULL) {
223 if (stream->pool == NULL)
226 stream->addr = gst_rtsp_address_pool_acquire_address (stream->pool,
227 GST_RTSP_ADDRESS_FLAG_EVEN_PORT, 2);
228 if (stream->addr == NULL)
231 result = gst_rtsp_address_copy (stream->addr);
232 g_mutex_unlock (&stream->lock);
239 GST_ERROR_OBJECT (stream, "no address pool specified");
240 g_mutex_unlock (&stream->lock);
245 GST_ERROR_OBJECT (stream, "failed to acquire address from pool");
246 g_mutex_unlock (&stream->lock);
251 /* must be called with lock */
253 alloc_ports (GstRTSPStream * stream)
255 GstStateChangeReturn ret;
256 GstElement *udpsrc0, *udpsrc1;
257 GstElement *udpsink0, *udpsink1;
258 gint tmp_rtp, tmp_rtcp;
260 gint rtpport, rtcpport;
264 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
272 /* Start with random port */
276 host = "udp://[::0]";
278 host = "udp://0.0.0.0";
280 /* try to allocate 2 UDP ports, the RTP port should be an even
281 * number and the RTCP port should be the next (uneven) port */
283 udpsrc0 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
285 goto no_udp_protocol;
286 g_object_set (G_OBJECT (udpsrc0), "port", tmp_rtp, NULL);
288 ret = gst_element_set_state (udpsrc0, GST_STATE_PAUSED);
289 if (ret == GST_STATE_CHANGE_FAILURE) {
295 gst_element_set_state (udpsrc0, GST_STATE_NULL);
296 gst_object_unref (udpsrc0);
300 goto no_udp_protocol;
303 g_object_get (G_OBJECT (udpsrc0), "port", &tmp_rtp, NULL);
305 /* check if port is even */
306 if ((tmp_rtp & 1) != 0) {
307 /* port not even, close and allocate another */
311 gst_element_set_state (udpsrc0, GST_STATE_NULL);
312 gst_object_unref (udpsrc0);
318 /* allocate port+1 for RTCP now */
319 udpsrc1 = gst_element_make_from_uri (GST_URI_SRC, host, NULL, NULL);
321 goto no_udp_rtcp_protocol;
324 tmp_rtcp = tmp_rtp + 1;
325 g_object_set (G_OBJECT (udpsrc1), "port", tmp_rtcp, NULL);
327 ret = gst_element_set_state (udpsrc1, GST_STATE_PAUSED);
328 /* tmp_rtcp port is busy already : retry to make rtp/rtcp pair */
329 if (ret == GST_STATE_CHANGE_FAILURE) {
334 gst_element_set_state (udpsrc0, GST_STATE_NULL);
335 gst_object_unref (udpsrc0);
337 gst_element_set_state (udpsrc1, GST_STATE_NULL);
338 gst_object_unref (udpsrc1);
343 /* all fine, do port check */
344 g_object_get (G_OBJECT (udpsrc0), "port", &rtpport, NULL);
345 g_object_get (G_OBJECT (udpsrc1), "port", &rtcpport, NULL);
347 /* this should not happen... */
348 if (rtpport != tmp_rtp || rtcpport != tmp_rtcp)
351 udpsink0 = gst_element_factory_make ("multiudpsink", NULL);
353 goto no_udp_protocol;
355 g_object_get (G_OBJECT (udpsrc0), "used-socket", &socket, NULL);
356 g_object_set (G_OBJECT (udpsink0), "socket", socket, NULL);
357 g_object_unref (socket);
358 g_object_set (G_OBJECT (udpsink0), "close-socket", FALSE, NULL);
360 udpsink1 = gst_element_factory_make ("multiudpsink", NULL);
362 goto no_udp_protocol;
364 if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
365 "send-duplicates")) {
366 g_object_set (G_OBJECT (udpsink0), "send-duplicates", FALSE, NULL);
367 g_object_set (G_OBJECT (udpsink1), "send-duplicates", FALSE, NULL);
370 ("old multiudpsink version found without send-duplicates property");
373 if (g_object_class_find_property (G_OBJECT_GET_CLASS (udpsink0),
375 g_object_set (G_OBJECT (udpsink0), "buffer-size", stream->buffer_size,
378 GST_WARNING ("multiudpsink version found without buffer-size property");
381 g_object_get (G_OBJECT (udpsrc1), "used-socket", &socket, NULL);
382 g_object_set (G_OBJECT (udpsink1), "socket", socket, NULL);
383 g_object_unref (socket);
384 g_object_set (G_OBJECT (udpsink1), "close-socket", FALSE, NULL);
385 g_object_set (G_OBJECT (udpsink1), "sync", FALSE, NULL);
386 g_object_set (G_OBJECT (udpsink1), "async", FALSE, NULL);
387 g_object_set (G_OBJECT (udpsink0), "auto-multicast", FALSE, NULL);
388 g_object_set (G_OBJECT (udpsink0), "loop", FALSE, NULL);
389 g_object_set (G_OBJECT (udpsink1), "auto-multicast", FALSE, NULL);
390 g_object_set (G_OBJECT (udpsink1), "loop", FALSE, NULL);
392 /* we keep these elements, we will further configure them when the
393 * client told us to really use the UDP ports. */
394 stream->udpsrc[0] = udpsrc0;
395 stream->udpsrc[1] = udpsrc1;
396 stream->udpsink[0] = udpsink0;
397 stream->udpsink[1] = udpsink1;
398 stream->server_port.min = rtpport;
399 stream->server_port.max = rtcpport;
412 no_udp_rtcp_protocol:
423 gst_element_set_state (udpsrc0, GST_STATE_NULL);
424 gst_object_unref (udpsrc0);
427 gst_element_set_state (udpsrc1, GST_STATE_NULL);
428 gst_object_unref (udpsrc1);
431 gst_element_set_state (udpsink0, GST_STATE_NULL);
432 gst_object_unref (udpsink0);
435 gst_element_set_state (udpsink1, GST_STATE_NULL);
436 gst_object_unref (udpsink1);
442 /* executed from streaming thread */
444 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
446 GstCaps *newcaps, *oldcaps;
448 newcaps = gst_pad_get_current_caps (pad);
450 GST_INFO ("stream %p received caps %p, %" GST_PTR_FORMAT, stream, newcaps,
453 g_mutex_lock (&stream->lock);
454 oldcaps = stream->caps;
455 stream->caps = newcaps;
456 g_mutex_unlock (&stream->lock);
459 gst_caps_unref (oldcaps);
463 dump_structure (const GstStructure * s)
467 sstr = gst_structure_to_string (s);
468 GST_INFO ("structure: %s", sstr);
472 static GstRTSPStreamTransport *
473 find_transport (GstRTSPStream * stream, const gchar * rtcp_from)
476 GstRTSPStreamTransport *result = NULL;
481 if (rtcp_from == NULL)
484 tmp = g_strrstr (rtcp_from, ":");
488 port = atoi (tmp + 1);
489 dest = g_strndup (rtcp_from, tmp - rtcp_from);
491 g_mutex_lock (&stream->lock);
492 GST_INFO ("finding %s:%d in %d transports", dest, port,
493 g_list_length (stream->transports));
495 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
496 GstRTSPStreamTransport *trans = walk->data;
499 min = trans->transport->client_port.min;
500 max = trans->transport->client_port.max;
502 if ((strcmp (trans->transport->destination, dest) == 0) && (min == port
508 g_mutex_unlock (&stream->lock);
515 static GstRTSPStreamTransport *
516 check_transport (GObject * source, GstRTSPStream * stream)
519 GstRTSPStreamTransport *trans;
521 /* see if we have a stream to match with the origin of the RTCP packet */
522 trans = g_object_get_qdata (source, ssrc_stream_map_key);
524 g_object_get (source, "stats", &stats, NULL);
526 const gchar *rtcp_from;
528 dump_structure (stats);
530 rtcp_from = gst_structure_get_string (stats, "rtcp-from");
531 if ((trans = find_transport (stream, rtcp_from))) {
532 GST_INFO ("%p: found transport %p for source %p", stream, trans,
535 /* keep ref to the source */
536 trans->rtpsource = source;
538 g_object_set_qdata (source, ssrc_stream_map_key, trans);
540 gst_structure_free (stats);
549 on_new_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
551 GstRTSPStreamTransport *trans;
553 GST_INFO ("%p: new source %p", stream, source);
555 trans = check_transport (source, stream);
558 GST_INFO ("%p: source %p for transport %p", stream, source, trans);
562 on_ssrc_sdes (GObject * session, GObject * source, GstRTSPStream * stream)
564 GST_INFO ("%p: new SDES %p", stream, source);
568 on_ssrc_active (GObject * session, GObject * source, GstRTSPStream * stream)
570 GstRTSPStreamTransport *trans;
572 trans = check_transport (source, stream);
575 GST_INFO ("%p: source %p in transport %p is active", stream, source, trans);
576 gst_rtsp_stream_transport_keep_alive (trans);
581 g_object_get (source, "stats", &stats, NULL);
583 dump_structure (stats);
584 gst_structure_free (stats);
591 on_bye_ssrc (GObject * session, GObject * source, GstRTSPStream * stream)
593 GST_INFO ("%p: source %p bye", stream, source);
597 on_bye_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
599 GstRTSPStreamTransport *trans;
601 GST_INFO ("%p: source %p bye timeout", stream, source);
603 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
604 trans->rtpsource = NULL;
605 trans->timeout = TRUE;
610 on_timeout (GObject * session, GObject * source, GstRTSPStream * stream)
612 GstRTSPStreamTransport *trans;
614 GST_INFO ("%p: source %p timeout", stream, source);
616 if ((trans = g_object_get_qdata (source, ssrc_stream_map_key))) {
617 trans->rtpsource = NULL;
618 trans->timeout = TRUE;
623 handle_new_sample (GstAppSink * sink, gpointer user_data)
628 GstRTSPStream *stream;
630 sample = gst_app_sink_pull_sample (sink);
634 stream = (GstRTSPStream *) user_data;
635 buffer = gst_sample_get_buffer (sample);
637 g_mutex_lock (&stream->lock);
638 for (walk = stream->transports; walk; walk = g_list_next (walk)) {
639 GstRTSPStreamTransport *tr = (GstRTSPStreamTransport *) walk->data;
641 if (GST_ELEMENT_CAST (sink) == stream->appsink[0]) {
642 gst_rtsp_stream_transport_send_rtp (tr, buffer);
644 gst_rtsp_stream_transport_send_rtcp (tr, buffer);
647 g_mutex_unlock (&stream->lock);
649 gst_sample_unref (sample);
654 static GstAppSinkCallbacks sink_cb = {
655 NULL, /* not interested in EOS */
656 NULL, /* not interested in preroll samples */
661 * gst_rtsp_stream_join_bin:
662 * @stream: a #GstRTSPStream
663 * @bin: a #GstBin to join
664 * @rtpbin: a rtpbin element in @bin
665 * @state: the target state of the new elements
667 * Join the #Gstbin @bin that contains the element @rtpbin.
669 * @stream will link to @rtpbin, which must be inside @bin. The elements
670 * added to @bin will be set to the state given in @state.
672 * Returns: %TRUE on success.
675 gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
676 GstElement * rtpbin, GstState state)
680 GstPad *pad, *teepad, *queuepad, *selpad;
681 GstPadLinkReturn ret;
683 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
684 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
685 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
687 g_mutex_lock (&stream->lock);
688 if (stream->is_joined)
691 /* create a session with the same index as the stream */
694 GST_INFO ("stream %p joining bin as session %d", stream, idx);
696 if (!alloc_ports (stream))
699 /* get a pad for sending RTP */
700 name = g_strdup_printf ("send_rtp_sink_%u", idx);
701 stream->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
703 /* link the RTP pad to the session manager, it should not really fail unless
704 * this is not really an RTP pad */
705 ret = gst_pad_link (stream->srcpad, stream->send_rtp_sink);
706 if (ret != GST_PAD_LINK_OK)
709 /* get pads from the RTP session element for sending and receiving
711 name = g_strdup_printf ("send_rtp_src_%u", idx);
712 stream->send_src[0] = gst_element_get_static_pad (rtpbin, name);
714 name = g_strdup_printf ("send_rtcp_src_%u", idx);
715 stream->send_src[1] = gst_element_get_request_pad (rtpbin, name);
717 name = g_strdup_printf ("recv_rtp_sink_%u", idx);
718 stream->recv_sink[0] = gst_element_get_request_pad (rtpbin, name);
720 name = g_strdup_printf ("recv_rtcp_sink_%u", idx);
721 stream->recv_sink[1] = gst_element_get_request_pad (rtpbin, name);
724 /* get the session */
725 g_signal_emit_by_name (rtpbin, "get-internal-session", idx, &stream->session);
727 g_signal_connect (stream->session, "on-new-ssrc", (GCallback) on_new_ssrc,
729 g_signal_connect (stream->session, "on-ssrc-sdes", (GCallback) on_ssrc_sdes,
731 g_signal_connect (stream->session, "on-ssrc-active",
732 (GCallback) on_ssrc_active, stream);
733 g_signal_connect (stream->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
735 g_signal_connect (stream->session, "on-bye-timeout",
736 (GCallback) on_bye_timeout, stream);
737 g_signal_connect (stream->session, "on-timeout", (GCallback) on_timeout,
740 for (i = 0; i < 2; i++) {
741 /* For the sender we create this bit of pipeline for both
742 * RTP and RTCP. Sync and preroll are enabled on udpsink so
743 * we need to add a queue before appsink to make the pipeline
744 * not block. For the TCP case, we want to pump data to the
745 * client as fast as possible anyway.
747 * .--------. .-----. .---------.
748 * | rtpbin | | tee | | udpsink |
749 * | send->sink src->sink |
750 * '--------' | | '---------'
751 * | | .---------. .---------.
752 * | | | queue | | appsink |
753 * | src->sink src->sink |
754 * '-----' '---------' '---------'
756 /* make tee for RTP/RTCP */
757 stream->tee[i] = gst_element_factory_make ("tee", NULL);
758 gst_bin_add (bin, stream->tee[i]);
760 /* and link to rtpbin send pad */
761 pad = gst_element_get_static_pad (stream->tee[i], "sink");
762 gst_pad_link (stream->send_src[i], pad);
763 gst_object_unref (pad);
766 gst_bin_add (bin, stream->udpsink[i]);
768 /* link tee to udpsink */
769 teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
770 pad = gst_element_get_static_pad (stream->udpsink[i], "sink");
771 gst_pad_link (teepad, pad);
772 gst_object_unref (pad);
773 gst_object_unref (teepad);
776 stream->appqueue[i] = gst_element_factory_make ("queue", NULL);
777 gst_bin_add (bin, stream->appqueue[i]);
778 /* and link to tee */
779 teepad = gst_element_get_request_pad (stream->tee[i], "src_%u");
780 pad = gst_element_get_static_pad (stream->appqueue[i], "sink");
781 gst_pad_link (teepad, pad);
782 gst_object_unref (pad);
783 gst_object_unref (teepad);
786 stream->appsink[i] = gst_element_factory_make ("appsink", NULL);
787 g_object_set (stream->appsink[i], "async", FALSE, "sync", FALSE, NULL);
788 g_object_set (stream->appsink[i], "emit-signals", FALSE, NULL);
789 gst_bin_add (bin, stream->appsink[i]);
790 gst_app_sink_set_callbacks (GST_APP_SINK_CAST (stream->appsink[i]),
791 &sink_cb, stream, NULL);
792 /* and link to queue */
793 queuepad = gst_element_get_static_pad (stream->appqueue[i], "src");
794 pad = gst_element_get_static_pad (stream->appsink[i], "sink");
795 gst_pad_link (queuepad, pad);
796 gst_object_unref (pad);
797 gst_object_unref (queuepad);
799 /* For the receiver we create this bit of pipeline for both
800 * RTP and RTCP. We receive RTP/RTCP on appsrc and udpsrc
801 * and it is all funneled into the rtpbin receive pad.
803 * .--------. .--------. .--------.
804 * | udpsrc | | funnel | | rtpbin |
805 * | src->sink src->sink |
806 * '--------' | | '--------'
810 * '--------' '--------'
812 /* make funnel for the RTP/RTCP receivers */
813 stream->funnel[i] = gst_element_factory_make ("funnel", NULL);
814 gst_bin_add (bin, stream->funnel[i]);
816 pad = gst_element_get_static_pad (stream->funnel[i], "src");
817 gst_pad_link (pad, stream->recv_sink[i]);
818 gst_object_unref (pad);
820 /* we set and keep these to playing so that they don't cause NO_PREROLL return
822 gst_element_set_state (stream->udpsrc[i], GST_STATE_PLAYING);
823 gst_element_set_locked_state (stream->udpsrc[i], TRUE);
825 gst_bin_add (bin, stream->udpsrc[i]);
826 /* and link to the funnel */
827 selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
828 pad = gst_element_get_static_pad (stream->udpsrc[i], "src");
829 gst_pad_link (pad, selpad);
830 gst_object_unref (pad);
831 gst_object_unref (selpad);
833 /* make and add appsrc */
834 stream->appsrc[i] = gst_element_factory_make ("appsrc", NULL);
835 gst_bin_add (bin, stream->appsrc[i]);
836 /* and link to the funnel */
837 selpad = gst_element_get_request_pad (stream->funnel[i], "sink_%u");
838 pad = gst_element_get_static_pad (stream->appsrc[i], "src");
839 gst_pad_link (pad, selpad);
840 gst_object_unref (pad);
841 gst_object_unref (selpad);
843 /* check if we need to set to a special state */
844 if (state != GST_STATE_NULL) {
845 gst_element_set_state (stream->udpsink[i], state);
846 gst_element_set_state (stream->appsink[i], state);
847 gst_element_set_state (stream->appqueue[i], state);
848 gst_element_set_state (stream->tee[i], state);
849 gst_element_set_state (stream->funnel[i], state);
850 gst_element_set_state (stream->appsrc[i], state);
854 /* be notified of caps changes */
855 stream->caps_sig = g_signal_connect (stream->send_rtp_sink, "notify::caps",
856 (GCallback) caps_notify, stream);
858 stream->is_joined = TRUE;
859 g_mutex_unlock (&stream->lock);
866 g_mutex_unlock (&stream->lock);
871 g_mutex_unlock (&stream->lock);
872 GST_WARNING ("failed to allocate ports %d", idx);
877 GST_WARNING ("failed to link stream %d", idx);
878 gst_object_unref (stream->send_rtp_sink);
879 stream->send_rtp_sink = NULL;
880 g_mutex_unlock (&stream->lock);
886 * gst_rtsp_stream_leave_bin:
887 * @stream: a #GstRTSPStream
889 * @rtpbin: a rtpbin #GstElement
891 * Remove the elements of @stream from @bin. @bin must be set
892 * to the NULL state before calling this.
894 * Return: %TRUE on success.
897 gst_rtsp_stream_leave_bin (GstRTSPStream * stream, GstBin * bin,
902 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
903 g_return_val_if_fail (GST_IS_BIN (bin), FALSE);
904 g_return_val_if_fail (GST_IS_ELEMENT (rtpbin), FALSE);
906 g_mutex_lock (&stream->lock);
907 if (!stream->is_joined)
910 /* all transports must be removed by now */
911 g_return_val_if_fail (stream->transports == NULL, FALSE);
913 GST_INFO ("stream %p leaving bin", stream);
915 gst_pad_unlink (stream->srcpad, stream->send_rtp_sink);
916 g_signal_handler_disconnect (stream->send_rtp_sink, stream->caps_sig);
917 gst_element_release_request_pad (rtpbin, stream->send_rtp_sink);
918 gst_object_unref (stream->send_rtp_sink);
919 stream->send_rtp_sink = NULL;
921 for (i = 0; i < 2; i++) {
922 /* and set udpsrc to NULL now before removing */
923 gst_element_set_locked_state (stream->udpsrc[i], FALSE);
924 gst_element_set_state (stream->udpsrc[i], GST_STATE_NULL);
926 /* removing them should also nicely release the request
927 * pads when they finalize */
928 gst_bin_remove (bin, stream->udpsrc[i]);
929 gst_bin_remove (bin, stream->udpsink[i]);
930 gst_bin_remove (bin, stream->appsrc[i]);
931 gst_bin_remove (bin, stream->appsink[i]);
932 gst_bin_remove (bin, stream->appqueue[i]);
933 gst_bin_remove (bin, stream->tee[i]);
934 gst_bin_remove (bin, stream->funnel[i]);
936 gst_element_release_request_pad (rtpbin, stream->recv_sink[i]);
937 gst_object_unref (stream->recv_sink[i]);
938 stream->recv_sink[i] = NULL;
940 stream->udpsrc[i] = NULL;
941 stream->udpsink[i] = NULL;
942 stream->appsrc[i] = NULL;
943 stream->appsink[i] = NULL;
944 stream->appqueue[i] = NULL;
945 stream->tee[i] = NULL;
946 stream->funnel[i] = NULL;
948 gst_object_unref (stream->send_src[0]);
949 stream->send_src[0] = NULL;
951 gst_element_release_request_pad (rtpbin, stream->send_src[1]);
952 gst_object_unref (stream->send_src[1]);
953 stream->send_src[1] = NULL;
955 g_object_unref (stream->session);
957 gst_caps_unref (stream->caps);
959 stream->is_joined = FALSE;
960 g_mutex_unlock (&stream->lock);
971 * gst_rtsp_stream_get_rtpinfo:
972 * @stream: a #GstRTSPStream
973 * @rtptime: result RTP timestamp
974 * @seq: result RTP seqnum
976 * Retrieve the current rtptime and seq. This is used to
977 * construct a RTPInfo reply header.
979 * Returns: %TRUE when rtptime and seq could be determined.
982 gst_rtsp_stream_get_rtpinfo (GstRTSPStream * stream,
983 guint * rtptime, guint * seq)
985 GObjectClass *payobjclass;
987 payobjclass = G_OBJECT_GET_CLASS (stream->payloader);
989 if (!g_object_class_find_property (payobjclass, "seqnum") ||
990 !g_object_class_find_property (payobjclass, "timestamp"))
993 g_object_get (stream->payloader, "seqnum", seq, "timestamp", rtptime, NULL);
999 * gst_rtsp_stream_recv_rtp:
1000 * @stream: a #GstRTSPStream
1001 * @buffer: (transfer full): a #GstBuffer
1003 * Handle an RTP buffer for the stream. This method is usually called when a
1004 * message has been received from a client using the TCP transport.
1006 * This function takes ownership of @buffer.
1008 * Returns: a GstFlowReturn.
1011 gst_rtsp_stream_recv_rtp (GstRTSPStream * stream, GstBuffer * buffer)
1014 GstElement *element;
1016 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1017 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1018 g_return_val_if_fail (stream->is_joined, FALSE);
1020 g_mutex_lock (&stream->lock);
1021 element = gst_object_ref (stream->appsrc[0]);
1022 g_mutex_unlock (&stream->lock);
1024 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1026 gst_object_unref (element);
1032 * gst_rtsp_stream_recv_rtcp:
1033 * @stream: a #GstRTSPStream
1034 * @buffer: (transfer full): a #GstBuffer
1036 * Handle an RTCP buffer for the stream. This method is usually called when a
1037 * message has been received from a client using the TCP transport.
1039 * This function takes ownership of @buffer.
1041 * Returns: a GstFlowReturn.
1044 gst_rtsp_stream_recv_rtcp (GstRTSPStream * stream, GstBuffer * buffer)
1047 GstElement *element;
1049 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), GST_FLOW_ERROR);
1050 g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
1051 g_return_val_if_fail (stream->is_joined, FALSE);
1053 g_mutex_lock (&stream->lock);
1054 element = gst_object_ref (stream->appsrc[1]);
1055 g_mutex_unlock (&stream->lock);
1057 ret = gst_app_src_push_buffer (GST_APP_SRC_CAST (element), buffer);
1059 gst_object_unref (element);
1064 /* must be called with lock */
1066 update_transport (GstRTSPStream * stream, GstRTSPStreamTransport * trans,
1069 GstRTSPTransport *tr;
1074 tr = trans->transport;
1076 switch (tr->lower_transport) {
1077 case GST_RTSP_LOWER_TRANS_UDP:
1078 case GST_RTSP_LOWER_TRANS_UDP_MCAST:
1084 dest = tr->destination;
1085 if (tr->lower_transport == GST_RTSP_LOWER_TRANS_UDP_MCAST) {
1090 min = tr->client_port.min;
1091 max = tr->client_port.max;
1094 if (add && !trans->active) {
1095 GST_INFO ("adding %s:%d-%d", dest, min, max);
1096 g_signal_emit_by_name (stream->udpsink[0], "add", dest, min, NULL);
1097 g_signal_emit_by_name (stream->udpsink[1], "add", dest, max, NULL);
1099 GST_INFO ("setting ttl-mc %d", ttl);
1100 g_object_set (G_OBJECT (stream->udpsink[0]), "ttl-mc", ttl, NULL);
1101 g_object_set (G_OBJECT (stream->udpsink[1]), "ttl-mc", ttl, NULL);
1103 stream->transports = g_list_prepend (stream->transports, trans);
1104 trans->active = TRUE;
1106 } else if (trans->active) {
1107 GST_INFO ("removing %s:%d-%d", dest, min, max);
1108 g_signal_emit_by_name (stream->udpsink[0], "remove", dest, min, NULL);
1109 g_signal_emit_by_name (stream->udpsink[1], "remove", dest, max, NULL);
1110 stream->transports = g_list_remove (stream->transports, trans);
1111 trans->active = FALSE;
1116 case GST_RTSP_LOWER_TRANS_TCP:
1117 if (add && !trans->active) {
1118 GST_INFO ("adding TCP %s", tr->destination);
1119 stream->transports = g_list_prepend (stream->transports, trans);
1120 trans->active = TRUE;
1122 } else if (trans->active) {
1123 GST_INFO ("removing TCP %s", tr->destination);
1124 stream->transports = g_list_remove (stream->transports, trans);
1125 trans->active = FALSE;
1130 GST_INFO ("Unknown transport %d", tr->lower_transport);
1138 * gst_rtsp_stream_add_transport:
1139 * @stream: a #GstRTSPStream
1140 * @trans: a #GstRTSPStreamTransport
1142 * Add the transport in @trans to @stream. The media of @stream will
1143 * then also be send to the values configured in @trans.
1145 * @stream must be joined to a bin.
1147 * @trans must contain a valid #GstRTSPTransport.
1149 * Returns: %TRUE if @trans was added
1152 gst_rtsp_stream_add_transport (GstRTSPStream * stream,
1153 GstRTSPStreamTransport * trans)
1157 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1158 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1159 g_return_val_if_fail (stream->is_joined, FALSE);
1160 g_return_val_if_fail (trans->transport != NULL, FALSE);
1162 g_mutex_lock (&stream->lock);
1163 res = update_transport (stream, trans, TRUE);
1164 g_mutex_unlock (&stream->lock);
1170 * gst_rtsp_stream_remove_transport:
1171 * @stream: a #GstRTSPStream
1172 * @trans: a #GstRTSPStreamTransport
1174 * Remove the transport in @trans from @stream. The media of @stream will
1175 * not be sent to the values configured in @trans.
1177 * @stream must be joined to a bin.
1179 * @trans must contain a valid #GstRTSPTransport.
1181 * Returns: %TRUE if @trans was removed
1184 gst_rtsp_stream_remove_transport (GstRTSPStream * stream,
1185 GstRTSPStreamTransport * trans)
1189 g_return_val_if_fail (GST_IS_RTSP_STREAM (stream), FALSE);
1190 g_return_val_if_fail (GST_IS_RTSP_STREAM_TRANSPORT (trans), FALSE);
1191 g_return_val_if_fail (stream->is_joined, FALSE);
1192 g_return_val_if_fail (trans->transport != NULL, FALSE);
1194 g_mutex_lock (&stream->lock);
1195 res = update_transport (stream, trans, FALSE);
1196 g_mutex_unlock (&stream->lock);