0832e7b660ce64919c2630b6aaf213db5debe3a2
[platform/upstream/gstreamer.git] / ext / dash / gstdashdemux.c
1 /*
2  * DASH demux plugin for GStreamer
3  *
4  * gstdashdemux.c
5  *
6  * Copyright (C) 2012 Orange
7  * 
8  * Authors:
9  *   David Corvoysier <david.corvoysier@orange.com>
10  *   Hamid Zakari <hamid.zakari@gmail.com>
11  *
12  * This library is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU Library General Public
14  * License as published by the Free Software Foundation; either
15  * version 2.1 of the License, or (at your option) any later version.
16  *
17  * This library is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20  * Library General Public License for more details.
21  *
22  * You should have received a copy of the GNU Library General Public
23  * License along with this library (COPYING); if not, write to the
24  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
25  * Boston, MA 02111-1307, USA.
26  */
27 /**
28  * SECTION:element-dashdemux
29  *
30  * DASH demuxer element.
31  * <title>Example launch line</title>
32  * |[
33  * gst-launch playbin2 uri="http://www-itec.uni-klu.ac.at/ftp/datasets/mmsys12/RedBullPlayStreets/redbull_4s/RedBullPlayStreets_4s_isoffmain_DIS_23009_1_v_2_1c2_2011_08_30.mpd"
34  * ]|
35  */
36
37 /* Implementation notes:
38  * 
39  * The following section describes how dashdemux works internally.
40  * 
41  * Introduction:
42  * 
43  * dashdemux is a "fake" demux, as unlike traditional demux elements, it
44  * doesn't split data streams contained in an enveloppe to expose them
45  * to downstream decoding elements.
46  * 
47  * Instead, it parses an XML file called a manifest to identify a set of
48  * individual stream fragments it needs to fetch and expose to the actual
49  * demux elements that will handle them (this behavior is sometimes 
50  * referred as the "demux after a demux" scenario).
51  * 
52  * For a given section of content, several representations corresponding
53  * to different bitrates may be available: dashdemux will select the most
54  * appropriate representation based on local conditions (typically the 
55  * available bandwidth and the amount of buffering available, capped by
56  * a maximum allowed bitrate). 
57  * 
58  * The representation selection algorithm can be configured using
59  * specific properties: max bitrate, min/max buffering, bandwidth ratio.
60  * 
61  * 
62  * General Design:
63  * 
64  * dashdemux has a single sink pad that accepts the data corresponding 
65  * to the manifest, typically fetched from an HTTP or file source.
66  * 
67  * dashdemux exposes the streams it recreates based on the fragments it
68  * fetches through dedicated src pads corresponding to the caps of the
69  * fragments container (ISOBMFF/MP4 or MPEG2TS).
70  * 
71  * During playback, new representations will typically be exposed as a
72  * new set of pads (see 'Switching between representations' below).
73  * 
74  * Fragments downloading is performed using a dedicated task that fills
75  * an internal queue. Another task is in charge of popping fragments
76  * from the queue and pushing them downstream.
77  * 
78  * Switching between representations:
79  * 
80  * Decodebin supports scenarios allowing to seamlessly switch from one 
81  * stream to another inside the same "decoding chain".
82  * 
83  * To achieve that, it combines the elements it autoplugged in chains
84  *  and groups, allowing only one decoding group to be active at a given
85  * time for a given chain.
86  *
87  * A chain can signal decodebin that it is complete by sending a 
88  * no-more-pads event, but even after that new pads can be added to
89  * create new subgroups, providing that a new no-more-pads event is sent.
90  *
91  * We take advantage of that to dynamically create a new decoding group
92  * in order to select a different representation during playback.
93  *
94  * Typically, assuming that each fragment contains both audio and video,
95  * the following tree would be created:
96  * 
97  * chain "DASH Demux"
98  * |_ group "Representation set 1"
99  * |   |_ chain "Qt Demux 0"
100  * |       |_ group "Stream 0"
101  * |           |_ chain "H264"
102  * |           |_ chain "AAC"
103  * |_ group "Representation set 2"
104  *     |_ chain "Qt Demux 1"
105  *         |_ group "Stream 1"
106  *             |_ chain "H264"
107  *             |_ chain "AAC"
108  *
109  * Or, if audio and video are contained in separate fragments:
110  *
111  * chain "DASH Demux"
112  * |_ group "Representation set 1"
113  * |   |_ chain "Qt Demux 0"
114  * |   |   |_ group "Stream 0"
115  * |   |       |_ chain "H264"
116  * |   |_ chain "Qt Demux 1"
117  * |       |_ group "Stream 1"
118  * |           |_ chain "AAC" 
119  * |_ group "Representation set 2"
120  *     |_ chain "Qt Demux 3"
121  *     |   |_ group "Stream 2"
122  *     |       |_ chain "H264"
123  *     |_ chain "Qt Demux 4"
124  *         |_ group "Stream 3"
125  *             |_ chain "AAC" 
126  *
127  * In both cases, when switching from Set 1 to Set 2 an EOS is sent on
128  * each end pad corresponding to Rep 0, triggering the "drain" state to
129  * propagate upstream.
130  * Once both EOS have been processed, the "Set 1" group is completely
131  * drained, and decodebin2 will switch to the "Set 2" group.
132  * 
133  * Note: nothing can be pushed to the new decoding group before the 
134  * old one has been drained, which means that in order to be able to 
135  * adapt quickly to bandwidth changes, we will not be able to rely
136  * on downstream buffering, and will instead manage an internal queue.
137  * 
138  */
139
140 #ifdef HAVE_CONFIG_H
141 #  include "config.h"
142 #endif
143
144 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
145  * with newer GLib versions (>= 2.31.0) */
146 #define GLIB_DISABLE_DEPRECATION_WARNINGS
147
148 #include <string.h>
149 #include <gst/base/gsttypefindhelper.h>
150 #include "gstdashdemux.h"
151
152 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d",
153     GST_PAD_SRC,
154     GST_PAD_SOMETIMES,
155     GST_STATIC_CAPS_ANY);
156
157 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
158     GST_PAD_SINK,
159     GST_PAD_ALWAYS,
160     GST_STATIC_CAPS ("application/dash+xml"));
161
162 GST_DEBUG_CATEGORY_STATIC (gst_dash_demux_debug);
163 #define GST_CAT_DEFAULT gst_dash_demux_debug
164
165 enum
166 {
167   PROP_0,
168
169   PROP_MIN_BUFFERING_TIME,
170   PROP_MAX_BUFFERING_TIME,
171   PROP_BANDWIDTH_USAGE,
172   PROP_MAX_BITRATE,
173   PROP_LAST
174 };
175
176 /* Default values for properties */
177 #define DEFAULT_MIN_BUFFERING_TIME      5       /* in seconds */
178 #define DEFAULT_MAX_BUFFERING_TIME      30      /* in seconds */
179 #define DEFAULT_BANDWIDTH_USAGE         0.8     /* 0 to 1     */
180 #define DEFAULT_MAX_BITRATE     24000000        /* in Mbit/s  */
181
182 #define DEFAULT_FAILED_COUNT 3
183
184
185 /* GObject */
186 static void gst_dash_demux_set_property (GObject * object, guint prop_id,
187     const GValue * value, GParamSpec * pspec);
188 static void gst_dash_demux_get_property (GObject * object, guint prop_id,
189     GValue * value, GParamSpec * pspec);
190 static void gst_dash_demux_dispose (GObject * obj);
191
192 /* GstElement */
193 static GstStateChangeReturn
194 gst_dash_demux_change_state (GstElement * element, GstStateChange transition);
195
196 /* GstDashDemux */
197 static GstFlowReturn gst_dash_demux_pad (GstPad * pad, GstBuffer * buf);
198 static gboolean gst_dash_demux_sink_event (GstPad * pad, GstEvent * event);
199 static gboolean gst_dash_demux_src_event (GstPad * pad, GstEvent * event);
200 static gboolean gst_dash_demux_src_query (GstPad * pad, GstQuery * query);
201 static void gst_dash_demux_stream_loop (GstDashDemux * demux);
202 static void gst_dash_demux_download_loop (GstDashDemux * demux);
203 static void gst_dash_demux_stop (GstDashDemux * demux);
204 static void gst_dash_demux_pause_stream_task (GstDashDemux * demux);
205 static void gst_dash_demux_resume_stream_task (GstDashDemux * demux);
206 static void gst_dash_demux_resume_download_task (GstDashDemux * demux);
207 static gboolean gst_dash_demux_select_representations (GstDashDemux * demux,
208     guint64 current_bitrate);
209 static gboolean gst_dash_demux_get_next_fragment_set (GstDashDemux * demux);
210
211 static void gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose);
212 static GstClockTime gst_dash_demux_get_buffering_time (GstDashDemux * demux);
213 static float gst_dash_demux_get_buffering_ratio (GstDashDemux * demux);
214
215 static void
216 _do_init (GType type)
217 {
218   GST_DEBUG_CATEGORY_INIT (gst_dash_demux_debug, "dashdemux", 0,
219       "dashdemux element");
220 }
221
222 GST_BOILERPLATE_FULL (GstDashDemux, gst_dash_demux, GstElement,
223     GST_TYPE_ELEMENT, _do_init);
224
225 static void
226 gst_dash_demux_base_init (gpointer g_class)
227 {
228   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
229
230   gst_element_class_add_static_pad_template (element_class, &srctemplate);
231
232   gst_element_class_add_static_pad_template (element_class, &sinktemplate);
233
234   gst_element_class_set_details_simple (element_class,
235       "DASH Demuxer",
236       "Codec/Demuxer",
237       "Dynamic Adaptive Streaming over HTTP demuxer",
238       "David Corvoysier <david.corvoysier@orange.com>\n\
239                 Hamid Zakari <hamid.zakari@gmail.com>\n\
240                 Gianluca Gennari <gennarone@gmail.com>");
241 }
242
243 static void
244 gst_dash_demux_dispose (GObject * obj)
245 {
246   GstDashDemux *demux = GST_DASH_DEMUX (obj);
247
248   if (demux->stream_task) {
249     if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
250       GST_DEBUG_OBJECT (demux, "Leaving streaming task");
251       gst_task_stop (demux->stream_task);
252       gst_task_join (demux->stream_task);
253     }
254     gst_object_unref (demux->stream_task);
255     g_static_rec_mutex_free (&demux->stream_lock);
256     demux->stream_task = NULL;
257   }
258
259   if (demux->download_task) {
260     if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
261       GST_DEBUG_OBJECT (demux, "Leaving download task");
262       gst_task_stop (demux->download_task);
263       gst_task_join (demux->download_task);
264     }
265     gst_object_unref (demux->download_task);
266     g_static_rec_mutex_free (&demux->download_lock);
267     demux->download_task = NULL;
268   }
269
270   if (demux->downloader != NULL) {
271     g_object_unref (demux->downloader);
272     demux->downloader = NULL;
273   }
274
275   gst_dash_demux_reset (demux, TRUE);
276
277   g_queue_free (demux->queue);
278
279   G_OBJECT_CLASS (parent_class)->dispose (obj);
280 }
281
282 static void
283 gst_dash_demux_class_init (GstDashDemuxClass * klass)
284 {
285   GObjectClass *gobject_class;
286   GstElementClass *gstelement_class;
287
288   gobject_class = (GObjectClass *) klass;
289   gstelement_class = (GstElementClass *) klass;
290
291   gobject_class->set_property = gst_dash_demux_set_property;
292   gobject_class->get_property = gst_dash_demux_get_property;
293   gobject_class->dispose = gst_dash_demux_dispose;
294
295   g_object_class_install_property (gobject_class, PROP_MIN_BUFFERING_TIME,
296       g_param_spec_uint ("min-buffering-time", "Minimum buffering time",
297           "Minimum number of seconds of buffer accumulated before playback",
298           1, G_MAXUINT, DEFAULT_MIN_BUFFERING_TIME,
299           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300
301   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
302       g_param_spec_uint ("max-buffering-time", "Maximum buffering time",
303           "Maximum number of seconds of buffer accumulated during playback",
304           2, G_MAXUINT, DEFAULT_MAX_BUFFERING_TIME,
305           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
306
307   g_object_class_install_property (gobject_class, PROP_BANDWIDTH_USAGE,
308       g_param_spec_float ("bandwidth-usage",
309           "Bandwidth usage [0..1]",
310           "Percentage of the available bandwidth to use when selecting representations",
311           0, 1, DEFAULT_BANDWIDTH_USAGE,
312           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
313
314   g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
315       g_param_spec_uint ("max-bitrate", "Max bitrate",
316           "Max of bitrate supported by target decoder",
317           1000, G_MAXUINT, DEFAULT_MAX_BITRATE,
318           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
319
320   gstelement_class->change_state =
321       GST_DEBUG_FUNCPTR (gst_dash_demux_change_state);
322 }
323
324 static void
325 gst_dash_demux_init (GstDashDemux * demux, GstDashDemuxClass * klass)
326 {
327   /* sink pad */
328   demux->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
329   gst_pad_set_chain_function (demux->sinkpad,
330       GST_DEBUG_FUNCPTR (gst_dash_demux_pad));
331   gst_pad_set_event_function (demux->sinkpad,
332       GST_DEBUG_FUNCPTR (gst_dash_demux_sink_event));
333   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
334
335   /* Downloader */
336   demux->downloader = gst_uri_downloader_new ();
337
338   /* Properties */
339   demux->min_buffering_time = DEFAULT_MIN_BUFFERING_TIME * GST_SECOND;
340   demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME * GST_SECOND;
341   demux->bandwidth_usage = DEFAULT_BANDWIDTH_USAGE;
342   demux->max_bitrate = DEFAULT_MAX_BITRATE;
343
344   demux->queue = g_queue_new ();
345   /* Updates task */
346   g_static_rec_mutex_init (&demux->download_lock);
347   demux->download_task =
348       gst_task_create ((GstTaskFunction) gst_dash_demux_download_loop, demux);
349   gst_task_set_lock (demux->download_task, &demux->download_lock);
350   demux->download_timed_lock = g_mutex_new ();
351
352   /* Streaming task */
353   g_static_rec_mutex_init (&demux->stream_lock);
354   demux->stream_task =
355       gst_task_create ((GstTaskFunction) gst_dash_demux_stream_loop, demux);
356   gst_task_set_lock (demux->stream_task, &demux->stream_lock);
357   demux->stream_timed_lock = g_mutex_new ();
358 }
359
360 static void
361 gst_dash_demux_set_property (GObject * object, guint prop_id,
362     const GValue * value, GParamSpec * pspec)
363 {
364   GstDashDemux *demux = GST_DASH_DEMUX (object);
365
366   switch (prop_id) {
367     case PROP_MIN_BUFFERING_TIME:
368       demux->min_buffering_time = g_value_get_uint (value) * GST_SECOND;
369       break;
370     case PROP_MAX_BUFFERING_TIME:
371       demux->max_buffering_time = g_value_get_uint (value) * GST_SECOND;
372       break;
373     case PROP_BANDWIDTH_USAGE:
374       demux->bandwidth_usage = g_value_get_float (value);
375       break;
376     case PROP_MAX_BITRATE:
377       demux->max_bitrate = g_value_get_uint (value);
378       break;
379     default:
380       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
381       break;
382   }
383 }
384
385 static void
386 gst_dash_demux_get_property (GObject * object, guint prop_id, GValue * value,
387     GParamSpec * pspec)
388 {
389   GstDashDemux *demux = GST_DASH_DEMUX (object);
390
391   switch (prop_id) {
392     case PROP_MIN_BUFFERING_TIME:
393       g_value_set_uint (value, demux->min_buffering_time);
394       demux->min_buffering_time *= GST_SECOND;
395       break;
396     case PROP_MAX_BUFFERING_TIME:
397       g_value_set_uint (value, demux->max_buffering_time);
398       demux->max_buffering_time *= GST_SECOND;
399       break;
400     case PROP_BANDWIDTH_USAGE:
401       g_value_set_float (value, demux->bandwidth_usage);
402       break;
403     case PROP_MAX_BITRATE:
404       g_value_set_uint (value, demux->max_bitrate);
405       break;
406     default:
407       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
408       break;
409   }
410 }
411
412 static GstStateChangeReturn
413 gst_dash_demux_change_state (GstElement * element, GstStateChange transition)
414 {
415   GstStateChangeReturn ret;
416   GstDashDemux *demux = GST_DASH_DEMUX (element);
417
418   switch (transition) {
419     case GST_STATE_CHANGE_READY_TO_PAUSED:
420       gst_dash_demux_reset (demux, FALSE);
421       break;
422     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
423       /* Start the streaming loop in paused only if we already received
424          the manifest. It might have been stopped if we were in PAUSED
425          state and we filled our queue with enough cached fragments
426        */
427       if (gst_mpdparser_get_baseURL (demux->client) != NULL)
428         gst_dash_demux_resume_stream_task (demux);
429       break;
430     default:
431       break;
432   }
433
434   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
435
436   switch (transition) {
437     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
438       gst_dash_demux_pause_stream_task (demux);
439       break;
440     case GST_STATE_CHANGE_PAUSED_TO_READY:
441       demux->cancelled = TRUE;
442       gst_dash_demux_stop (demux);
443       gst_task_join (demux->stream_task);
444       gst_task_join (demux->download_task);
445       break;
446     default:
447       break;
448   }
449   return ret;
450 }
451
452 void
453 gst_dash_demux_clear_queue (GstDashDemux * demux)
454 {
455   while (!g_queue_is_empty (demux->queue)) {
456     GList *listfragment = g_queue_pop_head (demux->queue);
457     guint j = 0;
458     while (j < g_list_length (listfragment)) {
459       GstFragment *fragment = g_list_nth_data (listfragment, j);
460       g_object_unref (fragment);
461       j++;
462     }
463     g_list_free (listfragment);
464   }
465   g_queue_clear (demux->queue);
466 }
467
468 static gboolean
469 gst_dash_demux_src_event (GstPad * pad, GstEvent * event)
470 {
471   GstDashDemux *demux;
472
473   demux = GST_DASH_DEMUX (gst_pad_get_element_private (pad));
474
475   switch (event->type) {
476     case GST_EVENT_SEEK:
477     {
478       gdouble rate;
479       GstFormat format;
480       GstSeekFlags flags;
481       GstSeekType start_type, stop_type;
482       gint64 start, stop;
483       GList *walk;
484       GstClockTime current_pos, target_pos;
485       gint current_sequence;
486       GstActiveStream *stream;
487       GstMediaSegment *chunk;
488       guint nb_active_stream;
489       guint stream_idx;
490
491       if (gst_mpd_client_is_live (demux->client)) {
492         GST_WARNING_OBJECT (demux, "Received seek event for live stream");
493         return FALSE;
494       }
495
496       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
497           &stop_type, &stop);
498
499       if (format != GST_FORMAT_TIME)
500         return FALSE;
501
502       GST_DEBUG_OBJECT (demux,
503           "seek event, rate: %f type: %d start: %" GST_TIME_FORMAT " stop: %"
504           GST_TIME_FORMAT, rate, start_type, GST_TIME_ARGS (start),
505           GST_TIME_ARGS (stop));
506
507       GST_MPD_CLIENT_LOCK (demux->client);
508       stream = gst_mpdparser_get_active_stream_by_index (demux->client, 0);
509
510       current_pos = 0;
511       target_pos = (GstClockTime) start;
512       for (walk = stream->segments; walk; walk = walk->next) {
513         chunk = walk->data;
514         current_sequence = chunk->number;
515         if (current_pos <= target_pos
516             && target_pos < current_pos + chunk->duration) {
517           break;
518         }
519         current_pos += chunk->duration;
520       }
521       GST_MPD_CLIENT_UNLOCK (demux->client);
522
523       if (walk == NULL) {
524         GST_WARNING_OBJECT (demux, "Could not find seeked fragment");
525         return FALSE;
526       }
527
528       /* We can actually perform the seek */
529       nb_active_stream = gst_mpdparser_get_nb_active_stream (demux->client);
530
531       if (flags & GST_SEEK_FLAG_FLUSH) {
532         GST_DEBUG_OBJECT (demux, "sending flush start");
533         stream_idx = 0;
534         while (stream_idx < nb_active_stream) {
535           gst_pad_push_event (demux->srcpad[stream_idx],
536               gst_event_new_flush_start ());
537           stream_idx++;
538         }
539       }
540
541       /* Stop the demux */
542       demux->cancelled = TRUE;
543       gst_dash_demux_stop (demux);
544
545       /* Wait for streaming to finish */
546       g_static_rec_mutex_lock (&demux->stream_lock);
547
548       /* Clear the buffering queue */
549       /* FIXME: allow seeking in the buffering queue */
550       gst_dash_demux_clear_queue (demux);
551
552       GST_MPD_CLIENT_LOCK (demux->client);
553       GST_DEBUG_OBJECT (demux, "Seeking to sequence %d", current_sequence);
554       stream_idx = 0;
555       /* Update the current sequence on all streams */
556       while (stream_idx < nb_active_stream) {
557         stream =
558             gst_mpdparser_get_active_stream_by_index (demux->client,
559             stream_idx);
560         /* FIXME: we should'nt fiddle with stream internals like that */
561         stream->segment_idx = current_sequence;
562         stream_idx++;
563       }
564       /* Calculate offset in the next fragment */
565       gst_mpd_client_get_current_position (demux->client, &demux->position);
566       demux->position_shift = start - demux->position;
567       demux->need_segment = TRUE;
568       GST_MPD_CLIENT_UNLOCK (demux->client);
569
570
571       if (flags & GST_SEEK_FLAG_FLUSH) {
572         GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
573         stream_idx = 0;
574         while (stream_idx < nb_active_stream) {
575           gst_pad_push_event (demux->srcpad[stream_idx],
576               gst_event_new_flush_stop ());
577           stream_idx++;
578         }
579       }
580
581       /* Restart the demux */
582       demux->cancelled = FALSE;
583       gst_dash_demux_resume_download_task (demux);
584       gst_dash_demux_resume_stream_task (demux);
585       g_static_rec_mutex_unlock (&demux->stream_lock);
586
587       return TRUE;
588     }
589     default:
590       break;
591   }
592
593   return gst_pad_event_default (pad, event);
594 }
595
596 static gboolean
597 gst_dash_demux_sink_event (GstPad * pad, GstEvent * event)
598 {
599   GstDashDemux *demux = GST_DASH_DEMUX (gst_pad_get_parent (pad));
600
601   switch (event->type) {
602     case GST_EVENT_EOS:{
603       gchar *manifest;
604       GstQuery *query;
605       gboolean res;
606
607       if (demux->manifest == NULL) {
608         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
609         break;
610       }
611
612       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
613
614       if (demux->client)
615         gst_mpd_client_free (demux->client);
616       demux->client = gst_mpd_client_new ();
617
618       query = gst_query_new_uri ();
619       res = gst_pad_peer_query (pad, query);
620       if (res) {
621         gst_query_parse_uri (query, &demux->client->mpd_uri);
622         GST_DEBUG_OBJECT (demux, "Fetched MPD file at URI: %s",
623             demux->client->mpd_uri);
624       } else {
625         GST_WARNING_OBJECT (demux, "MPD URI query failed.");
626       }
627       gst_query_unref (query);
628
629       manifest = (gchar *) GST_BUFFER_DATA (demux->manifest);
630       if (manifest == NULL) {
631         GST_WARNING_OBJECT (demux, "Error validating the manifest.");
632       } else if (!gst_mpd_parse (demux->client, manifest,
633               GST_BUFFER_SIZE (demux->manifest))) {
634         /* In most cases, this will happen if we set a wrong url in the
635          * source element and we have received the 404 HTML response instead of
636          * the manifest */
637         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
638             (NULL));
639         return FALSE;
640       }
641       gst_buffer_unref (demux->manifest);
642       demux->manifest = NULL;
643
644       if (!gst_mpd_client_setup_streaming (demux->client, GST_STREAM_VIDEO, "")) {
645         GST_ELEMENT_ERROR (demux, STREAM, DECODE,
646             ("Incompatible manifest file."), (NULL));
647         return FALSE;
648       }
649
650       GList *listLang = NULL;
651       guint nb_audio =
652           gst_mpdparser_get_list_and_nb_of_audio_language (&listLang,
653           demux->client->cur_period->AdaptationSets);
654       if (nb_audio == 0)
655         nb_audio = 1;
656       GST_INFO_OBJECT (demux, "Number of language is=%d", nb_audio);
657       guint i = 0;
658       for (i = 0; i < nb_audio; i++) {
659         gchar *lang = (gchar *) g_list_nth_data (listLang, i);
660         if (gst_mpdparser_get_nb_adaptationSet (demux->client) > 1)
661           if (!gst_mpd_client_setup_streaming (demux->client, GST_STREAM_AUDIO,
662                   lang))
663             GST_INFO_OBJECT (demux, "No audio adaptation set found");
664
665         if (gst_mpdparser_get_nb_adaptationSet (demux->client) > nb_audio)
666           if (!gst_mpd_client_setup_streaming (demux->client,
667                   GST_STREAM_APPLICATION, lang)) {
668             GST_INFO_OBJECT (demux, "No application adaptation set found");
669           }
670       }
671
672       /* Send duration message */
673       if (!gst_mpd_client_is_live (demux->client)) {
674         GstClockTime duration = gst_mpd_client_get_duration (demux->client);
675
676         if (duration != GST_CLOCK_TIME_NONE) {
677           GST_DEBUG_OBJECT (demux, "Sending duration message : %" GST_TIME_FORMAT,
678               GST_TIME_ARGS (duration));
679           gst_element_post_message (GST_ELEMENT (demux),
680               gst_message_new_duration (GST_OBJECT (demux),
681                   GST_FORMAT_TIME, duration));
682         } else {
683           GST_DEBUG_OBJECT (demux, "mediaPresentationDuration unknown, can not send the duration message");
684         }
685       }
686       gst_dash_demux_resume_download_task (demux);
687       gst_dash_demux_resume_stream_task (demux);
688       gst_event_unref (event);
689       return TRUE;
690     }
691     case GST_EVENT_NEWSEGMENT:
692       /* Swallow newsegments, we'll push our own */
693       gst_event_unref (event);
694       return TRUE;
695     default:
696       break;
697   }
698
699   return gst_pad_event_default (pad, event);
700 }
701
702 static gboolean
703 gst_dash_demux_src_query (GstPad * pad, GstQuery * query)
704 {
705   GstDashDemux *dashdemux;
706   gboolean ret = FALSE;
707
708   if (query == NULL)
709     return FALSE;
710
711   dashdemux = GST_DASH_DEMUX (gst_pad_get_element_private (pad));
712
713   switch (query->type) {
714     case GST_QUERY_DURATION:{
715       GstClockTime duration = -1;
716       GstFormat fmt;
717
718       gst_query_parse_duration (query, &fmt, NULL);
719       if (fmt == GST_FORMAT_TIME) {
720         duration = gst_mpd_client_get_duration (dashdemux->client);
721         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
722           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
723           ret = TRUE;
724         }
725       }
726       GST_DEBUG_OBJECT (dashdemux,
727           "GST_QUERY_DURATION returns %s with duration %" GST_TIME_FORMAT,
728           ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
729       break;
730     }
731     case GST_QUERY_SEEKING:{
732       GstFormat fmt;
733       gint64 stop = -1;
734
735       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
736       GST_DEBUG_OBJECT (dashdemux, "Received GST_QUERY_SEEKING with format %d",
737           fmt);
738       if (fmt == GST_FORMAT_TIME) {
739         GstClockTime duration;
740
741         duration = gst_mpd_client_get_duration (dashdemux->client);
742         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
743           stop = duration;
744
745         gst_query_set_seeking (query, fmt,
746             !gst_mpd_client_is_live (dashdemux->client), 0, stop);
747         ret = TRUE;
748         GST_DEBUG_OBJECT (dashdemux, "GST_QUERY_SEEKING returning with stop : %"
749             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
750       }
751       break;
752     }
753     default:{
754       GstPad *peer;
755
756       if ((peer = gst_pad_get_peer (dashdemux->sinkpad))) {
757         /* Try to query upstream */
758         ret = gst_pad_query (peer, query);
759         gst_object_unref (peer);
760       } else {
761         /* no peer, we don't know */
762         ret = FALSE;
763       }
764       break;
765     }
766   }
767
768   return ret;
769 }
770
771 static GstFlowReturn
772 gst_dash_demux_pad (GstPad * pad, GstBuffer * buf)
773 {
774   GstDashDemux *demux = GST_DASH_DEMUX (gst_pad_get_parent (pad));
775
776   if (demux->manifest == NULL)
777     demux->manifest = buf;
778   else
779     demux->manifest = gst_buffer_join (demux->manifest, buf);
780
781   gst_object_unref (demux);
782
783   return GST_FLOW_OK;
784 }
785
786 static void
787 gst_dash_demux_stop (GstDashDemux * demux)
788 {
789   gst_uri_downloader_cancel (demux->downloader);
790
791   if (GST_TASK_STATE (demux->download_task) != GST_TASK_STOPPED) {
792     GST_TASK_SIGNAL (demux->download_task);
793     gst_task_stop (demux->download_task);
794   }
795   if (GST_TASK_STATE (demux->stream_task) != GST_TASK_STOPPED) {
796     GST_TASK_SIGNAL (demux->stream_task);
797     gst_task_stop (demux->stream_task);
798   }
799 }
800
801 /* switch_pads:
802  * 
803  * Called when switching from one set of representations to another, but
804  * only if one of the new representations requires different downstream 
805  * elements (see the next function).
806  * 
807  * This function first creates the new pads, then sends a no-more-pads
808  * event (that will tell decodebin to create a new group), then sends
809  * EOS on the old pads to trigger the group switch.
810  * 
811  */
812 static void
813 switch_pads (GstDashDemux * demux, guint nb_adaptation_set)
814 {
815   GstPad *oldpad[MAX_LANGUAGES];
816   guint i = 0;
817   /* Remember old pads */
818   while (i < nb_adaptation_set) {
819     oldpad[i] = demux->srcpad[i];
820     if (oldpad[i]) {
821       GST_DEBUG_OBJECT (demux,
822           "Switching pads (oldpad:%p)" GST_PTR_FORMAT, oldpad[i]);
823     }
824     i++;
825   }
826   /* Create and activate new pads */
827   i = 0;
828   while (i < nb_adaptation_set) {
829     demux->srcpad[i] = gst_pad_new_from_static_template (&srctemplate, NULL);
830     gst_pad_set_event_function (demux->srcpad[i],
831         GST_DEBUG_FUNCPTR (gst_dash_demux_src_event));
832     gst_pad_set_query_function (demux->srcpad[i],
833         GST_DEBUG_FUNCPTR (gst_dash_demux_src_query));
834     gst_pad_set_element_private (demux->srcpad[i], demux);
835     gst_pad_set_active (demux->srcpad[i], TRUE);
836     gst_pad_set_caps (demux->srcpad[i], demux->output_caps[i]);
837     gst_element_add_pad (GST_ELEMENT (demux), demux->srcpad[i]);
838     i++;
839   }
840   /* Send 'no-more-pads' to have decodebin create the new group */
841   gst_element_no_more_pads (GST_ELEMENT (demux));
842   /* Push out EOS on all old pads to switch to the new group */
843   i = 0;
844   while (i < nb_adaptation_set) {
845     if (oldpad[i]) {
846       gst_pad_push_event (oldpad[i], gst_event_new_eos ());
847       gst_pad_set_active (oldpad[i], FALSE);
848       gst_element_remove_pad (GST_ELEMENT (demux), oldpad[i]);
849     }
850     i++;
851   }
852 }
853
854 /* needs_pad_switch:
855  * 
856  * Figure out if the newly selected representations require a new set
857  * of demuxers and decoders or if we can carry on with the existing ones.
858  * 
859  * Basically, we look at the list of fragments we need to push downstream, 
860  * and compare their caps with those of the corresponding src pads.
861  * 
862  * As soon as one fragment requires a new set of caps, we need to switch
863  * all decoding pads to recreate a whole decoding group as we cannot 
864  * move pads between groups (FIXME: or can we ?).
865  * 
866  * FIXME: redundant with need_add_header
867  * 
868  */
869 static gboolean
870 needs_pad_switch (GstDashDemux * demux, GList * fragment)
871 {
872
873   gboolean switch_pad = FALSE;
874   guint i = 0;
875   while (i < g_list_length (fragment)) {
876     GstFragment *newFragment = g_list_nth_data (fragment, i);
877     if (newFragment == NULL) {
878       continue;
879     }
880     GstCaps *srccaps = NULL;
881     demux->output_caps[i] = gst_fragment_get_caps (newFragment);
882     if (G_LIKELY (demux->srcpad[i]))
883       srccaps = gst_pad_get_negotiated_caps (demux->srcpad[i]);
884     if (G_UNLIKELY (!srccaps
885             || (!gst_caps_is_equal_fixed (demux->output_caps[i], srccaps))
886             || demux->need_segment)) {
887       switch_pad = TRUE;
888     }
889     if (G_LIKELY (srccaps))
890       gst_caps_unref (srccaps);
891     i++;
892   }
893   return switch_pad;
894 }
895
896 /* gst_dash_demux_stream_loop:
897  * 
898  * Loop for the "stream' task that pushes fragments to the src pads.
899  * 
900  * Startup: 
901  * The task is started as soon as we have received the manifest and
902  * waits for the first fragment to be downloaded and pushed in the
903  * queue. Once this fragment has been pushed, the task pauses itself
904  * until actual playback begins.
905  * 
906  * During playback:  
907  * The task pushes fragments downstream at regular intervals based on
908  * the fragment duration. If it detects a queue underrun, it sends
909  * a buffering event to tell the main application to pause.
910  * 
911  * Teardown:
912  * The task is stopped when we have reached the end of the manifest
913  * and emptied our queue.
914  * 
915  */
916 static void
917 gst_dash_demux_stream_loop (GstDashDemux * demux)
918 {
919   GList *listfragment;
920   GstFlowReturn ret;
921   GstBufferList *buffer_list;
922   guint nb_adaptation_set = 0;
923   GstActiveStream *stream;
924
925   /* Wait until the next scheduled push downstream */
926   if (g_cond_timed_wait (GST_TASK_GET_COND (demux->stream_task),
927           demux->stream_timed_lock, &demux->next_push)) {
928     goto quit;
929   }
930
931   if (g_queue_is_empty (demux->queue)) {
932     if (demux->end_of_manifest)
933       goto end_of_manifest;
934
935     return;
936   }
937
938   if (GST_STATE (demux) == GST_STATE_PLAYING) {
939     if (!demux->end_of_manifest
940         && gst_dash_demux_get_buffering_time (demux) <
941         demux->min_buffering_time) {
942       /* Warn we are below our threshold: this will eventually pause 
943        * the pipeline */
944       gst_element_post_message (GST_ELEMENT (demux),
945           gst_message_new_buffering (GST_OBJECT (demux),
946               100 * gst_dash_demux_get_buffering_ratio (demux)));
947     }
948   }
949   listfragment = g_queue_pop_head (demux->queue);
950   nb_adaptation_set = g_list_length (listfragment);
951   /* Figure out if we need to create/switch pads */
952   gboolean switch_pad = needs_pad_switch (demux, listfragment);
953   if (switch_pad) {
954     switch_pads (demux, nb_adaptation_set);
955     demux->need_segment = TRUE;
956   }
957   guint i = 0;
958   for (i = 0; i < nb_adaptation_set; i++) {
959     GstFragment *fragment = g_list_nth_data (listfragment, i);
960     stream = gst_mpdparser_get_active_stream_by_index (demux->client, i);
961     if (demux->need_segment) {
962       GstClockTime start = fragment->start_time + demux->position_shift;
963       /* And send a newsegment */
964       GST_DEBUG_OBJECT (demux, "Sending new-segment. segment start:%"
965           GST_TIME_FORMAT, GST_TIME_ARGS (start));
966       gst_pad_push_event (demux->srcpad[i],
967           gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME,
968               start, GST_CLOCK_TIME_NONE, start));
969       demux->need_segment = FALSE;
970       demux->position_shift = 0;
971     }
972
973     GST_DEBUG_OBJECT (demux, "Pushing fragment #%d", fragment->index);
974     buffer_list = gst_fragment_get_buffer_list (fragment);
975     g_object_unref (fragment);
976     ret = gst_pad_push_list (demux->srcpad[i], buffer_list);
977     if ((ret != GST_FLOW_OK) && (stream->mimeType == GST_STREAM_VIDEO))
978       goto error_pushing;
979   }
980   g_list_free (listfragment);
981   if (GST_STATE (demux) == GST_STATE_PLAYING) {
982     /* Wait for the duration of a fragment before resuming this task */
983     g_get_current_time (&demux->next_push);
984     g_time_val_add (&demux->next_push,
985         gst_mpd_client_get_target_duration (demux->client)
986         / GST_SECOND * G_USEC_PER_SEC);
987     GST_DEBUG_OBJECT (demux, "Next push scheduled at %s",
988         g_time_val_to_iso8601 (&demux->next_push));
989   } else {
990     /* The pipeline is now set up, wait until playback begins */
991     goto pause_streaming;
992   }
993
994 quit:
995   {
996     return;
997   }
998
999 end_of_manifest:
1000   {
1001     GST_INFO_OBJECT (demux, "Reached end of manifest, sending EOS");
1002     guint i = 0;
1003     for (i = 0; i < gst_mpdparser_get_nb_active_stream (demux->client); i++) {
1004       gst_pad_push_event (demux->srcpad[i], gst_event_new_eos ());
1005     }
1006     GST_INFO_OBJECT (demux, "Stopped streaming task");
1007     gst_task_stop (demux->stream_task);
1008     return;
1009   }
1010
1011 error_pushing:
1012   {
1013     /* FIXME: handle error */
1014     GST_ERROR_OBJECT (demux,
1015         "Error pushing buffer: %s... terminating the demux",
1016         gst_flow_get_name (ret));
1017     gst_dash_demux_stop (demux);
1018     return;
1019   }
1020
1021 pause_streaming:
1022   {
1023     GST_INFO_OBJECT (demux, "Pausing streaming task");
1024     gst_task_pause (demux->stream_task);
1025     return;
1026   }
1027 }
1028
1029 static void
1030 gst_dash_demux_reset (GstDashDemux * demux, gboolean dispose)
1031 {
1032   demux->end_of_manifest = FALSE;
1033   demux->cancelled = FALSE;
1034
1035   guint i = 0;
1036   for (i = 0; i < MAX_LANGUAGES; i++)
1037     if (demux->input_caps[i]) {
1038       gst_caps_unref (demux->input_caps[i]);
1039       demux->input_caps[i] = NULL;
1040     }
1041
1042   if (demux->manifest) {
1043     gst_buffer_unref (demux->manifest);
1044     demux->manifest = NULL;
1045   }
1046   if (demux->client) {
1047     gst_mpd_client_free (demux->client);
1048     demux->client = NULL;
1049   }
1050   if (!dispose) {
1051     demux->client = gst_mpd_client_new ();
1052   }
1053
1054   gst_dash_demux_clear_queue (demux);
1055
1056   demux->position = 0;
1057   demux->position_shift = 0;
1058   demux->need_segment = TRUE;
1059 }
1060
1061 static GstClockTime
1062 gst_dash_demux_get_buffering_time (GstDashDemux * demux)
1063 {
1064   return (g_queue_get_length (demux->queue)) *
1065       gst_mpd_client_get_target_duration (demux->client);
1066 }
1067
1068 static float
1069 gst_dash_demux_get_buffering_ratio (GstDashDemux * demux)
1070 {
1071   float buffering_time = gst_dash_demux_get_buffering_time (demux);
1072   if (buffering_time >= demux->min_buffering_time) {
1073     return 1.0;
1074   } else
1075     return buffering_time / demux->min_buffering_time;
1076 }
1077
1078 /* gst_dash_demux_download_loop:
1079  * 
1080  * Loop for the "download' task that fetches fragments based on the 
1081  * selected representations.
1082  * 
1083  * Startup: 
1084  * 
1085  * The task is started from the stream loop.
1086  * 
1087  * During playback:  
1088  * 
1089  * It sequentially fetches fragments corresponding to the current 
1090  * representations and pushes them into a queue.
1091  * 
1092  * It tries to maintain the number of queued items within a predefined 
1093  * range: if the queue is full, it will pause, checking every 100 ms if 
1094  * it needs to restart downloading fragments.
1095  * 
1096  * When a new set of fragments has been downloaded, it evaluates the
1097  * download time to check if we can or should switch to a different 
1098  * set of representations.
1099  *
1100  * Teardown:
1101  * 
1102  * The task will exit when it encounters an error or when the end of the
1103  * manifest has been reached.
1104  * 
1105  */
1106 void
1107 gst_dash_demux_download_loop (GstDashDemux * demux)
1108 {
1109   /* Wait until the next scheduled download */
1110   if (g_cond_timed_wait (GST_TASK_GET_COND (demux->download_task),
1111           demux->download_timed_lock, &demux->next_download)) {
1112     goto quit;
1113   }
1114
1115   /* Target buffering time MUST at least exceeds mimimum buffering time 
1116    * by the duration of a fragment, but SHOULD NOT exceed maximum
1117    * buffering time */
1118   GstClockTime target_buffering_time =
1119       demux->min_buffering_time +
1120       gst_mpd_client_get_target_duration (demux->client);
1121   if (demux->max_buffering_time > target_buffering_time)
1122     target_buffering_time = demux->max_buffering_time;
1123   if (!demux->end_of_manifest
1124       && gst_dash_demux_get_buffering_time (demux) < target_buffering_time) {
1125     if (GST_STATE (demux) != GST_STATE_PLAYING) {
1126       /* Signal our buffering status (this will eventually restart the
1127        * pipeline when we have reached 100 %) */
1128       gst_element_post_message (GST_ELEMENT (demux),
1129           gst_message_new_buffering (GST_OBJECT (demux),
1130               100 * gst_dash_demux_get_buffering_ratio (demux)));
1131     }
1132
1133     /* try to switch to another set of representations if needed */
1134     gst_dash_demux_select_representations (demux,
1135         demux->bandwidth_usage * demux->dnl_rate *
1136         gst_dash_demux_get_buffering_ratio (demux));
1137
1138     /* fetch the next fragment */
1139     if (!gst_dash_demux_get_next_fragment_set (demux)) {
1140       if (demux->end_of_manifest) {
1141         GST_INFO_OBJECT (demux, "Reached the end of the manifest file");
1142         goto end_of_manifest;
1143       } else if (!demux->cancelled) {
1144         demux->client->update_failed_count++;
1145         if (demux->client->update_failed_count < DEFAULT_FAILED_COUNT) {
1146           GST_WARNING_OBJECT (demux, "Could not fetch the next fragment");
1147           return;
1148         } else
1149           goto error_downloading;
1150       }
1151     } else {
1152       GST_INFO_OBJECT (demux, "Internal buffering : %d s",
1153           gst_dash_demux_get_buffering_time (demux) / GST_SECOND);
1154       demux->client->update_failed_count = 0;
1155     }
1156   } else {
1157     /* schedule the next download in 100 ms */
1158     g_get_current_time (&demux->next_download);
1159     g_time_val_add (&demux->next_download, 100000);
1160   }
1161
1162 quit:
1163   {
1164     return;
1165   }
1166
1167 end_of_manifest:
1168   {
1169     GST_INFO_OBJECT (demux, "Stopped download task");
1170     gst_task_stop (demux->download_task);
1171     return;
1172   }
1173
1174 error_downloading:
1175   {
1176     GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
1177         ("Could not fetch the next fragment"), (NULL));
1178     gst_dash_demux_stop (demux);
1179     return;
1180   }
1181 }
1182
1183 static void
1184 gst_dash_demux_pause_stream_task (GstDashDemux * demux)
1185 {
1186   /* Send a signal to the stream task so that it pauses itself */
1187   GST_TASK_SIGNAL (demux->stream_task);
1188   /* Pause it explicitly (if it was not in the COND) */
1189   gst_task_pause (demux->stream_task);
1190 }
1191
1192 static void
1193 gst_dash_demux_resume_stream_task (GstDashDemux * demux)
1194 {
1195   g_get_current_time (&demux->next_push);
1196   gst_task_start (demux->stream_task);
1197 }
1198
1199 static void
1200 gst_dash_demux_resume_download_task (GstDashDemux * demux)
1201 {
1202   g_get_current_time (&demux->next_download);
1203   gst_task_start (demux->download_task);
1204 }
1205
1206 /* gst_dash_demux_select_representations:
1207  *
1208  * Select the most appropriate media representations based on a target 
1209  * bitrate.
1210  * 
1211  * FIXME: all representations are selected against the same bitrate, but
1212  * they will share the same bandwidth. This only works today because the
1213  * audio representations bitrate usage is negligible as compared to the
1214  * video representation one.
1215  * 
1216  * Returns TRUE if a new set of representations has been selected
1217  */
1218 static gboolean
1219 gst_dash_demux_select_representations (GstDashDemux * demux, guint64 bitrate)
1220 {
1221   GstActiveStream *stream = NULL;
1222   GList *rep_list = NULL;
1223   gint new_index;
1224   gboolean ret = FALSE;
1225
1226   guint i = 0;
1227   while (i < gst_mpdparser_get_nb_active_stream (demux->client)) {
1228     if (demux->client->active_streams)
1229       stream = g_list_nth_data (demux->client->active_streams, i);
1230     if (!stream)
1231       return FALSE;
1232
1233     /* retrieve representation list */
1234     if (stream->cur_adapt_set)
1235       rep_list = stream->cur_adapt_set->Representations;
1236     if (!rep_list)
1237       return FALSE;
1238
1239     /* get representation index with current max_bandwidth */
1240     new_index =
1241         gst_mpdparser_get_rep_idx_with_max_bandwidth (rep_list, bitrate);
1242
1243     /* if no representation has the required bandwidth, take the lowest one */
1244     if (new_index == -1)
1245       new_index = 0;
1246
1247     if (new_index != stream->representation_idx) {
1248       GST_MPD_CLIENT_LOCK (demux->client);
1249       ret =
1250           gst_mpd_client_setup_representation (demux->client, stream,
1251           g_list_nth_data (rep_list, new_index));
1252       GST_MPD_CLIENT_UNLOCK (demux->client);
1253       if (ret) {
1254         GST_INFO_OBJECT (demux, "Switching bitrate to %d",
1255             stream->cur_representation->bandwidth);
1256       } else {
1257         GST_WARNING_OBJECT (demux,
1258             "Can not switch representation, aborting...");
1259       }
1260     }
1261     i++;
1262   }
1263   return ret;
1264 }
1265
1266 static GstFragment *
1267 gst_dash_demux_get_next_header (GstDashDemux * demux, guint stream_idx)
1268 {
1269   const gchar *initializationURL;
1270   gchar *next_header_uri;
1271   GstFragment *fragment;
1272
1273   if (!gst_mpd_client_get_next_header (demux->client, &initializationURL,
1274           stream_idx))
1275     return NULL;
1276
1277   if (strncmp (initializationURL, "http://", 7) != 0) {
1278     next_header_uri =
1279         g_strconcat (gst_mpdparser_get_baseURL (demux->client),
1280         initializationURL, NULL);
1281   } else {
1282     next_header_uri = g_strdup (initializationURL);
1283   }
1284
1285   GST_INFO_OBJECT (demux, "Fetching header %s", next_header_uri);
1286
1287   fragment = gst_uri_downloader_fetch_uri (demux->downloader, next_header_uri);
1288   g_free (next_header_uri);
1289
1290   return fragment;
1291 }
1292
1293 static GstBufferListItem
1294 gst_dash_demux_add_buffer_cb (GstBuffer ** buffer,
1295     guint group, guint idx, gpointer user_data)
1296 {
1297   GstFragment *frag = GST_FRAGMENT (user_data);
1298   /* This buffer still belongs to the original fragment */
1299   /* so we need to increase refcount */
1300   gst_fragment_add_buffer (frag, gst_buffer_ref (*buffer));
1301   return GST_BUFFER_LIST_CONTINUE;
1302 }
1303
1304 /* Since we cannot add headers after the chunk has been downloaded, we have to recreate a new fragment */
1305 static GstFragment *
1306 gst_dash_demux_prepend_header (GstDashDemux * demux,
1307     GstFragment * frag, GstFragment * header)
1308 {
1309   GstFragment *res = gst_fragment_new ();
1310   res->name = g_strdup (frag->name);
1311   res->download_start_time = frag->download_start_time;
1312   res->download_stop_time = frag->download_stop_time;
1313   res->start_time = frag->start_time;
1314   res->stop_time = frag->stop_time;
1315   res->index = frag->index;
1316   res->discontinuous = frag->discontinuous;
1317
1318   GstBufferList *list;
1319   list = gst_fragment_get_buffer_list (header);
1320   gst_buffer_list_foreach (list, gst_dash_demux_add_buffer_cb, res);
1321   gst_buffer_list_unref (list);
1322   list = gst_fragment_get_buffer_list (frag);
1323   gst_buffer_list_foreach (list, gst_dash_demux_add_buffer_cb, res);
1324   gst_buffer_list_unref (list);
1325
1326   res->completed = TRUE;
1327
1328   return res;
1329 }
1330
1331 const gchar *
1332 gst_mpd_mimetype_to_caps (const gchar * mimeType)
1333 {
1334   if (mimeType == NULL)
1335     return NULL;
1336   if (strcmp (mimeType, "video/mp2t") == 0) {
1337     return "video/mpegts";
1338   } else if (strcmp (mimeType, "video/mp4") == 0) {
1339     return "video/quicktime";
1340   } else if (strcmp (mimeType, "audio/mp4") == 0) {
1341     return "audio/x-m4a";
1342   } else
1343     return mimeType;
1344 }
1345
1346 static GstCaps *
1347 gst_dash_demux_get_video_input_caps (GstDashDemux * demux,
1348     GstActiveStream * stream)
1349 {
1350   guint width, height;
1351   const gchar *mimeType;
1352   GstCaps *caps = NULL;
1353   GstRepresentationBaseType *RepresentationBase;
1354   if (stream == NULL)
1355     return NULL;
1356
1357   if (stream->cur_representation->RepresentationBase) {
1358     RepresentationBase = stream->cur_representation->RepresentationBase;
1359   } else {
1360     RepresentationBase = stream->cur_adapt_set->RepresentationBase;
1361   }
1362   if (RepresentationBase == NULL)
1363     return NULL;
1364
1365   width = gst_mpd_client_get_width_of_video_current_stream (RepresentationBase);
1366   height =
1367       gst_mpd_client_get_height_of_video_current_stream (RepresentationBase);
1368   mimeType = gst_mpd_mimetype_to_caps (RepresentationBase->mimeType);
1369   caps =
1370       gst_caps_new_simple (mimeType, "width", G_TYPE_INT, width, "height",
1371       G_TYPE_INT, height, NULL);
1372   return caps;
1373 }
1374
1375 static GstCaps *
1376 gst_dash_demux_get_audio_input_caps (GstDashDemux * demux,
1377     GstActiveStream * stream)
1378 {
1379   guint rate, channels;
1380   const gchar *mimeType;
1381   GstCaps *caps = NULL;
1382   GstRepresentationBaseType *RepresentationBase;
1383   if (stream == NULL)
1384     return NULL;
1385
1386   if (stream->cur_representation->RepresentationBase) {
1387     RepresentationBase = stream->cur_representation->RepresentationBase;
1388   } else {
1389     RepresentationBase = stream->cur_adapt_set->RepresentationBase;
1390   }
1391   if (RepresentationBase == NULL)
1392     return NULL;
1393
1394   channels =
1395       gst_mpd_client_get_num_channels_of_audio_current_stream
1396       (RepresentationBase);
1397   rate = gst_mpd_client_get_rate_of_audio_current_stream (RepresentationBase);
1398   mimeType = gst_mpd_mimetype_to_caps (RepresentationBase->mimeType);
1399   caps =
1400       gst_caps_new_simple (mimeType, "channels", G_TYPE_INT, channels, "rate",
1401       G_TYPE_INT, rate, NULL);
1402   return caps;
1403 }
1404
1405 static GstCaps *
1406 gst_dash_demux_get_application_input_caps (GstDashDemux * demux,
1407     GstActiveStream * stream)
1408 {
1409   const gchar *mimeType;
1410   GstCaps *caps = NULL;
1411   GstRepresentationBaseType *RepresentationBase;
1412   if (stream == NULL)
1413     return NULL;
1414
1415   if (stream->cur_representation->RepresentationBase) {
1416     RepresentationBase = stream->cur_representation->RepresentationBase;
1417   } else {
1418     RepresentationBase = stream->cur_adapt_set->RepresentationBase;
1419   }
1420   if (RepresentationBase == NULL)
1421     return NULL;
1422
1423   mimeType = gst_mpd_mimetype_to_caps (RepresentationBase->mimeType);
1424   caps = gst_caps_new_simple (mimeType, NULL);
1425   return caps;
1426 }
1427
1428 static GstCaps *
1429 gst_dash_demux_get_input_caps (GstDashDemux * demux, GstActiveStream * stream)
1430 {
1431   switch (stream->mimeType) {
1432     case GST_STREAM_VIDEO:
1433       return gst_dash_demux_get_video_input_caps (demux, stream);
1434     case GST_STREAM_AUDIO:
1435       return gst_dash_demux_get_audio_input_caps (demux, stream);
1436     case GST_STREAM_APPLICATION:
1437       return gst_dash_demux_get_application_input_caps (demux, stream);
1438     default:
1439       return GST_CAPS_NONE;
1440   }
1441 }
1442
1443 static gboolean
1444 need_add_header (GstDashDemux * demux)
1445 {
1446   GstActiveStream *stream;
1447   GstCaps *caps;
1448   guint stream_idx = 0;
1449   gboolean switch_caps = FALSE;
1450   while (stream_idx < gst_mpdparser_get_nb_active_stream (demux->client)) {
1451     stream =
1452         gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
1453     if (stream == NULL)
1454       return FALSE;
1455     caps = gst_dash_demux_get_input_caps (demux, stream);
1456     if (!demux->input_caps[stream_idx]
1457         || !gst_caps_is_equal (caps, demux->input_caps[stream_idx])) {
1458       switch_caps = TRUE;
1459       gst_caps_unref (caps);
1460       break;
1461     }
1462     gst_caps_unref (caps);
1463     stream_idx++;
1464   }
1465   return switch_caps;
1466 }
1467
1468 /* gst_dash_demux_get_next_fragment_set:
1469  *
1470  * Get the next set of fragments for the current representations.
1471  * 
1472  * This function uses the generic URI downloader API.
1473  *
1474  * Returns FALSE if an error occured while downloading fragments
1475  * 
1476  */
1477 static gboolean
1478 gst_dash_demux_get_next_fragment_set (GstDashDemux * demux)
1479 {
1480   GstActiveStream *stream;
1481   GstFragment *download, *header;
1482   GList *fragment_set;
1483   gchar *next_fragment_uri;
1484   GstClockTime duration;
1485   GstClockTime timestamp;
1486   gboolean discont;
1487   GTimeVal now;
1488   GTimeVal start;
1489   GstClockTime diff;
1490   guint64 size_buffer = 0;
1491
1492   g_get_current_time (&start);
1493   /* Figure out if we will need to switch pads, thus requiring a new
1494    * header to initialize the new decoding chain
1495    * FIXME: redundant with needs_pad_switch */
1496   gboolean need_header = need_add_header (demux);
1497   int stream_idx = 0;
1498   fragment_set = NULL;
1499   /* Get the fragment corresponding to each stream index */
1500   while (stream_idx < gst_mpdparser_get_nb_active_stream (demux->client)) {
1501     if (!gst_mpd_client_get_next_fragment (demux->client,
1502             stream_idx, &discont, &next_fragment_uri, &duration, &timestamp)) {
1503       GST_INFO_OBJECT (demux, "This manifest doesn't contain more fragments");
1504       demux->end_of_manifest = TRUE;
1505       if (GST_STATE (demux) != GST_STATE_PLAYING) {
1506         /* Restart the pipeline regardless of the current buffering level */
1507         gst_element_post_message (GST_ELEMENT (demux),
1508             gst_message_new_buffering (GST_OBJECT (demux), 100));
1509       }
1510       gst_task_start (demux->stream_task);
1511       return FALSE;
1512     }
1513
1514     GST_INFO_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
1515
1516     download = gst_uri_downloader_fetch_uri (demux->downloader,
1517         next_fragment_uri);
1518     g_free (next_fragment_uri);
1519
1520     if (download == NULL)
1521       return FALSE;
1522
1523     download->start_time = timestamp;
1524     download->stop_time = timestamp + duration;
1525
1526     stream =
1527         gst_mpdparser_get_active_stream_by_index (demux->client, stream_idx);
1528     if (stream == NULL)
1529       return FALSE;
1530     /* FIXME: we should'nt fiddle with stream internals like that */
1531     download->index = stream->segment_idx -1;
1532
1533     GstCaps *caps = gst_dash_demux_get_input_caps (demux, stream);
1534
1535     if (need_header) {
1536       /* Store the new input caps for that stream */
1537       gst_caps_replace (&demux->input_caps[stream_idx], caps);
1538       GST_INFO_OBJECT (demux, "Input source caps: %" GST_PTR_FORMAT,
1539           demux->input_caps[stream_idx]);
1540       /* We need to fetch a new header */
1541       if ((header = gst_dash_demux_get_next_header (demux, stream_idx)) == NULL) {
1542         GST_INFO_OBJECT (demux, "Unable to fetch header");
1543       } else {
1544         /* Replace fragment with a new one including the header */
1545         GstFragment *new_fragment =
1546             gst_dash_demux_prepend_header (demux, download, header);
1547         g_object_unref (header);
1548         g_object_unref (download);
1549         download = new_fragment;
1550       }
1551     } else
1552       gst_caps_unref (caps);
1553
1554     gst_fragment_set_caps (download, demux->input_caps[stream_idx]);
1555     fragment_set = g_list_append (fragment_set, download);
1556     size_buffer += gst_fragment_get_buffer_size (download);
1557     stream_idx++;
1558   }
1559   /* Push fragment set into the queue */
1560   g_queue_push_tail (demux->queue, fragment_set);
1561   /* Wake the download task up */
1562   GST_TASK_SIGNAL (demux->download_task);
1563   g_get_current_time (&now);
1564   diff = (GST_TIMEVAL_TO_TIME (now) - GST_TIMEVAL_TO_TIME (start));
1565   demux->dnl_rate = (size_buffer * 8) / ((double) diff / GST_SECOND);
1566   GST_INFO_OBJECT (demux, "Download rate = %d Kbits/s (%d Ko in %.2f s)",
1567       demux->dnl_rate / 1000, size_buffer / 1024, ((double) diff / GST_SECOND));
1568   return TRUE;
1569 }
1570