Tizen 2.0 Release
[framework/multimedia/gst-plugins-bad0.10.git] / gst / sdp / gstsdpdemux.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17  * Boston, MA 02111-1307, USA.
18  */
19 /**
20  * SECTION:element-sdpdemux
21  *
22  * sdpdemux currently understands SDP as the input format of the session description.
23  * For each stream listed in the SDP a new rtp_stream%d pad will be created
24  * with caps derived from the SDP media description. This is a caps of mime type
25  * "application/x-rtp" that can be connected to any available RTP depayloader
26  * element. 
27  * 
28  * sdpdemux will internally instantiate an RTP session manager element
29  * that will handle the RTCP messages to and from the server, jitter removal,
30  * packet reordering along with providing a clock for the pipeline. 
31  * 
32  * sdpdemux acts like a live element and will therefore only generate data in the 
33  * PLAYING state.
34  * 
35  * <refsect2>
36  * <title>Example launch line</title>
37  * |[
38  * gst-launch gnomevfssrc location=http://some.server/session.sdp ! sdpdemux ! fakesink
39  * ]| Establish a connection to an HTTP server that contains an SDP session description
40  * that gets parsed by sdpdemux and send the raw RTP packets to a fakesink.
41  * </refsect2>
42  *
43  * Last reviewed on 2007-10-01 (0.10.6)
44  */
45
46 #ifdef HAVE_CONFIG_H
47 #include "config.h"
48 #endif
49
50 #ifdef HAVE_UNISTD_H
51 #include <unistd.h>
52 #endif
53
54 /* include GLIB for G_OS_WIN32 */
55 #include <glib.h>
56
57 #ifdef G_OS_WIN32
58 #ifdef _MSC_VER
59 #include <Winsock2.h>
60 #endif
61 /* ws2_32.dll has getaddrinfo and freeaddrinfo on Windows XP and later.
62  *  * minwg32 headers check WINVER before allowing the use of these */
63 #ifndef WINVER
64 #define WINVER 0x0501
65 #endif
66 #include <ws2tcpip.h>
67 #else
68 #include <sys/socket.h>
69 #include <netdb.h>
70 #include <netinet/in.h>
71 #endif
72
73 #include <stdlib.h>
74 #include <string.h>
75 #include <locale.h>
76 #include <stdio.h>
77 #include <stdarg.h>
78
79 #include <gst/rtp/gstrtppayloads.h>
80 #include <gst/sdp/gstsdpmessage.h>
81
82 #include "gstsdpdemux.h"
83
84 GST_DEBUG_CATEGORY_STATIC (sdpdemux_debug);
85 #define GST_CAT_DEFAULT (sdpdemux_debug)
86
87 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
88     GST_PAD_SINK,
89     GST_PAD_ALWAYS,
90     GST_STATIC_CAPS ("application/sdp"));
91
92 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream%d",
93     GST_PAD_SRC,
94     GST_PAD_SOMETIMES,
95     GST_STATIC_CAPS ("application/x-rtp"));
96
97 enum
98 {
99   /* FILL ME */
100   LAST_SIGNAL
101 };
102
103 #define DEFAULT_DEBUG            FALSE
104 #define DEFAULT_TIMEOUT          10000000
105 #define DEFAULT_LATENCY_MS       200
106 #define DEFAULT_REDIRECT         TRUE
107
108 enum
109 {
110   PROP_0,
111   PROP_DEBUG,
112   PROP_TIMEOUT,
113   PROP_LATENCY,
114   PROP_REDIRECT,
115   PROP_LAST
116 };
117
118 static void gst_sdp_demux_finalize (GObject * object);
119
120 static void gst_sdp_demux_set_property (GObject * object, guint prop_id,
121     const GValue * value, GParamSpec * pspec);
122 static void gst_sdp_demux_get_property (GObject * object, guint prop_id,
123     GValue * value, GParamSpec * pspec);
124
125 static GstCaps *gst_sdp_demux_media_to_caps (gint pt,
126     const GstSDPMedia * media);
127
128 static GstStateChangeReturn gst_sdp_demux_change_state (GstElement * element,
129     GstStateChange transition);
130 static void gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message);
131
132 static void gst_sdp_demux_stream_push_event (GstSDPDemux * demux,
133     GstSDPStream * stream, GstEvent * event);
134
135 static gboolean gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event);
136 static GstFlowReturn gst_sdp_demux_sink_chain (GstPad * pad,
137     GstBuffer * buffer);
138
139 /*static guint gst_sdp_demux_signals[LAST_SIGNAL] = { 0 }; */
140
141 static void
142 _do_init (GType sdp_demux_type)
143 {
144   GST_DEBUG_CATEGORY_INIT (sdpdemux_debug, "sdpdemux", 0, "SDP demux");
145 }
146
147 GST_BOILERPLATE_FULL (GstSDPDemux, gst_sdp_demux, GstBin, GST_TYPE_BIN,
148     _do_init);
149
150 static void
151 gst_sdp_demux_base_init (gpointer g_class)
152 {
153   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
154
155   gst_element_class_add_static_pad_template (element_class, &sinktemplate);
156   gst_element_class_add_static_pad_template (element_class, &rtptemplate);
157
158   gst_element_class_set_details_simple (element_class, "SDP session setup",
159       "Codec/Demuxer/Network/RTP",
160       "Receive data over the network via SDP",
161       "Wim Taymans <wim.taymans@gmail.com>");
162 }
163
164 static void
165 gst_sdp_demux_class_init (GstSDPDemuxClass * klass)
166 {
167   GObjectClass *gobject_class;
168   GstElementClass *gstelement_class;
169   GstBinClass *gstbin_class;
170
171   gobject_class = (GObjectClass *) klass;
172   gstelement_class = (GstElementClass *) klass;
173   gstbin_class = (GstBinClass *) klass;
174
175   gobject_class->set_property = gst_sdp_demux_set_property;
176   gobject_class->get_property = gst_sdp_demux_get_property;
177
178   gobject_class->finalize = gst_sdp_demux_finalize;
179
180   g_object_class_install_property (gobject_class, PROP_DEBUG,
181       g_param_spec_boolean ("debug", "Debug",
182           "Dump request and response messages to stdout",
183           DEFAULT_DEBUG,
184           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
185
186   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
187       g_param_spec_uint64 ("timeout", "Timeout",
188           "Fail transport after UDP timeout microseconds (0 = disabled)",
189           0, G_MAXUINT64, DEFAULT_TIMEOUT,
190           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
191
192   g_object_class_install_property (gobject_class, PROP_LATENCY,
193       g_param_spec_uint ("latency", "Buffer latency in ms",
194           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
195           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
196
197   g_object_class_install_property (gobject_class, PROP_REDIRECT,
198       g_param_spec_boolean ("redirect", "Redirect",
199           "Sends a redirection message instead of using a custom session element",
200           DEFAULT_REDIRECT,
201           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
202
203   gstelement_class->change_state = gst_sdp_demux_change_state;
204
205   gstbin_class->handle_message = gst_sdp_demux_handle_message;
206 }
207
208 static void
209 gst_sdp_demux_init (GstSDPDemux * demux, GstSDPDemuxClass * g_class)
210 {
211   demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
212   gst_pad_set_event_function (demux->sinkpad,
213       GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_event));
214   gst_pad_set_chain_function (demux->sinkpad,
215       GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_chain));
216   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
217
218   /* protects the streaming thread in interleaved mode or the polling
219    * thread in UDP mode. */
220   demux->stream_rec_lock = g_new (GStaticRecMutex, 1);
221   g_static_rec_mutex_init (demux->stream_rec_lock);
222
223   demux->adapter = gst_adapter_new ();
224 }
225
226 static void
227 gst_sdp_demux_finalize (GObject * object)
228 {
229   GstSDPDemux *demux;
230
231   demux = GST_SDP_DEMUX (object);
232
233   /* free locks */
234   g_static_rec_mutex_free (demux->stream_rec_lock);
235   g_free (demux->stream_rec_lock);
236
237   g_object_unref (demux->adapter);
238
239   G_OBJECT_CLASS (parent_class)->finalize (object);
240 }
241
242 static void
243 gst_sdp_demux_set_property (GObject * object, guint prop_id,
244     const GValue * value, GParamSpec * pspec)
245 {
246   GstSDPDemux *demux;
247
248   demux = GST_SDP_DEMUX (object);
249
250   switch (prop_id) {
251     case PROP_DEBUG:
252       demux->debug = g_value_get_boolean (value);
253       break;
254     case PROP_TIMEOUT:
255       demux->udp_timeout = g_value_get_uint64 (value);
256       break;
257     case PROP_LATENCY:
258       demux->latency = g_value_get_uint (value);
259       break;
260     case PROP_REDIRECT:
261       demux->redirect = g_value_get_boolean (value);
262       break;
263     default:
264       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
265       break;
266   }
267 }
268
269 static void
270 gst_sdp_demux_get_property (GObject * object, guint prop_id, GValue * value,
271     GParamSpec * pspec)
272 {
273   GstSDPDemux *demux;
274
275   demux = GST_SDP_DEMUX (object);
276
277   switch (prop_id) {
278     case PROP_DEBUG:
279       g_value_set_boolean (value, demux->debug);
280       break;
281     case PROP_TIMEOUT:
282       g_value_set_uint64 (value, demux->udp_timeout);
283       break;
284     case PROP_LATENCY:
285       g_value_set_uint (value, demux->latency);
286       break;
287     case PROP_REDIRECT:
288       g_value_set_boolean (value, demux->redirect);
289       break;
290     default:
291       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
292       break;
293   }
294 }
295
296 static gint
297 find_stream_by_id (GstSDPStream * stream, gconstpointer a)
298 {
299   gint id = GPOINTER_TO_INT (a);
300
301   if (stream->id == id)
302     return 0;
303
304   return -1;
305 }
306
307 static gint
308 find_stream_by_pt (GstSDPStream * stream, gconstpointer a)
309 {
310   gint pt = GPOINTER_TO_INT (a);
311
312   if (stream->pt == pt)
313     return 0;
314
315   return -1;
316 }
317
318 static gint
319 find_stream_by_udpsrc (GstSDPStream * stream, gconstpointer a)
320 {
321   GstElement *src = (GstElement *) a;
322
323   if (stream->udpsrc[0] == src)
324     return 0;
325   if (stream->udpsrc[1] == src)
326     return 0;
327
328   return -1;
329 }
330
331 static GstSDPStream *
332 find_stream (GstSDPDemux * demux, gconstpointer data, gconstpointer func)
333 {
334   GList *lstream;
335
336   /* find and get stream */
337   if ((lstream =
338           g_list_find_custom (demux->streams, data, (GCompareFunc) func)))
339     return (GstSDPStream *) lstream->data;
340
341   return NULL;
342 }
343
344 static void
345 gst_sdp_demux_stream_free (GstSDPDemux * demux, GstSDPStream * stream)
346 {
347   gint i;
348
349   GST_DEBUG_OBJECT (demux, "free stream %p", stream);
350
351   if (stream->caps)
352     gst_caps_unref (stream->caps);
353
354   for (i = 0; i < 2; i++) {
355     GstElement *udpsrc = stream->udpsrc[i];
356
357     if (udpsrc) {
358       gst_element_set_state (udpsrc, GST_STATE_NULL);
359       gst_bin_remove (GST_BIN_CAST (demux), udpsrc);
360       stream->udpsrc[i] = NULL;
361     }
362   }
363   if (stream->udpsink) {
364     gst_element_set_state (stream->udpsink, GST_STATE_NULL);
365     gst_bin_remove (GST_BIN_CAST (demux), stream->udpsink);
366     stream->udpsink = NULL;
367   }
368   if (stream->srcpad) {
369     gst_pad_set_active (stream->srcpad, FALSE);
370     if (stream->added) {
371       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
372       stream->added = FALSE;
373     }
374     stream->srcpad = NULL;
375   }
376   g_free (stream);
377 }
378
379 static gboolean
380 is_multicast_address (const gchar * host_name)
381 {
382   struct addrinfo hints;
383   struct addrinfo *ai;
384   struct addrinfo *res;
385   gboolean ret = FALSE;
386
387   memset (&hints, 0, sizeof (hints));
388   hints.ai_socktype = SOCK_DGRAM;
389
390   g_return_val_if_fail (host_name, FALSE);
391
392   if (getaddrinfo (host_name, NULL, &hints, &res) < 0)
393     return FALSE;
394
395   for (ai = res; !ret && ai; ai = ai->ai_next) {
396     if (ai->ai_family == AF_INET)
397       ret =
398           IN_MULTICAST (ntohl (((struct sockaddr_in *) ai->ai_addr)->
399               sin_addr.s_addr));
400     else
401       ret =
402           IN6_IS_ADDR_MULTICAST (&((struct sockaddr_in6 *) ai->
403               ai_addr)->sin6_addr);
404   }
405
406   freeaddrinfo (res);
407
408   return ret;
409 }
410
411 static GstSDPStream *
412 gst_sdp_demux_create_stream (GstSDPDemux * demux, GstSDPMessage * sdp, gint idx)
413 {
414   GstSDPStream *stream;
415   const gchar *payload;
416   const GstSDPMedia *media;
417   const GstSDPConnection *conn;
418
419   /* get media, should not return NULL */
420   media = gst_sdp_message_get_media (sdp, idx);
421   if (media == NULL)
422     return NULL;
423
424   stream = g_new0 (GstSDPStream, 1);
425   stream->parent = demux;
426   /* we mark the pad as not linked, we will mark it as OK when we add the pad to
427    * the element. */
428   stream->last_ret = GST_FLOW_OK;
429   stream->added = FALSE;
430   stream->disabled = FALSE;
431   stream->id = demux->numstreams++;
432   stream->eos = FALSE;
433
434   /* we must have a payload. No payload means we cannot create caps */
435   /* FIXME, handle multiple formats. */
436   if ((payload = gst_sdp_media_get_format (media, 0))) {
437     stream->pt = atoi (payload);
438     /* convert caps */
439     stream->caps = gst_sdp_demux_media_to_caps (stream->pt, media);
440
441     if (stream->pt >= 96) {
442       /* If we have a dynamic payload type, see if we have a stream with the
443        * same payload number. If there is one, they are part of the same
444        * container and we only need to add one pad. */
445       if (find_stream (demux, GINT_TO_POINTER (stream->pt),
446               (gpointer) find_stream_by_pt)) {
447         stream->container = TRUE;
448       }
449     }
450   }
451   if (!(conn = gst_sdp_media_get_connection (media, 0))) {
452     if (!(conn = gst_sdp_message_get_connection (sdp)))
453       goto no_connection;
454   }
455
456   if (!conn->address)
457     goto no_connection;
458
459   stream->destination = conn->address;
460   stream->ttl = conn->ttl;
461   stream->multicast = is_multicast_address (stream->destination);
462
463   stream->rtp_port = gst_sdp_media_get_port (media);
464   if (gst_sdp_media_get_attribute_val (media, "rtcp")) {
465     /* FIXME, RFC 3605 */
466     stream->rtcp_port = stream->rtp_port + 1;
467   } else {
468     stream->rtcp_port = stream->rtp_port + 1;
469   }
470
471   GST_DEBUG_OBJECT (demux, "stream %d, (%p)", stream->id, stream);
472   GST_DEBUG_OBJECT (demux, " pt: %d", stream->pt);
473   GST_DEBUG_OBJECT (demux, " container: %d", stream->container);
474   GST_DEBUG_OBJECT (demux, " caps: %" GST_PTR_FORMAT, stream->caps);
475
476   /* we keep track of all streams */
477   demux->streams = g_list_append (demux->streams, stream);
478
479   return stream;
480
481   /* ERRORS */
482 no_connection:
483   {
484     gst_sdp_demux_stream_free (demux, stream);
485     return NULL;
486   }
487 }
488
489 static void
490 gst_sdp_demux_cleanup (GstSDPDemux * demux)
491 {
492   GList *walk;
493
494   GST_DEBUG_OBJECT (demux, "cleanup");
495
496   for (walk = demux->streams; walk; walk = g_list_next (walk)) {
497     GstSDPStream *stream = (GstSDPStream *) walk->data;
498
499     gst_sdp_demux_stream_free (demux, stream);
500   }
501   g_list_free (demux->streams);
502   demux->streams = NULL;
503   if (demux->session) {
504     if (demux->session_sig_id) {
505       g_signal_handler_disconnect (demux->session, demux->session_sig_id);
506       demux->session_sig_id = 0;
507     }
508     if (demux->session_nmp_id) {
509       g_signal_handler_disconnect (demux->session, demux->session_nmp_id);
510       demux->session_nmp_id = 0;
511     }
512     if (demux->session_ptmap_id) {
513       g_signal_handler_disconnect (demux->session, demux->session_ptmap_id);
514       demux->session_ptmap_id = 0;
515     }
516     gst_element_set_state (demux->session, GST_STATE_NULL);
517     gst_bin_remove (GST_BIN_CAST (demux), demux->session);
518     demux->session = NULL;
519   }
520   demux->numstreams = 0;
521 }
522
523 #define PARSE_INT(p, del, res)          \
524 G_STMT_START {                          \
525   gchar *t = p;                         \
526   p = strstr (p, del);                  \
527   if (p == NULL)                        \
528     res = -1;                           \
529   else {                                \
530     *p = '\0';                          \
531     p++;                                \
532     res = atoi (t);                     \
533   }                                     \
534 } G_STMT_END
535
536 #define PARSE_STRING(p, del, res)       \
537 G_STMT_START {                          \
538   gchar *t = p;                         \
539   p = strstr (p, del);                  \
540   if (p == NULL) {                      \
541     res = NULL;                         \
542     p = t;                              \
543   }                                     \
544   else {                                \
545     *p = '\0';                          \
546     p++;                                \
547     res = t;                            \
548   }                                     \
549 } G_STMT_END
550
551 #define SKIP_SPACES(p)                  \
552   while (*p && g_ascii_isspace (*p))    \
553     p++;
554
555 /* rtpmap contains:
556  *
557  *  <payload> <encoding_name>/<clock_rate>[/<encoding_params>]
558  */
559 static gboolean
560 gst_sdp_demux_parse_rtpmap (const gchar * rtpmap, gint * payload, gchar ** name,
561     gint * rate, gchar ** params)
562 {
563   gchar *p, *t;
564
565   t = p = (gchar *) rtpmap;
566
567   PARSE_INT (p, " ", *payload);
568   if (*payload == -1)
569     return FALSE;
570
571   SKIP_SPACES (p);
572   if (*p == '\0')
573     return FALSE;
574
575   PARSE_STRING (p, "/", *name);
576   if (*name == NULL) {
577     GST_DEBUG ("no rate, name %s", p);
578     /* no rate, assume -1 then */
579     *name = p;
580     *rate = -1;
581     return TRUE;
582   }
583
584   t = p;
585   p = strstr (p, "/");
586   if (p == NULL) {
587     *rate = atoi (t);
588     return TRUE;
589   }
590   *p = '\0';
591   p++;
592   *rate = atoi (t);
593
594   t = p;
595   if (*p == '\0')
596     return TRUE;
597   *params = t;
598
599   return TRUE;
600 }
601
602 /*
603  *  Mapping of caps to and from SDP fields:
604  *
605  *   m=<media> <UDP port> RTP/AVP <payload> 
606  *   a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>]
607  *   a=fmtp:<payload> <param>[=<value>];...
608  */
609 static GstCaps *
610 gst_sdp_demux_media_to_caps (gint pt, const GstSDPMedia * media)
611 {
612   GstCaps *caps;
613   const gchar *rtpmap;
614   const gchar *fmtp;
615   gchar *name = NULL;
616   gint rate = -1;
617   gchar *params = NULL;
618   gchar *tmp;
619   GstStructure *s;
620   gint payload = 0;
621   gboolean ret;
622
623   /* get and parse rtpmap */
624   if ((rtpmap = gst_sdp_media_get_attribute_val (media, "rtpmap"))) {
625     ret = gst_sdp_demux_parse_rtpmap (rtpmap, &payload, &name, &rate, &params);
626     if (ret) {
627       if (payload != pt) {
628         /* we ignore the rtpmap if the payload type is different. */
629         g_warning ("rtpmap of wrong payload type, ignoring");
630         name = NULL;
631         rate = -1;
632         params = NULL;
633       }
634     } else {
635       /* if we failed to parse the rtpmap for a dynamic payload type, we have an
636        * error */
637       if (pt >= 96)
638         goto no_rtpmap;
639       /* else we can ignore */
640       g_warning ("error parsing rtpmap, ignoring");
641     }
642   } else {
643     /* dynamic payloads need rtpmap or we fail */
644     if (pt >= 96)
645       goto no_rtpmap;
646   }
647   /* check if we have a rate, if not, we need to look up the rate from the
648    * default rates based on the payload types. */
649   if (rate == -1) {
650     const GstRTPPayloadInfo *info;
651
652     if (GST_RTP_PAYLOAD_IS_DYNAMIC (pt)) {
653       /* dynamic types, use media and encoding_name */
654       tmp = g_ascii_strdown (media->media, -1);
655       info = gst_rtp_payload_info_for_name (tmp, name);
656       g_free (tmp);
657     } else {
658       /* static types, use payload type */
659       info = gst_rtp_payload_info_for_pt (pt);
660     }
661
662     if (info) {
663       if ((rate = info->clock_rate) == 0)
664         rate = -1;
665     }
666     /* we fail if we cannot find one */
667     if (rate == -1)
668       goto no_rate;
669   }
670
671   tmp = g_ascii_strdown (media->media, -1);
672   caps = gst_caps_new_simple ("application/x-rtp",
673       "media", G_TYPE_STRING, tmp, "payload", G_TYPE_INT, pt, NULL);
674   g_free (tmp);
675   s = gst_caps_get_structure (caps, 0);
676
677   gst_structure_set (s, "clock-rate", G_TYPE_INT, rate, NULL);
678
679   /* encoding name must be upper case */
680   if (name != NULL) {
681     tmp = g_ascii_strup (name, -1);
682     gst_structure_set (s, "encoding-name", G_TYPE_STRING, tmp, NULL);
683     g_free (tmp);
684   }
685
686   /* params must be lower case */
687   if (params != NULL) {
688     tmp = g_ascii_strdown (params, -1);
689     gst_structure_set (s, "encoding-params", G_TYPE_STRING, tmp, NULL);
690     g_free (tmp);
691   }
692
693   /* parse optional fmtp: field */
694   if ((fmtp = gst_sdp_media_get_attribute_val (media, "fmtp"))) {
695     gchar *p;
696     gint payload = 0;
697
698     p = (gchar *) fmtp;
699
700     /* p is now of the format <payload> <param>[=<value>];... */
701     PARSE_INT (p, " ", payload);
702     if (payload != -1 && payload == pt) {
703       gchar **pairs;
704       gint i;
705
706       /* <param>[=<value>] are separated with ';' */
707       pairs = g_strsplit (p, ";", 0);
708       for (i = 0; pairs[i]; i++) {
709         gchar *valpos, *key;
710         const gchar *val;
711
712         /* the key may not have a '=', the value can have other '='s */
713         valpos = strstr (pairs[i], "=");
714         if (valpos) {
715           /* we have a '=' and thus a value, remove the '=' with \0 */
716           *valpos = '\0';
717           /* value is everything between '=' and ';'. FIXME, strip? */
718           val = g_strstrip (valpos + 1);
719         } else {
720           /* simple <param>;.. is translated into <param>=1;... */
721           val = "1";
722         }
723         /* strip the key of spaces, convert key to lowercase but not the value. */
724         key = g_strstrip (pairs[i]);
725         if (strlen (key) > 1) {
726           tmp = g_ascii_strdown (key, -1);
727           gst_structure_set (s, tmp, G_TYPE_STRING, val, NULL);
728           g_free (tmp);
729         }
730       }
731       g_strfreev (pairs);
732     }
733   }
734   return caps;
735
736   /* ERRORS */
737 no_rtpmap:
738   {
739     g_warning ("rtpmap type not given for dynamic payload %d", pt);
740     return NULL;
741   }
742 no_rate:
743   {
744     g_warning ("rate unknown for payload type %d", pt);
745     return NULL;
746   }
747 }
748
749 /* this callback is called when the session manager generated a new src pad with
750  * payloaded RTP packets. We simply ghost the pad here. */
751 static void
752 new_session_pad (GstElement * session, GstPad * pad, GstSDPDemux * demux)
753 {
754   gchar *name;
755   GstPadTemplate *template;
756   gint id, ssrc, pt;
757   GList *lstream;
758   GstSDPStream *stream;
759   gboolean all_added;
760
761   GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
762
763   GST_SDP_STREAM_LOCK (demux);
764   /* find stream */
765   name = gst_object_get_name (GST_OBJECT_CAST (pad));
766   if (sscanf (name, "recv_rtp_src_%d_%d_%d", &id, &ssrc, &pt) != 3)
767     goto unknown_stream;
768
769   GST_DEBUG_OBJECT (demux, "stream: %u, SSRC %d, PT %d", id, ssrc, pt);
770
771   stream =
772       find_stream (demux, GINT_TO_POINTER (id), (gpointer) find_stream_by_id);
773   if (stream == NULL)
774     goto unknown_stream;
775
776   /* no need for a timeout anymore now */
777   g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL);
778
779   /* create a new pad we will use to stream to */
780   template = gst_static_pad_template_get (&rtptemplate);
781   stream->srcpad = gst_ghost_pad_new_from_template (name, pad, template);
782   gst_object_unref (template);
783   g_free (name);
784
785   stream->added = TRUE;
786   gst_pad_set_active (stream->srcpad, TRUE);
787   gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
788
789   /* check if we added all streams */
790   all_added = TRUE;
791   for (lstream = demux->streams; lstream; lstream = g_list_next (lstream)) {
792     stream = (GstSDPStream *) lstream->data;
793     /* a container stream only needs one pad added. Also disabled streams don't
794      * count */
795     if (!stream->container && !stream->disabled && !stream->added) {
796       all_added = FALSE;
797       break;
798     }
799   }
800   GST_SDP_STREAM_UNLOCK (demux);
801
802   if (all_added) {
803     GST_DEBUG_OBJECT (demux, "We added all streams");
804     /* when we get here, all stream are added and we can fire the no-more-pads
805      * signal. */
806     gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
807   }
808
809   return;
810
811   /* ERRORS */
812 unknown_stream:
813   {
814     GST_DEBUG_OBJECT (demux, "ignoring unknown stream");
815     GST_SDP_STREAM_UNLOCK (demux);
816     g_free (name);
817     return;
818   }
819 }
820
821 static void
822 rtsp_session_pad_added (GstElement * session, GstPad * pad, GstSDPDemux * demux)
823 {
824   GstPad *srcpad = NULL;
825   gchar *name;
826
827   GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
828
829   name = gst_pad_get_name (pad);
830   srcpad = gst_ghost_pad_new (name, pad);
831   g_free (name);
832
833   gst_pad_set_active (srcpad, TRUE);
834   gst_element_add_pad (GST_ELEMENT_CAST (demux), srcpad);
835 }
836
837 static void
838 rtsp_session_no_more_pads (GstElement * session, GstSDPDemux * demux)
839 {
840   GST_DEBUG_OBJECT (demux, "got no-more-pads");
841   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
842 }
843
844 static GstCaps *
845 request_pt_map (GstElement * sess, guint session, guint pt, GstSDPDemux * demux)
846 {
847   GstSDPStream *stream;
848   GstCaps *caps;
849
850   GST_DEBUG_OBJECT (demux, "getting pt map for pt %d in session %d", pt,
851       session);
852
853   GST_SDP_STREAM_LOCK (demux);
854   stream =
855       find_stream (demux, GINT_TO_POINTER (session),
856       (gpointer) find_stream_by_id);
857   if (!stream)
858     goto unknown_stream;
859
860   caps = stream->caps;
861   if (caps)
862     gst_caps_ref (caps);
863   GST_SDP_STREAM_UNLOCK (demux);
864
865   return caps;
866
867 unknown_stream:
868   {
869     GST_DEBUG_OBJECT (demux, "unknown stream %d", session);
870     GST_SDP_STREAM_UNLOCK (demux);
871     return NULL;
872   }
873 }
874
875 static void
876 gst_sdp_demux_do_stream_eos (GstSDPDemux * demux, guint session)
877 {
878   GstSDPStream *stream;
879
880   GST_DEBUG_OBJECT (demux, "setting stream for session %u to EOS", session);
881
882   /* get stream for session */
883   stream =
884       find_stream (demux, GINT_TO_POINTER (session),
885       (gpointer) find_stream_by_id);
886   if (!stream)
887     goto unknown_stream;
888
889   if (stream->eos)
890     goto was_eos;
891
892   stream->eos = TRUE;
893   gst_sdp_demux_stream_push_event (demux, stream, gst_event_new_eos ());
894   return;
895
896   /* ERRORS */
897 unknown_stream:
898   {
899     GST_DEBUG_OBJECT (demux, "unknown stream for session %u", session);
900     return;
901   }
902 was_eos:
903   {
904     GST_DEBUG_OBJECT (demux, "stream for session %u was already EOS", session);
905     return;
906   }
907 }
908
909 static void
910 on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
911     GstSDPDemux * demux)
912 {
913   GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u received BYE", ssrc,
914       session);
915
916   gst_sdp_demux_do_stream_eos (demux, session);
917 }
918
919 static void
920 on_timeout (GstElement * manager, guint session, guint32 ssrc,
921     GstSDPDemux * demux)
922 {
923   GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u timed out", ssrc, session);
924
925   gst_sdp_demux_do_stream_eos (demux, session);
926 }
927
928 /* try to get and configure a manager */
929 static gboolean
930 gst_sdp_demux_configure_manager (GstSDPDemux * demux, char *rtsp_sdp)
931 {
932   /* configure the session manager */
933   if (rtsp_sdp != NULL) {
934     if (!(demux->session = gst_element_factory_make ("rtspsrc", NULL)))
935       goto rtspsrc_failed;
936
937     g_object_set (demux->session, "location", rtsp_sdp, NULL);
938
939     GST_DEBUG_OBJECT (demux, "connect to signals on rtspsrc");
940     demux->session_sig_id =
941         g_signal_connect (demux->session, "pad-added",
942         (GCallback) rtsp_session_pad_added, demux);
943     demux->session_nmp_id =
944         g_signal_connect (demux->session, "no-more-pads",
945         (GCallback) rtsp_session_no_more_pads, demux);
946   } else {
947     if (!(demux->session = gst_element_factory_make ("gstrtpbin", NULL)))
948       goto manager_failed;
949
950     /* connect to signals if we did not already do so */
951     GST_DEBUG_OBJECT (demux, "connect to signals on session manager");
952     demux->session_sig_id =
953         g_signal_connect (demux->session, "pad-added",
954         (GCallback) new_session_pad, demux);
955     demux->session_ptmap_id =
956         g_signal_connect (demux->session, "request-pt-map",
957         (GCallback) request_pt_map, demux);
958     g_signal_connect (demux->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
959         demux);
960     g_signal_connect (demux->session, "on-bye-timeout", (GCallback) on_timeout,
961         demux);
962     g_signal_connect (demux->session, "on-timeout", (GCallback) on_timeout,
963         demux);
964   }
965
966   g_object_set (demux->session, "latency", demux->latency, NULL);
967
968   /* we manage this element */
969   gst_bin_add (GST_BIN_CAST (demux), demux->session);
970
971   return TRUE;
972
973   /* ERRORS */
974 manager_failed:
975   {
976     GST_DEBUG_OBJECT (demux, "no session manager element gstrtpbin found");
977     return FALSE;
978   }
979 rtspsrc_failed:
980   {
981     GST_DEBUG_OBJECT (demux, "no manager element rtspsrc found");
982     return FALSE;
983   }
984 }
985
986 static gboolean
987 gst_sdp_demux_stream_configure_udp (GstSDPDemux * demux, GstSDPStream * stream)
988 {
989   gchar *uri, *name;
990   const gchar *destination;
991   GstPad *pad;
992
993   GST_DEBUG_OBJECT (demux, "creating UDP sources for multicast");
994
995   /* if the destination is not a multicast address, we just want to listen on
996    * our local ports */
997   if (!stream->multicast)
998     destination = "0.0.0.0";
999   else
1000     destination = stream->destination;
1001
1002   /* creating UDP source */
1003   if (stream->rtp_port != -1) {
1004     GST_DEBUG_OBJECT (demux, "receiving RTP from %s:%d", destination,
1005         stream->rtp_port);
1006
1007     uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtp_port);
1008     stream->udpsrc[0] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
1009     g_free (uri);
1010     if (stream->udpsrc[0] == NULL)
1011       goto no_element;
1012
1013     /* take ownership */
1014     gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[0]);
1015
1016     GST_DEBUG_OBJECT (demux,
1017         "setting up UDP source with timeout %" G_GINT64_FORMAT,
1018         demux->udp_timeout);
1019
1020     /* configure a timeout on the UDP port. When the timeout message is
1021      * posted, we assume UDP transport is not possible. */
1022     g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", demux->udp_timeout,
1023         NULL);
1024
1025     /* get output pad of the UDP source. */
1026     pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
1027
1028     name = g_strdup_printf ("recv_rtp_sink_%d", stream->id);
1029     stream->channelpad[0] = gst_element_get_request_pad (demux->session, name);
1030     g_free (name);
1031
1032     GST_DEBUG_OBJECT (demux, "connecting RTP source 0 to manager");
1033     /* configure for UDP delivery, we need to connect the UDP pads to
1034      * the session plugin. */
1035     gst_pad_link (pad, stream->channelpad[0]);
1036     gst_object_unref (pad);
1037
1038     /* change state */
1039     gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED);
1040   }
1041
1042   /* creating another UDP source */
1043   if (stream->rtcp_port != -1) {
1044     GST_DEBUG_OBJECT (demux, "receiving RTCP from %s:%d", destination,
1045         stream->rtcp_port);
1046     uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtcp_port);
1047     stream->udpsrc[1] = gst_element_make_from_uri (GST_URI_SRC, uri, NULL);
1048     g_free (uri);
1049     if (stream->udpsrc[1] == NULL)
1050       goto no_element;
1051
1052     /* take ownership */
1053     gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[1]);
1054
1055     GST_DEBUG_OBJECT (demux, "connecting RTCP source to manager");
1056
1057     name = g_strdup_printf ("recv_rtcp_sink_%d", stream->id);
1058     stream->channelpad[1] = gst_element_get_request_pad (demux->session, name);
1059     g_free (name);
1060
1061     pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
1062     gst_pad_link (pad, stream->channelpad[1]);
1063     gst_object_unref (pad);
1064
1065     gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED);
1066   }
1067   return TRUE;
1068
1069   /* ERRORS */
1070 no_element:
1071   {
1072     GST_DEBUG_OBJECT (demux, "no UDP source element found");
1073     return FALSE;
1074   }
1075 }
1076
1077 /* configure the UDP sink back to the server for status reports */
1078 static gboolean
1079 gst_sdp_demux_stream_configure_udp_sink (GstSDPDemux * demux,
1080     GstSDPStream * stream)
1081 {
1082   GstPad *pad, *sinkpad;
1083   gint port, sockfd = -1;
1084   gchar *destination, *uri, *name;
1085
1086   /* get destination and port */
1087   port = stream->rtcp_port;
1088   destination = stream->destination;
1089
1090   GST_DEBUG_OBJECT (demux, "configure UDP sink for %s:%d", destination, port);
1091
1092   uri = g_strdup_printf ("udp://%s:%d", destination, port);
1093   stream->udpsink = gst_element_make_from_uri (GST_URI_SINK, uri, NULL);
1094   g_free (uri);
1095   if (stream->udpsink == NULL)
1096     goto no_sink_element;
1097
1098   /* we clear all destinations because we don't really know where to send the
1099    * RTCP to and we want to avoid sending it to our own ports.
1100    * FIXME when we get an RTCP packet from the sender, we could look at its
1101    * source port and address and try to send RTCP there. */
1102   if (!stream->multicast)
1103     g_signal_emit_by_name (stream->udpsink, "clear");
1104
1105   g_object_set (G_OBJECT (stream->udpsink), "auto-multicast", FALSE, NULL);
1106   g_object_set (G_OBJECT (stream->udpsink), "loop", FALSE, NULL);
1107   /* no sync needed */
1108   g_object_set (G_OBJECT (stream->udpsink), "sync", FALSE, NULL);
1109   /* no async state changes needed */
1110   g_object_set (G_OBJECT (stream->udpsink), "async", FALSE, NULL);
1111
1112   if (stream->udpsrc[1]) {
1113     /* configure socket, we give it the same UDP socket as the udpsrc for RTCP
1114      * because some servers check the port number of where it sends RTCP to identify
1115      * the RTCP packets it receives */
1116     g_object_get (G_OBJECT (stream->udpsrc[1]), "sock", &sockfd, NULL);
1117     GST_DEBUG_OBJECT (demux, "UDP src has sock %d", sockfd);
1118     /* configure socket and make sure udpsink does not close it when shutting
1119      * down, it belongs to udpsrc after all. */
1120     g_object_set (G_OBJECT (stream->udpsink), "sockfd", sockfd, NULL);
1121     g_object_set (G_OBJECT (stream->udpsink), "closefd", FALSE, NULL);
1122   }
1123
1124   /* we keep this playing always */
1125   gst_element_set_locked_state (stream->udpsink, TRUE);
1126   gst_element_set_state (stream->udpsink, GST_STATE_PLAYING);
1127
1128   gst_bin_add (GST_BIN_CAST (demux), stream->udpsink);
1129
1130   /* get session RTCP pad */
1131   name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
1132   pad = gst_element_get_request_pad (demux->session, name);
1133   g_free (name);
1134
1135   /* and link */
1136   if (pad) {
1137     sinkpad = gst_element_get_static_pad (stream->udpsink, "sink");
1138     gst_pad_link (pad, sinkpad);
1139     gst_object_unref (sinkpad);
1140   } else {
1141     /* not very fatal, we just won't be able to send RTCP */
1142     GST_WARNING_OBJECT (demux, "could not get session RTCP pad");
1143   }
1144
1145
1146   return TRUE;
1147
1148   /* ERRORS */
1149 no_sink_element:
1150   {
1151     GST_DEBUG_OBJECT (demux, "no UDP sink element found");
1152     return FALSE;
1153   }
1154 }
1155
1156 static GstFlowReturn
1157 gst_sdp_demux_combine_flows (GstSDPDemux * demux, GstSDPStream * stream,
1158     GstFlowReturn ret)
1159 {
1160   GList *streams;
1161
1162   /* store the value */
1163   stream->last_ret = ret;
1164
1165   /* if it's success we can return the value right away */
1166   if (ret == GST_FLOW_OK)
1167     goto done;
1168
1169   /* any other error that is not-linked can be returned right
1170    * away */
1171   if (ret != GST_FLOW_NOT_LINKED)
1172     goto done;
1173
1174   /* only return NOT_LINKED if all other pads returned NOT_LINKED */
1175   for (streams = demux->streams; streams; streams = g_list_next (streams)) {
1176     GstSDPStream *ostream = (GstSDPStream *) streams->data;
1177
1178     ret = ostream->last_ret;
1179     /* some other return value (must be SUCCESS but we can return
1180      * other values as well) */
1181     if (ret != GST_FLOW_NOT_LINKED)
1182       goto done;
1183   }
1184   /* if we get here, all other pads were unlinked and we return
1185    * NOT_LINKED then */
1186 done:
1187   return ret;
1188 }
1189
1190 static void
1191 gst_sdp_demux_stream_push_event (GstSDPDemux * demux, GstSDPStream * stream,
1192     GstEvent * event)
1193 {
1194   /* only streams that have a connection to the outside world */
1195   if (stream->srcpad == NULL)
1196     goto done;
1197
1198   if (stream->channelpad[0]) {
1199     gst_event_ref (event);
1200     gst_pad_send_event (stream->channelpad[0], event);
1201   }
1202
1203   if (stream->channelpad[1]) {
1204     gst_event_ref (event);
1205     gst_pad_send_event (stream->channelpad[1], event);
1206   }
1207
1208 done:
1209   gst_event_unref (event);
1210 }
1211
1212 static void
1213 gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message)
1214 {
1215   GstSDPDemux *demux;
1216
1217   demux = GST_SDP_DEMUX (bin);
1218
1219   switch (GST_MESSAGE_TYPE (message)) {
1220     case GST_MESSAGE_ELEMENT:
1221     {
1222       const GstStructure *s = gst_message_get_structure (message);
1223
1224       if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
1225         gboolean ignore_timeout;
1226
1227         GST_DEBUG_OBJECT (bin, "timeout on UDP port");
1228
1229         GST_OBJECT_LOCK (demux);
1230         ignore_timeout = demux->ignore_timeout;
1231         demux->ignore_timeout = TRUE;
1232         GST_OBJECT_UNLOCK (demux);
1233
1234         /* we only act on the first udp timeout message, others are irrelevant
1235          * and can be ignored. */
1236         if (ignore_timeout)
1237           gst_message_unref (message);
1238         else {
1239           GST_ELEMENT_ERROR (demux, RESOURCE, READ, (NULL),
1240               ("Could not receive any UDP packets for %.4f seconds, maybe your "
1241                   "firewall is blocking it.",
1242                   gst_guint64_to_gdouble (demux->udp_timeout / 1000000.0)));
1243         }
1244         return;
1245       }
1246       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1247       break;
1248     }
1249     case GST_MESSAGE_ERROR:
1250     {
1251       GstObject *udpsrc;
1252       GstSDPStream *stream;
1253       GstFlowReturn ret;
1254
1255       udpsrc = GST_MESSAGE_SRC (message);
1256
1257       GST_DEBUG_OBJECT (demux, "got error from %s", GST_ELEMENT_NAME (udpsrc));
1258
1259       stream = find_stream (demux, udpsrc, (gpointer) find_stream_by_udpsrc);
1260       /* fatal but not our message, forward */
1261       if (!stream)
1262         goto forward;
1263
1264       /* we ignore the RTCP udpsrc */
1265       if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
1266         goto done;
1267
1268       /* if we get error messages from the udp sources, that's not a problem as
1269        * long as not all of them error out. We also don't really know what the
1270        * problem is, the message does not give enough detail... */
1271       ret = gst_sdp_demux_combine_flows (demux, stream, GST_FLOW_NOT_LINKED);
1272       GST_DEBUG_OBJECT (demux, "combined flows: %s", gst_flow_get_name (ret));
1273       if (ret != GST_FLOW_OK)
1274         goto forward;
1275
1276     done:
1277       gst_message_unref (message);
1278       break;
1279
1280     forward:
1281       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1282       break;
1283     }
1284     default:
1285     {
1286       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1287       break;
1288     }
1289   }
1290 }
1291
1292 static gboolean
1293 gst_sdp_demux_start (GstSDPDemux * demux)
1294 {
1295   guint8 *data;
1296   guint size;
1297   gint i, n_streams;
1298   GstSDPMessage sdp = { 0 };
1299   GstSDPStream *stream = NULL;
1300   GList *walk;
1301   gchar *uri = NULL;
1302   GstStateChangeReturn ret;
1303
1304   /* grab the lock so that no state change can interfere */
1305   GST_SDP_STREAM_LOCK (demux);
1306
1307   GST_DEBUG_OBJECT (demux, "parse SDP...");
1308
1309   size = gst_adapter_available (demux->adapter);
1310   data = gst_adapter_take (demux->adapter, size);
1311
1312   gst_sdp_message_init (&sdp);
1313   if (gst_sdp_message_parse_buffer (data, size, &sdp) != GST_SDP_OK)
1314     goto could_not_parse;
1315
1316   if (demux->debug)
1317     gst_sdp_message_dump (&sdp);
1318
1319   /* maybe this is plain RTSP DESCRIBE rtsp and we should redirect */
1320   /* look for rtsp control url */
1321   {
1322     const gchar *control;
1323
1324     for (i = 0;; i++) {
1325       control = gst_sdp_message_get_attribute_val_n (&sdp, "control", i);
1326       if (control == NULL)
1327         break;
1328
1329       /* only take fully qualified urls */
1330       if (g_str_has_prefix (control, "rtsp://"))
1331         break;
1332     }
1333     if (!control) {
1334       gint idx;
1335
1336       /* try to find non-aggragate control */
1337       n_streams = gst_sdp_message_medias_len (&sdp);
1338
1339       for (idx = 0; idx < n_streams; idx++) {
1340         const GstSDPMedia *media;
1341
1342         /* get media, should not return NULL */
1343         media = gst_sdp_message_get_media (&sdp, idx);
1344         if (media == NULL)
1345           break;
1346
1347         for (i = 0;; i++) {
1348           control = gst_sdp_media_get_attribute_val_n (media, "control", i);
1349           if (control == NULL)
1350             break;
1351
1352           /* only take fully qualified urls */
1353           if (g_str_has_prefix (control, "rtsp://"))
1354             break;
1355         }
1356         /* this media has no control, exit */
1357         if (!control)
1358           break;
1359       }
1360     }
1361
1362     if (control) {
1363       /* we have RTSP now */
1364       uri = gst_sdp_message_as_uri ("rtsp-sdp", &sdp);
1365
1366       if (demux->redirect) {
1367         GST_INFO_OBJECT (demux, "redirect to %s", uri);
1368
1369         gst_element_post_message (GST_ELEMENT_CAST (demux),
1370             gst_message_new_element (GST_OBJECT_CAST (demux),
1371                 gst_structure_new ("redirect",
1372                     "new-location", G_TYPE_STRING, uri, NULL)));
1373         goto sent_redirect;
1374       }
1375     }
1376   }
1377
1378   /* we get here when we didn't do a redirect */
1379
1380   /* try to get and configure a manager */
1381   if (!gst_sdp_demux_configure_manager (demux, uri))
1382     goto no_manager;
1383   if (!uri) {
1384     /* create streams with UDP sources and sinks */
1385     n_streams = gst_sdp_message_medias_len (&sdp);
1386     for (i = 0; i < n_streams; i++) {
1387       stream = gst_sdp_demux_create_stream (demux, &sdp, i);
1388
1389       if (!stream)
1390         continue;
1391
1392       GST_DEBUG_OBJECT (demux, "configuring transport for stream %p", stream);
1393
1394       if (!gst_sdp_demux_stream_configure_udp (demux, stream))
1395         goto transport_failed;
1396       if (!gst_sdp_demux_stream_configure_udp_sink (demux, stream))
1397         goto transport_failed;
1398     }
1399
1400     if (!demux->streams)
1401       goto no_streams;
1402   }
1403
1404   /* set target state on session manager */
1405   /* setting rtspsrc to PLAYING may cause it to loose it that target state
1406    * along the way due to no-preroll udpsrc elements, so ...
1407    * do it in two stages here (similar to other elements) */
1408   if (demux->target > GST_STATE_PAUSED) {
1409     ret = gst_element_set_state (demux->session, GST_STATE_PAUSED);
1410     if (ret == GST_STATE_CHANGE_FAILURE)
1411       goto start_session_failure;
1412   }
1413   ret = gst_element_set_state (demux->session, demux->target);
1414   if (ret == GST_STATE_CHANGE_FAILURE)
1415     goto start_session_failure;
1416
1417   if (!uri) {
1418     /* activate all streams */
1419     for (walk = demux->streams; walk; walk = g_list_next (walk)) {
1420       stream = (GstSDPStream *) walk->data;
1421
1422       /* configure target state on udp sources */
1423       gst_element_set_state (stream->udpsrc[0], demux->target);
1424       gst_element_set_state (stream->udpsrc[1], demux->target);
1425     }
1426   }
1427   GST_SDP_STREAM_UNLOCK (demux);
1428   gst_sdp_message_uninit (&sdp);
1429
1430   return TRUE;
1431
1432   /* ERRORS */
1433 done:
1434   {
1435     GST_SDP_STREAM_UNLOCK (demux);
1436     gst_sdp_message_uninit (&sdp);
1437     return FALSE;
1438   }
1439 transport_failed:
1440   {
1441     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1442         ("Could not create RTP stream transport."));
1443     goto done;
1444   }
1445 no_manager:
1446   {
1447     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1448         ("Could not create RTP session manager."));
1449     goto done;
1450   }
1451 could_not_parse:
1452   {
1453     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1454         ("Could not parse SDP message."));
1455     goto done;
1456   }
1457 no_streams:
1458   {
1459     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1460         ("No streams in SDP message."));
1461     goto done;
1462   }
1463 sent_redirect:
1464   {
1465     /* avoid hanging if redirect not handled */
1466     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1467         ("Sent RTSP redirect."));
1468     goto done;
1469   }
1470 start_session_failure:
1471   {
1472     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1473         ("Could not start RTP session manager."));
1474     gst_element_set_state (demux->session, GST_STATE_NULL);
1475     gst_bin_remove (GST_BIN_CAST (demux), demux->session);
1476     demux->session = NULL;
1477     goto done;
1478   }
1479 }
1480
1481 static gboolean
1482 gst_sdp_demux_sink_event (GstPad * pad, GstEvent * event)
1483 {
1484   GstSDPDemux *demux;
1485   gboolean res = TRUE;
1486
1487   demux = GST_SDP_DEMUX (gst_pad_get_parent (pad));
1488
1489   switch (GST_EVENT_TYPE (event)) {
1490     case GST_EVENT_EOS:
1491       /* when we get EOS, start parsing the SDP */
1492       res = gst_sdp_demux_start (demux);
1493       gst_event_unref (event);
1494       break;
1495     default:
1496       gst_event_unref (event);
1497       break;
1498   }
1499   gst_object_unref (demux);
1500
1501   return res;
1502 }
1503
1504 static GstFlowReturn
1505 gst_sdp_demux_sink_chain (GstPad * pad, GstBuffer * buffer)
1506 {
1507   GstSDPDemux *demux;
1508
1509   demux = GST_SDP_DEMUX (gst_pad_get_parent (pad));
1510
1511   /* push the SDP message in an adapter, we start doing something with it when
1512    * we receive EOS */
1513   gst_adapter_push (demux->adapter, buffer);
1514
1515   gst_object_unref (demux);
1516
1517   return GST_FLOW_OK;
1518 }
1519
1520 static GstStateChangeReturn
1521 gst_sdp_demux_change_state (GstElement * element, GstStateChange transition)
1522 {
1523   GstSDPDemux *demux;
1524   GstStateChangeReturn ret;
1525
1526   demux = GST_SDP_DEMUX (element);
1527
1528   GST_SDP_STREAM_LOCK (demux);
1529
1530   switch (transition) {
1531     case GST_STATE_CHANGE_NULL_TO_READY:
1532       break;
1533     case GST_STATE_CHANGE_READY_TO_PAUSED:
1534       /* first attempt, don't ignore timeouts */
1535       gst_adapter_clear (demux->adapter);
1536       demux->ignore_timeout = FALSE;
1537       demux->target = GST_STATE_PAUSED;
1538       break;
1539     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1540       demux->target = GST_STATE_PLAYING;
1541       break;
1542     default:
1543       break;
1544   }
1545
1546   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1547   if (ret == GST_STATE_CHANGE_FAILURE)
1548     goto done;
1549
1550   switch (transition) {
1551     case GST_STATE_CHANGE_READY_TO_PAUSED:
1552       ret = GST_STATE_CHANGE_NO_PREROLL;
1553       break;
1554     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1555       ret = GST_STATE_CHANGE_NO_PREROLL;
1556       demux->target = GST_STATE_PAUSED;
1557       break;
1558     case GST_STATE_CHANGE_PAUSED_TO_READY:
1559       gst_sdp_demux_cleanup (demux);
1560       break;
1561     case GST_STATE_CHANGE_READY_TO_NULL:
1562       break;
1563     default:
1564       break;
1565   }
1566
1567 done:
1568   GST_SDP_STREAM_UNLOCK (demux);
1569
1570   return ret;
1571 }