2 * Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
20 * SECTION:element-sdpdemux
22 * sdpdemux currently understands SDP as the input format of the session description.
23 * For each stream listed in the SDP a new stream_%u pad will be created
24 * with caps derived from the SDP media description. This is a caps of mime type
25 * "application/x-rtp" that can be connected to any available RTP depayloader
28 * sdpdemux will internally instantiate an RTP session manager element
29 * that will handle the RTCP messages to and from the server, jitter removal,
30 * packet reordering along with providing a clock for the pipeline.
32 * sdpdemux acts like a live element and will therefore only generate data in the
36 * <title>Example launch line</title>
38 * gst-launch gnomevfssrc location=http://some.server/session.sdp ! sdpdemux ! fakesink
39 * ]| Establish a connection to an HTTP server that contains an SDP session description
40 * that gets parsed by sdpdemux and send the raw RTP packets to a fakesink.
48 #include "gstsdpdemux.h"
50 #include <gst/rtp/gstrtppayloads.h>
51 #include <gst/sdp/gstsdpmessage.h>
57 GST_DEBUG_CATEGORY_STATIC (sdpdemux_debug);
58 #define GST_CAT_DEFAULT (sdpdemux_debug)
60 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
63 GST_STATIC_CAPS ("application/sdp"));
65 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream_%u",
68 GST_STATIC_CAPS ("application/x-rtp"));
76 #define DEFAULT_DEBUG FALSE
77 #define DEFAULT_TIMEOUT 10000000
78 #define DEFAULT_LATENCY_MS 200
79 #define DEFAULT_REDIRECT TRUE
91 static void gst_sdp_demux_finalize (GObject * object);
93 static void gst_sdp_demux_set_property (GObject * object, guint prop_id,
94 const GValue * value, GParamSpec * pspec);
95 static void gst_sdp_demux_get_property (GObject * object, guint prop_id,
96 GValue * value, GParamSpec * pspec);
98 static GstCaps *gst_sdp_demux_media_to_caps (gint pt,
99 const GstSDPMedia * media);
101 static GstStateChangeReturn gst_sdp_demux_change_state (GstElement * element,
102 GstStateChange transition);
103 static void gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message);
105 static void gst_sdp_demux_stream_push_event (GstSDPDemux * demux,
106 GstSDPStream * stream, GstEvent * event);
108 static gboolean gst_sdp_demux_sink_event (GstPad * pad, GstObject * parent,
110 static GstFlowReturn gst_sdp_demux_sink_chain (GstPad * pad, GstObject * parent,
113 /*static guint gst_sdp_demux_signals[LAST_SIGNAL] = { 0 }; */
115 #define gst_sdp_demux_parent_class parent_class
116 G_DEFINE_TYPE (GstSDPDemux, gst_sdp_demux, GST_TYPE_BIN);
119 gst_sdp_demux_class_init (GstSDPDemuxClass * klass)
121 GObjectClass *gobject_class;
122 GstElementClass *gstelement_class;
123 GstBinClass *gstbin_class;
125 gobject_class = (GObjectClass *) klass;
126 gstelement_class = (GstElementClass *) klass;
127 gstbin_class = (GstBinClass *) klass;
129 gobject_class->set_property = gst_sdp_demux_set_property;
130 gobject_class->get_property = gst_sdp_demux_get_property;
132 gobject_class->finalize = gst_sdp_demux_finalize;
134 g_object_class_install_property (gobject_class, PROP_DEBUG,
135 g_param_spec_boolean ("debug", "Debug",
136 "Dump request and response messages to stdout",
138 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
140 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
141 g_param_spec_uint64 ("timeout", "Timeout",
142 "Fail transport after UDP timeout microseconds (0 = disabled)",
143 0, G_MAXUINT64, DEFAULT_TIMEOUT,
144 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
146 g_object_class_install_property (gobject_class, PROP_LATENCY,
147 g_param_spec_uint ("latency", "Buffer latency in ms",
148 "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
149 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
151 g_object_class_install_property (gobject_class, PROP_REDIRECT,
152 g_param_spec_boolean ("redirect", "Redirect",
153 "Sends a redirection message instead of using a custom session element",
155 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
157 gst_element_class_add_pad_template (gstelement_class,
158 gst_static_pad_template_get (&sinktemplate));
159 gst_element_class_add_pad_template (gstelement_class,
160 gst_static_pad_template_get (&rtptemplate));
162 gst_element_class_set_static_metadata (gstelement_class, "SDP session setup",
163 "Codec/Demuxer/Network/RTP",
164 "Receive data over the network via SDP",
165 "Wim Taymans <wim.taymans@gmail.com>");
167 gstelement_class->change_state = gst_sdp_demux_change_state;
169 gstbin_class->handle_message = gst_sdp_demux_handle_message;
171 GST_DEBUG_CATEGORY_INIT (sdpdemux_debug, "sdpdemux", 0, "SDP demux");
175 gst_sdp_demux_init (GstSDPDemux * demux)
177 demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
178 gst_pad_set_event_function (demux->sinkpad,
179 GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_event));
180 gst_pad_set_chain_function (demux->sinkpad,
181 GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_chain));
182 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
184 /* protects the streaming thread in interleaved mode or the polling
185 * thread in UDP mode. */
186 g_rec_mutex_init (&demux->stream_rec_lock);
188 demux->adapter = gst_adapter_new ();
192 gst_sdp_demux_finalize (GObject * object)
196 demux = GST_SDP_DEMUX (object);
199 g_rec_mutex_clear (&demux->stream_rec_lock);
201 g_object_unref (demux->adapter);
203 G_OBJECT_CLASS (parent_class)->finalize (object);
207 gst_sdp_demux_set_property (GObject * object, guint prop_id,
208 const GValue * value, GParamSpec * pspec)
212 demux = GST_SDP_DEMUX (object);
216 demux->debug = g_value_get_boolean (value);
219 demux->udp_timeout = g_value_get_uint64 (value);
222 demux->latency = g_value_get_uint (value);
225 demux->redirect = g_value_get_boolean (value);
228 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
234 gst_sdp_demux_get_property (GObject * object, guint prop_id, GValue * value,
239 demux = GST_SDP_DEMUX (object);
243 g_value_set_boolean (value, demux->debug);
246 g_value_set_uint64 (value, demux->udp_timeout);
249 g_value_set_uint (value, demux->latency);
252 g_value_set_boolean (value, demux->redirect);
255 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
261 find_stream_by_id (GstSDPStream * stream, gconstpointer a)
263 gint id = GPOINTER_TO_INT (a);
265 if (stream->id == id)
272 find_stream_by_pt (GstSDPStream * stream, gconstpointer a)
274 gint pt = GPOINTER_TO_INT (a);
276 if (stream->pt == pt)
283 find_stream_by_udpsrc (GstSDPStream * stream, gconstpointer a)
285 GstElement *src = (GstElement *) a;
287 if (stream->udpsrc[0] == src)
289 if (stream->udpsrc[1] == src)
295 static GstSDPStream *
296 find_stream (GstSDPDemux * demux, gconstpointer data, gconstpointer func)
300 /* find and get stream */
302 g_list_find_custom (demux->streams, data, (GCompareFunc) func)))
303 return (GstSDPStream *) lstream->data;
309 gst_sdp_demux_stream_free (GstSDPDemux * demux, GstSDPStream * stream)
313 GST_DEBUG_OBJECT (demux, "free stream %p", stream);
316 gst_caps_unref (stream->caps);
318 for (i = 0; i < 2; i++) {
319 GstElement *udpsrc = stream->udpsrc[i];
322 gst_element_set_state (udpsrc, GST_STATE_NULL);
323 gst_bin_remove (GST_BIN_CAST (demux), udpsrc);
324 stream->udpsrc[i] = NULL;
327 if (stream->udpsink) {
328 gst_element_set_state (stream->udpsink, GST_STATE_NULL);
329 gst_bin_remove (GST_BIN_CAST (demux), stream->udpsink);
330 stream->udpsink = NULL;
332 if (stream->srcpad) {
333 gst_pad_set_active (stream->srcpad, FALSE);
335 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
336 stream->added = FALSE;
338 stream->srcpad = NULL;
344 is_multicast_address (const gchar * host_name)
347 GResolver *resolver = NULL;
348 gboolean ret = FALSE;
350 addr = g_inet_address_new_from_string (host_name);
354 resolver = g_resolver_get_default ();
355 results = g_resolver_lookup_by_name (resolver, host_name, NULL, NULL);
358 addr = G_INET_ADDRESS (g_object_ref (results->data));
360 g_resolver_free_addresses (results);
362 g_assert (addr != NULL);
364 ret = g_inet_address_get_is_multicast (addr);
368 g_object_unref (resolver);
370 g_object_unref (addr);
374 static GstSDPStream *
375 gst_sdp_demux_create_stream (GstSDPDemux * demux, GstSDPMessage * sdp, gint idx)
377 GstSDPStream *stream;
378 const gchar *payload;
379 const GstSDPMedia *media;
380 const GstSDPConnection *conn;
382 /* get media, should not return NULL */
383 media = gst_sdp_message_get_media (sdp, idx);
387 stream = g_new0 (GstSDPStream, 1);
388 stream->parent = demux;
389 /* we mark the pad as not linked, we will mark it as OK when we add the pad to
391 stream->last_ret = GST_FLOW_OK;
392 stream->added = FALSE;
393 stream->disabled = FALSE;
394 stream->id = demux->numstreams++;
397 /* we must have a payload. No payload means we cannot create caps */
398 /* FIXME, handle multiple formats. */
399 if ((payload = gst_sdp_media_get_format (media, 0))) {
400 stream->pt = atoi (payload);
402 stream->caps = gst_sdp_demux_media_to_caps (stream->pt, media);
404 if (stream->pt >= 96) {
405 /* If we have a dynamic payload type, see if we have a stream with the
406 * same payload number. If there is one, they are part of the same
407 * container and we only need to add one pad. */
408 if (find_stream (demux, GINT_TO_POINTER (stream->pt),
409 (gpointer) find_stream_by_pt)) {
410 stream->container = TRUE;
414 if (!(conn = gst_sdp_media_get_connection (media, 0))) {
415 if (!(conn = gst_sdp_message_get_connection (sdp)))
422 stream->destination = conn->address;
423 stream->ttl = conn->ttl;
424 stream->multicast = is_multicast_address (stream->destination);
426 stream->rtp_port = gst_sdp_media_get_port (media);
427 if (gst_sdp_media_get_attribute_val (media, "rtcp")) {
428 /* FIXME, RFC 3605 */
429 stream->rtcp_port = stream->rtp_port + 1;
431 stream->rtcp_port = stream->rtp_port + 1;
434 GST_DEBUG_OBJECT (demux, "stream %d, (%p)", stream->id, stream);
435 GST_DEBUG_OBJECT (demux, " pt: %d", stream->pt);
436 GST_DEBUG_OBJECT (demux, " container: %d", stream->container);
437 GST_DEBUG_OBJECT (demux, " caps: %" GST_PTR_FORMAT, stream->caps);
439 /* we keep track of all streams */
440 demux->streams = g_list_append (demux->streams, stream);
447 gst_sdp_demux_stream_free (demux, stream);
453 gst_sdp_demux_cleanup (GstSDPDemux * demux)
457 GST_DEBUG_OBJECT (demux, "cleanup");
459 for (walk = demux->streams; walk; walk = g_list_next (walk)) {
460 GstSDPStream *stream = (GstSDPStream *) walk->data;
462 gst_sdp_demux_stream_free (demux, stream);
464 g_list_free (demux->streams);
465 demux->streams = NULL;
466 if (demux->session) {
467 if (demux->session_sig_id) {
468 g_signal_handler_disconnect (demux->session, demux->session_sig_id);
469 demux->session_sig_id = 0;
471 if (demux->session_nmp_id) {
472 g_signal_handler_disconnect (demux->session, demux->session_nmp_id);
473 demux->session_nmp_id = 0;
475 if (demux->session_ptmap_id) {
476 g_signal_handler_disconnect (demux->session, demux->session_ptmap_id);
477 demux->session_ptmap_id = 0;
479 gst_element_set_state (demux->session, GST_STATE_NULL);
480 gst_bin_remove (GST_BIN_CAST (demux), demux->session);
481 demux->session = NULL;
483 demux->numstreams = 0;
486 #define PARSE_INT(p, del, res) \
489 p = strstr (p, del); \
499 #define PARSE_STRING(p, del, res) \
502 p = strstr (p, del); \
514 #define SKIP_SPACES(p) \
515 while (*p && g_ascii_isspace (*p)) \
520 * <payload> <encoding_name>/<clock_rate>[/<encoding_params>]
523 gst_sdp_demux_parse_rtpmap (const gchar * rtpmap, gint * payload, gchar ** name,
524 gint * rate, gchar ** params)
528 t = p = (gchar *) rtpmap;
530 PARSE_INT (p, " ", *payload);
538 PARSE_STRING (p, "/", *name);
540 GST_DEBUG ("no rate, name %s", p);
541 /* no rate, assume -1 then */
566 * Mapping of caps to and from SDP fields:
568 * m=<media> <UDP port> RTP/AVP <payload>
569 * a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>]
570 * a=fmtp:<payload> <param>[=<value>];...
573 gst_sdp_demux_media_to_caps (gint pt, const GstSDPMedia * media)
580 gchar *params = NULL;
586 /* get and parse rtpmap */
587 if ((rtpmap = gst_sdp_media_get_attribute_val (media, "rtpmap"))) {
588 ret = gst_sdp_demux_parse_rtpmap (rtpmap, &payload, &name, &rate, ¶ms);
591 /* we ignore the rtpmap if the payload type is different. */
592 g_warning ("rtpmap of wrong payload type, ignoring");
598 /* if we failed to parse the rtpmap for a dynamic payload type, we have an
602 /* else we can ignore */
603 g_warning ("error parsing rtpmap, ignoring");
606 /* dynamic payloads need rtpmap or we fail */
610 /* check if we have a rate, if not, we need to look up the rate from the
611 * default rates based on the payload types. */
613 const GstRTPPayloadInfo *info;
615 if (GST_RTP_PAYLOAD_IS_DYNAMIC (pt)) {
616 /* dynamic types, use media and encoding_name */
617 tmp = g_ascii_strdown (media->media, -1);
618 info = gst_rtp_payload_info_for_name (tmp, name);
621 /* static types, use payload type */
622 info = gst_rtp_payload_info_for_pt (pt);
626 if ((rate = info->clock_rate) == 0)
629 /* we fail if we cannot find one */
634 tmp = g_ascii_strdown (media->media, -1);
635 caps = gst_caps_new_simple ("application/x-rtp",
636 "media", G_TYPE_STRING, tmp, "payload", G_TYPE_INT, pt, NULL);
638 s = gst_caps_get_structure (caps, 0);
640 gst_structure_set (s, "clock-rate", G_TYPE_INT, rate, NULL);
642 /* encoding name must be upper case */
644 tmp = g_ascii_strup (name, -1);
645 gst_structure_set (s, "encoding-name", G_TYPE_STRING, tmp, NULL);
649 /* params must be lower case */
650 if (params != NULL) {
651 tmp = g_ascii_strdown (params, -1);
652 gst_structure_set (s, "encoding-params", G_TYPE_STRING, tmp, NULL);
656 /* parse optional fmtp: field */
657 if ((fmtp = gst_sdp_media_get_attribute_val (media, "fmtp"))) {
663 /* p is now of the format <payload> <param>[=<value>];... */
664 PARSE_INT (p, " ", payload);
665 if (payload != -1 && payload == pt) {
669 /* <param>[=<value>] are separated with ';' */
670 pairs = g_strsplit (p, ";", 0);
671 for (i = 0; pairs[i]; i++) {
675 /* the key may not have a '=', the value can have other '='s */
676 valpos = strstr (pairs[i], "=");
678 /* we have a '=' and thus a value, remove the '=' with \0 */
680 /* value is everything between '=' and ';'. FIXME, strip? */
681 val = g_strstrip (valpos + 1);
683 /* simple <param>;.. is translated into <param>=1;... */
686 /* strip the key of spaces, convert key to lowercase but not the value. */
687 key = g_strstrip (pairs[i]);
688 if (strlen (key) > 1) {
689 tmp = g_ascii_strdown (key, -1);
690 gst_structure_set (s, tmp, G_TYPE_STRING, val, NULL);
702 g_warning ("rtpmap type not given for dynamic payload %d", pt);
707 g_warning ("rate unknown for payload type %d", pt);
712 /* this callback is called when the session manager generated a new src pad with
713 * payloaded RTP packets. We simply ghost the pad here. */
715 new_session_pad (GstElement * session, GstPad * pad, GstSDPDemux * demux)
718 GstPadTemplate *template;
721 GstSDPStream *stream;
724 GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
726 GST_SDP_STREAM_LOCK (demux);
728 name = gst_object_get_name (GST_OBJECT_CAST (pad));
729 if (sscanf (name, "recv_rtp_src_%u_%u_%u", &id, &ssrc, &pt) != 3)
732 GST_DEBUG_OBJECT (demux, "stream: %u, SSRC %d, PT %d", id, ssrc, pt);
735 find_stream (demux, GINT_TO_POINTER (id), (gpointer) find_stream_by_id);
739 /* no need for a timeout anymore now */
740 g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL);
742 /* create a new pad we will use to stream to */
743 template = gst_static_pad_template_get (&rtptemplate);
744 stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template);
745 gst_object_unref (template);
748 stream->added = TRUE;
749 gst_pad_set_active (stream->srcpad, TRUE);
750 gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
752 /* check if we added all streams */
754 for (lstream = demux->streams; lstream; lstream = g_list_next (lstream)) {
755 stream = (GstSDPStream *) lstream->data;
756 /* a container stream only needs one pad added. Also disabled streams don't
758 if (!stream->container && !stream->disabled && !stream->added) {
763 GST_SDP_STREAM_UNLOCK (demux);
766 GST_DEBUG_OBJECT (demux, "We added all streams");
767 /* when we get here, all stream are added and we can fire the no-more-pads
769 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
777 GST_DEBUG_OBJECT (demux, "ignoring unknown stream");
778 GST_SDP_STREAM_UNLOCK (demux);
785 rtsp_session_pad_added (GstElement * session, GstPad * pad, GstSDPDemux * demux)
787 GstPad *srcpad = NULL;
790 GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
792 name = gst_pad_get_name (pad);
793 srcpad = gst_ghost_pad_new (name, pad);
796 gst_pad_set_active (srcpad, TRUE);
797 gst_element_add_pad (GST_ELEMENT_CAST (demux), srcpad);
801 rtsp_session_no_more_pads (GstElement * session, GstSDPDemux * demux)
803 GST_DEBUG_OBJECT (demux, "got no-more-pads");
804 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
808 request_pt_map (GstElement * sess, guint session, guint pt, GstSDPDemux * demux)
810 GstSDPStream *stream;
813 GST_DEBUG_OBJECT (demux, "getting pt map for pt %d in session %d", pt,
816 GST_SDP_STREAM_LOCK (demux);
818 find_stream (demux, GINT_TO_POINTER (session),
819 (gpointer) find_stream_by_id);
826 GST_SDP_STREAM_UNLOCK (demux);
832 GST_DEBUG_OBJECT (demux, "unknown stream %d", session);
833 GST_SDP_STREAM_UNLOCK (demux);
839 gst_sdp_demux_do_stream_eos (GstSDPDemux * demux, guint session)
841 GstSDPStream *stream;
843 GST_DEBUG_OBJECT (demux, "setting stream for session %u to EOS", session);
845 /* get stream for session */
847 find_stream (demux, GINT_TO_POINTER (session),
848 (gpointer) find_stream_by_id);
856 gst_sdp_demux_stream_push_event (demux, stream, gst_event_new_eos ());
862 GST_DEBUG_OBJECT (demux, "unknown stream for session %u", session);
867 GST_DEBUG_OBJECT (demux, "stream for session %u was already EOS", session);
873 on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
876 GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u received BYE", ssrc,
879 gst_sdp_demux_do_stream_eos (demux, session);
883 on_timeout (GstElement * manager, guint session, guint32 ssrc,
886 GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u timed out", ssrc, session);
888 gst_sdp_demux_do_stream_eos (demux, session);
891 /* try to get and configure a manager */
893 gst_sdp_demux_configure_manager (GstSDPDemux * demux, char *rtsp_sdp)
895 /* configure the session manager */
896 if (rtsp_sdp != NULL) {
897 if (!(demux->session = gst_element_factory_make ("rtspsrc", NULL)))
900 g_object_set (demux->session, "location", rtsp_sdp, NULL);
902 GST_DEBUG_OBJECT (demux, "connect to signals on rtspsrc");
903 demux->session_sig_id =
904 g_signal_connect (demux->session, "pad-added",
905 (GCallback) rtsp_session_pad_added, demux);
906 demux->session_nmp_id =
907 g_signal_connect (demux->session, "no-more-pads",
908 (GCallback) rtsp_session_no_more_pads, demux);
910 if (!(demux->session = gst_element_factory_make ("rtpbin", NULL)))
913 /* connect to signals if we did not already do so */
914 GST_DEBUG_OBJECT (demux, "connect to signals on session manager");
915 demux->session_sig_id =
916 g_signal_connect (demux->session, "pad-added",
917 (GCallback) new_session_pad, demux);
918 demux->session_ptmap_id =
919 g_signal_connect (demux->session, "request-pt-map",
920 (GCallback) request_pt_map, demux);
921 g_signal_connect (demux->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
923 g_signal_connect (demux->session, "on-bye-timeout", (GCallback) on_timeout,
925 g_signal_connect (demux->session, "on-timeout", (GCallback) on_timeout,
929 g_object_set (demux->session, "latency", demux->latency, NULL);
931 /* we manage this element */
932 gst_bin_add (GST_BIN_CAST (demux), demux->session);
939 GST_DEBUG_OBJECT (demux, "no session manager element gstrtpbin found");
944 GST_DEBUG_OBJECT (demux, "no manager element rtspsrc found");
950 gst_sdp_demux_stream_configure_udp (GstSDPDemux * demux, GstSDPStream * stream)
953 const gchar *destination;
956 GST_DEBUG_OBJECT (demux, "creating UDP sources for multicast");
958 /* if the destination is not a multicast address, we just want to listen on
960 if (!stream->multicast)
961 destination = "0.0.0.0";
963 destination = stream->destination;
965 /* creating UDP source */
966 if (stream->rtp_port != -1) {
967 GST_DEBUG_OBJECT (demux, "receiving RTP from %s:%d", destination,
970 uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtp_port);
972 gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
974 if (stream->udpsrc[0] == NULL)
978 gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[0]);
980 GST_DEBUG_OBJECT (demux,
981 "setting up UDP source with timeout %" G_GINT64_FORMAT,
984 /* configure a timeout on the UDP port. When the timeout message is
985 * posted, we assume UDP transport is not possible. */
986 g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout",
987 demux->udp_timeout * 1000, NULL);
989 /* get output pad of the UDP source. */
990 pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
992 name = g_strdup_printf ("recv_rtp_sink_%u", stream->id);
993 stream->channelpad[0] = gst_element_get_request_pad (demux->session, name);
996 GST_DEBUG_OBJECT (demux, "connecting RTP source 0 to manager");
997 /* configure for UDP delivery, we need to connect the UDP pads to
998 * the session plugin. */
999 gst_pad_link (pad, stream->channelpad[0]);
1000 gst_object_unref (pad);
1003 gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED);
1006 /* creating another UDP source */
1007 if (stream->rtcp_port != -1) {
1008 GST_DEBUG_OBJECT (demux, "receiving RTCP from %s:%d", destination,
1010 uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtcp_port);
1012 gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
1014 if (stream->udpsrc[1] == NULL)
1017 /* take ownership */
1018 gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[1]);
1020 GST_DEBUG_OBJECT (demux, "connecting RTCP source to manager");
1022 name = g_strdup_printf ("recv_rtcp_sink_%u", stream->id);
1023 stream->channelpad[1] = gst_element_get_request_pad (demux->session, name);
1026 pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1027 gst_pad_link (pad, stream->channelpad[1]);
1028 gst_object_unref (pad);
1030 gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED);
1037 GST_DEBUG_OBJECT (demux, "no UDP source element found");
1042 /* configure the UDP sink back to the server for status reports */
1044 gst_sdp_demux_stream_configure_udp_sink (GstSDPDemux * demux,
1045 GstSDPStream * stream)
1047 GstPad *pad, *sinkpad;
1050 gchar *destination, *uri, *name;
1052 /* get destination and port */
1053 port = stream->rtcp_port;
1054 destination = stream->destination;
1056 GST_DEBUG_OBJECT (demux, "configure UDP sink for %s:%d", destination, port);
1058 uri = g_strdup_printf ("udp://%s:%d", destination, port);
1059 stream->udpsink = gst_element_make_from_uri (GST_URI_SINK, uri, NULL, NULL);
1061 if (stream->udpsink == NULL)
1062 goto no_sink_element;
1064 /* we clear all destinations because we don't really know where to send the
1065 * RTCP to and we want to avoid sending it to our own ports.
1066 * FIXME when we get an RTCP packet from the sender, we could look at its
1067 * source port and address and try to send RTCP there. */
1068 if (!stream->multicast)
1069 g_signal_emit_by_name (stream->udpsink, "clear");
1071 g_object_set (G_OBJECT (stream->udpsink), "auto-multicast", FALSE, NULL);
1072 g_object_set (G_OBJECT (stream->udpsink), "loop", FALSE, NULL);
1073 /* no sync needed */
1074 g_object_set (G_OBJECT (stream->udpsink), "sync", FALSE, NULL);
1075 /* no async state changes needed */
1076 g_object_set (G_OBJECT (stream->udpsink), "async", FALSE, NULL);
1078 if (stream->udpsrc[1]) {
1079 /* configure socket, we give it the same UDP socket as the udpsrc for RTCP
1080 * because some servers check the port number of where it sends RTCP to identify
1081 * the RTCP packets it receives */
1082 g_object_get (G_OBJECT (stream->udpsrc[1]), "used_socket", &socket, NULL);
1083 GST_DEBUG_OBJECT (demux, "UDP src has socket %p", socket);
1084 /* configure socket and make sure udpsink does not close it when shutting
1085 * down, it belongs to udpsrc after all. */
1086 g_object_set (G_OBJECT (stream->udpsink), "socket", socket, NULL);
1087 g_object_set (G_OBJECT (stream->udpsink), "close-socket", FALSE, NULL);
1088 g_object_unref (socket);
1091 /* we keep this playing always */
1092 gst_element_set_locked_state (stream->udpsink, TRUE);
1093 gst_element_set_state (stream->udpsink, GST_STATE_PLAYING);
1095 gst_bin_add (GST_BIN_CAST (demux), stream->udpsink);
1097 /* get session RTCP pad */
1098 name = g_strdup_printf ("send_rtcp_src_%u", stream->id);
1099 pad = gst_element_get_request_pad (demux->session, name);
1104 sinkpad = gst_element_get_static_pad (stream->udpsink, "sink");
1105 gst_pad_link (pad, sinkpad);
1106 gst_object_unref (sinkpad);
1108 /* not very fatal, we just won't be able to send RTCP */
1109 GST_WARNING_OBJECT (demux, "could not get session RTCP pad");
1118 GST_DEBUG_OBJECT (demux, "no UDP sink element found");
1123 static GstFlowReturn
1124 gst_sdp_demux_combine_flows (GstSDPDemux * demux, GstSDPStream * stream,
1129 /* store the value */
1130 stream->last_ret = ret;
1132 /* if it's success we can return the value right away */
1133 if (ret == GST_FLOW_OK)
1136 /* any other error that is not-linked can be returned right
1138 if (ret != GST_FLOW_NOT_LINKED)
1141 /* only return NOT_LINKED if all other pads returned NOT_LINKED */
1142 for (streams = demux->streams; streams; streams = g_list_next (streams)) {
1143 GstSDPStream *ostream = (GstSDPStream *) streams->data;
1145 ret = ostream->last_ret;
1146 /* some other return value (must be SUCCESS but we can return
1147 * other values as well) */
1148 if (ret != GST_FLOW_NOT_LINKED)
1151 /* if we get here, all other pads were unlinked and we return
1152 * NOT_LINKED then */
1158 gst_sdp_demux_stream_push_event (GstSDPDemux * demux, GstSDPStream * stream,
1161 /* only streams that have a connection to the outside world */
1162 if (stream->srcpad == NULL)
1165 if (stream->channelpad[0]) {
1166 gst_event_ref (event);
1167 gst_pad_send_event (stream->channelpad[0], event);
1170 if (stream->channelpad[1]) {
1171 gst_event_ref (event);
1172 gst_pad_send_event (stream->channelpad[1], event);
1176 gst_event_unref (event);
1180 gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message)
1184 demux = GST_SDP_DEMUX (bin);
1186 switch (GST_MESSAGE_TYPE (message)) {
1187 case GST_MESSAGE_ELEMENT:
1189 const GstStructure *s = gst_message_get_structure (message);
1191 if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
1192 gboolean ignore_timeout;
1194 GST_DEBUG_OBJECT (bin, "timeout on UDP port");
1196 GST_OBJECT_LOCK (demux);
1197 ignore_timeout = demux->ignore_timeout;
1198 demux->ignore_timeout = TRUE;
1199 GST_OBJECT_UNLOCK (demux);
1201 /* we only act on the first udp timeout message, others are irrelevant
1202 * and can be ignored. */
1204 gst_message_unref (message);
1206 GST_ELEMENT_ERROR (demux, RESOURCE, READ, (NULL),
1207 ("Could not receive any UDP packets for %.4f seconds, maybe your "
1208 "firewall is blocking it.",
1209 gst_guint64_to_gdouble (demux->udp_timeout / 1000000.0)));
1213 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1216 case GST_MESSAGE_ERROR:
1219 GstSDPStream *stream;
1222 udpsrc = GST_MESSAGE_SRC (message);
1224 GST_DEBUG_OBJECT (demux, "got error from %s", GST_ELEMENT_NAME (udpsrc));
1226 stream = find_stream (demux, udpsrc, (gpointer) find_stream_by_udpsrc);
1227 /* fatal but not our message, forward */
1231 /* we ignore the RTCP udpsrc */
1232 if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
1235 /* if we get error messages from the udp sources, that's not a problem as
1236 * long as not all of them error out. We also don't really know what the
1237 * problem is, the message does not give enough detail... */
1238 ret = gst_sdp_demux_combine_flows (demux, stream, GST_FLOW_NOT_LINKED);
1239 GST_DEBUG_OBJECT (demux, "combined flows: %s", gst_flow_get_name (ret));
1240 if (ret != GST_FLOW_OK)
1244 gst_message_unref (message);
1248 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1253 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1260 gst_sdp_demux_start (GstSDPDemux * demux)
1265 GstSDPMessage sdp = { 0 };
1266 GstSDPStream *stream = NULL;
1269 GstStateChangeReturn ret;
1271 /* grab the lock so that no state change can interfere */
1272 GST_SDP_STREAM_LOCK (demux);
1274 GST_DEBUG_OBJECT (demux, "parse SDP...");
1276 size = gst_adapter_available (demux->adapter);
1277 data = gst_adapter_take (demux->adapter, size);
1279 gst_sdp_message_init (&sdp);
1280 if (gst_sdp_message_parse_buffer (data, size, &sdp) != GST_SDP_OK)
1281 goto could_not_parse;
1284 gst_sdp_message_dump (&sdp);
1286 /* maybe this is plain RTSP DESCRIBE rtsp and we should redirect */
1287 /* look for rtsp control url */
1289 const gchar *control;
1292 control = gst_sdp_message_get_attribute_val_n (&sdp, "control", i);
1293 if (control == NULL)
1296 /* only take fully qualified urls */
1297 if (g_str_has_prefix (control, "rtsp://"))
1303 /* try to find non-aggragate control */
1304 n_streams = gst_sdp_message_medias_len (&sdp);
1306 for (idx = 0; idx < n_streams; idx++) {
1307 const GstSDPMedia *media;
1309 /* get media, should not return NULL */
1310 media = gst_sdp_message_get_media (&sdp, idx);
1315 control = gst_sdp_media_get_attribute_val_n (media, "control", i);
1316 if (control == NULL)
1319 /* only take fully qualified urls */
1320 if (g_str_has_prefix (control, "rtsp://"))
1323 /* this media has no control, exit */
1330 /* we have RTSP now */
1331 uri = gst_sdp_message_as_uri ("rtsp-sdp", &sdp);
1333 if (demux->redirect) {
1334 GST_INFO_OBJECT (demux, "redirect to %s", uri);
1336 gst_element_post_message (GST_ELEMENT_CAST (demux),
1337 gst_message_new_element (GST_OBJECT_CAST (demux),
1338 gst_structure_new ("redirect",
1339 "new-location", G_TYPE_STRING, uri, NULL)));
1345 /* we get here when we didn't do a redirect */
1347 /* try to get and configure a manager */
1348 if (!gst_sdp_demux_configure_manager (demux, uri))
1351 /* create streams with UDP sources and sinks */
1352 n_streams = gst_sdp_message_medias_len (&sdp);
1353 for (i = 0; i < n_streams; i++) {
1354 stream = gst_sdp_demux_create_stream (demux, &sdp, i);
1359 GST_DEBUG_OBJECT (demux, "configuring transport for stream %p", stream);
1361 if (!gst_sdp_demux_stream_configure_udp (demux, stream))
1362 goto transport_failed;
1363 if (!gst_sdp_demux_stream_configure_udp_sink (demux, stream))
1364 goto transport_failed;
1367 if (!demux->streams)
1371 /* set target state on session manager */
1372 /* setting rtspsrc to PLAYING may cause it to loose it that target state
1373 * along the way due to no-preroll udpsrc elements, so ...
1374 * do it in two stages here (similar to other elements) */
1375 if (demux->target > GST_STATE_PAUSED) {
1376 ret = gst_element_set_state (demux->session, GST_STATE_PAUSED);
1377 if (ret == GST_STATE_CHANGE_FAILURE)
1378 goto start_session_failure;
1380 ret = gst_element_set_state (demux->session, demux->target);
1381 if (ret == GST_STATE_CHANGE_FAILURE)
1382 goto start_session_failure;
1385 /* activate all streams */
1386 for (walk = demux->streams; walk; walk = g_list_next (walk)) {
1387 stream = (GstSDPStream *) walk->data;
1389 /* configure target state on udp sources */
1390 gst_element_set_state (stream->udpsrc[0], demux->target);
1391 gst_element_set_state (stream->udpsrc[1], demux->target);
1394 GST_SDP_STREAM_UNLOCK (demux);
1395 gst_sdp_message_uninit (&sdp);
1403 GST_SDP_STREAM_UNLOCK (demux);
1404 gst_sdp_message_uninit (&sdp);
1410 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1411 ("Could not create RTP stream transport."));
1416 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1417 ("Could not create RTP session manager."));
1422 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1423 ("Could not parse SDP message."));
1428 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1429 ("No streams in SDP message."));
1434 /* avoid hanging if redirect not handled */
1435 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1436 ("Sent RTSP redirect."));
1439 start_session_failure:
1441 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1442 ("Could not start RTP session manager."));
1443 gst_element_set_state (demux->session, GST_STATE_NULL);
1444 gst_bin_remove (GST_BIN_CAST (demux), demux->session);
1445 demux->session = NULL;
1451 gst_sdp_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
1454 gboolean res = TRUE;
1456 demux = GST_SDP_DEMUX (parent);
1458 switch (GST_EVENT_TYPE (event)) {
1460 /* when we get EOS, start parsing the SDP */
1461 res = gst_sdp_demux_start (demux);
1462 gst_event_unref (event);
1465 gst_event_unref (event);
1472 static GstFlowReturn
1473 gst_sdp_demux_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1477 demux = GST_SDP_DEMUX (parent);
1479 /* push the SDP message in an adapter, we start doing something with it when
1481 gst_adapter_push (demux->adapter, buffer);
1486 static GstStateChangeReturn
1487 gst_sdp_demux_change_state (GstElement * element, GstStateChange transition)
1490 GstStateChangeReturn ret;
1492 demux = GST_SDP_DEMUX (element);
1494 GST_SDP_STREAM_LOCK (demux);
1496 switch (transition) {
1497 case GST_STATE_CHANGE_NULL_TO_READY:
1499 case GST_STATE_CHANGE_READY_TO_PAUSED:
1500 /* first attempt, don't ignore timeouts */
1501 gst_adapter_clear (demux->adapter);
1502 demux->ignore_timeout = FALSE;
1503 demux->target = GST_STATE_PAUSED;
1505 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1506 demux->target = GST_STATE_PLAYING;
1512 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1513 if (ret == GST_STATE_CHANGE_FAILURE)
1516 switch (transition) {
1517 case GST_STATE_CHANGE_READY_TO_PAUSED:
1518 ret = GST_STATE_CHANGE_NO_PREROLL;
1520 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1521 ret = GST_STATE_CHANGE_NO_PREROLL;
1522 demux->target = GST_STATE_PAUSED;
1524 case GST_STATE_CHANGE_PAUSED_TO_READY:
1525 gst_sdp_demux_cleanup (demux);
1527 case GST_STATE_CHANGE_READY_TO_NULL:
1534 GST_SDP_STREAM_UNLOCK (demux);