Merging gst-plugins-bad
[platform/upstream/gstreamer.git] / gst / sdp / gstsdpdemux.c
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 /**
20  * SECTION:element-sdpdemux
21  * @title: sdpdemux
22  *
23  * sdpdemux currently understands SDP as the input format of the session description.
24  * For each stream listed in the SDP a new stream_\%u pad will be created
25  * with caps derived from the SDP media description. This is a caps of mime type
26  * "application/x-rtp" that can be connected to any available RTP depayloader
27  * element.
28  *
29  * sdpdemux will internally instantiate an RTP session manager element
30  * that will handle the RTCP messages to and from the server, jitter removal,
31  * packet reordering along with providing a clock for the pipeline.
32  *
33  * sdpdemux acts like a live element and will therefore only generate data in the
34  * PLAYING state.
35  *
36  * ## Example launch line
37  * |[
38  * gst-launch-1.0 souphttpsrc location=http://some.server/session.sdp ! sdpdemux ! fakesink
39  * ]| Establish a connection to an HTTP server that contains an SDP session description
40  * that gets parsed by sdpdemux and send the raw RTP packets to a fakesink.
41  *
42  */
43
44 #ifdef HAVE_CONFIG_H
45 #include "config.h"
46 #endif
47
48 #include "gstsdpdemux.h"
49
50 #include <gst/rtp/gstrtppayloads.h>
51 #include <gst/sdp/gstsdpmessage.h>
52
53 #include <stdio.h>
54 #include <stdlib.h>
55 #include <string.h>
56
57 GST_DEBUG_CATEGORY_STATIC (sdpdemux_debug);
58 #define GST_CAT_DEFAULT (sdpdemux_debug)
59
60 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
61     GST_PAD_SINK,
62     GST_PAD_ALWAYS,
63     GST_STATIC_CAPS ("application/sdp"));
64
65 static GstStaticPadTemplate rtptemplate = GST_STATIC_PAD_TEMPLATE ("stream_%u",
66     GST_PAD_SRC,
67     GST_PAD_SOMETIMES,
68     GST_STATIC_CAPS ("application/x-rtp"));
69
70 enum
71 {
72   /* FILL ME */
73   LAST_SIGNAL
74 };
75
76 #define DEFAULT_DEBUG            FALSE
77 #define DEFAULT_TIMEOUT          10000000
78 #define DEFAULT_LATENCY_MS       200
79 #define DEFAULT_REDIRECT         TRUE
80
81 enum
82 {
83   PROP_0,
84   PROP_DEBUG,
85   PROP_TIMEOUT,
86   PROP_LATENCY,
87   PROP_REDIRECT
88 };
89
90 static void gst_sdp_demux_finalize (GObject * object);
91
92 static void gst_sdp_demux_set_property (GObject * object, guint prop_id,
93     const GValue * value, GParamSpec * pspec);
94 static void gst_sdp_demux_get_property (GObject * object, guint prop_id,
95     GValue * value, GParamSpec * pspec);
96
97 static GstStateChangeReturn gst_sdp_demux_change_state (GstElement * element,
98     GstStateChange transition);
99 static void gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message);
100
101 static void gst_sdp_demux_stream_push_event (GstSDPDemux * demux,
102     GstSDPStream * stream, GstEvent * event);
103
104 static gboolean gst_sdp_demux_sink_event (GstPad * pad, GstObject * parent,
105     GstEvent * event);
106 static GstFlowReturn gst_sdp_demux_sink_chain (GstPad * pad, GstObject * parent,
107     GstBuffer * buffer);
108
109 /*static guint gst_sdp_demux_signals[LAST_SIGNAL] = { 0 }; */
110
111 #define gst_sdp_demux_parent_class parent_class
112 G_DEFINE_TYPE (GstSDPDemux, gst_sdp_demux, GST_TYPE_BIN);
113 GST_ELEMENT_REGISTER_DEFINE (sdpdemux, "sdpdemux", GST_RANK_NONE,
114     GST_TYPE_SDP_DEMUX);
115
116 static void
117 gst_sdp_demux_class_init (GstSDPDemuxClass * klass)
118 {
119   GObjectClass *gobject_class;
120   GstElementClass *gstelement_class;
121   GstBinClass *gstbin_class;
122
123   gobject_class = (GObjectClass *) klass;
124   gstelement_class = (GstElementClass *) klass;
125   gstbin_class = (GstBinClass *) klass;
126
127   gobject_class->set_property = gst_sdp_demux_set_property;
128   gobject_class->get_property = gst_sdp_demux_get_property;
129
130   gobject_class->finalize = gst_sdp_demux_finalize;
131
132   g_object_class_install_property (gobject_class, PROP_DEBUG,
133       g_param_spec_boolean ("debug", "Debug",
134           "Dump request and response messages to stdout",
135           DEFAULT_DEBUG,
136           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
137
138   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
139       g_param_spec_uint64 ("timeout", "Timeout",
140           "Fail transport after UDP timeout microseconds (0 = disabled)",
141           0, G_MAXUINT64, DEFAULT_TIMEOUT,
142           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
143
144   g_object_class_install_property (gobject_class, PROP_LATENCY,
145       g_param_spec_uint ("latency", "Buffer latency in ms",
146           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
147           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
148
149   g_object_class_install_property (gobject_class, PROP_REDIRECT,
150       g_param_spec_boolean ("redirect", "Redirect",
151           "Sends a redirection message instead of using a custom session element",
152           DEFAULT_REDIRECT,
153           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
154
155   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
156   gst_element_class_add_static_pad_template (gstelement_class, &rtptemplate);
157
158   gst_element_class_set_static_metadata (gstelement_class, "SDP session setup",
159       "Codec/Demuxer/Network/RTP",
160       "Receive data over the network via SDP",
161       "Wim Taymans <wim.taymans@gmail.com>");
162
163   gstelement_class->change_state = gst_sdp_demux_change_state;
164
165   gstbin_class->handle_message = gst_sdp_demux_handle_message;
166
167   GST_DEBUG_CATEGORY_INIT (sdpdemux_debug, "sdpdemux", 0, "SDP demux");
168 }
169
170 static void
171 gst_sdp_demux_init (GstSDPDemux * demux)
172 {
173   demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
174   gst_pad_set_event_function (demux->sinkpad,
175       GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_event));
176   gst_pad_set_chain_function (demux->sinkpad,
177       GST_DEBUG_FUNCPTR (gst_sdp_demux_sink_chain));
178   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
179
180   /* protects the streaming thread in interleaved mode or the polling
181    * thread in UDP mode. */
182   g_rec_mutex_init (&demux->stream_rec_lock);
183
184   demux->adapter = gst_adapter_new ();
185 }
186
187 static void
188 gst_sdp_demux_finalize (GObject * object)
189 {
190   GstSDPDemux *demux;
191
192   demux = GST_SDP_DEMUX (object);
193
194   /* free locks */
195   g_rec_mutex_clear (&demux->stream_rec_lock);
196
197   g_object_unref (demux->adapter);
198
199   G_OBJECT_CLASS (parent_class)->finalize (object);
200 }
201
202 static void
203 gst_sdp_demux_set_property (GObject * object, guint prop_id,
204     const GValue * value, GParamSpec * pspec)
205 {
206   GstSDPDemux *demux;
207
208   demux = GST_SDP_DEMUX (object);
209
210   switch (prop_id) {
211     case PROP_DEBUG:
212       demux->debug = g_value_get_boolean (value);
213       break;
214     case PROP_TIMEOUT:
215       demux->udp_timeout = g_value_get_uint64 (value);
216       break;
217     case PROP_LATENCY:
218       demux->latency = g_value_get_uint (value);
219       break;
220     case PROP_REDIRECT:
221       demux->redirect = g_value_get_boolean (value);
222       break;
223     default:
224       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
225       break;
226   }
227 }
228
229 static void
230 gst_sdp_demux_get_property (GObject * object, guint prop_id, GValue * value,
231     GParamSpec * pspec)
232 {
233   GstSDPDemux *demux;
234
235   demux = GST_SDP_DEMUX (object);
236
237   switch (prop_id) {
238     case PROP_DEBUG:
239       g_value_set_boolean (value, demux->debug);
240       break;
241     case PROP_TIMEOUT:
242       g_value_set_uint64 (value, demux->udp_timeout);
243       break;
244     case PROP_LATENCY:
245       g_value_set_uint (value, demux->latency);
246       break;
247     case PROP_REDIRECT:
248       g_value_set_boolean (value, demux->redirect);
249       break;
250     default:
251       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
252       break;
253   }
254 }
255
256 static gint
257 find_stream_by_id (GstSDPStream * stream, gconstpointer a)
258 {
259   gint id = GPOINTER_TO_INT (a);
260
261   if (stream->id == id)
262     return 0;
263
264   return -1;
265 }
266
267 static gint
268 find_stream_by_pt (GstSDPStream * stream, gconstpointer a)
269 {
270   gint pt = GPOINTER_TO_INT (a);
271
272   if (stream->pt == pt)
273     return 0;
274
275   return -1;
276 }
277
278 static gint
279 find_stream_by_udpsrc (GstSDPStream * stream, gconstpointer a)
280 {
281   GstElement *src = (GstElement *) a;
282
283   if (stream->udpsrc[0] == src)
284     return 0;
285   if (stream->udpsrc[1] == src)
286     return 0;
287
288   return -1;
289 }
290
291 static GstSDPStream *
292 find_stream (GstSDPDemux * demux, gconstpointer data, gconstpointer func)
293 {
294   GList *lstream;
295
296   /* find and get stream */
297   if ((lstream =
298           g_list_find_custom (demux->streams, data, (GCompareFunc) func)))
299     return (GstSDPStream *) lstream->data;
300
301   return NULL;
302 }
303
304 static void
305 gst_sdp_demux_stream_free (GstSDPDemux * demux, GstSDPStream * stream)
306 {
307   gint i;
308
309   GST_DEBUG_OBJECT (demux, "free stream %p", stream);
310
311   if (stream->caps)
312     gst_caps_unref (stream->caps);
313
314   for (i = 0; i < 2; i++) {
315     GstElement *udpsrc = stream->udpsrc[i];
316
317     if (udpsrc) {
318       gst_element_set_state (udpsrc, GST_STATE_NULL);
319       gst_bin_remove (GST_BIN_CAST (demux), udpsrc);
320       stream->udpsrc[i] = NULL;
321     }
322   }
323   if (stream->udpsink) {
324     gst_element_set_state (stream->udpsink, GST_STATE_NULL);
325     gst_bin_remove (GST_BIN_CAST (demux), stream->udpsink);
326     stream->udpsink = NULL;
327   }
328   if (stream->srcpad) {
329     gst_pad_set_active (stream->srcpad, FALSE);
330     if (stream->added) {
331       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
332       stream->added = FALSE;
333     }
334     stream->srcpad = NULL;
335   }
336   g_free (stream);
337 }
338
339 static gboolean
340 is_multicast_address (const gchar * host_name)
341 {
342   GInetAddress *addr;
343   GResolver *resolver = NULL;
344   gboolean ret = FALSE;
345
346   addr = g_inet_address_new_from_string (host_name);
347   if (!addr) {
348     GList *results;
349
350     resolver = g_resolver_get_default ();
351     results = g_resolver_lookup_by_name (resolver, host_name, NULL, NULL);
352     if (!results)
353       goto out;
354     addr = G_INET_ADDRESS (g_object_ref (results->data));
355
356     g_resolver_free_addresses (results);
357   }
358   g_assert (addr != NULL);
359
360   ret = g_inet_address_get_is_multicast (addr);
361
362 out:
363   if (resolver)
364     g_object_unref (resolver);
365   if (addr)
366     g_object_unref (addr);
367   return ret;
368 }
369
370 static GstSDPStream *
371 gst_sdp_demux_create_stream (GstSDPDemux * demux, GstSDPMessage * sdp, gint idx)
372 {
373   GstSDPStream *stream;
374   const gchar *payload;
375   const GstSDPMedia *media;
376   const GstSDPConnection *conn;
377
378   /* get media, should not return NULL */
379   media = gst_sdp_message_get_media (sdp, idx);
380   if (media == NULL)
381     return NULL;
382
383   stream = g_new0 (GstSDPStream, 1);
384   stream->parent = demux;
385   /* we mark the pad as not linked, we will mark it as OK when we add the pad to
386    * the element. */
387   stream->last_ret = GST_FLOW_OK;
388   stream->added = FALSE;
389   stream->disabled = FALSE;
390   stream->id = demux->numstreams++;
391   stream->eos = FALSE;
392
393   /* we must have a payload. No payload means we cannot create caps */
394   /* FIXME, handle multiple formats. */
395   if ((payload = gst_sdp_media_get_format (media, 0))) {
396     GstStructure *s;
397
398     stream->pt = atoi (payload);
399     /* convert caps */
400     stream->caps = gst_sdp_media_get_caps_from_media (media, stream->pt);
401
402     s = gst_caps_get_structure (stream->caps, 0);
403     gst_structure_set_name (s, "application/x-rtp");
404
405     if (stream->pt >= 96) {
406       /* If we have a dynamic payload type, see if we have a stream with the
407        * same payload number. If there is one, they are part of the same
408        * container and we only need to add one pad. */
409       if (find_stream (demux, GINT_TO_POINTER (stream->pt),
410               (gpointer) find_stream_by_pt)) {
411         stream->container = TRUE;
412       }
413     }
414   }
415
416   if (gst_sdp_media_connections_len (media) > 0) {
417     if (!(conn = gst_sdp_media_get_connection (media, 0))) {
418       /* We should not reach this based on the check above */
419       goto no_connection;
420     }
421   } else {
422     if (!(conn = gst_sdp_message_get_connection (sdp))) {
423       goto no_connection;
424     }
425   }
426
427   if (!conn->address)
428     goto no_connection;
429
430   stream->destination = conn->address;
431   stream->ttl = conn->ttl;
432   stream->multicast = is_multicast_address (stream->destination);
433
434   stream->rtp_port = gst_sdp_media_get_port (media);
435   if (gst_sdp_media_get_attribute_val (media, "rtcp")) {
436     /* FIXME, RFC 3605 */
437     stream->rtcp_port = stream->rtp_port + 1;
438   } else {
439     stream->rtcp_port = stream->rtp_port + 1;
440   }
441
442   GST_DEBUG_OBJECT (demux, "stream %d, (%p)", stream->id, stream);
443   GST_DEBUG_OBJECT (demux, " pt: %d", stream->pt);
444   GST_DEBUG_OBJECT (demux, " container: %d", stream->container);
445   GST_DEBUG_OBJECT (demux, " caps: %" GST_PTR_FORMAT, stream->caps);
446
447   /* we keep track of all streams */
448   demux->streams = g_list_append (demux->streams, stream);
449
450   return stream;
451
452   /* ERRORS */
453 no_connection:
454   {
455     gst_sdp_demux_stream_free (demux, stream);
456     return NULL;
457   }
458 }
459
460 static void
461 gst_sdp_demux_cleanup (GstSDPDemux * demux)
462 {
463   GList *walk;
464
465   GST_DEBUG_OBJECT (demux, "cleanup");
466
467   for (walk = demux->streams; walk; walk = g_list_next (walk)) {
468     GstSDPStream *stream = (GstSDPStream *) walk->data;
469
470     gst_sdp_demux_stream_free (demux, stream);
471   }
472   g_list_free (demux->streams);
473   demux->streams = NULL;
474   if (demux->session) {
475     if (demux->session_sig_id) {
476       g_signal_handler_disconnect (demux->session, demux->session_sig_id);
477       demux->session_sig_id = 0;
478     }
479     if (demux->session_nmp_id) {
480       g_signal_handler_disconnect (demux->session, demux->session_nmp_id);
481       demux->session_nmp_id = 0;
482     }
483     if (demux->session_ptmap_id) {
484       g_signal_handler_disconnect (demux->session, demux->session_ptmap_id);
485       demux->session_ptmap_id = 0;
486     }
487     gst_element_set_state (demux->session, GST_STATE_NULL);
488     gst_bin_remove (GST_BIN_CAST (demux), demux->session);
489     demux->session = NULL;
490   }
491   demux->numstreams = 0;
492 }
493
494 /* this callback is called when the session manager generated a new src pad with
495  * payloaded RTP packets. We simply ghost the pad here. */
496 static void
497 new_session_pad (GstElement * session, GstPad * pad, GstSDPDemux * demux)
498 {
499   gchar *name, *pad_name;
500   GstPadTemplate *template;
501   guint id, ssrc, pt;
502   GList *lstream;
503   GstSDPStream *stream;
504   gboolean all_added;
505
506   GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
507
508   GST_SDP_STREAM_LOCK (demux);
509   /* find stream */
510   name = gst_object_get_name (GST_OBJECT_CAST (pad));
511   if (sscanf (name, "recv_rtp_src_%u_%u_%u", &id, &ssrc, &pt) != 3)
512     goto unknown_stream;
513
514   GST_DEBUG_OBJECT (demux, "stream: %u, SSRC %u, PT %u", id, ssrc, pt);
515
516   stream =
517       find_stream (demux, GUINT_TO_POINTER (id), (gpointer) find_stream_by_id);
518   if (stream == NULL)
519     goto unknown_stream;
520
521   stream->ssrc = ssrc;
522
523   /* no need for a timeout anymore now */
524   g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout", (guint64) 0, NULL);
525
526   pad_name = g_strdup_printf ("stream_%u", stream->id);
527   /* create a new pad we will use to stream to */
528   template = gst_static_pad_template_get (&rtptemplate);
529   stream->srcpad = gst_ghost_pad_new_from_template (pad_name, pad, template);
530   gst_object_unref (template);
531   g_free (name);
532   g_free (pad_name);
533
534   stream->added = TRUE;
535   gst_pad_set_active (stream->srcpad, TRUE);
536   gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->srcpad);
537
538   /* check if we added all streams */
539   all_added = TRUE;
540   for (lstream = demux->streams; lstream; lstream = g_list_next (lstream)) {
541     stream = (GstSDPStream *) lstream->data;
542     /* a container stream only needs one pad added. Also disabled streams don't
543      * count */
544     if (!stream->container && !stream->disabled && !stream->added) {
545       all_added = FALSE;
546       break;
547     }
548   }
549   GST_SDP_STREAM_UNLOCK (demux);
550
551   if (all_added) {
552     GST_DEBUG_OBJECT (demux, "We added all streams");
553     /* when we get here, all stream are added and we can fire the no-more-pads
554      * signal. */
555     gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
556   }
557
558   return;
559
560   /* ERRORS */
561 unknown_stream:
562   {
563     GST_DEBUG_OBJECT (demux, "ignoring unknown stream");
564     GST_SDP_STREAM_UNLOCK (demux);
565     g_free (name);
566     return;
567   }
568 }
569
570 static void
571 rtsp_session_pad_added (GstElement * session, GstPad * pad, GstSDPDemux * demux)
572 {
573   GstPad *srcpad = NULL;
574   gchar *name;
575
576   GST_DEBUG_OBJECT (demux, "got new session pad %" GST_PTR_FORMAT, pad);
577
578   name = gst_pad_get_name (pad);
579   srcpad = gst_ghost_pad_new (name, pad);
580   g_free (name);
581
582   gst_pad_set_active (srcpad, TRUE);
583   gst_element_add_pad (GST_ELEMENT_CAST (demux), srcpad);
584 }
585
586 static void
587 rtsp_session_no_more_pads (GstElement * session, GstSDPDemux * demux)
588 {
589   GST_DEBUG_OBJECT (demux, "got no-more-pads");
590   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
591 }
592
593 static GstCaps *
594 request_pt_map (GstElement * sess, guint session, guint pt, GstSDPDemux * demux)
595 {
596   GstSDPStream *stream;
597   GstCaps *caps;
598
599   GST_DEBUG_OBJECT (demux, "getting pt map for pt %d in session %d", pt,
600       session);
601
602   GST_SDP_STREAM_LOCK (demux);
603   stream =
604       find_stream (demux, GINT_TO_POINTER (session),
605       (gpointer) find_stream_by_id);
606   if (!stream)
607     goto unknown_stream;
608
609   caps = stream->caps;
610   if (caps)
611     gst_caps_ref (caps);
612   GST_SDP_STREAM_UNLOCK (demux);
613
614   return caps;
615
616 unknown_stream:
617   {
618     GST_DEBUG_OBJECT (demux, "unknown stream %d", session);
619     GST_SDP_STREAM_UNLOCK (demux);
620     return NULL;
621   }
622 }
623
624 static void
625 gst_sdp_demux_do_stream_eos (GstSDPDemux * demux, guint session, guint32 ssrc)
626 {
627   GstSDPStream *stream;
628
629   GST_DEBUG_OBJECT (demux, "setting stream for session %u to EOS", session);
630
631   /* get stream for session */
632   stream =
633       find_stream (demux, GINT_TO_POINTER (session),
634       (gpointer) find_stream_by_id);
635   if (!stream)
636     goto unknown_stream;
637
638   if (stream->eos)
639     goto was_eos;
640
641   if (stream->ssrc != ssrc)
642     goto wrong_ssrc;
643
644   stream->eos = TRUE;
645   gst_sdp_demux_stream_push_event (demux, stream, gst_event_new_eos ());
646   return;
647
648   /* ERRORS */
649 unknown_stream:
650   {
651     GST_DEBUG_OBJECT (demux, "unknown stream for session %u", session);
652     return;
653   }
654 was_eos:
655   {
656     GST_DEBUG_OBJECT (demux, "stream for session %u was already EOS", session);
657     return;
658   }
659 wrong_ssrc:
660   {
661     GST_DEBUG_OBJECT (demux, "unkown SSRC %08x for session %u", ssrc, session);
662     return;
663   }
664 }
665
666 static void
667 on_bye_ssrc (GstElement * manager, guint session, guint32 ssrc,
668     GstSDPDemux * demux)
669 {
670   GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u received BYE", ssrc,
671       session);
672
673   gst_sdp_demux_do_stream_eos (demux, session, ssrc);
674 }
675
676 static void
677 on_timeout (GstElement * manager, guint session, guint32 ssrc,
678     GstSDPDemux * demux)
679 {
680   GST_DEBUG_OBJECT (demux, "SSRC %08x in session %u timed out", ssrc, session);
681
682   gst_sdp_demux_do_stream_eos (demux, session, ssrc);
683 }
684
685 /* try to get and configure a manager */
686 static gboolean
687 gst_sdp_demux_configure_manager (GstSDPDemux * demux, char *rtsp_sdp)
688 {
689   /* configure the session manager */
690   if (rtsp_sdp != NULL) {
691     if (!(demux->session = gst_element_factory_make ("rtspsrc", NULL)))
692       goto rtspsrc_failed;
693
694     g_object_set (demux->session, "location", rtsp_sdp, NULL);
695
696     GST_DEBUG_OBJECT (demux, "connect to signals on rtspsrc");
697     demux->session_sig_id =
698         g_signal_connect (demux->session, "pad-added",
699         (GCallback) rtsp_session_pad_added, demux);
700     demux->session_nmp_id =
701         g_signal_connect (demux->session, "no-more-pads",
702         (GCallback) rtsp_session_no_more_pads, demux);
703   } else {
704     if (!(demux->session = gst_element_factory_make ("rtpbin", NULL)))
705       goto manager_failed;
706
707     /* connect to signals if we did not already do so */
708     GST_DEBUG_OBJECT (demux, "connect to signals on session manager");
709     demux->session_sig_id =
710         g_signal_connect (demux->session, "pad-added",
711         (GCallback) new_session_pad, demux);
712     demux->session_ptmap_id =
713         g_signal_connect (demux->session, "request-pt-map",
714         (GCallback) request_pt_map, demux);
715     g_signal_connect (demux->session, "on-bye-ssrc", (GCallback) on_bye_ssrc,
716         demux);
717     g_signal_connect (demux->session, "on-bye-timeout", (GCallback) on_timeout,
718         demux);
719     g_signal_connect (demux->session, "on-timeout", (GCallback) on_timeout,
720         demux);
721   }
722
723   g_object_set (demux->session, "latency", demux->latency, NULL);
724
725   /* we manage this element */
726   gst_bin_add (GST_BIN_CAST (demux), demux->session);
727
728   return TRUE;
729
730   /* ERRORS */
731 manager_failed:
732   {
733     GST_DEBUG_OBJECT (demux, "no session manager element gstrtpbin found");
734     return FALSE;
735   }
736 rtspsrc_failed:
737   {
738     GST_DEBUG_OBJECT (demux, "no manager element rtspsrc found");
739     return FALSE;
740   }
741 }
742
743 static gboolean
744 gst_sdp_demux_stream_configure_udp (GstSDPDemux * demux, GstSDPStream * stream)
745 {
746   gchar *uri, *name;
747   const gchar *destination;
748   GstPad *pad;
749
750   GST_DEBUG_OBJECT (demux, "creating UDP sources for multicast");
751
752   /* if the destination is not a multicast address, we just want to listen on
753    * our local ports */
754   if (!stream->multicast)
755     destination = "0.0.0.0";
756   else
757     destination = stream->destination;
758
759   /* creating UDP source */
760   if (stream->rtp_port != -1) {
761     GST_DEBUG_OBJECT (demux, "receiving RTP from %s:%d", destination,
762         stream->rtp_port);
763
764     uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtp_port);
765     stream->udpsrc[0] =
766         gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
767     g_free (uri);
768     if (stream->udpsrc[0] == NULL)
769       goto no_element;
770
771     /* take ownership */
772     gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[0]);
773
774     GST_DEBUG_OBJECT (demux,
775         "setting up UDP source with timeout %" G_GINT64_FORMAT,
776         demux->udp_timeout);
777
778     /* configure a timeout on the UDP port. When the timeout message is
779      * posted, we assume UDP transport is not possible. */
780     g_object_set (G_OBJECT (stream->udpsrc[0]), "timeout",
781         demux->udp_timeout * 1000, NULL);
782
783     /* get output pad of the UDP source. */
784     pad = gst_element_get_static_pad (stream->udpsrc[0], "src");
785
786     name = g_strdup_printf ("recv_rtp_sink_%u", stream->id);
787     stream->channelpad[0] =
788         gst_element_request_pad_simple (demux->session, name);
789     g_free (name);
790
791     GST_DEBUG_OBJECT (demux, "connecting RTP source 0 to manager");
792     /* configure for UDP delivery, we need to connect the UDP pads to
793      * the session plugin. */
794     gst_pad_link (pad, stream->channelpad[0]);
795     gst_object_unref (pad);
796
797     /* change state */
798     gst_element_set_state (stream->udpsrc[0], GST_STATE_PAUSED);
799   }
800
801   /* creating another UDP source */
802   if (stream->rtcp_port != -1) {
803     GST_DEBUG_OBJECT (demux, "receiving RTCP from %s:%d", destination,
804         stream->rtcp_port);
805     uri = g_strdup_printf ("udp://%s:%d", destination, stream->rtcp_port);
806     stream->udpsrc[1] =
807         gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
808     g_free (uri);
809     if (stream->udpsrc[1] == NULL)
810       goto no_element;
811
812     /* take ownership */
813     gst_bin_add (GST_BIN_CAST (demux), stream->udpsrc[1]);
814
815     GST_DEBUG_OBJECT (demux, "connecting RTCP source to manager");
816
817     name = g_strdup_printf ("recv_rtcp_sink_%u", stream->id);
818     stream->channelpad[1] =
819         gst_element_request_pad_simple (demux->session, name);
820     g_free (name);
821
822     pad = gst_element_get_static_pad (stream->udpsrc[1], "src");
823     gst_pad_link (pad, stream->channelpad[1]);
824     gst_object_unref (pad);
825
826     gst_element_set_state (stream->udpsrc[1], GST_STATE_PAUSED);
827   }
828   return TRUE;
829
830   /* ERRORS */
831 no_element:
832   {
833     GST_DEBUG_OBJECT (demux, "no UDP source element found");
834     return FALSE;
835   }
836 }
837
838 /* configure the UDP sink back to the server for status reports */
839 static gboolean
840 gst_sdp_demux_stream_configure_udp_sink (GstSDPDemux * demux,
841     GstSDPStream * stream)
842 {
843   GstPad *pad, *sinkpad;
844   gint port;
845   GSocket *socket;
846   gchar *destination, *uri, *name;
847
848   /* get destination and port */
849   port = stream->rtcp_port;
850   destination = stream->destination;
851
852   GST_DEBUG_OBJECT (demux, "configure UDP sink for %s:%d", destination, port);
853
854   uri = g_strdup_printf ("udp://%s:%d", destination, port);
855   stream->udpsink = gst_element_make_from_uri (GST_URI_SINK, uri, NULL, NULL);
856   g_free (uri);
857   if (stream->udpsink == NULL)
858     goto no_sink_element;
859
860   /* we clear all destinations because we don't really know where to send the
861    * RTCP to and we want to avoid sending it to our own ports.
862    * FIXME when we get an RTCP packet from the sender, we could look at its
863    * source port and address and try to send RTCP there. */
864   if (!stream->multicast)
865     g_signal_emit_by_name (stream->udpsink, "clear");
866
867   g_object_set (G_OBJECT (stream->udpsink), "auto-multicast", FALSE, NULL);
868   g_object_set (G_OBJECT (stream->udpsink), "loop", FALSE, NULL);
869   /* no sync needed */
870   g_object_set (G_OBJECT (stream->udpsink), "sync", FALSE, NULL);
871   /* no async state changes needed */
872   g_object_set (G_OBJECT (stream->udpsink), "async", FALSE, NULL);
873
874   if (stream->udpsrc[1]) {
875     /* configure socket, we give it the same UDP socket as the udpsrc for RTCP
876      * because some servers check the port number of where it sends RTCP to identify
877      * the RTCP packets it receives */
878     g_object_get (G_OBJECT (stream->udpsrc[1]), "used_socket", &socket, NULL);
879     GST_DEBUG_OBJECT (demux, "UDP src has socket %p", socket);
880     /* configure socket and make sure udpsink does not close it when shutting
881      * down, it belongs to udpsrc after all. */
882     g_object_set (G_OBJECT (stream->udpsink), "socket", socket, NULL);
883     g_object_set (G_OBJECT (stream->udpsink), "close-socket", FALSE, NULL);
884     g_object_unref (socket);
885   }
886
887   /* we keep this playing always */
888   gst_element_set_locked_state (stream->udpsink, TRUE);
889   gst_element_set_state (stream->udpsink, GST_STATE_PLAYING);
890
891   gst_bin_add (GST_BIN_CAST (demux), stream->udpsink);
892
893   /* get session RTCP pad */
894   name = g_strdup_printf ("send_rtcp_src_%u", stream->id);
895   pad = gst_element_request_pad_simple (demux->session, name);
896   g_free (name);
897
898   /* and link */
899   if (pad) {
900     sinkpad = gst_element_get_static_pad (stream->udpsink, "sink");
901     gst_pad_link (pad, sinkpad);
902     gst_object_unref (pad);
903     gst_object_unref (sinkpad);
904   } else {
905     /* not very fatal, we just won't be able to send RTCP */
906     GST_WARNING_OBJECT (demux, "could not get session RTCP pad");
907   }
908
909
910   return TRUE;
911
912   /* ERRORS */
913 no_sink_element:
914   {
915     GST_DEBUG_OBJECT (demux, "no UDP sink element found");
916     return FALSE;
917   }
918 }
919
920 static GstFlowReturn
921 gst_sdp_demux_combine_flows (GstSDPDemux * demux, GstSDPStream * stream,
922     GstFlowReturn ret)
923 {
924   GList *streams;
925
926   /* store the value */
927   stream->last_ret = ret;
928
929   /* if it's success we can return the value right away */
930   if (ret == GST_FLOW_OK)
931     goto done;
932
933   /* any other error that is not-linked can be returned right
934    * away */
935   if (ret != GST_FLOW_NOT_LINKED)
936     goto done;
937
938   /* only return NOT_LINKED if all other pads returned NOT_LINKED */
939   for (streams = demux->streams; streams; streams = g_list_next (streams)) {
940     GstSDPStream *ostream = (GstSDPStream *) streams->data;
941
942     ret = ostream->last_ret;
943     /* some other return value (must be SUCCESS but we can return
944      * other values as well) */
945     if (ret != GST_FLOW_NOT_LINKED)
946       goto done;
947   }
948   /* if we get here, all other pads were unlinked and we return
949    * NOT_LINKED then */
950 done:
951   return ret;
952 }
953
954 static void
955 gst_sdp_demux_stream_push_event (GstSDPDemux * demux, GstSDPStream * stream,
956     GstEvent * event)
957 {
958   /* only streams that have a connection to the outside world */
959   if (stream->srcpad == NULL)
960     goto done;
961
962   if (stream->channelpad[0]) {
963     gst_event_ref (event);
964     gst_pad_send_event (stream->channelpad[0], event);
965   }
966
967   if (stream->channelpad[1]) {
968     gst_event_ref (event);
969     gst_pad_send_event (stream->channelpad[1], event);
970   }
971
972 done:
973   gst_event_unref (event);
974 }
975
976 static void
977 gst_sdp_demux_handle_message (GstBin * bin, GstMessage * message)
978 {
979   GstSDPDemux *demux;
980
981   demux = GST_SDP_DEMUX (bin);
982
983   switch (GST_MESSAGE_TYPE (message)) {
984     case GST_MESSAGE_ELEMENT:
985     {
986       const GstStructure *s = gst_message_get_structure (message);
987
988       if (gst_structure_has_name (s, "GstUDPSrcTimeout")) {
989         gboolean ignore_timeout;
990
991         GST_DEBUG_OBJECT (bin, "timeout on UDP port");
992
993         GST_OBJECT_LOCK (demux);
994         ignore_timeout = demux->ignore_timeout;
995         demux->ignore_timeout = TRUE;
996         GST_OBJECT_UNLOCK (demux);
997
998         /* we only act on the first udp timeout message, others are irrelevant
999          * and can be ignored. */
1000         if (ignore_timeout)
1001           gst_message_unref (message);
1002         else {
1003           GST_ELEMENT_ERROR (demux, RESOURCE, READ, (NULL),
1004               ("Could not receive any UDP packets for %.4f seconds, maybe your "
1005                   "firewall is blocking it.",
1006                   gst_guint64_to_gdouble (demux->udp_timeout / 1000000.0)));
1007         }
1008         return;
1009       }
1010       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1011       break;
1012     }
1013     case GST_MESSAGE_ERROR:
1014     {
1015       GstObject *udpsrc;
1016       GstSDPStream *stream;
1017       GstFlowReturn ret;
1018
1019       udpsrc = GST_MESSAGE_SRC (message);
1020
1021       GST_DEBUG_OBJECT (demux, "got error from %s", GST_ELEMENT_NAME (udpsrc));
1022
1023       stream = find_stream (demux, udpsrc, (gpointer) find_stream_by_udpsrc);
1024       /* fatal but not our message, forward */
1025       if (!stream)
1026         goto forward;
1027
1028       /* we ignore the RTCP udpsrc */
1029       if (stream->udpsrc[1] == GST_ELEMENT_CAST (udpsrc))
1030         goto done;
1031
1032       /* if we get error messages from the udp sources, that's not a problem as
1033        * long as not all of them error out. We also don't really know what the
1034        * problem is, the message does not give enough detail... */
1035       ret = gst_sdp_demux_combine_flows (demux, stream, GST_FLOW_NOT_LINKED);
1036       GST_DEBUG_OBJECT (demux, "combined flows: %s", gst_flow_get_name (ret));
1037       if (ret != GST_FLOW_OK)
1038         goto forward;
1039
1040     done:
1041       gst_message_unref (message);
1042       break;
1043
1044     forward:
1045       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1046       break;
1047     }
1048     default:
1049     {
1050       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1051       break;
1052     }
1053   }
1054 }
1055
1056 static gboolean
1057 gst_sdp_demux_start (GstSDPDemux * demux)
1058 {
1059   guint8 *data = NULL;
1060   guint size;
1061   gint i, n_streams;
1062   GstSDPMessage sdp = { 0 };
1063   GstSDPStream *stream = NULL;
1064   GList *walk;
1065   gchar *uri = NULL;
1066   GstStateChangeReturn ret;
1067
1068   /* grab the lock so that no state change can interfere */
1069   GST_SDP_STREAM_LOCK (demux);
1070
1071   GST_DEBUG_OBJECT (demux, "parse SDP...");
1072
1073   size = gst_adapter_available (demux->adapter);
1074   if (size == 0)
1075     goto no_data;
1076
1077   data = gst_adapter_take (demux->adapter, size);
1078
1079   gst_sdp_message_init (&sdp);
1080   if (gst_sdp_message_parse_buffer (data, size, &sdp) != GST_SDP_OK)
1081     goto could_not_parse;
1082
1083   if (demux->debug)
1084     gst_sdp_message_dump (&sdp);
1085
1086   /* maybe this is plain RTSP DESCRIBE rtsp and we should redirect */
1087   /* look for rtsp control url */
1088   {
1089     const gchar *control;
1090
1091     for (i = 0;; i++) {
1092       control = gst_sdp_message_get_attribute_val_n (&sdp, "control", i);
1093       if (control == NULL)
1094         break;
1095
1096       /* only take fully qualified urls */
1097       if (g_str_has_prefix (control, "rtsp://"))
1098         break;
1099     }
1100     if (!control) {
1101       gint idx;
1102
1103       /* try to find non-aggragate control */
1104       n_streams = gst_sdp_message_medias_len (&sdp);
1105
1106       for (idx = 0; idx < n_streams; idx++) {
1107         const GstSDPMedia *media;
1108
1109         /* get media, should not return NULL */
1110         media = gst_sdp_message_get_media (&sdp, idx);
1111         if (media == NULL)
1112           break;
1113
1114         for (i = 0;; i++) {
1115           control = gst_sdp_media_get_attribute_val_n (media, "control", i);
1116           if (control == NULL)
1117             break;
1118
1119           /* only take fully qualified urls */
1120           if (g_str_has_prefix (control, "rtsp://"))
1121             break;
1122         }
1123         /* this media has no control, exit */
1124         if (!control)
1125           break;
1126       }
1127     }
1128
1129     if (control) {
1130       /* we have RTSP now */
1131       uri = gst_sdp_message_as_uri ("rtsp-sdp", &sdp);
1132
1133       if (demux->redirect) {
1134         GST_INFO_OBJECT (demux, "redirect to %s", uri);
1135
1136         gst_element_post_message (GST_ELEMENT_CAST (demux),
1137             gst_message_new_element (GST_OBJECT_CAST (demux),
1138                 gst_structure_new ("redirect",
1139                     "new-location", G_TYPE_STRING, uri, NULL)));
1140         goto sent_redirect;
1141       }
1142     }
1143   }
1144
1145   /* we get here when we didn't do a redirect */
1146
1147   /* try to get and configure a manager */
1148   if (!gst_sdp_demux_configure_manager (demux, uri))
1149     goto no_manager;
1150   if (!uri) {
1151     /* create streams with UDP sources and sinks */
1152     n_streams = gst_sdp_message_medias_len (&sdp);
1153     for (i = 0; i < n_streams; i++) {
1154       stream = gst_sdp_demux_create_stream (demux, &sdp, i);
1155
1156       if (!stream)
1157         continue;
1158
1159       GST_DEBUG_OBJECT (demux, "configuring transport for stream %p", stream);
1160
1161       if (!gst_sdp_demux_stream_configure_udp (demux, stream))
1162         goto transport_failed;
1163       if (!gst_sdp_demux_stream_configure_udp_sink (demux, stream))
1164         goto transport_failed;
1165     }
1166
1167     if (!demux->streams)
1168       goto no_streams;
1169   }
1170
1171   /* set target state on session manager */
1172   /* setting rtspsrc to PLAYING may cause it to loose it that target state
1173    * along the way due to no-preroll udpsrc elements, so ...
1174    * do it in two stages here (similar to other elements) */
1175   if (demux->target > GST_STATE_PAUSED) {
1176     ret = gst_element_set_state (demux->session, GST_STATE_PAUSED);
1177     if (ret == GST_STATE_CHANGE_FAILURE)
1178       goto start_session_failure;
1179   }
1180   ret = gst_element_set_state (demux->session, demux->target);
1181   if (ret == GST_STATE_CHANGE_FAILURE)
1182     goto start_session_failure;
1183
1184   if (!uri) {
1185     /* activate all streams */
1186     for (walk = demux->streams; walk; walk = g_list_next (walk)) {
1187       stream = (GstSDPStream *) walk->data;
1188
1189       /* configure target state on udp sources */
1190       gst_element_set_state (stream->udpsrc[0], demux->target);
1191       gst_element_set_state (stream->udpsrc[1], demux->target);
1192     }
1193   }
1194   GST_SDP_STREAM_UNLOCK (demux);
1195   gst_sdp_message_uninit (&sdp);
1196   g_free (data);
1197
1198   return TRUE;
1199
1200   /* ERRORS */
1201 done:
1202   {
1203     GST_SDP_STREAM_UNLOCK (demux);
1204     gst_sdp_message_uninit (&sdp);
1205     g_free (data);
1206     return FALSE;
1207   }
1208 transport_failed:
1209   {
1210     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1211         ("Could not create RTP stream transport."));
1212     goto done;
1213   }
1214 no_manager:
1215   {
1216     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1217         ("Could not create RTP session manager."));
1218     goto done;
1219   }
1220 no_data:
1221   {
1222     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1223         ("Empty SDP message."));
1224     goto done;
1225   }
1226 could_not_parse:
1227   {
1228     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1229         ("Could not parse SDP message."));
1230     goto done;
1231   }
1232 no_streams:
1233   {
1234     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1235         ("No streams in SDP message."));
1236     goto done;
1237   }
1238 sent_redirect:
1239   {
1240     /* avoid hanging if redirect not handled */
1241     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1242         ("Sent RTSP redirect."));
1243     goto done;
1244   }
1245 start_session_failure:
1246   {
1247     GST_ELEMENT_ERROR (demux, STREAM, TYPE_NOT_FOUND, (NULL),
1248         ("Could not start RTP session manager."));
1249     gst_element_set_state (demux->session, GST_STATE_NULL);
1250     gst_bin_remove (GST_BIN_CAST (demux), demux->session);
1251     demux->session = NULL;
1252     goto done;
1253   }
1254 }
1255
1256 static gboolean
1257 gst_sdp_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
1258 {
1259   GstSDPDemux *demux;
1260   gboolean res = TRUE;
1261
1262   demux = GST_SDP_DEMUX (parent);
1263
1264   switch (GST_EVENT_TYPE (event)) {
1265     case GST_EVENT_EOS:
1266       /* when we get EOS, start parsing the SDP */
1267       res = gst_sdp_demux_start (demux);
1268       gst_event_unref (event);
1269       break;
1270     default:
1271       gst_event_unref (event);
1272       break;
1273   }
1274
1275   return res;
1276 }
1277
1278 static GstFlowReturn
1279 gst_sdp_demux_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1280 {
1281   GstSDPDemux *demux;
1282
1283   demux = GST_SDP_DEMUX (parent);
1284
1285   /* push the SDP message in an adapter, we start doing something with it when
1286    * we receive EOS */
1287   gst_adapter_push (demux->adapter, buffer);
1288
1289   return GST_FLOW_OK;
1290 }
1291
1292 static GstStateChangeReturn
1293 gst_sdp_demux_change_state (GstElement * element, GstStateChange transition)
1294 {
1295   GstSDPDemux *demux;
1296   GstStateChangeReturn ret;
1297
1298   demux = GST_SDP_DEMUX (element);
1299
1300   GST_SDP_STREAM_LOCK (demux);
1301
1302   switch (transition) {
1303     case GST_STATE_CHANGE_NULL_TO_READY:
1304       break;
1305     case GST_STATE_CHANGE_READY_TO_PAUSED:
1306       /* first attempt, don't ignore timeouts */
1307       gst_adapter_clear (demux->adapter);
1308       demux->ignore_timeout = FALSE;
1309       demux->target = GST_STATE_PAUSED;
1310       break;
1311     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1312       demux->target = GST_STATE_PLAYING;
1313       break;
1314     default:
1315       break;
1316   }
1317
1318   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1319   if (ret == GST_STATE_CHANGE_FAILURE)
1320     goto done;
1321
1322   switch (transition) {
1323     case GST_STATE_CHANGE_READY_TO_PAUSED:
1324       ret = GST_STATE_CHANGE_NO_PREROLL;
1325       break;
1326     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1327       ret = GST_STATE_CHANGE_NO_PREROLL;
1328       demux->target = GST_STATE_PAUSED;
1329       break;
1330     case GST_STATE_CHANGE_PAUSED_TO_READY:
1331       gst_sdp_demux_cleanup (demux);
1332       break;
1333     case GST_STATE_CHANGE_READY_TO_NULL:
1334       break;
1335     default:
1336       break;
1337   }
1338
1339 done:
1340   GST_SDP_STREAM_UNLOCK (demux);
1341
1342   return ret;
1343 }