2 * Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
20 * SECTION:element-sdpdemux
23 * sdpdemux currently understands SDP as the input format of the session description.
24 * For each stream listed in the SDP a new stream_\%u pad will be created
25 * with caps derived from the SDP media description. This is a caps of mime type
26 * "application/x-rtp" that can be connected to any available RTP depayloader
29 * sdpdemux will internally instantiate an RTP session manager element
30 * that will handle the RTCP messages to and from the server, jitter removal,
31 * packet reordering along with providing a clock for the pipeline.
33 * sdpdemux acts like a live element and will therefore only generate data in the
36 * ## Example launch line
38 * gst-launch-1.0 souphttpsrc location=http://some.server/session.sdp ! sdpdemux ! fakesink
39 * ]| Establish a connection to an HTTP server that contains an SDP session description
40 * that gets parsed by sdpdemux and send the raw RTP packets to a fakesink.
48 #include "gstsdpdemux.h"
50 #include <gst/rtp/gstrtppayloads.h>
51 #include <gst/sdp/gstsdpmessage.h>
57 GST_DEBUG_CATEGORY_STATIC (sdpdemux_debug);
58 #define GST_CAT_DEFAULT (sdpdemux_debug)
60 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
63 GST_STATIC_CAPS ("application/sdp"));
65 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream_%u",
68 GST_STATIC_CAPS ("application/x-rtp"));
76 #define DEFAULT_DEBUG FALSE
77 #define DEFAULT_TIMEOUT 10000000
78 #define DEFAULT_LATENCY_MS 200
79 #define DEFAULT_REDIRECT TRUE
90 static void gst_sdp_demux_finalize (GObject * object);
92 static void gst_sdp_demux_set_property (GObject * object, guint prop_id,
93 const GValue * value, GParamSpec * pspec);
94 static void gst_sdp_demux_get_property (GObject * object, guint prop_id,
95 GValue * value, GParamSpec * pspec);
97 static GstStateChangeReturn gst_sdp_demux_change_state (GstElement * element,
98 GstStateChange transition);
99 static void gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message);
101 static void gst_sdp_demux_stream_push_event (GstSDPDemux * demux,
102 GstSDPStream * stream, GstEvent * event);
104 static gboolean gst_sdp_demux_sink_event (GstPad * pad, GstObject * parent,
106 static GstFlowReturn gst_sdp_demux_sink_chain (GstPad * pad, GstObject * parent,
109 /*static guint gst_sdp_demux_signals[LAST_SIGNAL] = { 0 }; */
111 #define gst_sdp_demux_parent_class parent_class
112 G_DEFINE_TYPE (GstSDPDemux, gst_sdp_demux, GST_TYPE_BIN);
113 GST_ELEMENT_REGISTER_DEFINE (sdpdemux, "sdpdemux", GST_RANK_NONE,
117 gst_sdp_demux_class_init (GstSDPDemuxClass * klass)
119 GObjectClass *gobject_class;
120 GstElementClass *gstelement_class;
121 GstBinClass *gstbin_class;
123 gobject_class = (GObjectClass *) klass;
124 gstelement_class = (GstElementClass *) klass;
125 gstbin_class = (GstBinClass *) klass;
127 gobject_class->set_property = gst_sdp_demux_set_property;
128 gobject_class->get_property = gst_sdp_demux_get_property;
130 gobject_class->finalize = gst_sdp_demux_finalize;
132 g_object_class_install_property (gobject_class, PROP_DEBUG,
133 g_param_spec_boolean ("debug", "Debug",
134 "Dump request and response messages to stdout",
136 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
138 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
139 g_param_spec_uint64 ("timeout", "Timeout",
140 "Fail transport after UDP timeout microseconds (0 = disabled)",
141 0, G_MAXUINT64, DEFAULT_TIMEOUT,
142 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
144 g_object_class_install_property (gobject_class, PROP_LATENCY,
145 g_param_spec_uint ("latency", "Buffer latency in ms",
146 "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
147 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
149 g_object_class_install_property (gobject_class, PROP_REDIRECT,
150 g_param_spec_boolean ("redirect", "Redirect",
151 "Sends a redirection message instead of using a custom session element",
153 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
155 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
156 gst_element_class_add_static_pad_template (gstelement_class, &rtptemplate);
158 gst_element_class_set_static_metadata (gstelement_class, "SDP session setup",
159 "Codec/Demuxer/Network/RTP",
160 "Receive data over the network via SDP",
161 "Wim Taymans <wim.taymans@gmail.com>");
163 gstelement_class->change_state = gst_sdp_demux_change_state;
165 gstbin_class->handle_message = gst_sdp_demux_handle_message;
167 GST_DEBUG_CATEGORY_INIT (sdpdemux_debug, "sdpdemux", 0, "SDP demux");
171 gst_sdp_demux_init (GstSDPDemux * demux)
173 demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
174 gst_pad_set_event_function (demux->sinkpad,
175 GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_event));
176 gst_pad_set_chain_function (demux->sinkpad,
177 GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_chain));
178 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
180 /* protects the streaming thread in interleaved mode or the polling
181 * thread in UDP mode. */
182 g_rec_mutex_init (&demux->stream_rec_lock);
184 demux->adapter = gst_adapter_new ();
188 gst_sdp_demux_finalize (GObject * object)
192 demux = GST_SDP_DEMUX (object);
195 g_rec_mutex_clear (&demux->stream_rec_lock);
197 g_object_unref (demux->adapter);
199 G_OBJECT_CLASS (parent_class)->finalize (object);
203 gst_sdp_demux_set_property (GObject * object, guint prop_id,
204 const GValue * value, GParamSpec * pspec)
208 demux = GST_SDP_DEMUX (object);
212 demux->debug = g_value_get_boolean (value);
215 demux->udp_timeout = g_value_get_uint64 (value);
218 demux->latency = g_value_get_uint (value);
221 demux->redirect = g_value_get_boolean (value);
224 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
230 gst_sdp_demux_get_property (GObject * object, guint prop_id, GValue * value,
235 demux = GST_SDP_DEMUX (object);
239 g_value_set_boolean (value, demux->debug);
242 g_value_set_uint64 (value, demux->udp_timeout);
245 g_value_set_uint (value, demux->latency);
248 g_value_set_boolean (value, demux->redirect);
251 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
257 find_stream_by_id (GstSDPStream * stream, gconstpointer a)
259 gint id = GPOINTER_TO_INT (a);
261 if (stream->id == id)
268 find_stream_by_pt (GstSDPStream * stream, gconstpointer a)
270 gint pt = GPOINTER_TO_INT (a);
272 if (stream->pt == pt)
279 find_stream_by_udpsrc (GstSDPStream * stream, gconstpointer a)
281 GstElement *src = (GstElement *) a;
283 if (stream->udpsrc[0] == src)
285 if (stream->udpsrc[1] == src)
291 static GstSDPStream *
292 find_stream (GstSDPDemux * demux, gconstpointer data, gconstpointer func)
296 /* find and get stream */
298 g_list_find_custom (demux->streams, data, (GCompareFunc) func)))
299 return (GstSDPStream *) lstream->data;
305 gst_sdp_demux_stream_free (GstSDPDemux * demux, GstSDPStream * stream)
309 GST_DEBUG_OBJECT (demux, "free stream %p", stream);
312 gst_caps_unref (stream->caps);
314 for (i = 0; i < 2; i++) {
315 GstElement *udpsrc = stream->udpsrc[i];
318 gst_element_set_state (udpsrc, GST_STATE_NULL);
319 gst_bin_remove (GST_BIN_CAST (demux), udpsrc);
320 stream->udpsrc[i] = NULL;
323 if (stream->udpsink) {
324 gst_element_set_state (stream->udpsink, GST_STATE_NULL);
325 gst_bin_remove (GST_BIN_CAST (demux), stream->udpsink);
326 stream->udpsink = NULL;
328 if (stream->srcpad) {
329 gst_pad_set_active (stream->srcpad, FALSE);
331 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
332 stream->added = FALSE;
334 stream->srcpad = NULL;
340 is_multicast_address (const gchar * host_name)
343 GResolver *resolver = NULL;
344 gboolean ret = FALSE;
346 addr = g_inet_address_new_from_string (host_name);
350 resolver = g_resolver_get_default ();
351 results = g_resolver_lookup_by_name (resolver, host_name, NULL, NULL);
354 addr = G_INET_ADDRESS (g_object_ref (results->data));
356 g_resolver_free_addresses (results);
358 g_assert (addr != NULL);
360 ret = g_inet_address_get_is_multicast (addr);
364 g_object_unref (resolver);
366 g_object_unref (addr);
370 static GstSDPStream *
371 gst_sdp_demux_create_stream (GstSDPDemux * demux, GstSDPMessage * sdp, gint idx)
373 GstSDPStream *stream;
374 const gchar *payload;
375 const GstSDPMedia *media;
376 const GstSDPConnection *conn;
378 /* get media, should not return NULL */
379 media = gst_sdp_message_get_media (sdp, idx);
383 stream = g_new0 (GstSDPStream, 1);
384 stream->parent = demux;
385 /* we mark the pad as not linked, we will mark it as OK when we add the pad to
387 stream->last_ret = GST_FLOW_OK;
388 stream->added = FALSE;
389 stream->disabled = FALSE;
390 stream->id = demux->numstreams++;
393 /* we must have a payload. No payload means we cannot create caps */
394 /* FIXME, handle multiple formats. */
395 if ((payload = gst_sdp_media_get_format (media, 0))) {
398 stream->pt = atoi (payload);
400 stream->caps = gst_sdp_media_get_caps_from_media (media, stream->pt);
402 s = gst_caps_get_structure (stream->caps, 0);
403 gst_structure_set_name (s, "application/x-rtp");
405 if (stream->pt >= 96) {
406 /* If we have a dynamic payload type, see if we have a stream with the
407 * same payload number. If there is one, they are part of the same
408 * container and we only need to add one pad. */
409 if (find_stream (demux, GINT_TO_POINTER (stream->pt),
410 (gpointer) find_stream_by_pt)) {
411 stream->container = TRUE;
416 if (gst_sdp_media_connections_len (media) > 0) {
417 if (!(conn = gst_sdp_media_get_connection (media, 0))) {
418 /* We should not reach this based on the check above */
422 if (!(conn = gst_sdp_message_get_connection (sdp))) {
430 stream->destination = conn->address;
431 stream->ttl = conn->ttl;
432 stream->multicast = is_multicast_address (stream->destination);
434 stream->rtp_port = gst_sdp_media_get_port (media);
435 if (gst_sdp_media_get_attribute_val (media, "rtcp")) {
436 /* FIXME, RFC 3605 */
437 stream->rtcp_port = stream->rtp_port + 1;
439 stream->rtcp_port = stream->rtp_port + 1;
442 GST_DEBUG_OBJECT (demux, "stream %d, (%p)", stream->id, stream);
443 GST_DEBUG_OBJECT (demux, " pt: %d", stream->pt);
444 GST_DEBUG_OBJECT (demux, " container: %d", stream->container);
445 GST_DEBUG_OBJECT (demux, " caps: %" GST_PTR_FORMAT, stream->caps);
447 /* we keep track of all streams */
448 demux->streams = g_list_append (demux->streams, stream);
455 gst_sdp_demux_stream_free (demux, stream);
461 gst_sdp_demux_cleanup (GstSDPDemux * demux)
465 GST_DEBUG_OBJECT (demux, "cleanup");
467 for (walk = demux->streams; walk; walk = g_list_next (walk)) {
468 GstSDPStream *stream = (GstSDPStream *) walk->data;
470 gst_sdp_demux_stream_free (demux, stream);
472 g_list_free (demux->streams);
473 demux->streams = NULL;
474 if (demux->session) {
475 if (demux->session_sig_id) {
476 g_signal_handler_disconnect (demux->session, demux->session_sig_id);
477 demux->session_sig_id = 0;
479 if (demux->session_nmp_id) {
480 g_signal_handler_disconnect (demux->session, demux->session_nmp_id);
481 demux->session_nmp_id = 0;
483 if (demux->session_ptmap_id) {
484 g_signal_handler_disconnect (demux->session, demux->session_ptmap_id);
485 demux->session_ptmap_id = 0;
487 gst_element_set_state (demux->session, GST_STATE_NULL);
488 gst_bin_remove (GST_BIN_CAST (demux), demux->session);
489 demux->session = NULL;
491 demux->numstreams = 0;
494 /* this callback is called when the session manager generated a new src pad with
495 * payloaded RTP packets. We simply ghost the pad here. */
497 new_session_pad (GstElement * session, GstPad * pad, GstSDPDemux * demux)
499 gchar *name, *pad_name;
500 GstPadTemplate *template;
503 GstSDPStream *stream;
506 GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
508 GST_SDP_STREAM_LOCK (demux);
510 name = gst_object_get_name (GST_OBJECT_CAST (pad));
511 if (sscanf (name, "recv_rtp_src_%u_%u_%u", &id, &ssrc, &pt) != 3)
514 GST_DEBUG_OBJECT (demux, "stream: %u, SSRC %u, PT %u", id, ssrc, pt);
517 find_stream (demux, GUINT_TO_POINTER (id), (gpointer) find_stream_by_id);
523 /* no need for a timeout anymore now */
524 g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL);
526 pad_name = g_strdup_printf ("stream_%u", stream->id);
527 /* create a new pad we will use to stream to */
528 template = gst_static_pad_template_get (&rtptemplate);
529 stream->srcpad = gst_ghost_pad_new_from_template (pad_name, pad, template);
530 gst_object_unref (template);
534 stream->added = TRUE;
535 gst_pad_set_active (stream->srcpad, TRUE);
536 gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
538 /* check if we added all streams */
540 for (lstream = demux->streams; lstream; lstream = g_list_next (lstream)) {
541 stream = (GstSDPStream *) lstream->data;
542 /* a container stream only needs one pad added. Also disabled streams don't
544 if (!stream->container && !stream->disabled && !stream->added) {
549 GST_SDP_STREAM_UNLOCK (demux);
552 GST_DEBUG_OBJECT (demux, "We added all streams");
553 /* when we get here, all stream are added and we can fire the no-more-pads
555 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
563 GST_DEBUG_OBJECT (demux, "ignoring unknown stream");
564 GST_SDP_STREAM_UNLOCK (demux);
571 rtsp_session_pad_added (GstElement * session, GstPad * pad, GstSDPDemux * demux)
573 GstPad *srcpad = NULL;
576 GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
578 name = gst_pad_get_name (pad);
579 srcpad = gst_ghost_pad_new (name, pad);
582 gst_pad_set_active (srcpad, TRUE);
583 gst_element_add_pad (GST_ELEMENT_CAST (demux), srcpad);
587 rtsp_session_no_more_pads (GstElement * session, GstSDPDemux * demux)
589 GST_DEBUG_OBJECT (demux, "got no-more-pads");
590 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
594 request_pt_map (GstElement * sess, guint session, guint pt, GstSDPDemux * demux)
596 GstSDPStream *stream;
599 GST_DEBUG_OBJECT (demux, "getting pt map for pt %d in session %d", pt,
602 GST_SDP_STREAM_LOCK (demux);
604 find_stream (demux, GINT_TO_POINTER (session),
605 (gpointer) find_stream_by_id);
612 GST_SDP_STREAM_UNLOCK (demux);
618 GST_DEBUG_OBJECT (demux, "unknown stream %d", session);
619 GST_SDP_STREAM_UNLOCK (demux);
625 gst_sdp_demux_do_stream_eos (GstSDPDemux * demux, guint session, guint32 ssrc)
627 GstSDPStream *stream;
629 GST_DEBUG_OBJECT (demux, "setting stream for session %u to EOS", session);
631 /* get stream for session */
633 find_stream (demux, GINT_TO_POINTER (session),
634 (gpointer) find_stream_by_id);
641 if (stream->ssrc != ssrc)
645 gst_sdp_demux_stream_push_event (demux, stream, gst_event_new_eos ());
651 GST_DEBUG_OBJECT (demux, "unknown stream for session %u", session);
656 GST_DEBUG_OBJECT (demux, "stream for session %u was already EOS", session);
661 GST_DEBUG_OBJECT (demux, "unkown SSRC %08x for session %u", ssrc, session);
667 on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
670 GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u received BYE", ssrc,
673 gst_sdp_demux_do_stream_eos (demux, session, ssrc);
677 on_timeout (GstElement * manager, guint session, guint32 ssrc,
680 GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u timed out", ssrc, session);
682 gst_sdp_demux_do_stream_eos (demux, session, ssrc);
685 /* try to get and configure a manager */
687 gst_sdp_demux_configure_manager (GstSDPDemux * demux, char *rtsp_sdp)
689 /* configure the session manager */
690 if (rtsp_sdp != NULL) {
691 if (!(demux->session = gst_element_factory_make ("rtspsrc", NULL)))
694 g_object_set (demux->session, "location", rtsp_sdp, NULL);
696 GST_DEBUG_OBJECT (demux, "connect to signals on rtspsrc");
697 demux->session_sig_id =
698 g_signal_connect (demux->session, "pad-added",
699 (GCallback) rtsp_session_pad_added, demux);
700 demux->session_nmp_id =
701 g_signal_connect (demux->session, "no-more-pads",
702 (GCallback) rtsp_session_no_more_pads, demux);
704 if (!(demux->session = gst_element_factory_make ("rtpbin", NULL)))
707 /* connect to signals if we did not already do so */
708 GST_DEBUG_OBJECT (demux, "connect to signals on session manager");
709 demux->session_sig_id =
710 g_signal_connect (demux->session, "pad-added",
711 (GCallback) new_session_pad, demux);
712 demux->session_ptmap_id =
713 g_signal_connect (demux->session, "request-pt-map",
714 (GCallback) request_pt_map, demux);
715 g_signal_connect (demux->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
717 g_signal_connect (demux->session, "on-bye-timeout", (GCallback) on_timeout,
719 g_signal_connect (demux->session, "on-timeout", (GCallback) on_timeout,
723 g_object_set (demux->session, "latency", demux->latency, NULL);
725 /* we manage this element */
726 gst_bin_add (GST_BIN_CAST (demux), demux->session);
733 GST_DEBUG_OBJECT (demux, "no session manager element gstrtpbin found");
738 GST_DEBUG_OBJECT (demux, "no manager element rtspsrc found");
744 gst_sdp_demux_stream_configure_udp (GstSDPDemux * demux, GstSDPStream * stream)
747 const gchar *destination;
750 GST_DEBUG_OBJECT (demux, "creating UDP sources for multicast");
752 /* if the destination is not a multicast address, we just want to listen on
754 if (!stream->multicast)
755 destination = "0.0.0.0";
757 destination = stream->destination;
759 /* creating UDP source */
760 if (stream->rtp_port != -1) {
761 GST_DEBUG_OBJECT (demux, "receiving RTP from %s:%d", destination,
764 uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtp_port);
766 gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
768 if (stream->udpsrc[0] == NULL)
772 gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[0]);
774 GST_DEBUG_OBJECT (demux,
775 "setting up UDP source with timeout %" G_GINT64_FORMAT,
778 /* configure a timeout on the UDP port. When the timeout message is
779 * posted, we assume UDP transport is not possible. */
780 g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout",
781 demux->udp_timeout * 1000, NULL);
783 /* get output pad of the UDP source. */
784 pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
786 name = g_strdup_printf ("recv_rtp_sink_%u", stream->id);
787 stream->channelpad[0] =
788 gst_element_request_pad_simple (demux->session, name);
791 GST_DEBUG_OBJECT (demux, "connecting RTP source 0 to manager");
792 /* configure for UDP delivery, we need to connect the UDP pads to
793 * the session plugin. */
794 gst_pad_link (pad, stream->channelpad[0]);
795 gst_object_unref (pad);
798 gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED);
801 /* creating another UDP source */
802 if (stream->rtcp_port != -1) {
803 GST_DEBUG_OBJECT (demux, "receiving RTCP from %s:%d", destination,
805 uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtcp_port);
807 gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
809 if (stream->udpsrc[1] == NULL)
813 gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[1]);
815 GST_DEBUG_OBJECT (demux, "connecting RTCP source to manager");
817 name = g_strdup_printf ("recv_rtcp_sink_%u", stream->id);
818 stream->channelpad[1] =
819 gst_element_request_pad_simple (demux->session, name);
822 pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
823 gst_pad_link (pad, stream->channelpad[1]);
824 gst_object_unref (pad);
826 gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED);
833 GST_DEBUG_OBJECT (demux, "no UDP source element found");
838 /* configure the UDP sink back to the server for status reports */
840 gst_sdp_demux_stream_configure_udp_sink (GstSDPDemux * demux,
841 GstSDPStream * stream)
843 GstPad *pad, *sinkpad;
846 gchar *destination, *uri, *name;
848 /* get destination and port */
849 port = stream->rtcp_port;
850 destination = stream->destination;
852 GST_DEBUG_OBJECT (demux, "configure UDP sink for %s:%d", destination, port);
854 uri = g_strdup_printf ("udp://%s:%d", destination, port);
855 stream->udpsink = gst_element_make_from_uri (GST_URI_SINK, uri, NULL, NULL);
857 if (stream->udpsink == NULL)
858 goto no_sink_element;
860 /* we clear all destinations because we don't really know where to send the
861 * RTCP to and we want to avoid sending it to our own ports.
862 * FIXME when we get an RTCP packet from the sender, we could look at its
863 * source port and address and try to send RTCP there. */
864 if (!stream->multicast)
865 g_signal_emit_by_name (stream->udpsink, "clear");
867 g_object_set (G_OBJECT (stream->udpsink), "auto-multicast", FALSE, NULL);
868 g_object_set (G_OBJECT (stream->udpsink), "loop", FALSE, NULL);
870 g_object_set (G_OBJECT (stream->udpsink), "sync", FALSE, NULL);
871 /* no async state changes needed */
872 g_object_set (G_OBJECT (stream->udpsink), "async", FALSE, NULL);
874 if (stream->udpsrc[1]) {
875 /* configure socket, we give it the same UDP socket as the udpsrc for RTCP
876 * because some servers check the port number of where it sends RTCP to identify
877 * the RTCP packets it receives */
878 g_object_get (G_OBJECT (stream->udpsrc[1]), "used_socket", &socket, NULL);
879 GST_DEBUG_OBJECT (demux, "UDP src has socket %p", socket);
880 /* configure socket and make sure udpsink does not close it when shutting
881 * down, it belongs to udpsrc after all. */
882 g_object_set (G_OBJECT (stream->udpsink), "socket", socket, NULL);
883 g_object_set (G_OBJECT (stream->udpsink), "close-socket", FALSE, NULL);
884 g_object_unref (socket);
887 /* we keep this playing always */
888 gst_element_set_locked_state (stream->udpsink, TRUE);
889 gst_element_set_state (stream->udpsink, GST_STATE_PLAYING);
891 gst_bin_add (GST_BIN_CAST (demux), stream->udpsink);
893 /* get session RTCP pad */
894 name = g_strdup_printf ("send_rtcp_src_%u", stream->id);
895 pad = gst_element_request_pad_simple (demux->session, name);
900 sinkpad = gst_element_get_static_pad (stream->udpsink, "sink");
901 gst_pad_link (pad, sinkpad);
902 gst_object_unref (pad);
903 gst_object_unref (sinkpad);
905 /* not very fatal, we just won't be able to send RTCP */
906 GST_WARNING_OBJECT (demux, "could not get session RTCP pad");
915 GST_DEBUG_OBJECT (demux, "no UDP sink element found");
921 gst_sdp_demux_combine_flows (GstSDPDemux * demux, GstSDPStream * stream,
926 /* store the value */
927 stream->last_ret = ret;
929 /* if it's success we can return the value right away */
930 if (ret == GST_FLOW_OK)
933 /* any other error that is not-linked can be returned right
935 if (ret != GST_FLOW_NOT_LINKED)
938 /* only return NOT_LINKED if all other pads returned NOT_LINKED */
939 for (streams = demux->streams; streams; streams = g_list_next (streams)) {
940 GstSDPStream *ostream = (GstSDPStream *) streams->data;
942 ret = ostream->last_ret;
943 /* some other return value (must be SUCCESS but we can return
944 * other values as well) */
945 if (ret != GST_FLOW_NOT_LINKED)
948 /* if we get here, all other pads were unlinked and we return
955 gst_sdp_demux_stream_push_event (GstSDPDemux * demux, GstSDPStream * stream,
958 /* only streams that have a connection to the outside world */
959 if (stream->srcpad == NULL)
962 if (stream->channelpad[0]) {
963 gst_event_ref (event);
964 gst_pad_send_event (stream->channelpad[0], event);
967 if (stream->channelpad[1]) {
968 gst_event_ref (event);
969 gst_pad_send_event (stream->channelpad[1], event);
973 gst_event_unref (event);
977 gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message)
981 demux = GST_SDP_DEMUX (bin);
983 switch (GST_MESSAGE_TYPE (message)) {
984 case GST_MESSAGE_ELEMENT:
986 const GstStructure *s = gst_message_get_structure (message);
988 if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
989 gboolean ignore_timeout;
991 GST_DEBUG_OBJECT (bin, "timeout on UDP port");
993 GST_OBJECT_LOCK (demux);
994 ignore_timeout = demux->ignore_timeout;
995 demux->ignore_timeout = TRUE;
996 GST_OBJECT_UNLOCK (demux);
998 /* we only act on the first udp timeout message, others are irrelevant
999 * and can be ignored. */
1001 gst_message_unref (message);
1003 GST_ELEMENT_ERROR (demux, RESOURCE, READ, (NULL),
1004 ("Could not receive any UDP packets for %.4f seconds, maybe your "
1005 "firewall is blocking it.",
1006 gst_guint64_to_gdouble (demux->udp_timeout / 1000000.0)));
1010 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1013 case GST_MESSAGE_ERROR:
1016 GstSDPStream *stream;
1019 udpsrc = GST_MESSAGE_SRC (message);
1021 GST_DEBUG_OBJECT (demux, "got error from %s", GST_ELEMENT_NAME (udpsrc));
1023 stream = find_stream (demux, udpsrc, (gpointer) find_stream_by_udpsrc);
1024 /* fatal but not our message, forward */
1028 /* we ignore the RTCP udpsrc */
1029 if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
1032 /* if we get error messages from the udp sources, that's not a problem as
1033 * long as not all of them error out. We also don't really know what the
1034 * problem is, the message does not give enough detail... */
1035 ret = gst_sdp_demux_combine_flows (demux, stream, GST_FLOW_NOT_LINKED);
1036 GST_DEBUG_OBJECT (demux, "combined flows: %s", gst_flow_get_name (ret));
1037 if (ret != GST_FLOW_OK)
1041 gst_message_unref (message);
1045 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1050 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1057 gst_sdp_demux_start (GstSDPDemux * demux)
1059 guint8 *data = NULL;
1062 GstSDPMessage sdp = { 0 };
1063 GstSDPStream *stream = NULL;
1066 GstStateChangeReturn ret;
1068 /* grab the lock so that no state change can interfere */
1069 GST_SDP_STREAM_LOCK (demux);
1071 GST_DEBUG_OBJECT (demux, "parse SDP...");
1073 size = gst_adapter_available (demux->adapter);
1077 data = gst_adapter_take (demux->adapter, size);
1079 gst_sdp_message_init (&sdp);
1080 if (gst_sdp_message_parse_buffer (data, size, &sdp) != GST_SDP_OK)
1081 goto could_not_parse;
1084 gst_sdp_message_dump (&sdp);
1086 /* maybe this is plain RTSP DESCRIBE rtsp and we should redirect */
1087 /* look for rtsp control url */
1089 const gchar *control;
1092 control = gst_sdp_message_get_attribute_val_n (&sdp, "control", i);
1093 if (control == NULL)
1096 /* only take fully qualified urls */
1097 if (g_str_has_prefix (control, "rtsp://"))
1103 /* try to find non-aggragate control */
1104 n_streams = gst_sdp_message_medias_len (&sdp);
1106 for (idx = 0; idx < n_streams; idx++) {
1107 const GstSDPMedia *media;
1109 /* get media, should not return NULL */
1110 media = gst_sdp_message_get_media (&sdp, idx);
1115 control = gst_sdp_media_get_attribute_val_n (media, "control", i);
1116 if (control == NULL)
1119 /* only take fully qualified urls */
1120 if (g_str_has_prefix (control, "rtsp://"))
1123 /* this media has no control, exit */
1130 /* we have RTSP now */
1131 uri = gst_sdp_message_as_uri ("rtsp-sdp", &sdp);
1133 if (demux->redirect) {
1134 GST_INFO_OBJECT (demux, "redirect to %s", uri);
1136 gst_element_post_message (GST_ELEMENT_CAST (demux),
1137 gst_message_new_element (GST_OBJECT_CAST (demux),
1138 gst_structure_new ("redirect",
1139 "new-location", G_TYPE_STRING, uri, NULL)));
1145 /* we get here when we didn't do a redirect */
1147 /* try to get and configure a manager */
1148 if (!gst_sdp_demux_configure_manager (demux, uri))
1151 /* create streams with UDP sources and sinks */
1152 n_streams = gst_sdp_message_medias_len (&sdp);
1153 for (i = 0; i < n_streams; i++) {
1154 stream = gst_sdp_demux_create_stream (demux, &sdp, i);
1159 GST_DEBUG_OBJECT (demux, "configuring transport for stream %p", stream);
1161 if (!gst_sdp_demux_stream_configure_udp (demux, stream))
1162 goto transport_failed;
1163 if (!gst_sdp_demux_stream_configure_udp_sink (demux, stream))
1164 goto transport_failed;
1167 if (!demux->streams)
1171 /* set target state on session manager */
1172 /* setting rtspsrc to PLAYING may cause it to loose it that target state
1173 * along the way due to no-preroll udpsrc elements, so ...
1174 * do it in two stages here (similar to other elements) */
1175 if (demux->target > GST_STATE_PAUSED) {
1176 ret = gst_element_set_state (demux->session, GST_STATE_PAUSED);
1177 if (ret == GST_STATE_CHANGE_FAILURE)
1178 goto start_session_failure;
1180 ret = gst_element_set_state (demux->session, demux->target);
1181 if (ret == GST_STATE_CHANGE_FAILURE)
1182 goto start_session_failure;
1185 /* activate all streams */
1186 for (walk = demux->streams; walk; walk = g_list_next (walk)) {
1187 stream = (GstSDPStream *) walk->data;
1189 /* configure target state on udp sources */
1190 gst_element_set_state (stream->udpsrc[0], demux->target);
1191 gst_element_set_state (stream->udpsrc[1], demux->target);
1194 GST_SDP_STREAM_UNLOCK (demux);
1195 gst_sdp_message_uninit (&sdp);
1203 GST_SDP_STREAM_UNLOCK (demux);
1204 gst_sdp_message_uninit (&sdp);
1210 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1211 ("Could not create RTP stream transport."));
1216 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1217 ("Could not create RTP session manager."));
1222 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1223 ("Empty SDP message."));
1228 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1229 ("Could not parse SDP message."));
1234 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1235 ("No streams in SDP message."));
1240 /* avoid hanging if redirect not handled */
1241 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1242 ("Sent RTSP redirect."));
1245 start_session_failure:
1247 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1248 ("Could not start RTP session manager."));
1249 gst_element_set_state (demux->session, GST_STATE_NULL);
1250 gst_bin_remove (GST_BIN_CAST (demux), demux->session);
1251 demux->session = NULL;
1257 gst_sdp_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
1260 gboolean res = TRUE;
1262 demux = GST_SDP_DEMUX (parent);
1264 switch (GST_EVENT_TYPE (event)) {
1266 /* when we get EOS, start parsing the SDP */
1267 res = gst_sdp_demux_start (demux);
1268 gst_event_unref (event);
1271 gst_event_unref (event);
1278 static GstFlowReturn
1279 gst_sdp_demux_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1283 demux = GST_SDP_DEMUX (parent);
1285 /* push the SDP message in an adapter, we start doing something with it when
1287 gst_adapter_push (demux->adapter, buffer);
1292 static GstStateChangeReturn
1293 gst_sdp_demux_change_state (GstElement * element, GstStateChange transition)
1296 GstStateChangeReturn ret;
1298 demux = GST_SDP_DEMUX (element);
1300 GST_SDP_STREAM_LOCK (demux);
1302 switch (transition) {
1303 case GST_STATE_CHANGE_NULL_TO_READY:
1305 case GST_STATE_CHANGE_READY_TO_PAUSED:
1306 /* first attempt, don't ignore timeouts */
1307 gst_adapter_clear (demux->adapter);
1308 demux->ignore_timeout = FALSE;
1309 demux->target = GST_STATE_PAUSED;
1311 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1312 demux->target = GST_STATE_PLAYING;
1318 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1319 if (ret == GST_STATE_CHANGE_FAILURE)
1322 switch (transition) {
1323 case GST_STATE_CHANGE_READY_TO_PAUSED:
1324 ret = GST_STATE_CHANGE_NO_PREROLL;
1326 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1327 ret = GST_STATE_CHANGE_NO_PREROLL;
1328 demux->target = GST_STATE_PAUSED;
1330 case GST_STATE_CHANGE_PAUSED_TO_READY:
1331 gst_sdp_demux_cleanup (demux);
1333 case GST_STATE_CHANGE_READY_TO_NULL:
1340 GST_SDP_STREAM_UNLOCK (demux);