adaptivedemux2: Fixes for period switching in the output loop
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:plugin-adaptivedemux2
28  * @short_description: Next Generation adaptive demuxers
29  *
30  * What is an adaptive demuxer?  Adaptive demuxers are special demuxers in the
31  * sense that they don't actually demux data received from upstream but download
32  * the data themselves.
33  *
34  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35  * of fragments. The manifest describes the available media and the sequence of
36  * fragments to use. Each fragment contains a small part of the media (typically
37  * only a few seconds). It is possible for the manifest to have the same media
38  * available in different configurations (bitrates for example) so that the
39  * client can select the one that best suits its scenario (network fluctuation,
40  * hardware requirements...).
41  *
42  * Furthermore, that manifest can also specify alternative medias (such as audio
43  * or subtitle tracks in different languages). Only the fragments for the
44  * requested selection will be download.
45  *
46  * These elements can therefore "adapt" themselves to the network conditions (as
47  * opposed to the server doing that adaptation) and user choices, which is why
48  * they are called "adaptive" demuxers.
49  *
50  * Note: These elements require a "streams-aware" container to work
51  * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52  * GST_BIN_FLAG_STREAMS_AWARE flag set).
53  *
54  * Subclasses:
55  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56  * about the intrinsics of the subclass formats, so the subclasses are
57  * responsible for maintaining the manifest data structures and stream
58  * information.
59  *
60  * Since: 1.22
61  */
62
63 /*
64 See the adaptive-demuxer.md design documentation for more information
65
66 MT safety.
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
72
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
80
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
97
98 By using the api_lock a thread is protected against other API calls.
99 */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
107
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
111
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
114
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
118
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
121
122 #define DEFAULT_MAX_BUFFERING_TIME (30 *  GST_SECOND)
123
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME (10 * GST_SECOND)
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
128
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
131
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
135
136 enum
137 {
138   PROP_0,
139   PROP_CONNECTION_SPEED,
140   PROP_BANDWIDTH_TARGET_RATIO,
141   PROP_CONNECTION_BITRATE,
142   PROP_MIN_BITRATE,
143   PROP_MAX_BITRATE,
144   PROP_CURRENT_BANDWIDTH,
145   PROP_MAX_BUFFERING_TIME,
146   PROP_BUFFERING_HIGH_WATERMARK_TIME,
147   PROP_BUFFERING_LOW_WATERMARK_TIME,
148   PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149   PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150   PROP_CURRENT_LEVEL_TIME_VIDEO,
151   PROP_CURRENT_LEVEL_TIME_AUDIO,
152   PROP_LAST
153 };
154
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
157     GST_PAD_SRC,
158     GST_PAD_SOMETIMES,
159     GST_STATIC_CAPS_ANY);
160
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
163     GST_PAD_SRC,
164     GST_PAD_SOMETIMES,
165     GST_STATIC_CAPS_ANY);
166
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
169     GST_PAD_SRC,
170     GST_PAD_SOMETIMES,
171     GST_STATIC_CAPS_ANY);
172
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
175 {
176   /* Output pad */
177   GstPad *pad;
178
179   /* Last flow return */
180   GstFlowReturn flow_ret;
181
182   /* Stream Type */
183   GstStreamType type;
184
185   /* Target track (reference) */
186   GstAdaptiveDemuxTrack *track;
187
188   /* Pending track (which will replace track) */
189   GstAdaptiveDemuxTrack *pending_track;
190
191   /* TRUE if a buffer or a gap event was pushed through this slot. */
192   gboolean pushed_timed_data;
193 } OutputSlot;
194
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
197
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200     GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203     element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
205     GstQuery * query);
206
207 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
208
209 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
210     GstEvent * event);
211 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
212     GstObject * parent, GstBuffer * buffer);
213 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
214     GstQuery * query);
215 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
216     GstEvent * event);
217
218 static gboolean
219 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
220
221 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
222 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
223 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
224     gboolean first_and_live);
225
226 static gboolean gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
227     demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate);
228 static GstFlowReturn
229 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
230
231 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
232     demux);
233 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
234     demux);
235
236 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
237 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
238     gboolean stop_updates);
239 static GstFlowReturn
240 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
241     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer);
242 static GstFlowReturn
243 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
244     GstAdaptiveDemux2Stream * stream);
245 static GstFlowReturn
246 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
247     GstAdaptiveDemux2Stream * stream, GstClockTime duration);
248
249 static void
250 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
251     GstAdaptiveDemux2Stream * stream);
252
253 static gboolean
254 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
255     * demux);
256
257 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
258  * method to get to the padtemplates */
259 GType
260 gst_adaptive_demux_ng_get_type (void)
261 {
262   static gsize type = 0;
263
264   if (g_once_init_enter (&type)) {
265     GType _type;
266     static const GTypeInfo info = {
267       sizeof (GstAdaptiveDemuxClass),
268       NULL,
269       NULL,
270       (GClassInitFunc) gst_adaptive_demux_class_init,
271       NULL,
272       NULL,
273       sizeof (GstAdaptiveDemux),
274       0,
275       (GInstanceInitFunc) gst_adaptive_demux_init,
276     };
277
278     _type = g_type_register_static (GST_TYPE_BIN,
279         "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
280
281     private_offset =
282         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
283
284     g_once_init_leave (&type, _type);
285   }
286   return type;
287 }
288
289 static inline GstAdaptiveDemuxPrivate *
290 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
291 {
292   return (G_STRUCT_MEMBER_P (self, private_offset));
293 }
294
295 static void
296 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
297     const GValue * value, GParamSpec * pspec)
298 {
299   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
300
301   GST_OBJECT_LOCK (demux);
302
303   switch (prop_id) {
304     case PROP_CONNECTION_SPEED:
305       demux->connection_speed = g_value_get_uint (value) * 1000;
306       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
307           demux->connection_speed);
308       break;
309     case PROP_BANDWIDTH_TARGET_RATIO:
310       demux->bandwidth_target_ratio = g_value_get_float (value);
311       break;
312     case PROP_MIN_BITRATE:
313       demux->min_bitrate = g_value_get_uint (value);
314       break;
315     case PROP_MAX_BITRATE:
316       demux->max_bitrate = g_value_get_uint (value);
317       break;
318     case PROP_CONNECTION_BITRATE:
319       demux->connection_speed = g_value_get_uint (value);
320       break;
321       /* FIXME: Recalculate track and buffering levels
322        * when watermarks change? */
323     case PROP_MAX_BUFFERING_TIME:
324       demux->max_buffering_time = g_value_get_uint64 (value);
325       break;
326     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
327       demux->buffering_high_watermark_time = g_value_get_uint64 (value);
328       break;
329     case PROP_BUFFERING_LOW_WATERMARK_TIME:
330       demux->buffering_low_watermark_time = g_value_get_uint64 (value);
331       break;
332     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
333       demux->buffering_high_watermark_fragments = g_value_get_double (value);
334       break;
335     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
336       demux->buffering_low_watermark_fragments = g_value_get_double (value);
337       break;
338     default:
339       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
340       break;
341   }
342
343   GST_OBJECT_UNLOCK (demux);
344 }
345
346 static void
347 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
348     GValue * value, GParamSpec * pspec)
349 {
350   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
351
352   GST_OBJECT_LOCK (demux);
353
354   switch (prop_id) {
355     case PROP_CONNECTION_SPEED:
356       g_value_set_uint (value, demux->connection_speed / 1000);
357       break;
358     case PROP_BANDWIDTH_TARGET_RATIO:
359       g_value_set_float (value, demux->bandwidth_target_ratio);
360       break;
361     case PROP_MIN_BITRATE:
362       g_value_set_uint (value, demux->min_bitrate);
363       break;
364     case PROP_MAX_BITRATE:
365       g_value_set_uint (value, demux->max_bitrate);
366       break;
367     case PROP_CONNECTION_BITRATE:
368       g_value_set_uint (value, demux->connection_speed);
369       break;
370     case PROP_CURRENT_BANDWIDTH:
371       g_value_set_uint (value, demux->current_download_rate);
372       break;
373     case PROP_MAX_BUFFERING_TIME:
374       g_value_set_uint64 (value, demux->max_buffering_time);
375       break;
376     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
377       g_value_set_uint64 (value, demux->buffering_high_watermark_time);
378       break;
379     case PROP_BUFFERING_LOW_WATERMARK_TIME:
380       g_value_set_uint64 (value, demux->buffering_low_watermark_time);
381       break;
382     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
383       g_value_set_double (value, demux->buffering_high_watermark_fragments);
384       break;
385     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
386       g_value_set_double (value, demux->buffering_low_watermark_fragments);
387       break;
388     case PROP_CURRENT_LEVEL_TIME_VIDEO:
389       g_value_set_uint64 (value, demux->current_level_time_video);
390       break;
391     case PROP_CURRENT_LEVEL_TIME_AUDIO:
392       g_value_set_uint64 (value, demux->current_level_time_audio);
393       break;
394     default:
395       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
396       break;
397   }
398
399   GST_OBJECT_UNLOCK (demux);
400 }
401
402 static void
403 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
404 {
405   GObjectClass *gobject_class;
406   GstElementClass *gstelement_class;
407   GstBinClass *gstbin_class;
408
409   gobject_class = G_OBJECT_CLASS (klass);
410   gstelement_class = GST_ELEMENT_CLASS (klass);
411   gstbin_class = GST_BIN_CLASS (klass);
412
413   GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
414       "Base Adaptive Demux (ng)");
415
416   parent_class = g_type_class_peek_parent (klass);
417
418   if (private_offset != 0)
419     g_type_class_adjust_private_offset (klass, &private_offset);
420
421   gobject_class->set_property = gst_adaptive_demux_set_property;
422   gobject_class->get_property = gst_adaptive_demux_get_property;
423   gobject_class->finalize = gst_adaptive_demux_finalize;
424
425   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
426       g_param_spec_uint ("connection-speed", "Connection Speed",
427           "Network connection speed to use in kbps (0 = calculate from downloaded"
428           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
429           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
430
431   g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
432       g_param_spec_float ("bandwidth-target-ratio",
433           "Ratio of target bandwidth / available bandwidth",
434           "Limit of the available bitrate to use when switching to alternates",
435           0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437
438   g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
439       g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
440           "Network connection speed to use (0 = automatic) (bits/s)",
441           0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
442           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
443
444   g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
445       g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
446           "Minimum bitrate to use when switching to alternates (bits/s)",
447           0, G_MAXUINT, DEFAULT_MIN_BITRATE,
448           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
449
450   g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
451       g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
452           "Maximum bitrate to use when switching to alternates (bits/s)",
453           0, G_MAXUINT, DEFAULT_MAX_BITRATE,
454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
455
456   g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
457       g_param_spec_uint ("current-bandwidth",
458           "Current download bandwidth (bits/s)",
459           "Report of current download bandwidth (based on arriving data) (bits/s)",
460           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
461
462   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
463       g_param_spec_uint64 ("max-buffering-time",
464           "Buffering maximum size (ns)",
465           "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
466           0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
467           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468           G_PARAM_STATIC_STRINGS));
469
470   g_object_class_install_property (gobject_class,
471       PROP_BUFFERING_HIGH_WATERMARK_TIME,
472       g_param_spec_uint64 ("high-watermark-time",
473           "High buffering watermark size (ns)",
474           "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
475           0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
476           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477           G_PARAM_STATIC_STRINGS));
478
479   g_object_class_install_property (gobject_class,
480       PROP_BUFFERING_LOW_WATERMARK_TIME,
481       g_param_spec_uint64 ("low-watermark-time",
482           "Low buffering watermark size (ns)",
483           "Low watermark for parsed data below which downloads are resumed (in ns, 0=disable)",
484           0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
485           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
486           G_PARAM_STATIC_STRINGS));
487
488   g_object_class_install_property (gobject_class,
489       PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
490       g_param_spec_double ("high-watermark-fragments",
491           "High buffering watermark size (fragments)",
492           "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
493           0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
494           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495           G_PARAM_STATIC_STRINGS));
496
497   g_object_class_install_property (gobject_class,
498       PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
499       g_param_spec_double ("low-watermark-fragments",
500           "Low buffering watermark size (fragments)",
501           "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
502           0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
503           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
504           G_PARAM_STATIC_STRINGS));
505
506   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
507       g_param_spec_uint64 ("current-level-time-video",
508           "Currently buffered level of video (ns)",
509           "Currently buffered level of video track(s) (ns)",
510           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
511           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
512           G_PARAM_STATIC_STRINGS));
513
514   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
515       g_param_spec_uint64 ("current-level-time-audio",
516           "Currently buffered level of audio (ns)",
517           "Currently buffered level of audio track(s) (ns)",
518           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
519           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
520           G_PARAM_STATIC_STRINGS));
521
522   gst_element_class_add_static_pad_template (gstelement_class,
523       &gst_adaptive_demux_audiosrc_template);
524   gst_element_class_add_static_pad_template (gstelement_class,
525       &gst_adaptive_demux_videosrc_template);
526   gst_element_class_add_static_pad_template (gstelement_class,
527       &gst_adaptive_demux_subtitlesrc_template);
528
529
530   gstelement_class->change_state = gst_adaptive_demux_change_state;
531   gstelement_class->query = gst_adaptive_demux_query;
532
533   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
534
535   klass->data_received = gst_adaptive_demux2_stream_data_received_default;
536   klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
537   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
538   klass->requires_periodical_playlist_update =
539       gst_adaptive_demux_requires_periodical_playlist_update_default;
540   klass->stream_update_tracks = gst_adaptive_demux2_stream_update_tracks;
541   gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
542 }
543
544 static void
545 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
546     GstAdaptiveDemuxClass * klass)
547 {
548   GstPadTemplate *pad_template;
549
550   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
551
552   demux->priv = gst_adaptive_demux_get_instance_private (demux);
553   demux->priv->input_adapter = gst_adapter_new ();
554   demux->realtime_clock = gst_adaptive_demux_clock_new ();
555
556   demux->download_helper = downloadhelper_new (demux->realtime_clock);
557   demux->priv->segment_seqnum = gst_util_seqnum_next ();
558   demux->have_group_id = FALSE;
559   demux->group_id = G_MAXUINT;
560
561   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
562   demux->instant_rate_multiplier = 1.0;
563
564   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
565       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
566
567   g_rec_mutex_init (&demux->priv->manifest_lock);
568
569   demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
570   g_mutex_init (&demux->priv->scheduler_lock);
571
572   g_mutex_init (&demux->priv->api_lock);
573   g_mutex_init (&demux->priv->segment_lock);
574
575   g_mutex_init (&demux->priv->tracks_lock);
576   g_cond_init (&demux->priv->tracks_add);
577
578   g_mutex_init (&demux->priv->buffering_lock);
579
580   demux->priv->periods = g_queue_new ();
581
582   pad_template =
583       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
584   g_return_if_fail (pad_template != NULL);
585
586   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
587   gst_pad_set_event_function (demux->sinkpad,
588       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
589   gst_pad_set_chain_function (demux->sinkpad,
590       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
591
592   /* Properties */
593   demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
594   demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
595   demux->min_bitrate = DEFAULT_MIN_BITRATE;
596   demux->max_bitrate = DEFAULT_MAX_BITRATE;
597
598   demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
599   demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
600   demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
601   demux->buffering_high_watermark_fragments =
602       DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
603   demux->buffering_low_watermark_fragments =
604       DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
605
606   demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
607   demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
608
609   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
610
611   demux->priv->duration = GST_CLOCK_TIME_NONE;
612
613   /* Output combiner */
614   demux->priv->flowcombiner = gst_flow_combiner_new ();
615
616   /* Output task */
617   g_rec_mutex_init (&demux->priv->output_lock);
618   demux->priv->output_task =
619       gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
620       NULL);
621   gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
622 }
623
624 static void
625 gst_adaptive_demux_finalize (GObject * object)
626 {
627   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
628   GstAdaptiveDemuxPrivate *priv = demux->priv;
629
630   GST_DEBUG_OBJECT (object, "finalize");
631
632   g_object_unref (priv->input_adapter);
633
634   downloadhelper_free (demux->download_helper);
635
636   g_rec_mutex_clear (&demux->priv->manifest_lock);
637   g_mutex_clear (&demux->priv->api_lock);
638   g_mutex_clear (&demux->priv->segment_lock);
639
640   g_mutex_clear (&demux->priv->buffering_lock);
641
642   g_mutex_clear (&demux->priv->scheduler_lock);
643   gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
644
645   /* The input period is present after a reset, clear it now */
646   if (demux->input_period)
647     gst_adaptive_demux_period_unref (demux->input_period);
648
649   if (demux->realtime_clock) {
650     gst_adaptive_demux_clock_unref (demux->realtime_clock);
651     demux->realtime_clock = NULL;
652   }
653   g_object_unref (priv->output_task);
654   g_rec_mutex_clear (&priv->output_lock);
655
656   gst_flow_combiner_free (priv->flowcombiner);
657
658   g_queue_free (priv->periods);
659
660   G_OBJECT_CLASS (parent_class)->finalize (object);
661 }
662
663 static gboolean
664 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
665 {
666   gboolean ret = FALSE;
667   GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
668
669   ret = (parent && GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE));
670
671   return ret;
672 }
673
674 static GstStateChangeReturn
675 gst_adaptive_demux_change_state (GstElement * element,
676     GstStateChange transition)
677 {
678   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
679   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
680
681   switch (transition) {
682     case GST_STATE_CHANGE_NULL_TO_READY:
683       if (!gst_adaptive_demux_check_streams_aware (demux)) {
684         GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
685             (_("Element requires a streams-aware context.")), (NULL));
686         goto fail_out;
687       }
688       break;
689     case GST_STATE_CHANGE_PAUSED_TO_READY:
690       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
691         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
692
693       gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
694       downloadhelper_stop (demux->download_helper);
695
696       TRACKS_LOCK (demux);
697       demux->priv->flushing = TRUE;
698       g_cond_signal (&demux->priv->tracks_add);
699       gst_task_stop (demux->priv->output_task);
700       TRACKS_UNLOCK (demux);
701
702       gst_task_join (demux->priv->output_task);
703
704       GST_API_LOCK (demux);
705       gst_adaptive_demux_reset (demux);
706       GST_API_UNLOCK (demux);
707       break;
708     case GST_STATE_CHANGE_READY_TO_PAUSED:
709       GST_API_LOCK (demux);
710       gst_adaptive_demux_reset (demux);
711
712       gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
713       if (g_atomic_int_get (&demux->priv->have_manifest))
714         gst_adaptive_demux_start_manifest_update_task (demux);
715       GST_API_UNLOCK (demux);
716       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
717         GST_DEBUG_OBJECT (demux, "demuxer has started running");
718       /* gst_task_start (demux->priv->output_task); */
719       break;
720     default:
721       break;
722   }
723
724   /* this must be run with the scheduler and output tasks stopped. */
725   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
726
727   switch (transition) {
728     case GST_STATE_CHANGE_READY_TO_PAUSED:
729       /* Start download task */
730       downloadhelper_start (demux->download_helper);
731       break;
732     default:
733       break;
734   }
735
736 fail_out:
737   return result;
738 }
739
740 static void
741 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
742     OutputSlot * slot)
743 {
744   GstEvent *eos = gst_event_new_eos ();
745   GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
746
747   /* FIXME: The slot might not have output any data, caps or segment yet */
748   gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
749   gst_pad_push_event (slot->pad, eos);
750   gst_pad_set_active (slot->pad, FALSE);
751   gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
752   gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
753   if (slot->track)
754     gst_adaptive_demux_track_unref (slot->track);
755   if (slot->pending_track)
756     gst_adaptive_demux_track_unref (slot->pending_track);
757
758   g_slice_free (OutputSlot, slot);
759 }
760
761 static OutputSlot *
762 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
763     GstStreamType streamtype)
764 {
765   OutputSlot *slot;
766   GstPadTemplate *tmpl;
767   gchar *name;
768
769   switch (streamtype) {
770     case GST_STREAM_TYPE_AUDIO:
771       name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
772       tmpl =
773           gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
774       break;
775     case GST_STREAM_TYPE_VIDEO:
776       name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
777       tmpl =
778           gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
779       break;
780     case GST_STREAM_TYPE_TEXT:
781       name =
782           g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
783       tmpl =
784           gst_static_pad_template_get
785           (&gst_adaptive_demux_subtitlesrc_template);
786       break;
787     default:
788       g_assert_not_reached ();
789       return NULL;
790   }
791
792   slot = g_slice_new0 (OutputSlot);
793   slot->type = streamtype;
794   slot->pushed_timed_data = FALSE;
795
796   /* Create and activate new pads */
797   slot->pad = gst_pad_new_from_template (tmpl, name);
798   g_free (name);
799   gst_object_unref (tmpl);
800
801   gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
802   gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
803   gst_pad_set_active (slot->pad, TRUE);
804
805   gst_pad_set_query_function (slot->pad,
806       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
807   gst_pad_set_event_function (slot->pad,
808       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
809
810   gst_pad_set_element_private (slot->pad, slot);
811
812   GST_INFO_OBJECT (demux, "Created output slot %s:%s",
813       GST_DEBUG_PAD_NAME (slot->pad));
814   return slot;
815 }
816
817 /* Called:
818  * * After `process_manifest` or when a period starts
819  * * Or when all tracks have been created
820  *
821  * Goes over tracks and creates the collection
822  *
823  * Returns TRUE if the collection was fully created.
824  *
825  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
826  * */
827 static gboolean
828 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
829     GstAdaptiveDemuxPeriod * period)
830 {
831   GstStreamCollection *collection;
832   GList *iter;
833
834   GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
835
836   if (!period->tracks_changed) {
837     GST_DEBUG_OBJECT (demux, "Tracks didn't change");
838     return TRUE;
839   }
840
841   if (!period->tracks) {
842     GST_WARNING_OBJECT (demux, "No tracks registered/present");
843     return FALSE;
844   }
845
846   if (gst_adaptive_demux_period_has_pending_tracks (period)) {
847     GST_DEBUG_OBJECT (demux,
848         "Streams still have pending tracks, not creating/updating collection");
849     return FALSE;
850   }
851
852   /* Update collection */
853   collection = gst_stream_collection_new ("adaptivedemux");
854
855   for (iter = period->tracks; iter; iter = iter->next) {
856     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
857
858     GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
859     gst_stream_collection_add_stream (collection,
860         gst_object_ref (track->stream_object));
861   }
862
863   if (period->collection)
864     gst_object_unref (period->collection);
865   period->collection = collection;
866
867   return TRUE;
868 }
869
870 /*
871  * Called for the output period:
872  * * after `update_collection()` if the input period is the same as the output period
873  * * When the output period changes
874  *
875  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
876  */
877 static gboolean
878 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
879 {
880   GstStreamCollection *collection;
881   GstAdaptiveDemuxPeriod *period = demux->output_period;
882   guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
883
884   g_return_val_if_fail (period, FALSE);
885   if (!period->collection) {
886     GST_DEBUG_OBJECT (demux, "No collection available yet");
887     return TRUE;
888   }
889
890   collection = period->collection;
891
892   GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
893       period->period_num);
894
895   /* Post collection */
896   TRACKS_UNLOCK (demux);
897   GST_MANIFEST_UNLOCK (demux);
898
899   gst_element_post_message (GST_ELEMENT_CAST (demux),
900       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
901
902   GST_MANIFEST_LOCK (demux);
903   TRACKS_LOCK (demux);
904
905   /* If no stream selection was handled, make a default selection */
906   if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
907     gst_adaptive_demux_period_select_default_tracks (demux,
908         demux->output_period);
909   }
910
911   /* Make sure the output task is running */
912   if (gst_adaptive_demux2_is_running (demux)) {
913     demux->priv->flushing = FALSE;
914     GST_DEBUG_OBJECT (demux, "Starting the output task");
915     gst_task_start (demux->priv->output_task);
916   }
917
918   return TRUE;
919 }
920
921 static gboolean
922 handle_incoming_manifest (GstAdaptiveDemux * demux)
923 {
924   GstAdaptiveDemuxClass *demux_class;
925   GstQuery *query;
926   gboolean query_res;
927   gboolean ret = TRUE;
928   gsize available;
929   GstBuffer *manifest_buffer;
930
931   GST_API_LOCK (demux);
932   GST_MANIFEST_LOCK (demux);
933
934   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
935
936   available = gst_adapter_available (demux->priv->input_adapter);
937
938   if (available == 0)
939     goto eos_without_data;
940
941   GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
942
943   /* Need to get the URI to use it as a base to generate the fragment's
944    * uris */
945   query = gst_query_new_uri ();
946   query_res = gst_pad_peer_query (demux->sinkpad, query);
947   if (query_res) {
948     gchar *uri, *redirect_uri;
949     gboolean permanent;
950
951     gst_query_parse_uri (query, &uri);
952     gst_query_parse_uri_redirection (query, &redirect_uri);
953     gst_query_parse_uri_redirection_permanent (query, &permanent);
954
955     if (permanent && redirect_uri) {
956       demux->manifest_uri = redirect_uri;
957       demux->manifest_base_uri = NULL;
958       g_free (uri);
959     } else {
960       demux->manifest_uri = uri;
961       demux->manifest_base_uri = redirect_uri;
962     }
963
964     GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
965         demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
966   } else {
967     GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
968   }
969   gst_query_unref (query);
970
971   /* If somehow we didn't receive a stream-start with a group_id, pick one now */
972   if (!demux->have_group_id) {
973     demux->have_group_id = TRUE;
974     demux->group_id = gst_util_group_id_next ();
975   }
976
977   /* Let the subclass parse the manifest */
978   manifest_buffer =
979       gst_adapter_take_buffer (demux->priv->input_adapter, available);
980   ret = demux_class->process_manifest (demux, manifest_buffer);
981   gst_buffer_unref (manifest_buffer);
982
983   gst_element_post_message (GST_ELEMENT_CAST (demux),
984       gst_message_new_element (GST_OBJECT_CAST (demux),
985           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
986               "manifest-uri", G_TYPE_STRING,
987               demux->manifest_uri, "uri", G_TYPE_STRING,
988               demux->manifest_uri,
989               "manifest-download-start", GST_TYPE_CLOCK_TIME,
990               GST_CLOCK_TIME_NONE,
991               "manifest-download-stop", GST_TYPE_CLOCK_TIME,
992               gst_util_get_timestamp (), NULL)));
993
994   if (!ret)
995     goto invalid_manifest;
996
997   /* Streams should have been added to the input period if the manifest parsing
998    * succeeded */
999   if (!demux->input_period->streams)
1000     goto no_streams;
1001
1002   g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1003
1004   GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1005   /* Send duration message */
1006   if (!gst_adaptive_demux_is_live (demux)) {
1007     GstClockTime duration = demux_class->get_duration (demux);
1008
1009     demux->priv->duration = duration;
1010     if (duration != GST_CLOCK_TIME_NONE) {
1011       GST_DEBUG_OBJECT (demux,
1012           "Sending duration message : %" GST_TIME_FORMAT,
1013           GST_TIME_ARGS (duration));
1014       gst_element_post_message (GST_ELEMENT (demux),
1015           gst_message_new_duration_changed (GST_OBJECT (demux)));
1016     } else {
1017       GST_DEBUG_OBJECT (demux,
1018           "media duration unknown, can not send the duration message");
1019     }
1020   }
1021
1022   TRACKS_LOCK (demux);
1023   /* New streams/tracks will have been added to the input period */
1024   /* The input period has streams, make it the active output period */
1025   /* FIXME : Factorize this into a function to make a period active */
1026   demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1027   ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1028       gst_adaptive_demux_post_collection (demux);
1029   TRACKS_UNLOCK (demux);
1030
1031   gst_adaptive_demux_prepare_streams (demux,
1032       gst_adaptive_demux_is_live (demux));
1033   gst_adaptive_demux_start_tasks (demux);
1034   gst_adaptive_demux_start_manifest_update_task (demux);
1035
1036 unlock_out:
1037   GST_MANIFEST_UNLOCK (demux);
1038   GST_API_UNLOCK (demux);
1039
1040   return ret;
1041
1042   /* ERRORS */
1043 eos_without_data:
1044   {
1045     GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1046     ret = FALSE;
1047     goto unlock_out;
1048   }
1049
1050 no_streams:
1051   {
1052     /* no streams */
1053     GST_WARNING_OBJECT (demux, "No streams created from manifest");
1054     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1055         (_("This file contains no playable streams.")),
1056         ("No known stream formats found at the Manifest"));
1057     ret = FALSE;
1058     goto unlock_out;
1059   }
1060
1061 invalid_manifest:
1062   {
1063     GST_MANIFEST_UNLOCK (demux);
1064     GST_API_UNLOCK (demux);
1065
1066     /* In most cases, this will happen if we set a wrong url in the
1067      * source element and we have received the 404 HTML response instead of
1068      * the manifest */
1069     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1070     return FALSE;
1071   }
1072 }
1073
1074 struct http_headers_collector
1075 {
1076   GstAdaptiveDemux *demux;
1077   gchar **cookies;
1078 };
1079
1080 static gboolean
1081 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1082     const GValue * value, gpointer userdata)
1083 {
1084   struct http_headers_collector *hdr_data = userdata;
1085   GstAdaptiveDemux *demux = hdr_data->demux;
1086   const gchar *field_name = g_quark_to_string (field_id);
1087
1088   if (G_UNLIKELY (value == NULL))
1089     return TRUE;                /* This should not happen */
1090
1091   if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1092     const gchar *user_agent = g_value_get_string (value);
1093
1094     GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1095     downloadhelper_set_user_agent (demux->download_helper, user_agent);
1096   }
1097
1098   if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1099       g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1100     guint i = 0, prev_len = 0, total_len = 0;
1101     gchar **cookies = NULL;
1102
1103     if (hdr_data->cookies != NULL)
1104       prev_len = g_strv_length (hdr_data->cookies);
1105
1106     if (GST_VALUE_HOLDS_ARRAY (value)) {
1107       total_len = gst_value_array_get_size (value) + prev_len;
1108       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1109
1110       for (i = 0; i < gst_value_array_get_size (value); i++) {
1111         GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1112             g_value_get_string (gst_value_array_get_value (value, i)));
1113         cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1114       }
1115     } else if (G_VALUE_HOLDS_STRING (value)) {
1116       total_len = 1 + prev_len;
1117       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1118
1119       GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1120           g_value_get_string (value));
1121       cookies[0] = g_value_dup_string (value);
1122     } else {
1123       GST_WARNING_OBJECT (demux, "%s field is not string or array",
1124           g_quark_to_string (field_id));
1125     }
1126
1127     if (cookies) {
1128       if (prev_len) {
1129         guint j;
1130         for (j = 0; j < prev_len; j++) {
1131           GST_DEBUG_OBJECT (demux,
1132               "Append existing cookie %s", hdr_data->cookies[j]);
1133           cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1134         }
1135       }
1136       cookies[total_len] = NULL;
1137
1138       g_strfreev (hdr_data->cookies);
1139       hdr_data->cookies = cookies;
1140     }
1141   }
1142
1143   if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1144     const gchar *referer = g_value_get_string (value);
1145     GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1146
1147     downloadhelper_set_referer (demux->download_helper, referer);
1148   }
1149
1150   /* Date header can be used to estimate server offset */
1151   if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1152     const gchar *http_date = g_value_get_string (value);
1153
1154     if (http_date) {
1155       GstDateTime *datetime =
1156           gst_adaptive_demux_util_parse_http_head_date (http_date);
1157
1158       if (datetime) {
1159         GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1160         gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1161
1162         GST_INFO_OBJECT (demux,
1163             "HTTP response Date %s", GST_STR_NULL (date_string));
1164         g_free (date_string);
1165
1166         gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1167
1168         g_date_time_unref (utc_now);
1169         gst_date_time_unref (datetime);
1170       }
1171     }
1172   }
1173
1174   return TRUE;
1175 }
1176
1177 static gboolean
1178 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1179     GstEvent * event)
1180 {
1181   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1182   gboolean ret;
1183
1184   switch (event->type) {
1185     case GST_EVENT_FLUSH_STOP:{
1186       GST_API_LOCK (demux);
1187       GST_MANIFEST_LOCK (demux);
1188
1189       gst_adaptive_demux_reset (demux);
1190
1191       ret = gst_pad_event_default (pad, parent, event);
1192
1193       GST_MANIFEST_UNLOCK (demux);
1194       GST_API_UNLOCK (demux);
1195
1196       return ret;
1197     }
1198     case GST_EVENT_EOS:
1199     {
1200       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1201         if (!handle_incoming_manifest (demux)) {
1202           GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1203           return gst_pad_event_default (pad, parent, event);
1204         }
1205         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1206       } else {
1207         GST_ERROR_OBJECT (demux,
1208             "Failed to acquire scheduler to handle manifest");
1209         return gst_pad_event_default (pad, parent, event);
1210       }
1211       gst_event_unref (event);
1212       return TRUE;
1213     }
1214     case GST_EVENT_STREAM_START:
1215       if (gst_event_parse_group_id (event, &demux->group_id))
1216         demux->have_group_id = TRUE;
1217       else
1218         demux->have_group_id = FALSE;
1219       /* Swallow stream-start, we'll push our own */
1220       gst_event_unref (event);
1221       return TRUE;
1222     case GST_EVENT_SEGMENT:
1223       /* Swallow newsegments, we'll push our own */
1224       gst_event_unref (event);
1225       return TRUE;
1226     case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1227       const GstStructure *structure = gst_event_get_structure (event);
1228       struct http_headers_collector c = { demux, NULL };
1229
1230       if (gst_structure_has_name (structure, "http-headers")) {
1231         if (gst_structure_has_field (structure, "request-headers")) {
1232           GstStructure *req_headers = NULL;
1233           gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1234               &req_headers, NULL);
1235           if (req_headers) {
1236             gst_structure_foreach (req_headers,
1237                 gst_adaptive_demux_handle_upstream_http_header, &c);
1238             gst_structure_free (req_headers);
1239           }
1240         }
1241         if (gst_structure_has_field (structure, "response-headers")) {
1242           GstStructure *res_headers = NULL;
1243           gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1244               &res_headers, NULL);
1245           if (res_headers) {
1246             gst_structure_foreach (res_headers,
1247                 gst_adaptive_demux_handle_upstream_http_header, &c);
1248             gst_structure_free (res_headers);
1249           }
1250         }
1251
1252         if (c.cookies)
1253           downloadhelper_set_cookies (demux->download_helper, c.cookies);
1254       }
1255       break;
1256     }
1257     default:
1258       break;
1259   }
1260
1261   return gst_pad_event_default (pad, parent, event);
1262 }
1263
1264 static GstFlowReturn
1265 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1266     GstBuffer * buffer)
1267 {
1268   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1269
1270   GST_MANIFEST_LOCK (demux);
1271
1272   gst_adapter_push (demux->priv->input_adapter, buffer);
1273
1274   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1275       (gint) gst_adapter_available (demux->priv->input_adapter));
1276
1277   GST_MANIFEST_UNLOCK (demux);
1278   return GST_FLOW_OK;
1279 }
1280
1281
1282 /* Called with TRACKS_LOCK taken */
1283 static void
1284 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1285 {
1286   GList *tmp;
1287
1288   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1289     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1290
1291     gst_adaptive_demux_track_flush (track);
1292     if (gst_pad_is_active (track->sinkpad)) {
1293       gst_pad_set_active (track->sinkpad, FALSE);
1294       gst_pad_set_active (track->sinkpad, TRUE);
1295     }
1296   }
1297 }
1298
1299 /* Resets all tracks to their initial state, ready to receive new data. */
1300 static void
1301 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1302 {
1303   TRACKS_LOCK (demux);
1304   g_queue_foreach (demux->priv->periods,
1305       (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1306   TRACKS_UNLOCK (demux);
1307 }
1308
1309 /* Subclasses will call this function to ensure that a new input period is
1310  * available to receive new streams and tracks */
1311 gboolean
1312 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1313 {
1314   if (demux->input_period && !demux->input_period->prepared) {
1315     GST_DEBUG_OBJECT (demux, "Using existing input period");
1316     return TRUE;
1317   }
1318
1319   if (demux->input_period) {
1320     GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1321     demux->input_period->has_next_period = TRUE;
1322   }
1323   GST_DEBUG_OBJECT (demux, "Setting up new period");
1324
1325   demux->input_period = gst_adaptive_demux_period_new (demux);
1326
1327   return TRUE;
1328 }
1329
1330 /* must be called with manifest_lock taken */
1331 static void
1332 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1333 {
1334   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1335   GList *iter;
1336
1337   gst_adaptive_demux_stop_tasks (demux, TRUE);
1338
1339   if (klass->reset)
1340     klass->reset (demux);
1341
1342   /* Disable and remove all outputs */
1343   GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1344   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1345     gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1346   }
1347   g_list_free (demux->priv->outputs);
1348   demux->priv->outputs = NULL;
1349
1350   g_queue_clear_full (demux->priv->periods,
1351       (GDestroyNotify) gst_adaptive_demux_period_unref);
1352
1353   /* The output period always has an extra ref taken on it */
1354   if (demux->output_period)
1355     gst_adaptive_demux_period_unref (demux->output_period);
1356   demux->output_period = NULL;
1357   /* The input period doesn't have an extra ref taken on it */
1358   demux->input_period = NULL;
1359
1360   gst_adaptive_demux_start_new_period (demux);
1361
1362   g_free (demux->manifest_uri);
1363   g_free (demux->manifest_base_uri);
1364   demux->manifest_uri = NULL;
1365   demux->manifest_base_uri = NULL;
1366
1367   gst_adapter_clear (demux->priv->input_adapter);
1368   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1369
1370   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1371   demux->instant_rate_multiplier = 1.0;
1372
1373   demux->priv->duration = GST_CLOCK_TIME_NONE;
1374
1375   demux->priv->percent = -1;
1376   demux->priv->is_buffering = TRUE;
1377
1378   demux->have_group_id = FALSE;
1379   demux->group_id = G_MAXUINT;
1380   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1381
1382   demux->priv->global_output_position = 0;
1383
1384   demux->priv->n_audio_streams = 0;
1385   demux->priv->n_video_streams = 0;
1386   demux->priv->n_subtitle_streams = 0;
1387
1388   gst_flow_combiner_reset (demux->priv->flowcombiner);
1389 }
1390
1391 static gboolean
1392 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1393 {
1394   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1395
1396   GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1397
1398   switch (GST_QUERY_TYPE (query)) {
1399     case GST_QUERY_BUFFERING:
1400     {
1401       GstFormat format;
1402       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1403
1404       if (!demux->output_period) {
1405         if (format != GST_FORMAT_TIME) {
1406           GST_DEBUG_OBJECT (demux,
1407               "No period setup yet, can't answer non-TIME buffering queries");
1408           return FALSE;
1409         }
1410
1411         GST_DEBUG_OBJECT (demux,
1412             "No period setup yet, but still answering buffering query");
1413         return TRUE;
1414       }
1415     }
1416     default:
1417       break;
1418   }
1419
1420   return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1421 }
1422
1423 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1424 static GstAdaptiveDemux2Stream *
1425 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1426 {
1427   GList *iter;
1428
1429   /* We only look in the streams of the input period (i.e. with active streams) */
1430   for (iter = demux->input_period->streams; iter; iter = iter->next) {
1431     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1432     if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1433       return stream;
1434     }
1435   }
1436
1437   return NULL;
1438 }
1439
1440 /* TRACKS_LOCK held */
1441 static GstAdaptiveDemuxTrack *
1442 gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
1443     GstStreamType stream_type)
1444 {
1445   GList *iter;
1446
1447   for (iter = stream->tracks; iter; iter = iter->next) {
1448     GstAdaptiveDemuxTrack *track = iter->data;
1449
1450     if (track->type == stream_type)
1451       return track;
1452   }
1453
1454   return NULL;
1455 }
1456
1457 /* MANIFEST and TRACKS lock held */
1458 static void
1459 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
1460     GstAdaptiveDemux2Stream * stream)
1461 {
1462   guint i;
1463
1464   GST_DEBUG_OBJECT (stream, "Updating track information from collection");
1465
1466   for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
1467       i++) {
1468     GstStream *gst_stream =
1469         gst_stream_collection_get_stream (stream->stream_collection, i);
1470     GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
1471     GstAdaptiveDemuxTrack *track;
1472
1473     if (stream_type == GST_STREAM_TYPE_UNKNOWN)
1474       continue;
1475     track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
1476     if (!track) {
1477       GST_DEBUG_OBJECT (stream,
1478           "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
1479           gst_stream);
1480       continue;
1481     }
1482
1483     if (track->upstream_stream_id)
1484       g_free (track->upstream_stream_id);
1485     track->upstream_stream_id =
1486         g_strdup (gst_stream_get_stream_id (gst_stream));
1487   }
1488
1489 }
1490
1491 static gboolean
1492 tags_have_language_info (GstTagList * tags)
1493 {
1494   const gchar *language = NULL;
1495
1496   if (tags == NULL)
1497     return FALSE;
1498
1499   if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
1500           &language))
1501     return TRUE;
1502   if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
1503           &language))
1504     return TRUE;
1505
1506   return FALSE;
1507 }
1508
1509 static gboolean
1510 can_handle_collection (GstAdaptiveDemux2Stream * stream,
1511     GstStreamCollection * collection)
1512 {
1513   guint i;
1514   guint nb_audio, nb_video, nb_text;
1515   gboolean have_audio_languages = TRUE;
1516   gboolean have_text_languages = TRUE;
1517
1518   nb_audio = nb_video = nb_text = 0;
1519
1520   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1521     GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
1522     GstTagList *tags = gst_stream_get_tags (gst_stream);
1523
1524     GST_DEBUG_OBJECT (stream,
1525         "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
1526     switch (gst_stream_get_stream_type (gst_stream)) {
1527       case GST_STREAM_TYPE_AUDIO:
1528         have_audio_languages &= tags_have_language_info (tags);
1529         nb_audio++;
1530         break;
1531       case GST_STREAM_TYPE_VIDEO:
1532         nb_video++;
1533         break;
1534       case GST_STREAM_TYPE_TEXT:
1535         have_text_languages &= tags_have_language_info (tags);
1536         nb_text++;
1537         break;
1538       default:
1539         break;
1540     }
1541   }
1542
1543   /* Check that we either have at most 1 of each track type, or that
1544    * we have language tags for each to tell which is which */
1545   if (nb_video > 1 ||
1546       (nb_audio > 1 && !have_audio_languages) ||
1547       (nb_text > 1 && !have_text_languages)) {
1548     GST_WARNING
1549         ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
1550         nb_audio, nb_video, nb_text);
1551     return FALSE;
1552   }
1553
1554   return TRUE;
1555 }
1556
1557 static void
1558 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1559     GstMessage * msg)
1560 {
1561   GstAdaptiveDemux2Stream *stream;
1562   GstStreamCollection *collection = NULL;
1563   gboolean pending_tracks_activated = FALSE;
1564
1565   GST_MANIFEST_LOCK (demux);
1566
1567   stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1568   if (stream == NULL) {
1569     GST_WARNING_OBJECT (demux,
1570         "Failed to locate stream for collection message");
1571     goto beach;
1572   }
1573
1574   gst_message_parse_stream_collection (msg, &collection);
1575   if (!collection)
1576     goto beach;
1577
1578   /* Check whether the collection is "sane" or not.
1579    *
1580    * In the context of adaptive streaming, we can only handle multiplexed
1581    * content that provides at most one stream of valid types (audio, video,
1582    * text). Without this we cannot reliably match the output of this multiplex
1583    * to the various tracks.
1584    *
1585    * FIXME : In the future and *IF* we encounter such streams, we could envision
1586    * supporting multiple streams of the same type if, and only if, they have
1587    * tags that allow differentiating them (ex: languages).
1588    */
1589   if (!can_handle_collection (stream, collection)) {
1590     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1591         (_("Stream format can't be handled")),
1592         ("The streams provided by the multiplex are ambiguous"));
1593     goto beach;
1594   }
1595
1596   /* Store the collection on the stream */
1597   gst_object_replace ((GstObject **) & stream->stream_collection,
1598       (GstObject *) collection);
1599
1600   /* IF there are pending tracks, ask the subclass to handle that */
1601   if (stream->pending_tracks) {
1602     GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1603     g_assert (demux_class->stream_update_tracks);
1604     demux_class->stream_update_tracks (demux, stream);
1605     TRACKS_LOCK (demux);
1606     stream->pending_tracks = FALSE;
1607     pending_tracks_activated = TRUE;
1608     if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1609         demux->input_period == demux->output_period)
1610       gst_adaptive_demux_post_collection (demux);
1611   } else {
1612     g_assert (stream->tracks);
1613     TRACKS_LOCK (demux);
1614     /* If we already have assigned tracks, update the pending upstream stream_id
1615      * for each of them based on the collection information. */
1616     gst_adaptive_demux2_stream_update_tracks (demux, stream);
1617   }
1618
1619   /* If we discovered pending tracks and we no longer have any, we can ensure
1620    * selected tracks are started */
1621   if (pending_tracks_activated
1622       && !gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1623     GList *iter = demux->input_period->streams;
1624     for (; iter; iter = iter->next) {
1625       GstAdaptiveDemux2Stream *new_stream = iter->data;
1626
1627       /* The stream that posted this collection was already started. If a
1628        * different stream is now selected, start it */
1629       if (stream != new_stream
1630           && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1631         gst_adaptive_demux2_stream_start (new_stream);
1632     }
1633   }
1634   TRACKS_UNLOCK (demux);
1635
1636 beach:
1637   GST_MANIFEST_UNLOCK (demux);
1638
1639   gst_message_unref (msg);
1640   msg = NULL;
1641 }
1642
1643 static void
1644 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1645 {
1646   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1647
1648   switch (GST_MESSAGE_TYPE (msg)) {
1649     case GST_MESSAGE_STREAM_COLLECTION:
1650     {
1651       gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1652       return;
1653     }
1654     case GST_MESSAGE_ERROR:{
1655       GstAdaptiveDemux2Stream *stream = NULL;
1656       GError *err = NULL;
1657       gchar *debug = NULL;
1658       gchar *new_error = NULL;
1659       const GstStructure *details = NULL;
1660
1661       GST_MANIFEST_LOCK (demux);
1662
1663       stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1664       if (stream == NULL) {
1665         GST_WARNING_OBJECT (demux,
1666             "Failed to locate stream for errored element");
1667         GST_MANIFEST_UNLOCK (demux);
1668         break;
1669       }
1670
1671       gst_message_parse_error (msg, &err, &debug);
1672
1673       GST_WARNING_OBJECT (demux,
1674           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1675           err->message, debug);
1676
1677       if (debug)
1678         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1679       if (new_error) {
1680         g_free (err->message);
1681         err->message = new_error;
1682       }
1683
1684       gst_message_parse_error_details (msg, &details);
1685       if (details) {
1686         gst_structure_get_uint (details, "http-status-code",
1687             &stream->last_status_code);
1688       }
1689
1690       /* error, but ask to retry */
1691       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1692         gst_adaptive_demux2_stream_parse_error (stream, err);
1693         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1694       }
1695
1696       g_error_free (err);
1697       g_free (debug);
1698
1699       GST_MANIFEST_UNLOCK (demux);
1700
1701       gst_message_unref (msg);
1702       msg = NULL;
1703     }
1704       break;
1705     default:
1706       break;
1707   }
1708
1709   if (msg)
1710     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1711 }
1712
1713 /* must be called with manifest_lock taken */
1714 GstClockTime
1715 gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1716     GstAdaptiveDemux2Stream * stream)
1717 {
1718   GstAdaptiveDemuxClass *klass;
1719
1720   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1721
1722   if (klass->get_presentation_offset == NULL)
1723     return 0;
1724
1725   return klass->get_presentation_offset (demux, stream);
1726 }
1727
1728 /* must be called with manifest_lock taken */
1729 GstClockTime
1730 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1731 {
1732   GstAdaptiveDemuxClass *klass;
1733
1734   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1735
1736   if (klass->get_period_start_time == NULL)
1737     return 0;
1738
1739   return klass->get_period_start_time (demux);
1740 }
1741
1742 /* must be called with manifest_lock taken */
1743 static gboolean
1744 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1745     gboolean first_and_live)
1746 {
1747   GList *iter;
1748   GstClockTime period_start;
1749   GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1750   GList *new_streams;
1751
1752   g_return_val_if_fail (demux->input_period->streams, FALSE);
1753   g_assert (demux->input_period->prepared == FALSE);
1754
1755   new_streams = demux->input_period->streams;
1756
1757   if (!gst_adaptive_demux2_is_running (demux)) {
1758     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1759     return TRUE;
1760   }
1761
1762   GST_DEBUG_OBJECT (demux,
1763       "Preparing %d streams for period %d , first_and_live:%d",
1764       g_list_length (new_streams), demux->input_period->period_num,
1765       first_and_live);
1766
1767   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1768     GstAdaptiveDemux2Stream *stream = iter->data;
1769
1770     GST_DEBUG_OBJECT (stream, "Preparing stream");
1771
1772     stream->need_header = TRUE;
1773     stream->discont = TRUE;
1774
1775     /* Grab the first stream time for live streams
1776      * * If the stream is selected
1777      * * Or it provides dynamic tracks (in which case we need to force an update)
1778      */
1779     if (first_and_live
1780         && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1781             || stream->pending_tracks)) {
1782       /* TODO we only need the first timestamp, maybe create a simple function to
1783        * get the current PTS of a fragment ? */
1784       GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1785       gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1786
1787       GST_DEBUG_OBJECT (stream,
1788           "Got stream time %" GST_STIME_FORMAT,
1789           GST_STIME_ARGS (stream->fragment.stream_time));
1790
1791       if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1792         min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1793       } else {
1794         min_stream_time = stream->fragment.stream_time;
1795       }
1796     }
1797   }
1798
1799   period_start = gst_adaptive_demux_get_period_start_time (demux);
1800
1801   /* For live streams, the subclass is supposed to seek to the current fragment
1802    * and then tell us its stream time in stream->fragment.stream_time.  We now
1803    * also have to seek our demuxer segment to reflect this.
1804    *
1805    * FIXME: This needs some refactoring at some point.
1806    */
1807   if (first_and_live) {
1808     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1809         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1810         GST_SEEK_TYPE_NONE, -1, NULL);
1811   }
1812
1813   GST_DEBUG_OBJECT (demux,
1814       "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1815       " demux segment %" GST_SEGMENT_FORMAT,
1816       GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1817       &demux->segment);
1818
1819   /* Synchronize stream start/current positions */
1820   if (min_stream_time == GST_CLOCK_STIME_NONE)
1821     min_stream_time = period_start;
1822   else
1823     min_stream_time += period_start;
1824   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1825     GstAdaptiveDemux2Stream *stream = iter->data;
1826     stream->start_position = stream->current_position = min_stream_time;
1827   }
1828
1829   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1830     GstAdaptiveDemux2Stream *stream = iter->data;
1831     stream->compute_segment = TRUE;
1832     stream->first_and_live = first_and_live;
1833   }
1834   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1835   demux->input_period->prepared = TRUE;
1836
1837   return TRUE;
1838 }
1839
1840 static GstAdaptiveDemuxTrack *
1841 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1842 {
1843   GList *tmp;
1844
1845   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1846     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1847     if (!g_strcmp0 (track->stream_id, stream_id))
1848       return track;
1849   }
1850
1851   return NULL;
1852 }
1853
1854 /* TRACKS_LOCK hold */
1855 void
1856 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1857 {
1858   GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1859   GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1860   GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1861   GList *tmp;
1862   gint min_percent = -1, percent;
1863   gboolean all_eos = TRUE;
1864
1865   /* Go over all active tracks of the output period and update level */
1866
1867   /* Check that all tracks are above their respective low thresholds (different
1868    * tracks may have different fragment durations yielding different buffering
1869    * percentages) Overall buffering percent is the lowest. */
1870   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1871     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1872
1873     GST_LOG_OBJECT (demux,
1874         "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1875         GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1876         track->stream_id, track->period_num, track->active, track->selected,
1877         track->eos, GST_TIME_ARGS (track->level_time),
1878         GST_TIME_ARGS (track->buffering_threshold));
1879
1880     if (track->active && track->selected) {
1881       if (!track->eos) {
1882         gint cur_percent;
1883
1884         all_eos = FALSE;
1885         if (min_level_time == GST_CLOCK_TIME_NONE) {
1886           min_level_time = track->level_time;
1887         } else if (track->level_time < min_level_time) {
1888           min_level_time = track->level_time;
1889         }
1890
1891         if (track->type & GST_STREAM_TYPE_VIDEO
1892             && video_level_time > track->level_time)
1893           video_level_time = track->level_time;
1894
1895         if (track->type & GST_STREAM_TYPE_AUDIO
1896             && audio_level_time > track->level_time)
1897           audio_level_time = track->level_time;
1898
1899         if (track->level_time != GST_CLOCK_TIME_NONE
1900             && track->buffering_threshold != 0) {
1901           cur_percent =
1902               gst_util_uint64_scale (track->level_time, 100,
1903               track->buffering_threshold);
1904           if (min_percent < 0 || cur_percent < min_percent)
1905             min_percent = cur_percent;
1906         }
1907       }
1908     }
1909   }
1910
1911   GST_DEBUG_OBJECT (demux,
1912       "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1913       GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1914
1915   /* Update demuxer video/audio level properties */
1916   GST_OBJECT_LOCK (demux);
1917   demux->current_level_time_video = video_level_time;
1918   demux->current_level_time_audio = audio_level_time;
1919   GST_OBJECT_UNLOCK (demux);
1920
1921   if (min_percent < 0 && !all_eos)
1922     return;
1923
1924   if (min_percent > 100 || all_eos)
1925     percent = 100;
1926   else
1927     percent = MAX (0, min_percent);
1928
1929   GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1930
1931   if (demux->priv->is_buffering) {
1932     if (percent >= 100)
1933       demux->priv->is_buffering = FALSE;
1934     if (demux->priv->percent != percent) {
1935       demux->priv->percent = percent;
1936       demux->priv->percent_changed = TRUE;
1937     }
1938   } else if (percent < 1) {
1939     demux->priv->is_buffering = TRUE;
1940     if (demux->priv->percent != percent) {
1941       demux->priv->percent = percent;
1942       demux->priv->percent_changed = TRUE;
1943     }
1944   }
1945
1946   if (demux->priv->percent_changed)
1947     GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1948         demux->priv->is_buffering);
1949 }
1950
1951 /* With TRACKS_LOCK held */
1952 void
1953 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1954 {
1955   gint percent;
1956   GstMessage *msg;
1957
1958   if (!demux->priv->percent_changed)
1959     return;
1960
1961   BUFFERING_LOCK (demux);
1962   percent = demux->priv->percent;
1963   msg = gst_message_new_buffering ((GstObject *) demux, percent);
1964   TRACKS_UNLOCK (demux);
1965   gst_element_post_message ((GstElement *) demux, msg);
1966
1967   BUFFERING_UNLOCK (demux);
1968   TRACKS_LOCK (demux);
1969   if (percent == demux->priv->percent)
1970     demux->priv->percent_changed = FALSE;
1971 }
1972
1973 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1974 GstAdaptiveDemux2Stream *
1975 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1976     GstAdaptiveDemuxTrack * track)
1977 {
1978   GList *iter;
1979
1980   for (iter = demux->output_period->streams; iter; iter = iter->next) {
1981     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1982     if (g_list_find (stream->tracks, track))
1983       return stream;
1984   }
1985
1986   return NULL;
1987 }
1988
1989 /* Called from seek handler
1990  *
1991  * This function is used when a (flushing) seek caused a new period to be activated.
1992  *
1993  * This will ensure that:
1994  * * the current output period is marked as finished (EOS)
1995  * * Any potential intermediate (non-input/non-output) periods are removed
1996  * * That the new input period is prepared and ready
1997  */
1998 static void
1999 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
2000 {
2001   GList *iter;
2002
2003   GST_DEBUG_OBJECT (demux,
2004       "Preparing new input period %u", demux->input_period->period_num);
2005
2006   /* Prepare the new input period */
2007   gst_adaptive_demux_update_collection (demux, demux->input_period);
2008
2009   /* Transfer the previous selection to the new input period */
2010   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
2011       demux->output_period);
2012   gst_adaptive_demux_prepare_streams (demux, FALSE);
2013
2014   /* Remove all periods except for the input (last) and output (first) period */
2015   while (demux->priv->periods->length > 2) {
2016     GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
2017     /* Mark all tracks of the removed period as not selected and EOS so they
2018      * will be skipped / ignored */
2019     for (iter = period->tracks; iter; iter = iter->next) {
2020       GstAdaptiveDemuxTrack *track = iter->data;
2021       track->selected = FALSE;
2022       track->eos = TRUE;
2023     }
2024     gst_adaptive_demux_period_unref (period);
2025   }
2026
2027   /* Mark all tracks of the output period as EOS so that the output loop
2028    * will immediately move to the new period */
2029   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2030     GstAdaptiveDemuxTrack *track = iter->data;
2031     track->eos = TRUE;
2032   }
2033
2034   /* Go over all slots, and clear any pending track */
2035   for (iter = demux->priv->outputs; iter; iter = iter->next) {
2036     OutputSlot *slot = (OutputSlot *) iter->data;
2037
2038     if (slot->pending_track != NULL) {
2039       GST_DEBUG_OBJECT (demux,
2040           "Removing track '%s' as pending from output of current track '%s'",
2041           slot->pending_track->stream_id, slot->track->stream_id);
2042       gst_adaptive_demux_track_unref (slot->pending_track);
2043       slot->pending_track = NULL;
2044     }
2045   }
2046 }
2047
2048 /* must be called with manifest_lock taken */
2049 gboolean
2050 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
2051     gint64 * range_start, gint64 * range_stop)
2052 {
2053   GstAdaptiveDemuxClass *klass;
2054
2055   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2056
2057   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
2058
2059   return klass->get_live_seek_range (demux, range_start, range_stop);
2060 }
2061
2062 /* must be called with manifest_lock taken */
2063 gboolean
2064 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
2065     GstAdaptiveDemux2Stream * stream)
2066 {
2067   gint64 range_start, range_stop;
2068   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
2069     GST_LOG_OBJECT (stream,
2070         "stream position %" GST_TIME_FORMAT "  live seek range %"
2071         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
2072         GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
2073         GST_STIME_ARGS (range_stop));
2074     return (stream->current_position >= range_start
2075         && stream->current_position <= range_stop);
2076   }
2077
2078   return FALSE;
2079 }
2080
2081 /* must be called with manifest_lock taken */
2082 static gboolean
2083 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
2084 {
2085   GstAdaptiveDemuxClass *klass;
2086
2087   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2088   if (gst_adaptive_demux_is_live (demux)) {
2089     return klass->get_live_seek_range != NULL;
2090   }
2091
2092   return klass->seek != NULL;
2093 }
2094
2095 static void
2096 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
2097     GstSeekType start_type, GstSeekType stop_type)
2098 {
2099   GList *iter;
2100
2101   for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2102     GstAdaptiveDemux2Stream *stream = iter->data;
2103
2104     /* Make sure the download loop clears and restarts on the next start,
2105      * which will recompute the stream segment */
2106     g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2107         stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
2108     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2109     stream->start_position = 0;
2110
2111     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
2112       stream->start_position = demux->segment.start;
2113     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
2114       stream->start_position = demux->segment.stop;
2115   }
2116 }
2117
2118 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
2119                               GST_SEEK_FLAG_SNAP_AFTER |          \
2120                               GST_SEEK_FLAG_SNAP_NEAREST |        \
2121                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
2122                               GST_SEEK_FLAG_KEY_UNIT))
2123 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
2124                               GST_SEEK_FLAG_SNAP_AFTER | \
2125                               GST_SEEK_FLAG_SNAP_NEAREST))
2126
2127 static gboolean
2128 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
2129     GstEvent * event)
2130 {
2131   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2132   gdouble rate;
2133   GstFormat format;
2134   GstSeekFlags flags;
2135   GstSeekType start_type, stop_type;
2136   gint64 start, stop;
2137   guint32 seqnum;
2138   gboolean update;
2139   gboolean ret;
2140   GstSegment oldsegment;
2141   GstEvent *flush_event;
2142
2143   GST_INFO_OBJECT (demux, "Received seek event");
2144
2145   GST_API_LOCK (demux);
2146
2147   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2148       &stop_type, &stop);
2149
2150   if (format != GST_FORMAT_TIME) {
2151     GST_API_UNLOCK (demux);
2152     GST_WARNING_OBJECT (demux,
2153         "Adaptive demuxers only support TIME-based seeking");
2154     gst_event_unref (event);
2155     return FALSE;
2156   }
2157
2158   if (flags & GST_SEEK_FLAG_SEGMENT) {
2159     GST_FIXME_OBJECT (demux, "Handle segment seeks");
2160     GST_API_UNLOCK (demux);
2161     gst_event_unref (event);
2162     return FALSE;
2163   }
2164
2165   seqnum = gst_event_get_seqnum (event);
2166
2167   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2168     GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2169     return FALSE;
2170   }
2171
2172   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2173     /* For instant rate seeks, reply directly and update
2174      * our segment so the new rate is reflected in any future
2175      * fragments */
2176     GstEvent *ev;
2177     gdouble rate_multiplier;
2178
2179     /* instant rate change only supported if direction does not change. All
2180      * other requirements are already checked before creating the seek event
2181      * but let's double-check here to be sure */
2182     if ((demux->segment.rate > 0 && rate < 0) ||
2183         (demux->segment.rate < 0 && rate > 0) ||
2184         start_type != GST_SEEK_TYPE_NONE ||
2185         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2186       GST_ERROR_OBJECT (demux,
2187           "Instant rate change seeks only supported in the "
2188           "same direction, without flushing and position change");
2189       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2190       GST_API_UNLOCK (demux);
2191       return FALSE;
2192     }
2193
2194     rate_multiplier = rate / demux->segment.rate;
2195
2196     ev = gst_event_new_instant_rate_change (rate_multiplier,
2197         (GstSegmentFlags) flags);
2198     gst_event_set_seqnum (ev, seqnum);
2199
2200     ret = gst_adaptive_demux_push_src_event (demux, ev);
2201
2202     if (ret) {
2203       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2204       demux->instant_rate_multiplier = rate_multiplier;
2205       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2206     }
2207
2208     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2209     GST_API_UNLOCK (demux);
2210     gst_event_unref (event);
2211
2212     return ret;
2213   }
2214
2215   if (!gst_adaptive_demux_can_seek (demux)) {
2216     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2217
2218     GST_API_UNLOCK (demux);
2219     gst_event_unref (event);
2220     return FALSE;
2221   }
2222
2223   /* We can only accept flushing seeks from this point onward */
2224   if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2225     GST_ERROR_OBJECT (demux,
2226         "Non-flushing non-instant-rate seeks are not possible");
2227
2228     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2229
2230     GST_API_UNLOCK (demux);
2231     gst_event_unref (event);
2232     return FALSE;
2233   }
2234
2235   if (gst_adaptive_demux_is_live (demux)) {
2236     gint64 range_start, range_stop;
2237     gboolean changed = FALSE;
2238     gboolean start_valid = TRUE, stop_valid = TRUE;
2239
2240     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2241             &range_stop)) {
2242       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2243       GST_API_UNLOCK (demux);
2244       gst_event_unref (event);
2245       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2246       return FALSE;
2247     }
2248
2249     GST_DEBUG_OBJECT (demux,
2250         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2251         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2252
2253     /* Handle relative positioning for live streams (relative to the range_stop) */
2254     if (start_type == GST_SEEK_TYPE_END) {
2255       start = range_stop + start;
2256       start_type = GST_SEEK_TYPE_SET;
2257       changed = TRUE;
2258     }
2259     if (stop_type == GST_SEEK_TYPE_END) {
2260       stop = range_stop + stop;
2261       stop_type = GST_SEEK_TYPE_SET;
2262       changed = TRUE;
2263     }
2264
2265     /* Adjust the requested start/stop position if it falls beyond the live
2266      * seek range.
2267      * The only case where we don't adjust is for the starting point of
2268      * an accurate seek (start if forward and stop if backwards)
2269      */
2270     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2271         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2272       GST_DEBUG_OBJECT (demux,
2273           "seek before live stream start, setting to range start: %"
2274           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2275       start = range_start;
2276       changed = TRUE;
2277     }
2278     /* truncate stop position also if set */
2279     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2280         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2281       GST_DEBUG_OBJECT (demux,
2282           "seek ending after live start, adjusting to: %"
2283           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2284       stop = range_stop;
2285       changed = TRUE;
2286     }
2287
2288     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2289         (start < range_start || start > range_stop)) {
2290       GST_WARNING_OBJECT (demux,
2291           "Seek to invalid position start:%" GST_STIME_FORMAT
2292           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2293           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2294           GST_STIME_ARGS (range_stop));
2295       start_valid = FALSE;
2296     }
2297     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2298         (stop < range_start || stop > range_stop)) {
2299       GST_WARNING_OBJECT (demux,
2300           "Seek to invalid position stop:%" GST_STIME_FORMAT
2301           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2302           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2303           GST_STIME_ARGS (range_stop));
2304       stop_valid = FALSE;
2305     }
2306
2307     /* If the seek position is still outside of the seekable range, refuse the seek */
2308     if (!start_valid || !stop_valid) {
2309       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2310       GST_API_UNLOCK (demux);
2311       gst_event_unref (event);
2312       return FALSE;
2313     }
2314
2315     /* Re-create seek event with changed/updated values */
2316     if (changed) {
2317       gst_event_unref (event);
2318       event =
2319           gst_event_new_seek (rate, format, flags,
2320           start_type, start, stop_type, stop);
2321       gst_event_set_seqnum (event, seqnum);
2322     }
2323   }
2324
2325   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2326
2327   /* have a backup in case seek fails */
2328   gst_segment_copy_into (&demux->segment, &oldsegment);
2329
2330   GST_DEBUG_OBJECT (demux, "sending flush start");
2331   flush_event = gst_event_new_flush_start ();
2332   gst_event_set_seqnum (flush_event, seqnum);
2333
2334   gst_adaptive_demux_push_src_event (demux, flush_event);
2335
2336   gst_adaptive_demux_stop_tasks (demux, FALSE);
2337   gst_adaptive_demux_reset_tracks (demux);
2338
2339   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2340
2341   /*
2342    * Handle snap seeks as follows:
2343    * 1) do the snap seeking a (random) active stream
2344    * 2) use the final position on this stream to seek
2345    *    on the other streams to the same position
2346    *
2347    * We can't snap at all streams at the same time as they might end in
2348    * different positions, so just pick one and align all others to that
2349    * position.
2350    */
2351   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek) {
2352     GstAdaptiveDemux2Stream *stream = NULL;
2353     GstClockTimeDiff ts;
2354     GstSeekFlags stream_seek_flags = flags;
2355     GList *iter;
2356
2357     /* snap-seek on the stream that received the event and then
2358      * use the resulting position to seek on all streams */
2359
2360     if (rate >= 0) {
2361       if (start_type != GST_SEEK_TYPE_NONE)
2362         ts = start;
2363       else {
2364         ts = gst_segment_position_from_running_time (&demux->segment,
2365             GST_FORMAT_TIME, demux->priv->global_output_position);
2366         start_type = GST_SEEK_TYPE_SET;
2367       }
2368     } else {
2369       if (stop_type != GST_SEEK_TYPE_NONE)
2370         ts = stop;
2371       else {
2372         stop_type = GST_SEEK_TYPE_SET;
2373         ts = gst_segment_position_from_running_time (&demux->segment,
2374             GST_FORMAT_TIME, demux->priv->global_output_position);
2375       }
2376     }
2377
2378     /* Pick a random active stream on which to do the stream seek */
2379     for (iter = demux->output_period->streams; iter; iter = iter->next) {
2380       GstAdaptiveDemux2Stream *cand = iter->data;
2381       if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2382         stream = cand;
2383         break;
2384       }
2385     }
2386     if (stream) {
2387       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2388     }
2389
2390     /* replace event with a new one without snapping to seek on all streams */
2391     gst_event_unref (event);
2392     if (rate >= 0) {
2393       start = ts;
2394     } else {
2395       stop = ts;
2396     }
2397     event =
2398         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2399         start_type, start, stop_type, stop);
2400     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2401   }
2402
2403   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2404       start, stop_type, stop, &update);
2405
2406   if (ret) {
2407     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2408
2409     ret = demux_class->seek (demux, event);
2410   }
2411
2412   if (!ret) {
2413     /* Is there anything else we can do if it fails? */
2414     gst_segment_copy_into (&oldsegment, &demux->segment);
2415   } else {
2416     demux->priv->segment_seqnum = seqnum;
2417   }
2418   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2419
2420   /* Resetting flow combiner */
2421   gst_flow_combiner_reset (demux->priv->flowcombiner);
2422
2423   GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2424   flush_event = gst_event_new_flush_stop (TRUE);
2425   gst_event_set_seqnum (flush_event, seqnum);
2426   gst_adaptive_demux_push_src_event (demux, flush_event);
2427
2428   /* If the seek generated a new period, prepare it */
2429   if (!demux->input_period->prepared) {
2430     /* This can only happen on flushing seeks */
2431     g_assert (flags & GST_SEEK_FLAG_FLUSH);
2432     gst_adaptive_demux_seek_to_input_period (demux);
2433   }
2434
2435   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2436   GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2437       &demux->segment);
2438   gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2439   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2440
2441   /* Reset the global output position (running time) for when the output loop restarts */
2442   demux->priv->global_output_position = 0;
2443
2444   /* After a flushing seek, any instant-rate override is undone */
2445   demux->instant_rate_multiplier = 1.0;
2446
2447   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2448
2449   /* Restart the demux */
2450   gst_adaptive_demux_start_tasks (demux);
2451
2452   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2453   GST_API_UNLOCK (demux);
2454   gst_event_unref (event);
2455
2456   return ret;
2457 }
2458
2459 /* Returns TRUE if the stream has at least one selected track */
2460 static gboolean
2461 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2462     stream)
2463 {
2464   GList *tmp;
2465
2466   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2467     GstAdaptiveDemuxTrack *track = tmp->data;
2468
2469     if (track->selected)
2470       return TRUE;
2471   }
2472
2473   return FALSE;
2474 }
2475
2476 static gboolean
2477 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2478     guint32 seqnum)
2479 {
2480   gboolean selection_handled = TRUE;
2481   GList *iter;
2482   GList *tracks = NULL;
2483
2484   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2485     return FALSE;
2486
2487   TRACKS_LOCK (demux);
2488   /* Validate the streams and fill:
2489    * tracks : list of tracks corresponding to requested streams
2490    */
2491   for (iter = streams; iter; iter = iter->next) {
2492     gchar *stream_id = (gchar *) iter->data;
2493     GstAdaptiveDemuxTrack *track;
2494
2495     GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2496     track = find_track_for_stream_id (demux->output_period, stream_id);
2497     if (!track) {
2498       GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2499       selection_handled = FALSE;
2500       goto select_streams_done;
2501     }
2502     tracks = g_list_append (tracks, track);
2503     GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2504   }
2505
2506   /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2507    * SCHEDULING THREAD */
2508
2509   /* FIXME: We want to iterate all streams, mark them as deselected,
2510    * then iterate tracks and mark any streams that have at least 1
2511    * active output track, then loop over all streams again and start/stop
2512    * them as needed */
2513
2514   /* Go over all tracks present and (de)select based on current selection */
2515   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2516     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2517
2518     if (track->selected && !g_list_find (tracks, track)) {
2519       GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2520           track->stream_id, track->active);
2521       track->selected = FALSE;
2522       track->draining = TRUE;
2523     } else if (!track->selected && g_list_find (tracks, track)) {
2524       GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2525
2526       track->selected = TRUE;
2527     }
2528   }
2529
2530   /* Start or stop streams based on the updated track selection */
2531   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2532     GstAdaptiveDemux2Stream *stream = iter->data;
2533     GList *trackiter;
2534
2535     gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2536     gboolean should_be_running =
2537         gst_adaptive_demux2_stream_has_selected_tracks (stream);
2538
2539     if (!is_running && should_be_running) {
2540       GstClockTime output_running_ts = demux->priv->global_output_position;
2541       GstClockTime start_position;
2542
2543       /* Calculate where we should start the stream, and then
2544        * start it. */
2545       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2546
2547       GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2548           GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2549           GST_TIME_ARGS (output_running_ts), &demux->segment);
2550
2551       start_position =
2552           gst_segment_position_from_running_time (&demux->segment,
2553           GST_FORMAT_TIME, output_running_ts);
2554
2555       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2556
2557       GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2558           GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2559
2560       stream->current_position = stream->start_position = start_position;
2561       stream->compute_segment = TRUE;
2562
2563       /* If output has already begun, ensure we seek this segment
2564        * to the correct restart position when the download loop begins */
2565       if (output_running_ts != 0)
2566         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2567
2568       /* Activate track pads for this stream */
2569       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2570         GstAdaptiveDemuxTrack *track =
2571             (GstAdaptiveDemuxTrack *) trackiter->data;
2572         gst_pad_set_active (track->sinkpad, TRUE);
2573       }
2574
2575       gst_adaptive_demux2_stream_start (stream);
2576     } else if (is_running && !should_be_running) {
2577       /* Stream should not be running and needs stopping */
2578       gst_adaptive_demux2_stream_stop (stream);
2579
2580       /* Set all track sinkpads to inactive for this stream */
2581       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2582         GstAdaptiveDemuxTrack *track =
2583             (GstAdaptiveDemuxTrack *) trackiter->data;
2584         gst_pad_set_active (track->sinkpad, FALSE);
2585       }
2586     }
2587   }
2588
2589   g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2590
2591 select_streams_done:
2592   demux_update_buffering_locked (demux);
2593   demux_post_buffering_locked (demux);
2594
2595   TRACKS_UNLOCK (demux);
2596   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2597
2598   if (tracks)
2599     g_list_free (tracks);
2600   return selection_handled;
2601 }
2602
2603 static gboolean
2604 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2605     GstEvent * event)
2606 {
2607   GstAdaptiveDemux *demux;
2608
2609   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2610
2611   switch (event->type) {
2612     case GST_EVENT_SEEK:
2613     {
2614       guint32 seqnum = gst_event_get_seqnum (event);
2615       if (seqnum == demux->priv->segment_seqnum) {
2616         GST_LOG_OBJECT (pad,
2617             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2618         gst_event_unref (event);
2619         return TRUE;
2620       }
2621       return gst_adaptive_demux_handle_seek_event (demux, event);
2622     }
2623     case GST_EVENT_LATENCY:{
2624       /* Upstream and our internal source are irrelevant
2625        * for latency, and we should not fail here to
2626        * configure the latency */
2627       gst_event_unref (event);
2628       return TRUE;
2629     }
2630     case GST_EVENT_QOS:{
2631       GstClockTimeDiff diff;
2632       GstClockTime timestamp;
2633       GstClockTime earliest_time;
2634
2635       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2636       /* Only take into account lateness if late */
2637       if (diff > 0)
2638         earliest_time = timestamp + 2 * diff;
2639       else
2640         earliest_time = timestamp;
2641
2642       GST_OBJECT_LOCK (demux);
2643       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2644           earliest_time > demux->priv->qos_earliest_time) {
2645         demux->priv->qos_earliest_time = earliest_time;
2646         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2647             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2648       }
2649       GST_OBJECT_UNLOCK (demux);
2650       break;
2651     }
2652     case GST_EVENT_SELECT_STREAMS:
2653     {
2654       GList *streams;
2655       gboolean selection_handled;
2656
2657       if (GST_EVENT_SEQNUM (event) ==
2658           g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2659         GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2660             GST_EVENT_SEQNUM (event));
2661         return TRUE;
2662       }
2663
2664       gst_event_parse_select_streams (event, &streams);
2665       selection_handled =
2666           handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2667       g_list_free_full (streams, g_free);
2668       return selection_handled;
2669     }
2670     default:
2671       break;
2672   }
2673
2674   return gst_pad_event_default (pad, parent, event);
2675 }
2676
2677 static gboolean
2678 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2679     GstQuery * query)
2680 {
2681   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2682   gboolean ret = FALSE;
2683
2684   if (query == NULL)
2685     return FALSE;
2686
2687
2688   switch (query->type) {
2689     case GST_QUERY_DURATION:{
2690       GstFormat fmt;
2691       GstClockTime duration = GST_CLOCK_TIME_NONE;
2692
2693       gst_query_parse_duration (query, &fmt, NULL);
2694
2695       if (gst_adaptive_demux_is_live (demux)) {
2696         /* We are able to answer this query: the duration is unknown */
2697         gst_query_set_duration (query, fmt, -1);
2698         ret = TRUE;
2699         break;
2700       }
2701
2702       if (fmt == GST_FORMAT_TIME
2703           && g_atomic_int_get (&demux->priv->have_manifest)) {
2704
2705         GST_MANIFEST_LOCK (demux);
2706         duration = demux->priv->duration;
2707         GST_MANIFEST_UNLOCK (demux);
2708
2709         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2710           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2711           ret = TRUE;
2712         }
2713       }
2714
2715       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2716           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2717       break;
2718     }
2719     case GST_QUERY_LATENCY:{
2720       gst_query_set_latency (query, FALSE, 0, -1);
2721       ret = TRUE;
2722       break;
2723     }
2724     case GST_QUERY_SEEKING:{
2725       GstFormat fmt;
2726       gint64 stop = -1;
2727       gint64 start = 0;
2728
2729       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2730         GST_INFO_OBJECT (demux,
2731             "Don't have manifest yet, can't answer seeking query");
2732         return FALSE;           /* can't answer without manifest */
2733       }
2734
2735       GST_MANIFEST_LOCK (demux);
2736
2737       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2738       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2739       if (fmt == GST_FORMAT_TIME) {
2740         GstClockTime duration;
2741         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2742
2743         ret = TRUE;
2744         if (can_seek) {
2745           if (gst_adaptive_demux_is_live (demux)) {
2746             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2747             if (!ret) {
2748               GST_MANIFEST_UNLOCK (demux);
2749               GST_INFO_OBJECT (demux, "can't answer seeking query");
2750               return FALSE;
2751             }
2752           } else {
2753             duration = demux->priv->duration;
2754             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2755               stop = duration;
2756           }
2757         }
2758         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2759         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2760             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2761             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2762       }
2763       GST_MANIFEST_UNLOCK (demux);
2764       break;
2765     }
2766     case GST_QUERY_URI:
2767
2768       GST_MANIFEST_LOCK (demux);
2769
2770       /* TODO HLS can answer this differently it seems */
2771       if (demux->manifest_uri) {
2772         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2773          * playlist or the the uri of the last downlowaded fragment? */
2774         gst_query_set_uri (query, demux->manifest_uri);
2775         ret = TRUE;
2776       }
2777
2778       GST_MANIFEST_UNLOCK (demux);
2779       break;
2780     case GST_QUERY_SELECTABLE:
2781       gst_query_set_selectable (query, TRUE);
2782       ret = TRUE;
2783       break;
2784     default:
2785       /* Don't forward queries upstream because of the special nature of this
2786        *  "demuxer", which relies on the upstream element only to be fed
2787        *  the Manifest
2788        */
2789       break;
2790   }
2791
2792   return ret;
2793 }
2794
2795 gboolean
2796 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2797 {
2798   GstEvent *seek;
2799
2800   GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2801
2802   seek =
2803       gst_event_new_seek (1.0, GST_FORMAT_TIME,
2804       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2805       GST_SEEK_TYPE_NONE, 0);
2806   gst_adaptive_demux_handle_seek_event (demux, seek);
2807   return FALSE;
2808 }
2809
2810
2811 /* Called when the scheduler starts, to kick off manifest updates
2812  * and stream downloads */
2813 static gboolean
2814 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2815 {
2816   GList *iter;
2817
2818   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2819
2820   iter = demux->input_period->streams;
2821
2822   for (; iter; iter = g_list_next (iter)) {
2823     GstAdaptiveDemux2Stream *stream = iter->data;
2824
2825     /* If we need to process this stream to discover tracks *OR* it has any
2826      * tracks which are selected, start it now */
2827     if ((stream->pending_tracks == TRUE)
2828         || gst_adaptive_demux2_stream_is_selected_locked (stream))
2829       gst_adaptive_demux2_stream_start (stream);
2830   }
2831
2832   return FALSE;
2833 }
2834
2835 /* must be called with manifest_lock taken */
2836 static void
2837 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2838 {
2839   if (!gst_adaptive_demux2_is_running (demux)) {
2840     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2841     return;
2842   }
2843
2844   GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2845   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2846       (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2847
2848   TRACKS_LOCK (demux);
2849   demux->priv->flushing = FALSE;
2850   GST_DEBUG_OBJECT (demux, "Starting the output task");
2851   gst_task_start (demux->priv->output_task);
2852   TRACKS_UNLOCK (demux);
2853 }
2854
2855 /* must be called with manifest_lock taken */
2856 static void
2857 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2858 {
2859   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2860   if (demux->priv->manifest_updates_cb != 0) {
2861     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2862         demux->priv->manifest_updates_cb);
2863     demux->priv->manifest_updates_cb = 0;
2864   }
2865 }
2866
2867 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2868
2869 /* must be called with manifest_lock taken */
2870 static void
2871 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2872 {
2873   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2874
2875   if (gst_adaptive_demux_is_live (demux)) {
2876     /* Task to periodically update the manifest */
2877     if (demux_class->requires_periodical_playlist_update (demux)) {
2878       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2879       if (demux->priv->manifest_updates_cb == 0) {
2880         demux->priv->manifest_updates_cb =
2881             gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2882             (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2883       }
2884     }
2885   }
2886 }
2887
2888 /* must be called with manifest_lock taken
2889  * This function will temporarily release manifest_lock in order to join the
2890  * download threads.
2891  * The api_lock will still protect it against other threads trying to modify
2892  * the demux element.
2893  */
2894 static void
2895 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2896 {
2897   GST_LOG_OBJECT (demux, "Stopping tasks");
2898
2899   if (stop_updates)
2900     gst_adaptive_demux_stop_manifest_update_task (demux);
2901
2902   TRACKS_LOCK (demux);
2903   demux->priv->flushing = TRUE;
2904   g_cond_signal (&demux->priv->tracks_add);
2905   gst_task_stop (demux->priv->output_task);
2906   TRACKS_UNLOCK (demux);
2907
2908   gst_task_join (demux->priv->output_task);
2909
2910   if (demux->input_period)
2911     gst_adaptive_demux_period_stop_tasks (demux->input_period);
2912
2913   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2914 }
2915
2916 /* must be called with manifest_lock taken */
2917 static gboolean
2918 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2919 {
2920   GList *iter;
2921   gboolean ret = TRUE;
2922
2923   GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2924
2925   TRACKS_LOCK (demux);
2926   for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2927     OutputSlot *slot = (OutputSlot *) iter->data;
2928     gst_event_ref (event);
2929     GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2930     ret = ret & gst_pad_push_event (slot->pad, event);
2931     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2932       slot->pushed_timed_data = FALSE;
2933   }
2934   TRACKS_UNLOCK (demux);
2935   gst_event_unref (event);
2936   return ret;
2937 }
2938
2939 /* must be called with manifest_lock taken */
2940 void
2941 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2942     GstCaps * caps)
2943 {
2944   GST_DEBUG_OBJECT (stream,
2945       "setting new caps for stream %" GST_PTR_FORMAT, caps);
2946   gst_caps_replace (&stream->pending_caps, caps);
2947   gst_caps_unref (caps);
2948 }
2949
2950 /* must be called with manifest_lock taken */
2951 void
2952 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2953     GstTagList * tags)
2954 {
2955   GST_DEBUG_OBJECT (stream,
2956       "setting new tags for stream %" GST_PTR_FORMAT, tags);
2957   if (stream->pending_tags) {
2958     gst_tag_list_unref (stream->pending_tags);
2959   }
2960   stream->pending_tags = tags;
2961 }
2962
2963 /* must be called with manifest_lock taken */
2964 void
2965 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2966     GstEvent * event)
2967 {
2968   stream->pending_events = g_list_append (stream->pending_events, event);
2969 }
2970
2971 static guint64
2972 _update_average_bitrate (GstAdaptiveDemux * demux,
2973     GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
2974 {
2975   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2976
2977   stream->moving_bitrate -= stream->fragment_bitrates[index];
2978   stream->fragment_bitrates[index] = new_bitrate;
2979   stream->moving_bitrate += new_bitrate;
2980
2981   stream->moving_index += 1;
2982
2983   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2984     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2985   return stream->moving_bitrate / stream->moving_index;
2986 }
2987
2988 static guint64
2989 gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2990     GstAdaptiveDemux2Stream * stream)
2991 {
2992   guint64 average_bitrate;
2993   guint64 fragment_bitrate;
2994   guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
2995
2996   fragment_bitrate = stream->last_bitrate;
2997   GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2998       fragment_bitrate);
2999
3000   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
3001
3002   GST_INFO_OBJECT (stream,
3003       "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
3004   GST_INFO_OBJECT (stream,
3005       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
3006       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
3007
3008   /* Conservative approach, make sure we don't upgrade too fast */
3009   GST_OBJECT_LOCK (demux);
3010   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
3011
3012   /* If this is the/a video stream update the overall demuxer
3013    * reported bitrate and notify, to give the application a
3014    * chance to choose a new connection-bitrate */
3015   if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
3016     demux->current_download_rate = stream->current_download_rate;
3017     GST_OBJECT_UNLOCK (demux);
3018     g_object_notify (G_OBJECT (demux), "current-bandwidth");
3019     GST_OBJECT_LOCK (demux);
3020   }
3021
3022   connection_speed = demux->connection_speed;
3023   min_bitrate = demux->min_bitrate;
3024   max_bitrate = demux->max_bitrate;
3025   GST_OBJECT_UNLOCK (demux);
3026
3027   if (connection_speed) {
3028     GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
3029         connection_speed / 1000);
3030     return connection_speed;
3031   }
3032
3033   /* No explicit connection_speed, so choose the new variant to use as a
3034    * fraction of the measured download rate */
3035   target_download_rate =
3036       CLAMP (stream->current_download_rate, 0,
3037       G_MAXUINT) * demux->bandwidth_target_ratio;
3038
3039   GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
3040       demux->bandwidth_target_ratio, target_download_rate);
3041
3042 #if 0
3043   /* Debugging code, modulate the bitrate every few fragments */
3044   {
3045     static guint ctr = 0;
3046     if (ctr % 3 == 0) {
3047       GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
3048       target_download_rate /= 2;
3049     }
3050     ctr++;
3051   }
3052 #endif
3053
3054   if (min_bitrate > 0 && target_download_rate < min_bitrate) {
3055     target_download_rate = min_bitrate;
3056     GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
3057         min_bitrate);
3058   }
3059
3060   if (max_bitrate > 0 && target_download_rate > max_bitrate) {
3061     target_download_rate = max_bitrate;
3062     GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
3063         max_bitrate);
3064   }
3065
3066   GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
3067       target_download_rate);
3068
3069   return target_download_rate;
3070 }
3071
3072 /* must be called with manifest_lock taken */
3073 static GstFlowReturn
3074 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
3075     GstAdaptiveDemux2Stream * stream)
3076 {
3077   /* No need to advance, this isn't a real fragment */
3078   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
3079     return GST_FLOW_OK;
3080
3081   return gst_adaptive_demux2_stream_advance_fragment (demux, stream,
3082       stream->fragment.duration);
3083 }
3084
3085 /* must be called with manifest_lock taken.
3086  * Can temporarily release manifest_lock
3087  */
3088 static GstFlowReturn
3089 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
3090     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
3091 {
3092   return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
3093 }
3094
3095 static gboolean
3096 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
3097     * demux)
3098 {
3099   return TRUE;
3100 }
3101
3102 /* Called when a stream needs waking after the manifest is updated */
3103 void
3104 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
3105 {
3106   demux->priv->stream_waiting_for_manifest = TRUE;
3107 }
3108
3109 static gboolean
3110 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
3111 {
3112   GstFlowReturn ret = GST_FLOW_OK;
3113   gboolean schedule_again = TRUE;
3114
3115   GST_MANIFEST_LOCK (demux);
3116   demux->priv->manifest_updates_cb = 0;
3117
3118   /* Updating playlist only needed for live playlists */
3119   if (!gst_adaptive_demux_is_live (demux)) {
3120     GST_MANIFEST_UNLOCK (demux);
3121     return G_SOURCE_REMOVE;
3122   }
3123
3124   GST_DEBUG_OBJECT (demux, "Updating playlist");
3125   ret = gst_adaptive_demux_update_manifest (demux);
3126
3127   if (ret == GST_FLOW_EOS) {
3128     GST_MANIFEST_UNLOCK (demux);
3129     return G_SOURCE_REMOVE;
3130   }
3131
3132   if (ret == GST_FLOW_OK) {
3133     GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
3134     demux->priv->update_failed_count = 0;
3135
3136     /* Wake up download tasks */
3137     if (demux->priv->stream_waiting_for_manifest) {
3138       GList *iter;
3139
3140       for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
3141         GstAdaptiveDemux2Stream *stream = iter->data;
3142         gst_adaptive_demux2_stream_on_manifest_update (stream);
3143       }
3144       demux->priv->stream_waiting_for_manifest = FALSE;
3145     }
3146   } else {
3147     demux->priv->update_failed_count++;
3148
3149     if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
3150       GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
3151           gst_flow_get_name (ret));
3152     } else {
3153       GST_ELEMENT_ERROR (demux, STREAM, FAILED,
3154           (_("Internal data stream error.")), ("Could not update playlist"));
3155       GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
3156       schedule_again = FALSE;
3157     }
3158   }
3159
3160   if (schedule_again) {
3161     GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3162
3163     demux->priv->manifest_updates_cb =
3164         gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3165         klass->get_manifest_update_interval (demux) * GST_USECOND,
3166         (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3167   }
3168
3169   GST_MANIFEST_UNLOCK (demux);
3170
3171   return G_SOURCE_REMOVE;
3172 }
3173
3174 static gboolean
3175 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
3176 {
3177   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3178
3179   /* Loop for updating of the playlist. This periodically checks if
3180    * the playlist is updated and does so, then signals the streaming
3181    * thread in case it can continue downloading now. */
3182
3183   /* block until the next scheduled update or the signal to quit this thread */
3184   GST_DEBUG_OBJECT (demux, "Started updates task");
3185   demux->priv->manifest_updates_cb =
3186       gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3187       klass->get_manifest_update_interval (demux) * GST_USECOND,
3188       (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3189
3190   return G_SOURCE_REMOVE;
3191 }
3192
3193 static OutputSlot *
3194 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
3195     GstAdaptiveDemuxTrack * track)
3196 {
3197   GList *tmp;
3198
3199   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3200     OutputSlot *slot = (OutputSlot *) tmp->data;
3201     /* Incompatible output type */
3202     if (slot->type != track->type)
3203       continue;
3204
3205     /* Slot which is already assigned to this pending track */
3206     if (slot->pending_track == track)
3207       return slot;
3208
3209     /* slot already used for another pending track */
3210     if (slot->pending_track != NULL)
3211       continue;
3212
3213     /* Current output track is of the same type and is draining */
3214     if (slot->track && slot->track->draining)
3215       return slot;
3216   }
3217
3218   return NULL;
3219 }
3220
3221 /* TRACKS_LOCK taken */
3222 static OutputSlot *
3223 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3224 {
3225   GList *tmp;
3226
3227   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3228     OutputSlot *slot = (OutputSlot *) tmp->data;
3229
3230     if (slot->track == track)
3231       return slot;
3232   }
3233
3234   return NULL;
3235 }
3236
3237 /* TRACKS_LOCK held */
3238 static GstMessage *
3239 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3240 {
3241   GList *tmp;
3242   GstMessage *msg;
3243
3244   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3245     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3246
3247     if (track->selected && !track->active)
3248       return NULL;
3249   }
3250
3251   /* All selected tracks are active, created message */
3252   msg =
3253       gst_message_new_streams_selected (GST_OBJECT (demux),
3254       demux->output_period->collection);
3255   GST_MESSAGE_SEQNUM (msg) = seqnum;
3256   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3257     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3258     if (track->active) {
3259       gst_message_streams_selected_add (msg, track->stream_object);
3260     }
3261   }
3262
3263   return msg;
3264 }
3265
3266 static void
3267 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3268     OutputSlot * slot)
3269 {
3270   GstAdaptiveDemuxTrack *track = slot->track;
3271   GstEvent *event;
3272
3273   /* Send EVENT_STREAM_START */
3274   event = gst_event_new_stream_start (track->stream_id);
3275   if (demux->have_group_id)
3276     gst_event_set_group_id (event, demux->group_id);
3277   gst_event_set_stream_flags (event, track->flags);
3278   gst_event_set_stream (event, track->stream_object);
3279   GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3280       track->stream_id);
3281   gst_pad_push_event (slot->pad, event);
3282
3283   /* Send EVENT_STREAM_COLLECTION */
3284   event = gst_event_new_stream_collection (demux->output_period->collection);
3285   GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3286       track->stream_id);
3287   gst_pad_push_event (slot->pad, event);
3288
3289   /* Mark all sticky events for re-sending */
3290   gst_event_store_mark_all_undelivered (&track->sticky_events);
3291 }
3292
3293 /*
3294  * Called with TRACKS_LOCK taken
3295  */
3296 static void
3297 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3298 {
3299   GList *tmp;
3300   guint requested_selection_seqnum;
3301   GstMessage *msg;
3302
3303   /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3304      output slots vs active/draining tracks */
3305   requested_selection_seqnum =
3306       g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3307
3308   if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3309     return;
3310
3311   GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3312
3313   /* Go over all slots, and if they have a pending track that's no longer
3314    * selected, clear it so the slot can be reused */
3315   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3316     OutputSlot *slot = (OutputSlot *) tmp->data;
3317
3318     if (slot->pending_track != NULL && !slot->pending_track->selected) {
3319       GST_DEBUG_OBJECT (demux,
3320           "Removing deselected track '%s' as pending from output of current track '%s'",
3321           slot->pending_track->stream_id, slot->track->stream_id);
3322       gst_adaptive_demux_track_unref (slot->pending_track);
3323       slot->pending_track = NULL;
3324     }
3325   }
3326
3327   /* Go over all tracks and create/re-assign/remove slots */
3328   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3329     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3330
3331     if (track->selected) {
3332       OutputSlot *slot = find_slot_for_track (demux, track);
3333
3334       /* 0. Track is selected and has a slot. Nothing to do */
3335       if (slot) {
3336         GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3337             track->stream_id);
3338         continue;
3339       }
3340
3341       slot = find_replacement_slot_for_track (demux, track);
3342       if (slot) {
3343         /* 1. There is an existing slot of the same type which is currently
3344          *    draining, assign this track as a replacement for it */
3345         g_assert (slot->pending_track == NULL || slot->pending_track == track);
3346         if (slot->pending_track == NULL) {
3347           slot->pending_track = gst_adaptive_demux_track_ref (track);
3348           GST_DEBUG_OBJECT (demux,
3349               "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3350               track->stream_id, track->period_num,
3351               slot->track->stream_id, slot->track->period_num);
3352         }
3353       } else {
3354         /* 2. There is no compatible replacement slot, create a new one */
3355         slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3356         GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3357             track->stream_id);
3358         demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3359
3360         track->update_next_segment = TRUE;
3361
3362         slot->track = gst_adaptive_demux_track_ref (track);
3363         track->active = TRUE;
3364         gst_adaptive_demux_send_initial_events (demux, slot);
3365       }
3366
3367       /* If we were draining this track, we no longer are */
3368       track->draining = FALSE;
3369     }
3370   }
3371
3372   /* Finally check all slots have a current/pending track. If not remove it */
3373   for (tmp = demux->priv->outputs; tmp;) {
3374     OutputSlot *slot = (OutputSlot *) tmp->data;
3375     /* We should never has slots without target tracks */
3376     g_assert (slot->track);
3377     if (slot->track->draining && !slot->pending_track) {
3378       GstAdaptiveDemux2Stream *stream;
3379
3380       GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3381           slot->track->stream_id);
3382       slot->track->active = FALSE;
3383
3384       /* If the stream feeding this track is stopped, flush and clear
3385        * the track now that it's going inactive. If the stream was not
3386        * found, it means we advanced past that period already (and the
3387        * stream was stopped and discarded) */
3388       stream = find_stream_for_track_locked (demux, slot->track);
3389       if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3390         gst_adaptive_demux_track_flush (slot->track);
3391
3392       tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3393       gst_adaptive_demux_output_slot_free (demux, slot);
3394     } else
3395       tmp = tmp->next;
3396   }
3397
3398   demux->priv->current_selection_seqnum = requested_selection_seqnum;
3399   msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3400   if (msg) {
3401     TRACKS_UNLOCK (demux);
3402     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3403     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3404     TRACKS_LOCK (demux);
3405   }
3406 }
3407
3408 /* TRACKS_LOCK held */
3409 static gboolean
3410 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3411 {
3412   GList *iter;
3413   GstAdaptiveDemuxPeriod *previous_period;
3414   GstStreamCollection *collection;
3415
3416   /* Grab the next period, should be demux->periods->next->data */
3417   previous_period = g_queue_pop_head (demux->priv->periods);
3418
3419   /* Remove ref held by demux->output_period */
3420   gst_adaptive_demux_period_unref (previous_period);
3421   demux->output_period =
3422       gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3423
3424   GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3425       demux->output_period->period_num);
3426
3427   /* We can now post the collection of the new period */
3428   collection = demux->output_period->collection;
3429   TRACKS_UNLOCK (demux);
3430   gst_element_post_message (GST_ELEMENT_CAST (demux),
3431       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3432   TRACKS_LOCK (demux);
3433
3434   /* Unselect all tracks of the previous period */
3435   for (iter = previous_period->tracks; iter; iter = iter->next) {
3436     GstAdaptiveDemuxTrack *track = iter->data;
3437     if (track->selected) {
3438       track->selected = FALSE;
3439       track->draining = TRUE;
3440     }
3441   }
3442
3443   /* Force a selection re-check */
3444   g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3445   check_and_handle_selection_update_locked (demux);
3446
3447   /* Remove the final ref on the previous period now that we have done the switch */
3448   gst_adaptive_demux_period_unref (previous_period);
3449
3450   return TRUE;
3451 }
3452
3453 /* Called with TRACKS_LOCK taken */
3454 static void
3455 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3456     OutputSlot * slot)
3457 {
3458   GstAdaptiveDemuxTrack *track = slot->track;
3459   GstMessage *msg;
3460   gboolean pending_is_ready;
3461   GstAdaptiveDemux2Stream *stream;
3462
3463   /* If we have a pending track for this slot, the current track should be
3464    * draining and no longer selected */
3465   g_assert (track->draining && !track->selected);
3466
3467   /* If we're draining, check if the pending track has enough data *or* that
3468      we've already drained out entirely */
3469   pending_is_ready =
3470       (slot->pending_track->level_time >=
3471       slot->pending_track->buffering_threshold);
3472   pending_is_ready |= slot->pending_track->eos;
3473
3474   if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3475     GST_DEBUG_OBJECT (demux,
3476         "Replacement track '%s' doesn't have enough data for switching yet",
3477         slot->pending_track->stream_id);
3478     return;
3479   }
3480
3481   GST_DEBUG_OBJECT (demux,
3482       "Pending replacement track has enough data, switching");
3483   track->active = FALSE;
3484   track->draining = FALSE;
3485
3486   /* If the stream feeding this track is stopped, flush and clear
3487    * the track now that it's going inactive. If the stream was not
3488    * found, it means we advanced past that period already (and the
3489    * stream was stopped and discarded) */
3490   stream = find_stream_for_track_locked (demux, track);
3491   if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3492     gst_adaptive_demux_track_flush (track);
3493
3494   gst_adaptive_demux_track_unref (track);
3495   /* We steal the reference of pending_track */
3496   track = slot->track = slot->pending_track;
3497   slot->pending_track = NULL;
3498   slot->track->active = TRUE;
3499
3500   /* Make sure the track segment will start at the current position */
3501   track->update_next_segment = TRUE;
3502
3503   /* Send stream start and collection, and schedule sticky events */
3504   gst_adaptive_demux_send_initial_events (demux, slot);
3505
3506   /* Can we emit the streams-selected message now ? */
3507   msg =
3508       all_selected_tracks_are_active (demux,
3509       g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3510   if (msg) {
3511     TRACKS_UNLOCK (demux);
3512     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3513     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3514     TRACKS_LOCK (demux);
3515   }
3516
3517 }
3518
3519 static void
3520 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3521 {
3522   GList *tmp;
3523   GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3524   gboolean wait_for_data = FALSE;
3525   gboolean all_tracks_empty;
3526   GstFlowReturn ret;
3527
3528   GST_DEBUG_OBJECT (demux, "enter");
3529
3530   TRACKS_LOCK (demux);
3531
3532   /* Check if stopping */
3533   if (demux->priv->flushing) {
3534     ret = GST_FLOW_FLUSHING;
3535     goto pause;
3536   }
3537
3538   /* If the selection changed, handle it */
3539   check_and_handle_selection_update_locked (demux);
3540
3541 restart:
3542   ret = GST_FLOW_OK;
3543   global_output_position = GST_CLOCK_STIME_NONE;
3544   all_tracks_empty = TRUE;
3545
3546   if (wait_for_data) {
3547     GST_DEBUG_OBJECT (demux, "Waiting for data");
3548     g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3549     GST_DEBUG_OBJECT (demux, "Done waiting for data");
3550     if (demux->priv->flushing) {
3551       ret = GST_FLOW_FLUSHING;
3552       goto pause;
3553     }
3554     wait_for_data = FALSE;
3555   }
3556
3557   /* Grab/Recalculate current global output position
3558    * This is the minimum pending output position of all tracks used for output
3559    *
3560    * If there is a track which is empty and not EOS, wait for it to receive data
3561    * then recalculate global output position.
3562    *
3563    * This also pushes downstream all non-timed data that might be present.
3564    *
3565    * IF all tracks are EOS : stop task
3566    */
3567   GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3568   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3569     OutputSlot *slot = (OutputSlot *) tmp->data;
3570     GstAdaptiveDemuxTrack *track;
3571
3572     /* If there is a pending track, Check if it's time to switch to it */
3573     if (slot->pending_track)
3574       handle_slot_pending_track_switch_locked (demux, slot);
3575
3576     track = slot->track;
3577
3578     if (!track->active) {
3579       /* Note: Edward: I can't see in what cases we would end up with inactive
3580          tracks assigned to slots. */
3581       GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3582       g_assert (track->active);
3583       continue;
3584     }
3585
3586     if (track->next_position == GST_CLOCK_STIME_NONE) {
3587       gst_adaptive_demux_track_update_next_position (track);
3588     }
3589
3590     GST_TRACE_OBJECT (demux,
3591         "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3592         track->stream_id, track->period_num,
3593         GST_STIME_ARGS (track->next_position));
3594
3595     if (track->next_position != GST_CLOCK_STIME_NONE) {
3596       if (global_output_position == GST_CLOCK_STIME_NONE)
3597         global_output_position = track->next_position;
3598       else
3599         global_output_position =
3600             MIN (global_output_position, track->next_position);
3601       track->waiting_add = FALSE;
3602       all_tracks_empty = FALSE;
3603     } else if (!track->eos) {
3604       GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3605           track->stream_id, track->period_num);
3606       all_tracks_empty = FALSE;
3607       wait_for_data = track->waiting_add = TRUE;
3608     } else {
3609       GST_DEBUG_OBJECT (demux,
3610           "Track %s (period %u) is EOS, not waiting for timed data",
3611           track->stream_id, track->period_num);
3612
3613       if (gst_queue_array_get_length (track->queue) > 0) {
3614         all_tracks_empty = FALSE;
3615       }
3616     }
3617   }
3618
3619   if (wait_for_data)
3620     goto restart;
3621
3622   if (all_tracks_empty && demux->output_period->has_next_period) {
3623     GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3624         demux->output_period->period_num);
3625     if (!gst_adaptive_demux_advance_output_period (demux)) {
3626       /* Failed to move to next period, error out */
3627       ret = GST_FLOW_ERROR;
3628       goto pause;
3629     }
3630     /* Restart the loop */
3631     goto restart;
3632   }
3633
3634   GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3635       GST_STIME_ARGS (global_output_position));
3636
3637   /* For each track:
3638    *
3639    * We know all active tracks have pending timed data
3640    * * while track next_position <= global output position
3641    *   * push pending data
3642    *   * Update track next_position
3643    *     * recalculate global output position
3644    *   * Pop next pending data from track and update pending position
3645    *
3646    */
3647   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3648     OutputSlot *slot = (OutputSlot *) tmp->data;
3649     GstAdaptiveDemuxTrack *track = slot->track;
3650
3651     GST_LOG_OBJECT (track->element,
3652         "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3653         " global_output_position:%" GST_STIME_FORMAT, track->active,
3654         track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3655         GST_STIME_ARGS (global_output_position));
3656
3657     if (!track->active)
3658       continue;
3659
3660     while (global_output_position == GST_CLOCK_STIME_NONE
3661         || !slot->pushed_timed_data
3662         || ((track->next_position != GST_CLOCK_STIME_NONE)
3663             && track->next_position <= global_output_position)
3664         || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3665       GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3666
3667       if (!mo) {
3668         GST_DEBUG_OBJECT (demux,
3669             "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3670             track->stream_id, track->period_num, track->eos,
3671             slot->pushed_timed_data);
3672         /* This should only happen if the track is EOS, or exactly in between
3673          * the parser outputting segment/caps before buffers. */
3674         g_assert (track->eos || !slot->pushed_timed_data);
3675
3676         /* If we drained the track, but there's a pending track on the slot
3677          * loop again to activate it */
3678         if (slot->pending_track) {
3679           GST_DEBUG_OBJECT (demux,
3680               "Track '%s' (period %u) drained, but has a pending track to activate",
3681               track->stream_id, track->period_num);
3682           goto restart;
3683         }
3684         break;
3685       }
3686
3687       demux_update_buffering_locked (demux);
3688       demux_post_buffering_locked (demux);
3689       TRACKS_UNLOCK (demux);
3690
3691       GST_DEBUG_OBJECT (demux,
3692           "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3693           track->period_num, mo);
3694
3695       if (GST_IS_EVENT (mo)) {
3696         GstEvent *event = (GstEvent *) mo;
3697         if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3698           slot->pushed_timed_data = TRUE;
3699         } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3700           /* If there is a pending next period, don't send the EOS */
3701           if (demux->output_period->has_next_period) {
3702             GST_LOG_OBJECT (demux,
3703                 "Dropping EOS on track '%s' (period %u) before next period",
3704                 track->stream_id, track->period_num);
3705             gst_event_store_mark_delivered (&track->sticky_events, event);
3706             gst_event_unref (event);
3707             event = NULL;
3708           }
3709         }
3710
3711         if (event != NULL) {
3712           gst_pad_push_event (slot->pad, gst_event_ref (event));
3713
3714           if (GST_EVENT_IS_STICKY (event))
3715             gst_event_store_mark_delivered (&track->sticky_events, event);
3716           gst_event_unref (event);
3717         }
3718       } else if (GST_IS_BUFFER (mo)) {
3719         GstBuffer *buffer = (GstBuffer *) mo;
3720
3721         if (track->output_discont) {
3722           if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3723             buffer = gst_buffer_make_writable (buffer);
3724             GST_DEBUG_OBJECT (slot->pad,
3725                 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3726                 buffer);
3727             GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3728           }
3729           track->output_discont = FALSE;
3730         }
3731         slot->flow_ret = gst_pad_push (slot->pad, buffer);
3732         ret =
3733             gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3734             slot->pad, slot->flow_ret);
3735         GST_DEBUG_OBJECT (slot->pad,
3736             "track %s (period %u) push returned %s (combined %s)",
3737             track->stream_id, track->period_num,
3738             gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3739         slot->pushed_timed_data = TRUE;
3740       } else {
3741         GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3742       }
3743
3744       TRACKS_LOCK (demux);
3745       gst_adaptive_demux_track_update_next_position (track);
3746
3747       if (ret != GST_FLOW_OK)
3748         goto pause;
3749     }
3750   }
3751
3752   /* Store global output position */
3753   if (global_output_position != GST_CLOCK_STIME_NONE)
3754     demux->priv->global_output_position = global_output_position;
3755
3756   if (global_output_position == GST_CLOCK_STIME_NONE) {
3757     if (!demux->priv->flushing) {
3758       GST_DEBUG_OBJECT (demux,
3759           "Pausing output task after reaching NONE global_output_position");
3760       gst_task_pause (demux->priv->output_task);
3761     }
3762   }
3763
3764   TRACKS_UNLOCK (demux);
3765   GST_DEBUG_OBJECT (demux, "leave");
3766   return;
3767
3768 pause:
3769   {
3770     GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3771     /* If the flushing flag is set, then the task is being
3772      * externally stopped, so don't go to pause(), otherwise we
3773      * should so we don't keep spinning */
3774     if (!demux->priv->flushing) {
3775       GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3776           gst_flow_get_name (ret));
3777       gst_task_pause (demux->priv->output_task);
3778     }
3779
3780     TRACKS_UNLOCK (demux);
3781
3782     if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3783       GstEvent *eos = gst_event_new_eos ();
3784
3785       if (ret != GST_FLOW_EOS) {
3786         GST_ELEMENT_FLOW_ERROR (demux, ret);
3787       }
3788
3789       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3790       if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3791         gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3792       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3793
3794       gst_adaptive_demux_push_src_event (demux, eos);
3795     }
3796
3797     return;
3798   }
3799 }
3800
3801 /* must be called from the scheduler */
3802 gboolean
3803 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3804 {
3805   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3806
3807   if (klass->is_live)
3808     return klass->is_live (demux);
3809   return FALSE;
3810 }
3811
3812 /* must be called from the scheduler */
3813 GstFlowReturn
3814 gst_adaptive_demux2_stream_seek (GstAdaptiveDemux * demux,
3815     GstAdaptiveDemux2Stream * stream, gboolean forward, GstSeekFlags flags,
3816     GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
3817 {
3818   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3819
3820   if (klass->stream_seek)
3821     return klass->stream_seek (stream, forward, flags, ts, final_ts);
3822   return GST_FLOW_ERROR;
3823 }
3824
3825 /* must be called from the scheduler */
3826 gboolean
3827 gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux * demux,
3828     GstAdaptiveDemux2Stream * stream)
3829 {
3830   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3831   gboolean ret = TRUE;
3832
3833   if (klass->stream_has_next_fragment)
3834     ret = klass->stream_has_next_fragment (stream);
3835
3836   return ret;
3837 }
3838
3839 /* must be called from the scheduler */
3840 /* Called from:
3841  *  the ::finish_fragment() handlers when an *actual* fragment is done
3842  *   */
3843 GstFlowReturn
3844 gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux * demux,
3845     GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3846 {
3847   if (stream->last_ret != GST_FLOW_OK)
3848     return stream->last_ret;
3849
3850   stream->last_ret =
3851       gst_adaptive_demux2_stream_advance_fragment_unlocked (demux, stream,
3852       duration);
3853
3854   return stream->last_ret;
3855 }
3856
3857 /* must be called with manifest_lock taken */
3858 static GstFlowReturn
3859 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
3860     GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3861 {
3862   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3863   GstFlowReturn ret;
3864
3865   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
3866
3867   GST_LOG_OBJECT (stream,
3868       "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3869       GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
3870
3871   stream->download_error_count = 0;
3872   g_clear_error (&stream->last_error);
3873
3874 #if 0
3875   /* FIXME - url has no indication of byte ranges for subsegments */
3876   /* FIXME: Reenable statistics sending? */
3877   gst_element_post_message (GST_ELEMENT_CAST (demux),
3878       gst_message_new_element (GST_OBJECT_CAST (demux),
3879           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
3880               "manifest-uri", G_TYPE_STRING,
3881               demux->manifest_uri, "uri", G_TYPE_STRING,
3882               stream->fragment.uri, "fragment-start-time",
3883               GST_TYPE_CLOCK_TIME, stream->download_start_time,
3884               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
3885               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
3886               stream->download_total_bytes, "fragment-download-time",
3887               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
3888 #endif
3889
3890   /* Don't update to the end of the segment if in reverse playback */
3891   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3892   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
3893     stream->parse_segment.position += duration;
3894     stream->current_position += duration;
3895
3896     GST_DEBUG_OBJECT (stream,
3897         "stream position now %" GST_TIME_FORMAT,
3898         GST_TIME_ARGS (stream->current_position));
3899   }
3900   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3901
3902   /* When advancing with a non 1.0 rate on live streams, we need to check
3903    * the live seeking range again to make sure we can still advance to
3904    * that position */
3905   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
3906     if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
3907       ret = GST_FLOW_EOS;
3908     else
3909       ret = klass->stream_advance_fragment (stream);
3910   } else if (gst_adaptive_demux_is_live (demux)
3911       || gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
3912     ret = klass->stream_advance_fragment (stream);
3913   } else {
3914     ret = GST_FLOW_EOS;
3915   }
3916
3917   stream->download_start_time =
3918       GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
3919
3920   if (ret == GST_FLOW_OK) {
3921     GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
3922     if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
3923             gst_adaptive_demux2_stream_update_current_bitrate (demux,
3924                 stream))) {
3925       GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
3926       stream->need_header = TRUE;
3927       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
3928     }
3929   }
3930
3931   return ret;
3932 }
3933
3934 /* must be called with manifest_lock taken */
3935 static gboolean
3936 gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
3937     demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
3938 {
3939   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3940
3941   if (klass->stream_select_bitrate)
3942     return klass->stream_select_bitrate (stream, bitrate);
3943   return FALSE;
3944 }
3945
3946 /* must be called with manifest_lock taken */
3947 GstFlowReturn
3948 gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux * demux,
3949     GstAdaptiveDemux2Stream * stream)
3950 {
3951   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3952   GstFlowReturn ret;
3953
3954   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
3955       GST_FLOW_ERROR);
3956
3957   /* Make sure the sub-class will update bitrate, or else
3958    * we will later */
3959   stream->fragment.finished = FALSE;
3960
3961   GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
3962       GST_TIME_ARGS (stream->current_position));
3963
3964   ret = klass->stream_update_fragment_info (stream);
3965
3966   GST_LOG_OBJECT (stream, "ret:%s uri:%s",
3967       gst_flow_get_name (ret), stream->fragment.uri);
3968   if (ret == GST_FLOW_OK) {
3969     GST_LOG_OBJECT (stream,
3970         "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3971         GST_STIME_ARGS (stream->fragment.stream_time),
3972         GST_TIME_ARGS (stream->fragment.duration));
3973     GST_LOG_OBJECT (stream,
3974         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
3975         stream->fragment.range_start, stream->fragment.range_end);
3976   }
3977
3978   return ret;
3979 }
3980
3981 /* must be called with manifest_lock taken */
3982 GstClockTime
3983 gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux *
3984     demux, GstAdaptiveDemux2Stream * stream)
3985 {
3986   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3987
3988   if (klass->stream_get_fragment_waiting_time)
3989     return klass->stream_get_fragment_waiting_time (stream);
3990   return 0;
3991 }
3992
3993 static void
3994 handle_manifest_download_complete (DownloadRequest * request,
3995     DownloadRequestState state, GstAdaptiveDemux * demux)
3996 {
3997   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3998   GstBuffer *buffer;
3999   GstFlowReturn result;
4000
4001   g_free (demux->manifest_base_uri);
4002   g_free (demux->manifest_uri);
4003
4004   if (request->redirect_permanent && request->redirect_uri) {
4005     demux->manifest_uri = g_strdup (request->redirect_uri);
4006     demux->manifest_base_uri = NULL;
4007   } else {
4008     demux->manifest_uri = g_strdup (request->uri);
4009     demux->manifest_base_uri = g_strdup (request->redirect_uri);
4010   }
4011
4012   buffer = download_request_take_buffer (request);
4013
4014   /* We should always have a buffer since this function is the non-error
4015    * callback for the download */
4016   g_assert (buffer);
4017
4018   result = klass->update_manifest_data (demux, buffer);
4019   gst_buffer_unref (buffer);
4020
4021   /* FIXME: Should the manifest uri vars be reverted to original
4022    * values if updating fails? */
4023
4024   if (result == GST_FLOW_OK) {
4025     GstClockTime duration;
4026     /* Send an updated duration message */
4027     duration = klass->get_duration (demux);
4028     if (duration != GST_CLOCK_TIME_NONE) {
4029       GST_DEBUG_OBJECT (demux,
4030           "Sending duration message : %" GST_TIME_FORMAT,
4031           GST_TIME_ARGS (duration));
4032       gst_element_post_message (GST_ELEMENT (demux),
4033           gst_message_new_duration_changed (GST_OBJECT (demux)));
4034     } else {
4035       GST_DEBUG_OBJECT (demux,
4036           "Duration unknown, can not send the duration message");
4037     }
4038
4039     /* If a manifest changes it's liveness or periodic updateness, we need
4040      * to start/stop the manifest update task appropriately */
4041     /* Keep this condition in sync with the one in
4042      * gst_adaptive_demux_start_manifest_update_task()
4043      */
4044     if (gst_adaptive_demux_is_live (demux) &&
4045         klass->requires_periodical_playlist_update (demux)) {
4046       gst_adaptive_demux_start_manifest_update_task (demux);
4047     } else {
4048       gst_adaptive_demux_stop_manifest_update_task (demux);
4049     }
4050   }
4051 }
4052
4053 static void
4054 handle_manifest_download_failure (DownloadRequest * request,
4055     DownloadRequestState state, GstAdaptiveDemux * demux)
4056 {
4057   GST_FIXME_OBJECT (demux, "Manifest download failed.");
4058   /* Retry or error out here */
4059 }
4060
4061 static GstFlowReturn
4062 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4063 {
4064   DownloadRequest *request;
4065   GstFlowReturn ret = GST_FLOW_OK;
4066   GError *error = NULL;
4067
4068   request = download_request_new_uri (demux->manifest_uri);
4069
4070   download_request_set_callbacks (request,
4071       (DownloadRequestEventCallback) handle_manifest_download_complete,
4072       (DownloadRequestEventCallback) handle_manifest_download_failure,
4073       NULL, NULL, demux);
4074
4075   if (!downloadhelper_submit_request (demux->download_helper, NULL,
4076           DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
4077           &error)) {
4078     if (error) {
4079       GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
4080           ("Failed to download manifest: %s", error->message), (NULL));
4081       g_clear_error (&error);
4082     }
4083     ret = GST_FLOW_NOT_LINKED;
4084   }
4085
4086   return ret;
4087 }
4088
4089 /* must be called with manifest_lock taken */
4090 GstFlowReturn
4091 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4092 {
4093   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4094   GstFlowReturn ret;
4095
4096   ret = klass->update_manifest (demux);
4097
4098   return ret;
4099 }
4100
4101 void
4102 gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
4103 {
4104   g_free (f->uri);
4105   f->uri = NULL;
4106   f->range_start = 0;
4107   f->range_end = -1;
4108
4109   g_free (f->header_uri);
4110   f->header_uri = NULL;
4111   f->header_range_start = 0;
4112   f->header_range_end = -1;
4113
4114   g_free (f->index_uri);
4115   f->index_uri = NULL;
4116   f->index_range_start = 0;
4117   f->index_range_end = -1;
4118
4119   f->stream_time = GST_CLOCK_STIME_NONE;
4120   f->duration = GST_CLOCK_TIME_NONE;
4121   f->finished = FALSE;
4122 }
4123
4124 /* must be called with manifest_lock taken */
4125 gboolean
4126 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4127 {
4128   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4129   gboolean ret = FALSE;
4130
4131   if (klass->has_next_period)
4132     ret = klass->has_next_period (demux);
4133   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4134   return ret;
4135 }
4136
4137 /* must be called with manifest_lock taken */
4138 void
4139 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4140 {
4141   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4142   GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
4143
4144   g_return_if_fail (klass->advance_period != NULL);
4145
4146   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4147   /* FIXME : no return value ? What if it fails ? */
4148   klass->advance_period (demux);
4149
4150   if (previous_period == demux->input_period) {
4151     GST_ERROR_OBJECT (demux, "Advancing period failed");
4152     return;
4153   }
4154
4155   /* Stop the previous period stream tasks */
4156   gst_adaptive_demux_period_stop_tasks (previous_period);
4157
4158   gst_adaptive_demux_update_collection (demux, demux->input_period);
4159   /* Figure out a pre-emptive selection based on the output period selection */
4160   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
4161       demux->output_period);
4162
4163   gst_adaptive_demux_prepare_streams (demux, FALSE);
4164   gst_adaptive_demux_start_tasks (demux);
4165 }
4166
4167 /**
4168  * gst_adaptive_demux_get_monotonic_time:
4169  * Returns: a monotonically increasing time, using the system realtime clock
4170  */
4171 GstClockTime
4172 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
4173 {
4174   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4175   return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
4176 }
4177
4178 /**
4179  * gst_adaptive_demux_get_client_now_utc:
4180  * @demux: #GstAdaptiveDemux
4181  * Returns: the client's estimate of UTC
4182  *
4183  * Used to find the client's estimate of UTC, using the system realtime clock.
4184  */
4185 GDateTime *
4186 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
4187 {
4188   return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
4189 }
4190
4191 /**
4192  * gst_adaptive_demux_is_running
4193  * @demux: #GstAdaptiveDemux
4194  * Returns: whether the demuxer is processing data
4195  *
4196  * Returns FALSE if shutdown has started (transitioning down from
4197  * PAUSED), otherwise TRUE.
4198  */
4199 gboolean
4200 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
4201 {
4202   return g_atomic_int_get (&demux->running);
4203 }
4204
4205 /**
4206  * gst_adaptive_demux_get_qos_earliest_time:
4207  *
4208  * Returns: The QOS earliest time
4209  *
4210  * Since: 1.20
4211  */
4212 GstClockTime
4213 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
4214 {
4215   GstClockTime earliest;
4216
4217   GST_OBJECT_LOCK (demux);
4218   earliest = demux->priv->qos_earliest_time;
4219   GST_OBJECT_UNLOCK (demux);
4220
4221   return earliest;
4222 }
4223
4224 gboolean
4225 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
4226     GstAdaptiveDemux2Stream * stream)
4227 {
4228   g_return_val_if_fail (demux && stream, FALSE);
4229
4230   /* FIXME : Migrate to parent */
4231   g_return_val_if_fail (stream->demux == NULL, FALSE);
4232
4233   GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
4234
4235   TRACKS_LOCK (demux);
4236   if (demux->input_period->prepared) {
4237     GST_ERROR_OBJECT (demux,
4238         "Attempted to add streams but no new period was created");
4239     TRACKS_UNLOCK (demux);
4240     return FALSE;
4241   }
4242   stream->demux = demux;
4243   stream->period = demux->input_period;
4244   demux->input_period->streams =
4245       g_list_append (demux->input_period->streams, stream);
4246
4247   if (stream->tracks) {
4248     GList *iter;
4249     for (iter = stream->tracks; iter; iter = iter->next)
4250       if (!gst_adaptive_demux_period_add_track (demux->input_period,
4251               (GstAdaptiveDemuxTrack *) iter->data)) {
4252         GST_ERROR_OBJECT (demux, "Failed to add track elements");
4253         TRACKS_UNLOCK (demux);
4254         return FALSE;
4255       }
4256   }
4257   TRACKS_UNLOCK (demux);
4258   return TRUE;
4259 }
4260
4261 /* Return the current playback rate including any instant rate multiplier */
4262 gdouble
4263 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
4264 {
4265   gdouble rate;
4266   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4267   rate = demux->segment.rate * demux->instant_rate_multiplier;
4268   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4269   return rate;
4270 }