2 * Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 * Boston, MA 02111-1307, USA.
20 * SECTION:element-sdpdemux
22 * sdpdemux currently understands SDP as the input format of the session description.
23 * For each stream listed in the SDP a new rtp_stream%d pad will be created
24 * with caps derived from the SDP media description. This is a caps of mime type
25 * "application/x-rtp" that can be connected to any available RTP depayloader
28 * sdpdemux will internally instantiate an RTP session manager element
29 * that will handle the RTCP messages to and from the server, jitter removal,
30 * packet reordering along with providing a clock for the pipeline.
32 * sdpdemux acts like a live element and will therefore only generate data in the
36 * <title>Example launch line</title>
38 * gst-launch gnomevfssrc location=http://some.server/session.sdp ! sdpdemux ! fakesink
39 * ]| Establish a connection to an HTTP server that contains an SDP session description
40 * that gets parsed by sdpdemux and send the raw RTP packets to a fakesink.
43 * Last reviewed on 2007-10-01 (0.10.6)
54 /* include GLIB for G_OS_WIN32 */
61 /* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
62 * * minwg32 headers check WINVER before allowing the use of these */
68 #include <sys/socket.h>
70 #include <netinet/in.h>
79 #include <gst/rtp/gstrtppayloads.h>
80 #include <gst/sdp/gstsdpmessage.h>
82 #include "gstsdpdemux.h"
84 GST_DEBUG_CATEGORY_STATIC (sdpdemux_debug);
85 #define GST_CAT_DEFAULT (sdpdemux_debug)
87 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
90 GST_STATIC_CAPS ("application/sdp"));
92 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream%d",
95 GST_STATIC_CAPS ("application/x-rtp"));
103 #define DEFAULT_DEBUG FALSE
104 #define DEFAULT_TIMEOUT 10000000
105 #define DEFAULT_LATENCY_MS 200
106 #define DEFAULT_REDIRECT TRUE
118 static void gst_sdp_demux_finalize (GObject * object);
120 static void gst_sdp_demux_set_property (GObject * object, guint prop_id,
121 const GValue * value, GParamSpec * pspec);
122 static void gst_sdp_demux_get_property (GObject * object, guint prop_id,
123 GValue * value, GParamSpec * pspec);
125 static GstCaps *gst_sdp_demux_media_to_caps (gint pt,
126 const GstSDPMedia * media);
128 static GstStateChangeReturn gst_sdp_demux_change_state (GstElement * element,
129 GstStateChange transition);
130 static void gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message);
132 static void gst_sdp_demux_stream_push_event (GstSDPDemux * demux,
133 GstSDPStream * stream, GstEvent * event);
135 static gboolean gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event);
136 static GstFlowReturn gst_sdp_demux_sink_chain (GstPad * pad,
139 /*static guint gst_sdp_demux_signals[LAST_SIGNAL] = { 0 }; */
142 _do_init (GType sdp_demux_type)
144 GST_DEBUG_CATEGORY_INIT (sdpdemux_debug, "sdpdemux", 0, "SDP demux");
147 GST_BOILERPLATE_FULL (GstSDPDemux, gst_sdp_demux, GstBin, GST_TYPE_BIN,
151 gst_sdp_demux_base_init (gpointer g_class)
153 GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
155 gst_element_class_add_static_pad_template (element_class, &sinktemplate);
156 gst_element_class_add_static_pad_template (element_class, &rtptemplate);
158 gst_element_class_set_details_simple (element_class, "SDP session setup",
159 "Codec/Demuxer/Network/RTP",
160 "Receive data over the network via SDP",
161 "Wim Taymans <wim.taymans@gmail.com>");
165 gst_sdp_demux_class_init (GstSDPDemuxClass * klass)
167 GObjectClass *gobject_class;
168 GstElementClass *gstelement_class;
169 GstBinClass *gstbin_class;
171 gobject_class = (GObjectClass *) klass;
172 gstelement_class = (GstElementClass *) klass;
173 gstbin_class = (GstBinClass *) klass;
175 gobject_class->set_property = gst_sdp_demux_set_property;
176 gobject_class->get_property = gst_sdp_demux_get_property;
178 gobject_class->finalize = gst_sdp_demux_finalize;
180 g_object_class_install_property (gobject_class, PROP_DEBUG,
181 g_param_spec_boolean ("debug", "Debug",
182 "Dump request and response messages to stdout",
184 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
186 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
187 g_param_spec_uint64 ("timeout", "Timeout",
188 "Fail transport after UDP timeout microseconds (0 = disabled)",
189 0, G_MAXUINT64, DEFAULT_TIMEOUT,
190 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
192 g_object_class_install_property (gobject_class, PROP_LATENCY,
193 g_param_spec_uint ("latency", "Buffer latency in ms",
194 "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
195 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
197 g_object_class_install_property (gobject_class, PROP_REDIRECT,
198 g_param_spec_boolean ("redirect", "Redirect",
199 "Sends a redirection message instead of using a custom session element",
201 G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
203 gstelement_class->change_state = gst_sdp_demux_change_state;
205 gstbin_class->handle_message = gst_sdp_demux_handle_message;
209 gst_sdp_demux_init (GstSDPDemux * demux, GstSDPDemuxClass * g_class)
211 demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
212 gst_pad_set_event_function (demux->sinkpad,
213 GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_event));
214 gst_pad_set_chain_function (demux->sinkpad,
215 GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_chain));
216 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
218 /* protects the streaming thread in interleaved mode or the polling
219 * thread in UDP mode. */
220 demux->stream_rec_lock = g_new (GStaticRecMutex, 1);
221 g_static_rec_mutex_init (demux->stream_rec_lock);
223 demux->adapter = gst_adapter_new ();
227 gst_sdp_demux_finalize (GObject * object)
231 demux = GST_SDP_DEMUX (object);
234 g_static_rec_mutex_free (demux->stream_rec_lock);
235 g_free (demux->stream_rec_lock);
237 g_object_unref (demux->adapter);
239 G_OBJECT_CLASS (parent_class)->finalize (object);
243 gst_sdp_demux_set_property (GObject * object, guint prop_id,
244 const GValue * value, GParamSpec * pspec)
248 demux = GST_SDP_DEMUX (object);
252 demux->debug = g_value_get_boolean (value);
255 demux->udp_timeout = g_value_get_uint64 (value);
258 demux->latency = g_value_get_uint (value);
261 demux->redirect = g_value_get_boolean (value);
264 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
270 gst_sdp_demux_get_property (GObject * object, guint prop_id, GValue * value,
275 demux = GST_SDP_DEMUX (object);
279 g_value_set_boolean (value, demux->debug);
282 g_value_set_uint64 (value, demux->udp_timeout);
285 g_value_set_uint (value, demux->latency);
288 g_value_set_boolean (value, demux->redirect);
291 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
297 find_stream_by_id (GstSDPStream * stream, gconstpointer a)
299 gint id = GPOINTER_TO_INT (a);
301 if (stream->id == id)
308 find_stream_by_pt (GstSDPStream * stream, gconstpointer a)
310 gint pt = GPOINTER_TO_INT (a);
312 if (stream->pt == pt)
319 find_stream_by_udpsrc (GstSDPStream * stream, gconstpointer a)
321 GstElement *src = (GstElement *) a;
323 if (stream->udpsrc[0] == src)
325 if (stream->udpsrc[1] == src)
331 static GstSDPStream *
332 find_stream (GstSDPDemux * demux, gconstpointer data, gconstpointer func)
336 /* find and get stream */
338 g_list_find_custom (demux->streams, data, (GCompareFunc) func)))
339 return (GstSDPStream *) lstream->data;
345 gst_sdp_demux_stream_free (GstSDPDemux * demux, GstSDPStream * stream)
349 GST_DEBUG_OBJECT (demux, "free stream %p", stream);
352 gst_caps_unref (stream->caps);
354 for (i = 0; i < 2; i++) {
355 GstElement *udpsrc = stream->udpsrc[i];
358 gst_element_set_state (udpsrc, GST_STATE_NULL);
359 gst_bin_remove (GST_BIN_CAST (demux), udpsrc);
360 stream->udpsrc[i] = NULL;
363 if (stream->udpsink) {
364 gst_element_set_state (stream->udpsink, GST_STATE_NULL);
365 gst_bin_remove (GST_BIN_CAST (demux), stream->udpsink);
366 stream->udpsink = NULL;
368 if (stream->srcpad) {
369 gst_pad_set_active (stream->srcpad, FALSE);
371 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
372 stream->added = FALSE;
374 stream->srcpad = NULL;
380 is_multicast_address (const gchar * host_name)
382 struct addrinfo hints;
384 struct addrinfo *res;
385 gboolean ret = FALSE;
387 memset (&hints, 0, sizeof (hints));
388 hints.ai_socktype = SOCK_DGRAM;
390 g_return_val_if_fail (host_name, FALSE);
392 if (getaddrinfo (host_name, NULL, &hints, &res) < 0)
395 for (ai = res; !ret && ai; ai = ai->ai_next) {
396 if (ai->ai_family == AF_INET)
398 IN_MULTICAST (ntohl (((struct sockaddr_in *) ai->ai_addr)->
402 IN6_IS_ADDR_MULTICAST (&((struct sockaddr_in6 *) ai->
403 ai_addr)->sin6_addr);
411 static GstSDPStream *
412 gst_sdp_demux_create_stream (GstSDPDemux * demux, GstSDPMessage * sdp, gint idx)
414 GstSDPStream *stream;
415 const gchar *payload;
416 const GstSDPMedia *media;
417 const GstSDPConnection *conn;
419 /* get media, should not return NULL */
420 media = gst_sdp_message_get_media (sdp, idx);
424 stream = g_new0 (GstSDPStream, 1);
425 stream->parent = demux;
426 /* we mark the pad as not linked, we will mark it as OK when we add the pad to
428 stream->last_ret = GST_FLOW_OK;
429 stream->added = FALSE;
430 stream->disabled = FALSE;
431 stream->id = demux->numstreams++;
434 /* we must have a payload. No payload means we cannot create caps */
435 /* FIXME, handle multiple formats. */
436 if ((payload = gst_sdp_media_get_format (media, 0))) {
437 stream->pt = atoi (payload);
439 stream->caps = gst_sdp_demux_media_to_caps (stream->pt, media);
441 if (stream->pt >= 96) {
442 /* If we have a dynamic payload type, see if we have a stream with the
443 * same payload number. If there is one, they are part of the same
444 * container and we only need to add one pad. */
445 if (find_stream (demux, GINT_TO_POINTER (stream->pt),
446 (gpointer) find_stream_by_pt)) {
447 stream->container = TRUE;
451 if (!(conn = gst_sdp_media_get_connection (media, 0))) {
452 if (!(conn = gst_sdp_message_get_connection (sdp)))
459 stream->destination = conn->address;
460 stream->ttl = conn->ttl;
461 stream->multicast = is_multicast_address (stream->destination);
463 stream->rtp_port = gst_sdp_media_get_port (media);
464 if (gst_sdp_media_get_attribute_val (media, "rtcp")) {
465 /* FIXME, RFC 3605 */
466 stream->rtcp_port = stream->rtp_port + 1;
468 stream->rtcp_port = stream->rtp_port + 1;
471 GST_DEBUG_OBJECT (demux, "stream %d, (%p)", stream->id, stream);
472 GST_DEBUG_OBJECT (demux, " pt: %d", stream->pt);
473 GST_DEBUG_OBJECT (demux, " container: %d", stream->container);
474 GST_DEBUG_OBJECT (demux, " caps: %" GST_PTR_FORMAT, stream->caps);
476 /* we keep track of all streams */
477 demux->streams = g_list_append (demux->streams, stream);
484 gst_sdp_demux_stream_free (demux, stream);
490 gst_sdp_demux_cleanup (GstSDPDemux * demux)
494 GST_DEBUG_OBJECT (demux, "cleanup");
496 for (walk = demux->streams; walk; walk = g_list_next (walk)) {
497 GstSDPStream *stream = (GstSDPStream *) walk->data;
499 gst_sdp_demux_stream_free (demux, stream);
501 g_list_free (demux->streams);
502 demux->streams = NULL;
503 if (demux->session) {
504 if (demux->session_sig_id) {
505 g_signal_handler_disconnect (demux->session, demux->session_sig_id);
506 demux->session_sig_id = 0;
508 if (demux->session_nmp_id) {
509 g_signal_handler_disconnect (demux->session, demux->session_nmp_id);
510 demux->session_nmp_id = 0;
512 if (demux->session_ptmap_id) {
513 g_signal_handler_disconnect (demux->session, demux->session_ptmap_id);
514 demux->session_ptmap_id = 0;
516 gst_element_set_state (demux->session, GST_STATE_NULL);
517 gst_bin_remove (GST_BIN_CAST (demux), demux->session);
518 demux->session = NULL;
520 demux->numstreams = 0;
523 #define PARSE_INT(p, del, res) \
526 p = strstr (p, del); \
536 #define PARSE_STRING(p, del, res) \
539 p = strstr (p, del); \
551 #define SKIP_SPACES(p) \
552 while (*p && g_ascii_isspace (*p)) \
557 * <payload> <encoding_name>/<clock_rate>[/<encoding_params>]
560 gst_sdp_demux_parse_rtpmap (const gchar * rtpmap, gint * payload, gchar ** name,
561 gint * rate, gchar ** params)
565 t = p = (gchar *) rtpmap;
567 PARSE_INT (p, " ", *payload);
575 PARSE_STRING (p, "/", *name);
577 GST_DEBUG ("no rate, name %s", p);
578 /* no rate, assume -1 then */
603 * Mapping of caps to and from SDP fields:
605 * m=<media> <UDP port> RTP/AVP <payload>
606 * a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>]
607 * a=fmtp:<payload> <param>[=<value>];...
610 gst_sdp_demux_media_to_caps (gint pt, const GstSDPMedia * media)
617 gchar *params = NULL;
623 /* get and parse rtpmap */
624 if ((rtpmap = gst_sdp_media_get_attribute_val (media, "rtpmap"))) {
625 ret = gst_sdp_demux_parse_rtpmap (rtpmap, &payload, &name, &rate, ¶ms);
628 /* we ignore the rtpmap if the payload type is different. */
629 g_warning ("rtpmap of wrong payload type, ignoring");
635 /* if we failed to parse the rtpmap for a dynamic payload type, we have an
639 /* else we can ignore */
640 g_warning ("error parsing rtpmap, ignoring");
643 /* dynamic payloads need rtpmap or we fail */
647 /* check if we have a rate, if not, we need to look up the rate from the
648 * default rates based on the payload types. */
650 const GstRTPPayloadInfo *info;
652 if (GST_RTP_PAYLOAD_IS_DYNAMIC (pt)) {
653 /* dynamic types, use media and encoding_name */
654 tmp = g_ascii_strdown (media->media, -1);
655 info = gst_rtp_payload_info_for_name (tmp, name);
658 /* static types, use payload type */
659 info = gst_rtp_payload_info_for_pt (pt);
663 if ((rate = info->clock_rate) == 0)
666 /* we fail if we cannot find one */
671 tmp = g_ascii_strdown (media->media, -1);
672 caps = gst_caps_new_simple ("application/x-rtp",
673 "media", G_TYPE_STRING, tmp, "payload", G_TYPE_INT, pt, NULL);
675 s = gst_caps_get_structure (caps, 0);
677 gst_structure_set (s, "clock-rate", G_TYPE_INT, rate, NULL);
679 /* encoding name must be upper case */
681 tmp = g_ascii_strup (name, -1);
682 gst_structure_set (s, "encoding-name", G_TYPE_STRING, tmp, NULL);
686 /* params must be lower case */
687 if (params != NULL) {
688 tmp = g_ascii_strdown (params, -1);
689 gst_structure_set (s, "encoding-params", G_TYPE_STRING, tmp, NULL);
693 /* parse optional fmtp: field */
694 if ((fmtp = gst_sdp_media_get_attribute_val (media, "fmtp"))) {
700 /* p is now of the format <payload> <param>[=<value>];... */
701 PARSE_INT (p, " ", payload);
702 if (payload != -1 && payload == pt) {
706 /* <param>[=<value>] are separated with ';' */
707 pairs = g_strsplit (p, ";", 0);
708 for (i = 0; pairs[i]; i++) {
712 /* the key may not have a '=', the value can have other '='s */
713 valpos = strstr (pairs[i], "=");
715 /* we have a '=' and thus a value, remove the '=' with \0 */
717 /* value is everything between '=' and ';'. FIXME, strip? */
718 val = g_strstrip (valpos + 1);
720 /* simple <param>;.. is translated into <param>=1;... */
723 /* strip the key of spaces, convert key to lowercase but not the value. */
724 key = g_strstrip (pairs[i]);
725 if (strlen (key) > 1) {
726 tmp = g_ascii_strdown (key, -1);
727 gst_structure_set (s, tmp, G_TYPE_STRING, val, NULL);
739 g_warning ("rtpmap type not given for dynamic payload %d", pt);
744 g_warning ("rate unknown for payload type %d", pt);
749 /* this callback is called when the session manager generated a new src pad with
750 * payloaded RTP packets. We simply ghost the pad here. */
752 new_session_pad (GstElement * session, GstPad * pad, GstSDPDemux * demux)
755 GstPadTemplate *template;
758 GstSDPStream *stream;
761 GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
763 GST_SDP_STREAM_LOCK (demux);
765 name = gst_object_get_name (GST_OBJECT_CAST (pad));
766 if (sscanf (name, "recv_rtp_src_%d_%d_%d", &id, &ssrc, &pt) != 3)
769 GST_DEBUG_OBJECT (demux, "stream: %u, SSRC %d, PT %d", id, ssrc, pt);
772 find_stream (demux, GINT_TO_POINTER (id), (gpointer) find_stream_by_id);
776 /* no need for a timeout anymore now */
777 g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL);
779 /* create a new pad we will use to stream to */
780 template = gst_static_pad_template_get (&rtptemplate);
781 stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template);
782 gst_object_unref (template);
785 stream->added = TRUE;
786 gst_pad_set_active (stream->srcpad, TRUE);
787 gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
789 /* check if we added all streams */
791 for (lstream = demux->streams; lstream; lstream = g_list_next (lstream)) {
792 stream = (GstSDPStream *) lstream->data;
793 /* a container stream only needs one pad added. Also disabled streams don't
795 if (!stream->container && !stream->disabled && !stream->added) {
800 GST_SDP_STREAM_UNLOCK (demux);
803 GST_DEBUG_OBJECT (demux, "We added all streams");
804 /* when we get here, all stream are added and we can fire the no-more-pads
806 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
814 GST_DEBUG_OBJECT (demux, "ignoring unknown stream");
815 GST_SDP_STREAM_UNLOCK (demux);
822 rtsp_session_pad_added (GstElement * session, GstPad * pad, GstSDPDemux * demux)
824 GstPad *srcpad = NULL;
827 GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
829 name = gst_pad_get_name (pad);
830 srcpad = gst_ghost_pad_new (name, pad);
833 gst_pad_set_active (srcpad, TRUE);
834 gst_element_add_pad (GST_ELEMENT_CAST (demux), srcpad);
838 rtsp_session_no_more_pads (GstElement * session, GstSDPDemux * demux)
840 GST_DEBUG_OBJECT (demux, "got no-more-pads");
841 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
845 request_pt_map (GstElement * sess, guint session, guint pt, GstSDPDemux * demux)
847 GstSDPStream *stream;
850 GST_DEBUG_OBJECT (demux, "getting pt map for pt %d in session %d", pt,
853 GST_SDP_STREAM_LOCK (demux);
855 find_stream (demux, GINT_TO_POINTER (session),
856 (gpointer) find_stream_by_id);
863 GST_SDP_STREAM_UNLOCK (demux);
869 GST_DEBUG_OBJECT (demux, "unknown stream %d", session);
870 GST_SDP_STREAM_UNLOCK (demux);
876 gst_sdp_demux_do_stream_eos (GstSDPDemux * demux, guint session)
878 GstSDPStream *stream;
880 GST_DEBUG_OBJECT (demux, "setting stream for session %u to EOS", session);
882 /* get stream for session */
884 find_stream (demux, GINT_TO_POINTER (session),
885 (gpointer) find_stream_by_id);
893 gst_sdp_demux_stream_push_event (demux, stream, gst_event_new_eos ());
899 GST_DEBUG_OBJECT (demux, "unknown stream for session %u", session);
904 GST_DEBUG_OBJECT (demux, "stream for session %u was already EOS", session);
910 on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
913 GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u received BYE", ssrc,
916 gst_sdp_demux_do_stream_eos (demux, session);
920 on_timeout (GstElement * manager, guint session, guint32 ssrc,
923 GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u timed out", ssrc, session);
925 gst_sdp_demux_do_stream_eos (demux, session);
928 /* try to get and configure a manager */
930 gst_sdp_demux_configure_manager (GstSDPDemux * demux, char *rtsp_sdp)
932 /* configure the session manager */
933 if (rtsp_sdp != NULL) {
934 if (!(demux->session = gst_element_factory_make ("rtspsrc", NULL)))
937 g_object_set (demux->session, "location", rtsp_sdp, NULL);
939 GST_DEBUG_OBJECT (demux, "connect to signals on rtspsrc");
940 demux->session_sig_id =
941 g_signal_connect (demux->session, "pad-added",
942 (GCallback) rtsp_session_pad_added, demux);
943 demux->session_nmp_id =
944 g_signal_connect (demux->session, "no-more-pads",
945 (GCallback) rtsp_session_no_more_pads, demux);
947 if (!(demux->session = gst_element_factory_make ("gstrtpbin", NULL)))
950 /* connect to signals if we did not already do so */
951 GST_DEBUG_OBJECT (demux, "connect to signals on session manager");
952 demux->session_sig_id =
953 g_signal_connect (demux->session, "pad-added",
954 (GCallback) new_session_pad, demux);
955 demux->session_ptmap_id =
956 g_signal_connect (demux->session, "request-pt-map",
957 (GCallback) request_pt_map, demux);
958 g_signal_connect (demux->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
960 g_signal_connect (demux->session, "on-bye-timeout", (GCallback) on_timeout,
962 g_signal_connect (demux->session, "on-timeout", (GCallback) on_timeout,
966 g_object_set (demux->session, "latency", demux->latency, NULL);
968 /* we manage this element */
969 gst_bin_add (GST_BIN_CAST (demux), demux->session);
976 GST_DEBUG_OBJECT (demux, "no session manager element gstrtpbin found");
981 GST_DEBUG_OBJECT (demux, "no manager element rtspsrc found");
987 gst_sdp_demux_stream_configure_udp (GstSDPDemux * demux, GstSDPStream * stream)
990 const gchar *destination;
993 GST_DEBUG_OBJECT (demux, "creating UDP sources for multicast");
995 /* if the destination is not a multicast address, we just want to listen on
997 if (!stream->multicast)
998 destination = "0.0.0.0";
1000 destination = stream->destination;
1002 /* creating UDP source */
1003 if (stream->rtp_port != -1) {
1004 GST_DEBUG_OBJECT (demux, "receiving RTP from %s:%d", destination,
1007 uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtp_port);
1008 stream->udpsrc[0] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
1010 if (stream->udpsrc[0] == NULL)
1013 /* take ownership */
1014 gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[0]);
1016 GST_DEBUG_OBJECT (demux,
1017 "setting up UDP source with timeout %" G_GINT64_FORMAT,
1018 demux->udp_timeout);
1020 /* configure a timeout on the UDP port. When the timeout message is
1021 * posted, we assume UDP transport is not possible. */
1022 g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", demux->udp_timeout,
1025 /* get output pad of the UDP source. */
1026 pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1028 name = g_strdup_printf ("recv_rtp_sink_%d", stream->id);
1029 stream->channelpad[0] = gst_element_get_request_pad (demux->session, name);
1032 GST_DEBUG_OBJECT (demux, "connecting RTP source 0 to manager");
1033 /* configure for UDP delivery, we need to connect the UDP pads to
1034 * the session plugin. */
1035 gst_pad_link (pad, stream->channelpad[0]);
1036 gst_object_unref (pad);
1039 gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED);
1042 /* creating another UDP source */
1043 if (stream->rtcp_port != -1) {
1044 GST_DEBUG_OBJECT (demux, "receiving RTCP from %s:%d", destination,
1046 uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtcp_port);
1047 stream->udpsrc[1] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
1049 if (stream->udpsrc[1] == NULL)
1052 /* take ownership */
1053 gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[1]);
1055 GST_DEBUG_OBJECT (demux, "connecting RTCP source to manager");
1057 name = g_strdup_printf ("recv_rtcp_sink_%d", stream->id);
1058 stream->channelpad[1] = gst_element_get_request_pad (demux->session, name);
1061 pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1062 gst_pad_link (pad, stream->channelpad[1]);
1063 gst_object_unref (pad);
1065 gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED);
1072 GST_DEBUG_OBJECT (demux, "no UDP source element found");
1077 /* configure the UDP sink back to the server for status reports */
1079 gst_sdp_demux_stream_configure_udp_sink (GstSDPDemux * demux,
1080 GstSDPStream * stream)
1082 GstPad *pad, *sinkpad;
1083 gint port, sockfd = -1;
1084 gchar *destination, *uri, *name;
1086 /* get destination and port */
1087 port = stream->rtcp_port;
1088 destination = stream->destination;
1090 GST_DEBUG_OBJECT (demux, "configure UDP sink for %s:%d", destination, port);
1092 uri = g_strdup_printf ("udp://%s:%d", destination, port);
1093 stream->udpsink = gst_element_make_from_uri (GST_URI_SINK, uri, NULL);
1095 if (stream->udpsink == NULL)
1096 goto no_sink_element;
1098 /* we clear all destinations because we don't really know where to send the
1099 * RTCP to and we want to avoid sending it to our own ports.
1100 * FIXME when we get an RTCP packet from the sender, we could look at its
1101 * source port and address and try to send RTCP there. */
1102 if (!stream->multicast)
1103 g_signal_emit_by_name (stream->udpsink, "clear");
1105 g_object_set (G_OBJECT (stream->udpsink), "auto-multicast", FALSE, NULL);
1106 g_object_set (G_OBJECT (stream->udpsink), "loop", FALSE, NULL);
1107 /* no sync needed */
1108 g_object_set (G_OBJECT (stream->udpsink), "sync", FALSE, NULL);
1109 /* no async state changes needed */
1110 g_object_set (G_OBJECT (stream->udpsink), "async", FALSE, NULL);
1112 if (stream->udpsrc[1]) {
1113 /* configure socket, we give it the same UDP socket as the udpsrc for RTCP
1114 * because some servers check the port number of where it sends RTCP to identify
1115 * the RTCP packets it receives */
1116 g_object_get (G_OBJECT (stream->udpsrc[1]), "sock", &sockfd, NULL);
1117 GST_DEBUG_OBJECT (demux, "UDP src has sock %d", sockfd);
1118 /* configure socket and make sure udpsink does not close it when shutting
1119 * down, it belongs to udpsrc after all. */
1120 g_object_set (G_OBJECT (stream->udpsink), "sockfd", sockfd, NULL);
1121 g_object_set (G_OBJECT (stream->udpsink), "closefd", FALSE, NULL);
1124 /* we keep this playing always */
1125 gst_element_set_locked_state (stream->udpsink, TRUE);
1126 gst_element_set_state (stream->udpsink, GST_STATE_PLAYING);
1128 gst_bin_add (GST_BIN_CAST (demux), stream->udpsink);
1130 /* get session RTCP pad */
1131 name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
1132 pad = gst_element_get_request_pad (demux->session, name);
1137 sinkpad = gst_element_get_static_pad (stream->udpsink, "sink");
1138 gst_pad_link (pad, sinkpad);
1139 gst_object_unref (sinkpad);
1141 /* not very fatal, we just won't be able to send RTCP */
1142 GST_WARNING_OBJECT (demux, "could not get session RTCP pad");
1151 GST_DEBUG_OBJECT (demux, "no UDP sink element found");
1156 static GstFlowReturn
1157 gst_sdp_demux_combine_flows (GstSDPDemux * demux, GstSDPStream * stream,
1162 /* store the value */
1163 stream->last_ret = ret;
1165 /* if it's success we can return the value right away */
1166 if (ret == GST_FLOW_OK)
1169 /* any other error that is not-linked can be returned right
1171 if (ret != GST_FLOW_NOT_LINKED)
1174 /* only return NOT_LINKED if all other pads returned NOT_LINKED */
1175 for (streams = demux->streams; streams; streams = g_list_next (streams)) {
1176 GstSDPStream *ostream = (GstSDPStream *) streams->data;
1178 ret = ostream->last_ret;
1179 /* some other return value (must be SUCCESS but we can return
1180 * other values as well) */
1181 if (ret != GST_FLOW_NOT_LINKED)
1184 /* if we get here, all other pads were unlinked and we return
1185 * NOT_LINKED then */
1191 gst_sdp_demux_stream_push_event (GstSDPDemux * demux, GstSDPStream * stream,
1194 /* only streams that have a connection to the outside world */
1195 if (stream->srcpad == NULL)
1198 if (stream->channelpad[0]) {
1199 gst_event_ref (event);
1200 gst_pad_send_event (stream->channelpad[0], event);
1203 if (stream->channelpad[1]) {
1204 gst_event_ref (event);
1205 gst_pad_send_event (stream->channelpad[1], event);
1209 gst_event_unref (event);
1213 gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message)
1217 demux = GST_SDP_DEMUX (bin);
1219 switch (GST_MESSAGE_TYPE (message)) {
1220 case GST_MESSAGE_ELEMENT:
1222 const GstStructure *s = gst_message_get_structure (message);
1224 if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
1225 gboolean ignore_timeout;
1227 GST_DEBUG_OBJECT (bin, "timeout on UDP port");
1229 GST_OBJECT_LOCK (demux);
1230 ignore_timeout = demux->ignore_timeout;
1231 demux->ignore_timeout = TRUE;
1232 GST_OBJECT_UNLOCK (demux);
1234 /* we only act on the first udp timeout message, others are irrelevant
1235 * and can be ignored. */
1237 gst_message_unref (message);
1239 GST_ELEMENT_ERROR (demux, RESOURCE, READ, (NULL),
1240 ("Could not receive any UDP packets for %.4f seconds, maybe your "
1241 "firewall is blocking it.",
1242 gst_guint64_to_gdouble (demux->udp_timeout / 1000000.0)));
1246 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1249 case GST_MESSAGE_ERROR:
1252 GstSDPStream *stream;
1255 udpsrc = GST_MESSAGE_SRC (message);
1257 GST_DEBUG_OBJECT (demux, "got error from %s", GST_ELEMENT_NAME (udpsrc));
1259 stream = find_stream (demux, udpsrc, (gpointer) find_stream_by_udpsrc);
1260 /* fatal but not our message, forward */
1264 /* we ignore the RTCP udpsrc */
1265 if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
1268 /* if we get error messages from the udp sources, that's not a problem as
1269 * long as not all of them error out. We also don't really know what the
1270 * problem is, the message does not give enough detail... */
1271 ret = gst_sdp_demux_combine_flows (demux, stream, GST_FLOW_NOT_LINKED);
1272 GST_DEBUG_OBJECT (demux, "combined flows: %s", gst_flow_get_name (ret));
1273 if (ret != GST_FLOW_OK)
1277 gst_message_unref (message);
1281 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1286 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1293 gst_sdp_demux_start (GstSDPDemux * demux)
1298 GstSDPMessage sdp = { 0 };
1299 GstSDPStream *stream = NULL;
1302 GstStateChangeReturn ret;
1304 /* grab the lock so that no state change can interfere */
1305 GST_SDP_STREAM_LOCK (demux);
1307 GST_DEBUG_OBJECT (demux, "parse SDP...");
1309 size = gst_adapter_available (demux->adapter);
1310 data = gst_adapter_take (demux->adapter, size);
1312 gst_sdp_message_init (&sdp);
1313 if (gst_sdp_message_parse_buffer (data, size, &sdp) != GST_SDP_OK)
1314 goto could_not_parse;
1317 gst_sdp_message_dump (&sdp);
1319 /* maybe this is plain RTSP DESCRIBE rtsp and we should redirect */
1320 /* look for rtsp control url */
1322 const gchar *control;
1325 control = gst_sdp_message_get_attribute_val_n (&sdp, "control", i);
1326 if (control == NULL)
1329 /* only take fully qualified urls */
1330 if (g_str_has_prefix (control, "rtsp://"))
1336 /* try to find non-aggragate control */
1337 n_streams = gst_sdp_message_medias_len (&sdp);
1339 for (idx = 0; idx < n_streams; idx++) {
1340 const GstSDPMedia *media;
1342 /* get media, should not return NULL */
1343 media = gst_sdp_message_get_media (&sdp, idx);
1348 control = gst_sdp_media_get_attribute_val_n (media, "control", i);
1349 if (control == NULL)
1352 /* only take fully qualified urls */
1353 if (g_str_has_prefix (control, "rtsp://"))
1356 /* this media has no control, exit */
1363 /* we have RTSP now */
1364 uri = gst_sdp_message_as_uri ("rtsp-sdp", &sdp);
1366 if (demux->redirect) {
1367 GST_INFO_OBJECT (demux, "redirect to %s", uri);
1369 gst_element_post_message (GST_ELEMENT_CAST (demux),
1370 gst_message_new_element (GST_OBJECT_CAST (demux),
1371 gst_structure_new ("redirect",
1372 "new-location", G_TYPE_STRING, uri, NULL)));
1378 /* we get here when we didn't do a redirect */
1380 /* try to get and configure a manager */
1381 if (!gst_sdp_demux_configure_manager (demux, uri))
1384 /* create streams with UDP sources and sinks */
1385 n_streams = gst_sdp_message_medias_len (&sdp);
1386 for (i = 0; i < n_streams; i++) {
1387 stream = gst_sdp_demux_create_stream (demux, &sdp, i);
1392 GST_DEBUG_OBJECT (demux, "configuring transport for stream %p", stream);
1394 if (!gst_sdp_demux_stream_configure_udp (demux, stream))
1395 goto transport_failed;
1396 if (!gst_sdp_demux_stream_configure_udp_sink (demux, stream))
1397 goto transport_failed;
1400 if (!demux->streams)
1404 /* set target state on session manager */
1405 /* setting rtspsrc to PLAYING may cause it to loose it that target state
1406 * along the way due to no-preroll udpsrc elements, so ...
1407 * do it in two stages here (similar to other elements) */
1408 if (demux->target > GST_STATE_PAUSED) {
1409 ret = gst_element_set_state (demux->session, GST_STATE_PAUSED);
1410 if (ret == GST_STATE_CHANGE_FAILURE)
1411 goto start_session_failure;
1413 ret = gst_element_set_state (demux->session, demux->target);
1414 if (ret == GST_STATE_CHANGE_FAILURE)
1415 goto start_session_failure;
1418 /* activate all streams */
1419 for (walk = demux->streams; walk; walk = g_list_next (walk)) {
1420 stream = (GstSDPStream *) walk->data;
1422 /* configure target state on udp sources */
1423 gst_element_set_state (stream->udpsrc[0], demux->target);
1424 gst_element_set_state (stream->udpsrc[1], demux->target);
1427 GST_SDP_STREAM_UNLOCK (demux);
1428 gst_sdp_message_uninit (&sdp);
1435 GST_SDP_STREAM_UNLOCK (demux);
1436 gst_sdp_message_uninit (&sdp);
1441 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1442 ("Could not create RTP stream transport."));
1447 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1448 ("Could not create RTP session manager."));
1453 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1454 ("Could not parse SDP message."));
1459 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1460 ("No streams in SDP message."));
1465 /* avoid hanging if redirect not handled */
1466 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1467 ("Sent RTSP redirect."));
1470 start_session_failure:
1472 GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1473 ("Could not start RTP session manager."));
1474 gst_element_set_state (demux->session, GST_STATE_NULL);
1475 gst_bin_remove (GST_BIN_CAST (demux), demux->session);
1476 demux->session = NULL;
1482 gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event)
1485 gboolean res = TRUE;
1487 demux = GST_SDP_DEMUX (gst_pad_get_parent (pad));
1489 switch (GST_EVENT_TYPE (event)) {
1491 /* when we get EOS, start parsing the SDP */
1492 res = gst_sdp_demux_start (demux);
1493 gst_event_unref (event);
1496 gst_event_unref (event);
1499 gst_object_unref (demux);
1504 static GstFlowReturn
1505 gst_sdp_demux_sink_chain (GstPad * pad, GstBuffer * buffer)
1509 demux = GST_SDP_DEMUX (gst_pad_get_parent (pad));
1511 /* push the SDP message in an adapter, we start doing something with it when
1513 gst_adapter_push (demux->adapter, buffer);
1515 gst_object_unref (demux);
1520 static GstStateChangeReturn
1521 gst_sdp_demux_change_state (GstElement * element, GstStateChange transition)
1524 GstStateChangeReturn ret;
1526 demux = GST_SDP_DEMUX (element);
1528 GST_SDP_STREAM_LOCK (demux);
1530 switch (transition) {
1531 case GST_STATE_CHANGE_NULL_TO_READY:
1533 case GST_STATE_CHANGE_READY_TO_PAUSED:
1534 /* first attempt, don't ignore timeouts */
1535 gst_adapter_clear (demux->adapter);
1536 demux->ignore_timeout = FALSE;
1537 demux->target = GST_STATE_PAUSED;
1539 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1540 demux->target = GST_STATE_PLAYING;
1546 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1547 if (ret == GST_STATE_CHANGE_FAILURE)
1550 switch (transition) {
1551 case GST_STATE_CHANGE_READY_TO_PAUSED:
1552 ret = GST_STATE_CHANGE_NO_PREROLL;
1554 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1555 ret = GST_STATE_CHANGE_NO_PREROLL;
1556 demux->target = GST_STATE_PAUSED;
1558 case GST_STATE_CHANGE_PAUSED_TO_READY:
1559 gst_sdp_demux_cleanup (demux);
1561 case GST_STATE_CHANGE_READY_TO_NULL:
1568 GST_SDP_STREAM_UNLOCK (demux);