3e109f131e5e501481593908a825af896e4cbe14
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:plugin-adaptivedemux2
28  * @short_description: Next Generation adaptive demuxers
29  *
30  * What is an adaptive demuxer?  Adaptive demuxers are special demuxers in the
31  * sense that they don't actually demux data received from upstream but download
32  * the data themselves.
33  *
34  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35  * of fragments. The manifest describes the available media and the sequence of
36  * fragments to use. Each fragment contains a small part of the media (typically
37  * only a few seconds). It is possible for the manifest to have the same media
38  * available in different configurations (bitrates for example) so that the
39  * client can select the one that best suits its scenario (network fluctuation,
40  * hardware requirements...).
41  *
42  * Furthermore, that manifest can also specify alternative medias (such as audio
43  * or subtitle tracks in different languages). Only the fragments for the
44  * requested selection will be download.
45  *
46  * These elements can therefore "adapt" themselves to the network conditions (as
47  * opposed to the server doing that adaptation) and user choices, which is why
48  * they are called "adaptive" demuxers.
49  *
50  * Note: These elements require a "streams-aware" container to work
51  * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52  * GST_BIN_FLAG_STREAMS_AWARE flag set).
53  *
54  * Subclasses:
55  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56  * about the intrinsics of the subclass formats, so the subclasses are
57  * responsible for maintaining the manifest data structures and stream
58  * information.
59  *
60  * Since: 1.22
61  */
62
63 /*
64 See the adaptive-demuxer.md design documentation for more information
65
66 MT safety.
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
72
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
80
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
97
98 By using the api_lock a thread is protected against other API calls.
99 */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
107
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
111
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
114
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
118
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
121
122 #define DEFAULT_MAX_BUFFERING_TIME (30 *  GST_SECOND)
123
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME (10 * GST_SECOND)
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
128
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
131
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
135
136 enum
137 {
138   PROP_0,
139   PROP_CONNECTION_SPEED,
140   PROP_BANDWIDTH_TARGET_RATIO,
141   PROP_CONNECTION_BITRATE,
142   PROP_MIN_BITRATE,
143   PROP_MAX_BITRATE,
144   PROP_CURRENT_BANDWIDTH,
145   PROP_MAX_BUFFERING_TIME,
146   PROP_BUFFERING_HIGH_WATERMARK_TIME,
147   PROP_BUFFERING_LOW_WATERMARK_TIME,
148   PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149   PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150   PROP_CURRENT_LEVEL_TIME_VIDEO,
151   PROP_CURRENT_LEVEL_TIME_AUDIO,
152   PROP_LAST
153 };
154
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
157     GST_PAD_SRC,
158     GST_PAD_SOMETIMES,
159     GST_STATIC_CAPS_ANY);
160
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
163     GST_PAD_SRC,
164     GST_PAD_SOMETIMES,
165     GST_STATIC_CAPS_ANY);
166
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
169     GST_PAD_SRC,
170     GST_PAD_SOMETIMES,
171     GST_STATIC_CAPS_ANY);
172
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
175 {
176   /* Output pad */
177   GstPad *pad;
178
179   /* Last flow return */
180   GstFlowReturn flow_ret;
181
182   /* Stream Type */
183   GstStreamType type;
184
185   /* Target track (reference) */
186   GstAdaptiveDemuxTrack *track;
187
188   /* Pending track (which will replace track) */
189   GstAdaptiveDemuxTrack *pending_track;
190
191   /* TRUE if a buffer or a gap event was pushed through this slot. */
192   gboolean pushed_timed_data;
193 } OutputSlot;
194
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
197
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200     GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203     element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
205     GstQuery * query);
206
207 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
208
209 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
210     GstEvent * event);
211 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
212     GstObject * parent, GstBuffer * buffer);
213 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
214     GstQuery * query);
215 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
216     GstEvent * event);
217
218 static gboolean
219 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
220
221 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
222 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
223 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
224     gboolean first_and_live);
225
226 static gboolean gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
227     demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate);
228 static GstFlowReturn
229 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
230
231 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
232     demux);
233 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
234     demux);
235
236 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
237 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
238     gboolean stop_updates);
239 static GstFlowReturn
240 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
241     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer);
242 static GstFlowReturn
243 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
244     GstAdaptiveDemux2Stream * stream);
245 static GstFlowReturn
246 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
247     GstAdaptiveDemux2Stream * stream, GstClockTime duration);
248
249 static void
250 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
251     GstAdaptiveDemux2Stream * stream);
252
253 static gboolean
254 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
255     * demux);
256
257 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
258  * method to get to the padtemplates */
259 GType
260 gst_adaptive_demux_ng_get_type (void)
261 {
262   static gsize type = 0;
263
264   if (g_once_init_enter (&type)) {
265     GType _type;
266     static const GTypeInfo info = {
267       sizeof (GstAdaptiveDemuxClass),
268       NULL,
269       NULL,
270       (GClassInitFunc) gst_adaptive_demux_class_init,
271       NULL,
272       NULL,
273       sizeof (GstAdaptiveDemux),
274       0,
275       (GInstanceInitFunc) gst_adaptive_demux_init,
276     };
277
278     _type = g_type_register_static (GST_TYPE_BIN,
279         "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
280
281     private_offset =
282         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
283
284     g_once_init_leave (&type, _type);
285   }
286   return type;
287 }
288
289 static inline GstAdaptiveDemuxPrivate *
290 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
291 {
292   return (G_STRUCT_MEMBER_P (self, private_offset));
293 }
294
295 static void
296 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
297     const GValue * value, GParamSpec * pspec)
298 {
299   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
300
301   GST_OBJECT_LOCK (demux);
302
303   switch (prop_id) {
304     case PROP_CONNECTION_SPEED:
305       demux->connection_speed = g_value_get_uint (value) * 1000;
306       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
307           demux->connection_speed);
308       break;
309     case PROP_BANDWIDTH_TARGET_RATIO:
310       demux->bandwidth_target_ratio = g_value_get_float (value);
311       break;
312     case PROP_MIN_BITRATE:
313       demux->min_bitrate = g_value_get_uint (value);
314       break;
315     case PROP_MAX_BITRATE:
316       demux->max_bitrate = g_value_get_uint (value);
317       break;
318     case PROP_CONNECTION_BITRATE:
319       demux->connection_speed = g_value_get_uint (value);
320       break;
321       /* FIXME: Recalculate track and buffering levels
322        * when watermarks change? */
323     case PROP_MAX_BUFFERING_TIME:
324       demux->max_buffering_time = g_value_get_uint64 (value);
325       break;
326     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
327       demux->buffering_high_watermark_time = g_value_get_uint64 (value);
328       break;
329     case PROP_BUFFERING_LOW_WATERMARK_TIME:
330       demux->buffering_low_watermark_time = g_value_get_uint64 (value);
331       break;
332     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
333       demux->buffering_high_watermark_fragments = g_value_get_double (value);
334       break;
335     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
336       demux->buffering_low_watermark_fragments = g_value_get_double (value);
337       break;
338     default:
339       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
340       break;
341   }
342
343   GST_OBJECT_UNLOCK (demux);
344 }
345
346 static void
347 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
348     GValue * value, GParamSpec * pspec)
349 {
350   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
351
352   GST_OBJECT_LOCK (demux);
353
354   switch (prop_id) {
355     case PROP_CONNECTION_SPEED:
356       g_value_set_uint (value, demux->connection_speed / 1000);
357       break;
358     case PROP_BANDWIDTH_TARGET_RATIO:
359       g_value_set_float (value, demux->bandwidth_target_ratio);
360       break;
361     case PROP_MIN_BITRATE:
362       g_value_set_uint (value, demux->min_bitrate);
363       break;
364     case PROP_MAX_BITRATE:
365       g_value_set_uint (value, demux->max_bitrate);
366       break;
367     case PROP_CONNECTION_BITRATE:
368       g_value_set_uint (value, demux->connection_speed);
369       break;
370     case PROP_CURRENT_BANDWIDTH:
371       g_value_set_uint (value, demux->current_download_rate);
372       break;
373     case PROP_MAX_BUFFERING_TIME:
374       g_value_set_uint64 (value, demux->max_buffering_time);
375       break;
376     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
377       g_value_set_uint64 (value, demux->buffering_high_watermark_time);
378       break;
379     case PROP_BUFFERING_LOW_WATERMARK_TIME:
380       g_value_set_uint64 (value, demux->buffering_low_watermark_time);
381       break;
382     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
383       g_value_set_double (value, demux->buffering_high_watermark_fragments);
384       break;
385     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
386       g_value_set_double (value, demux->buffering_low_watermark_fragments);
387       break;
388     case PROP_CURRENT_LEVEL_TIME_VIDEO:
389       g_value_set_uint64 (value, demux->current_level_time_video);
390       break;
391     case PROP_CURRENT_LEVEL_TIME_AUDIO:
392       g_value_set_uint64 (value, demux->current_level_time_audio);
393       break;
394     default:
395       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
396       break;
397   }
398
399   GST_OBJECT_UNLOCK (demux);
400 }
401
402 static void
403 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
404 {
405   GObjectClass *gobject_class;
406   GstElementClass *gstelement_class;
407   GstBinClass *gstbin_class;
408
409   gobject_class = G_OBJECT_CLASS (klass);
410   gstelement_class = GST_ELEMENT_CLASS (klass);
411   gstbin_class = GST_BIN_CLASS (klass);
412
413   GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
414       "Base Adaptive Demux (ng)");
415
416   parent_class = g_type_class_peek_parent (klass);
417
418   if (private_offset != 0)
419     g_type_class_adjust_private_offset (klass, &private_offset);
420
421   gobject_class->set_property = gst_adaptive_demux_set_property;
422   gobject_class->get_property = gst_adaptive_demux_get_property;
423   gobject_class->finalize = gst_adaptive_demux_finalize;
424
425   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
426       g_param_spec_uint ("connection-speed", "Connection Speed",
427           "Network connection speed to use in kbps (0 = calculate from downloaded"
428           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
429           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
430
431   g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
432       g_param_spec_float ("bandwidth-target-ratio",
433           "Ratio of target bandwidth / available bandwidth",
434           "Limit of the available bitrate to use when switching to alternates",
435           0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437
438   g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
439       g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
440           "Network connection speed to use (0 = automatic) (bits/s)",
441           0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
442           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
443
444   g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
445       g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
446           "Minimum bitrate to use when switching to alternates (bits/s)",
447           0, G_MAXUINT, DEFAULT_MIN_BITRATE,
448           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
449
450   g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
451       g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
452           "Maximum bitrate to use when switching to alternates (bits/s)",
453           0, G_MAXUINT, DEFAULT_MAX_BITRATE,
454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
455
456   g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
457       g_param_spec_uint ("current-bandwidth",
458           "Current download bandwidth (bits/s)",
459           "Report of current download bandwidth (based on arriving data) (bits/s)",
460           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
461
462   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
463       g_param_spec_uint64 ("max-buffering-time",
464           "Buffering maximum size (ns)",
465           "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
466           0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
467           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468           G_PARAM_STATIC_STRINGS));
469
470   g_object_class_install_property (gobject_class,
471       PROP_BUFFERING_HIGH_WATERMARK_TIME,
472       g_param_spec_uint64 ("high-watermark-time",
473           "High buffering watermark size (ns)",
474           "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
475           0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
476           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477           G_PARAM_STATIC_STRINGS));
478
479   g_object_class_install_property (gobject_class,
480       PROP_BUFFERING_LOW_WATERMARK_TIME,
481       g_param_spec_uint64 ("low-watermark-time",
482           "Low buffering watermark size (ns)",
483           "Low watermark for parsed data below which downloads are resumed (in ns, 0=disable)",
484           0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
485           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
486           G_PARAM_STATIC_STRINGS));
487
488   g_object_class_install_property (gobject_class,
489       PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
490       g_param_spec_double ("high-watermark-fragments",
491           "High buffering watermark size (fragments)",
492           "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
493           0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
494           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495           G_PARAM_STATIC_STRINGS));
496
497   g_object_class_install_property (gobject_class,
498       PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
499       g_param_spec_double ("low-watermark-fragments",
500           "Low buffering watermark size (fragments)",
501           "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
502           0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
503           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
504           G_PARAM_STATIC_STRINGS));
505
506   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
507       g_param_spec_uint64 ("current-level-time-video",
508           "Currently buffered level of video (ns)",
509           "Currently buffered level of video track(s) (ns)",
510           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
511           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
512           G_PARAM_STATIC_STRINGS));
513
514   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
515       g_param_spec_uint64 ("current-level-time-audio",
516           "Currently buffered level of audio (ns)",
517           "Currently buffered level of audio track(s) (ns)",
518           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
519           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
520           G_PARAM_STATIC_STRINGS));
521
522   gst_element_class_add_static_pad_template (gstelement_class,
523       &gst_adaptive_demux_audiosrc_template);
524   gst_element_class_add_static_pad_template (gstelement_class,
525       &gst_adaptive_demux_videosrc_template);
526   gst_element_class_add_static_pad_template (gstelement_class,
527       &gst_adaptive_demux_subtitlesrc_template);
528
529
530   gstelement_class->change_state = gst_adaptive_demux_change_state;
531   gstelement_class->query = gst_adaptive_demux_query;
532
533   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
534
535   klass->data_received = gst_adaptive_demux2_stream_data_received_default;
536   klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
537   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
538   klass->requires_periodical_playlist_update =
539       gst_adaptive_demux_requires_periodical_playlist_update_default;
540   klass->stream_update_tracks = gst_adaptive_demux2_stream_update_tracks;
541   gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
542 }
543
544 static void
545 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
546     GstAdaptiveDemuxClass * klass)
547 {
548   GstPadTemplate *pad_template;
549
550   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
551
552   demux->priv = gst_adaptive_demux_get_instance_private (demux);
553   demux->priv->input_adapter = gst_adapter_new ();
554   demux->realtime_clock = gst_adaptive_demux_clock_new ();
555
556   demux->download_helper = downloadhelper_new (demux->realtime_clock);
557   demux->priv->segment_seqnum = gst_util_seqnum_next ();
558   demux->have_group_id = FALSE;
559   demux->group_id = G_MAXUINT;
560
561   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
562   demux->instant_rate_multiplier = 1.0;
563
564   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
565       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
566
567   g_rec_mutex_init (&demux->priv->manifest_lock);
568
569   demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
570   g_mutex_init (&demux->priv->scheduler_lock);
571
572   g_mutex_init (&demux->priv->api_lock);
573   g_mutex_init (&demux->priv->segment_lock);
574
575   g_mutex_init (&demux->priv->tracks_lock);
576   g_cond_init (&demux->priv->tracks_add);
577
578   g_mutex_init (&demux->priv->buffering_lock);
579
580   demux->priv->periods = g_queue_new ();
581
582   pad_template =
583       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
584   g_return_if_fail (pad_template != NULL);
585
586   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
587   gst_pad_set_event_function (demux->sinkpad,
588       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
589   gst_pad_set_chain_function (demux->sinkpad,
590       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
591
592   /* Properties */
593   demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
594   demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
595   demux->min_bitrate = DEFAULT_MIN_BITRATE;
596   demux->max_bitrate = DEFAULT_MAX_BITRATE;
597
598   demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
599   demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
600   demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
601   demux->buffering_high_watermark_fragments =
602       DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
603   demux->buffering_low_watermark_fragments =
604       DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
605
606   demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
607   demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
608
609   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
610
611   demux->priv->duration = GST_CLOCK_TIME_NONE;
612
613   /* Output combiner */
614   demux->priv->flowcombiner = gst_flow_combiner_new ();
615
616   /* Output task */
617   g_rec_mutex_init (&demux->priv->output_lock);
618   demux->priv->output_task =
619       gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
620       NULL);
621   gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
622 }
623
624 static void
625 gst_adaptive_demux_finalize (GObject * object)
626 {
627   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
628   GstAdaptiveDemuxPrivate *priv = demux->priv;
629
630   GST_DEBUG_OBJECT (object, "finalize");
631
632   g_object_unref (priv->input_adapter);
633
634   downloadhelper_free (demux->download_helper);
635
636   g_rec_mutex_clear (&demux->priv->manifest_lock);
637   g_mutex_clear (&demux->priv->api_lock);
638   g_mutex_clear (&demux->priv->segment_lock);
639
640   g_mutex_clear (&demux->priv->buffering_lock);
641
642   g_mutex_clear (&demux->priv->scheduler_lock);
643   gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
644
645   /* The input period is present after a reset, clear it now */
646   if (demux->input_period)
647     gst_adaptive_demux_period_unref (demux->input_period);
648
649   if (demux->realtime_clock) {
650     gst_adaptive_demux_clock_unref (demux->realtime_clock);
651     demux->realtime_clock = NULL;
652   }
653   g_object_unref (priv->output_task);
654   g_rec_mutex_clear (&priv->output_lock);
655
656   gst_flow_combiner_free (priv->flowcombiner);
657
658   g_queue_free (priv->periods);
659
660   G_OBJECT_CLASS (parent_class)->finalize (object);
661 }
662
663 static gboolean
664 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
665 {
666   gboolean ret = FALSE;
667   GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
668
669   ret = (parent && GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE));
670
671   return ret;
672 }
673
674 static GstStateChangeReturn
675 gst_adaptive_demux_change_state (GstElement * element,
676     GstStateChange transition)
677 {
678   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
679   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
680
681   switch (transition) {
682     case GST_STATE_CHANGE_NULL_TO_READY:
683       if (!gst_adaptive_demux_check_streams_aware (demux)) {
684         GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
685             (_("Element requires a streams-aware context.")), (NULL));
686         goto fail_out;
687       }
688       break;
689     case GST_STATE_CHANGE_PAUSED_TO_READY:
690       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
691         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
692
693       gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
694       downloadhelper_stop (demux->download_helper);
695
696       TRACKS_LOCK (demux);
697       demux->priv->flushing = TRUE;
698       g_cond_signal (&demux->priv->tracks_add);
699       gst_task_stop (demux->priv->output_task);
700       TRACKS_UNLOCK (demux);
701
702       gst_task_join (demux->priv->output_task);
703
704       GST_API_LOCK (demux);
705       gst_adaptive_demux_reset (demux);
706       GST_API_UNLOCK (demux);
707       break;
708     case GST_STATE_CHANGE_READY_TO_PAUSED:
709       GST_API_LOCK (demux);
710       gst_adaptive_demux_reset (demux);
711
712       gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
713       if (g_atomic_int_get (&demux->priv->have_manifest))
714         gst_adaptive_demux_start_manifest_update_task (demux);
715       GST_API_UNLOCK (demux);
716       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
717         GST_DEBUG_OBJECT (demux, "demuxer has started running");
718       /* gst_task_start (demux->priv->output_task); */
719       break;
720     default:
721       break;
722   }
723
724   /* this must be run with the scheduler and output tasks stopped. */
725   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
726
727   switch (transition) {
728     case GST_STATE_CHANGE_READY_TO_PAUSED:
729       /* Start download task */
730       downloadhelper_start (demux->download_helper);
731       break;
732     default:
733       break;
734   }
735
736 fail_out:
737   return result;
738 }
739
740 static void
741 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
742     OutputSlot * slot)
743 {
744   GstEvent *eos = gst_event_new_eos ();
745   GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
746
747   gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
748   gst_pad_push_event (slot->pad, eos);
749   gst_pad_set_active (slot->pad, FALSE);
750   gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
751   gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
752   if (slot->track)
753     gst_adaptive_demux_track_unref (slot->track);
754   if (slot->pending_track)
755     gst_adaptive_demux_track_unref (slot->pending_track);
756
757   g_slice_free (OutputSlot, slot);
758 }
759
760 static OutputSlot *
761 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
762     GstStreamType streamtype)
763 {
764   OutputSlot *slot;
765   GstPadTemplate *tmpl;
766   gchar *name;
767
768   switch (streamtype) {
769     case GST_STREAM_TYPE_AUDIO:
770       name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
771       tmpl =
772           gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
773       break;
774     case GST_STREAM_TYPE_VIDEO:
775       name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
776       tmpl =
777           gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
778       break;
779     case GST_STREAM_TYPE_TEXT:
780       name =
781           g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
782       tmpl =
783           gst_static_pad_template_get
784           (&gst_adaptive_demux_subtitlesrc_template);
785       break;
786     default:
787       g_assert_not_reached ();
788       return NULL;
789   }
790
791   slot = g_slice_new0 (OutputSlot);
792   slot->type = streamtype;
793   slot->pushed_timed_data = FALSE;
794
795   /* Create and activate new pads */
796   slot->pad = gst_pad_new_from_template (tmpl, name);
797   g_free (name);
798   gst_object_unref (tmpl);
799
800   gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
801   gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
802   gst_pad_set_active (slot->pad, TRUE);
803
804   gst_pad_set_query_function (slot->pad,
805       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
806   gst_pad_set_event_function (slot->pad,
807       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
808
809   gst_pad_set_element_private (slot->pad, slot);
810
811   GST_INFO_OBJECT (demux, "Created output slot %s:%s",
812       GST_DEBUG_PAD_NAME (slot->pad));
813   return slot;
814 }
815
816 /* Called:
817  * * After `process_manifest` or when a period starts
818  * * Or when all tracks have been created
819  *
820  * Goes over tracks and creates the collection
821  *
822  * Returns TRUE if the collection was fully created.
823  *
824  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
825  * */
826 static gboolean
827 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
828     GstAdaptiveDemuxPeriod * period)
829 {
830   GstStreamCollection *collection;
831   GList *iter;
832
833   GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
834
835   if (!period->tracks_changed) {
836     GST_DEBUG_OBJECT (demux, "Tracks didn't change");
837     return TRUE;
838   }
839
840   if (!period->tracks) {
841     GST_WARNING_OBJECT (demux, "No tracks registered/present");
842     return FALSE;
843   }
844
845   if (gst_adaptive_demux_period_has_pending_tracks (period)) {
846     GST_DEBUG_OBJECT (demux,
847         "Streams still have pending tracks, not creating/updating collection");
848     return FALSE;
849   }
850
851   /* Update collection */
852   collection = gst_stream_collection_new ("adaptivedemux");
853
854   for (iter = period->tracks; iter; iter = iter->next) {
855     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
856
857     GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
858     gst_stream_collection_add_stream (collection,
859         gst_object_ref (track->stream_object));
860   }
861
862   if (period->collection)
863     gst_object_unref (period->collection);
864   period->collection = collection;
865
866   return TRUE;
867 }
868
869 /*
870  * Called for the output period:
871  * * after `update_collection()` if the input period is the same as the output period
872  * * When the output period changes
873  *
874  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
875  */
876 static gboolean
877 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
878 {
879   GstStreamCollection *collection;
880   GstAdaptiveDemuxPeriod *period = demux->output_period;
881   guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
882
883   g_return_val_if_fail (period, FALSE);
884   if (!period->collection) {
885     GST_DEBUG_OBJECT (demux, "No collection available yet");
886     return TRUE;
887   }
888
889   collection = period->collection;
890
891   GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
892       period->period_num);
893
894   /* Post collection */
895   TRACKS_UNLOCK (demux);
896   GST_MANIFEST_UNLOCK (demux);
897
898   gst_element_post_message (GST_ELEMENT_CAST (demux),
899       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
900
901   GST_MANIFEST_LOCK (demux);
902   TRACKS_LOCK (demux);
903
904   /* If no stream selection was handled, make a default selection */
905   if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
906     gst_adaptive_demux_period_select_default_tracks (demux,
907         demux->output_period);
908   }
909
910   /* Make sure the output task is running */
911   if (gst_adaptive_demux2_is_running (demux)) {
912     demux->priv->flushing = FALSE;
913     GST_DEBUG_OBJECT (demux, "Starting the output task");
914     gst_task_start (demux->priv->output_task);
915   }
916
917   return TRUE;
918 }
919
920 static gboolean
921 handle_incoming_manifest (GstAdaptiveDemux * demux)
922 {
923   GstAdaptiveDemuxClass *demux_class;
924   GstQuery *query;
925   gboolean query_res;
926   gboolean ret = TRUE;
927   gsize available;
928   GstBuffer *manifest_buffer;
929
930   GST_API_LOCK (demux);
931   GST_MANIFEST_LOCK (demux);
932
933   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
934
935   available = gst_adapter_available (demux->priv->input_adapter);
936
937   if (available == 0)
938     goto eos_without_data;
939
940   GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
941
942   /* Need to get the URI to use it as a base to generate the fragment's
943    * uris */
944   query = gst_query_new_uri ();
945   query_res = gst_pad_peer_query (demux->sinkpad, query);
946   if (query_res) {
947     gchar *uri, *redirect_uri;
948     gboolean permanent;
949
950     gst_query_parse_uri (query, &uri);
951     gst_query_parse_uri_redirection (query, &redirect_uri);
952     gst_query_parse_uri_redirection_permanent (query, &permanent);
953
954     if (permanent && redirect_uri) {
955       demux->manifest_uri = redirect_uri;
956       demux->manifest_base_uri = NULL;
957       g_free (uri);
958     } else {
959       demux->manifest_uri = uri;
960       demux->manifest_base_uri = redirect_uri;
961     }
962
963     GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
964         demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
965   } else {
966     GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
967   }
968   gst_query_unref (query);
969
970   /* If somehow we didn't receive a stream-start with a group_id, pick one now */
971   if (!demux->have_group_id) {
972     demux->have_group_id = TRUE;
973     demux->group_id = gst_util_group_id_next ();
974   }
975
976   /* Let the subclass parse the manifest */
977   manifest_buffer =
978       gst_adapter_take_buffer (demux->priv->input_adapter, available);
979   ret = demux_class->process_manifest (demux, manifest_buffer);
980   gst_buffer_unref (manifest_buffer);
981
982   gst_element_post_message (GST_ELEMENT_CAST (demux),
983       gst_message_new_element (GST_OBJECT_CAST (demux),
984           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
985               "manifest-uri", G_TYPE_STRING,
986               demux->manifest_uri, "uri", G_TYPE_STRING,
987               demux->manifest_uri,
988               "manifest-download-start", GST_TYPE_CLOCK_TIME,
989               GST_CLOCK_TIME_NONE,
990               "manifest-download-stop", GST_TYPE_CLOCK_TIME,
991               gst_util_get_timestamp (), NULL)));
992
993   if (!ret)
994     goto invalid_manifest;
995
996   /* Streams should have been added to the input period if the manifest parsing
997    * succeeded */
998   if (!demux->input_period->streams)
999     goto no_streams;
1000
1001   g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1002
1003   GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1004   /* Send duration message */
1005   if (!gst_adaptive_demux_is_live (demux)) {
1006     GstClockTime duration = demux_class->get_duration (demux);
1007
1008     demux->priv->duration = duration;
1009     if (duration != GST_CLOCK_TIME_NONE) {
1010       GST_DEBUG_OBJECT (demux,
1011           "Sending duration message : %" GST_TIME_FORMAT,
1012           GST_TIME_ARGS (duration));
1013       gst_element_post_message (GST_ELEMENT (demux),
1014           gst_message_new_duration_changed (GST_OBJECT (demux)));
1015     } else {
1016       GST_DEBUG_OBJECT (demux,
1017           "media duration unknown, can not send the duration message");
1018     }
1019   }
1020
1021   TRACKS_LOCK (demux);
1022   /* New streams/tracks will have been added to the input period */
1023   /* The input period has streams, make it the active output period */
1024   /* FIXME : Factorize this into a function to make a period active */
1025   demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1026   ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1027       gst_adaptive_demux_post_collection (demux);
1028   TRACKS_UNLOCK (demux);
1029
1030   gst_adaptive_demux_prepare_streams (demux,
1031       gst_adaptive_demux_is_live (demux));
1032   gst_adaptive_demux_start_tasks (demux);
1033   gst_adaptive_demux_start_manifest_update_task (demux);
1034
1035 unlock_out:
1036   GST_MANIFEST_UNLOCK (demux);
1037   GST_API_UNLOCK (demux);
1038
1039   return ret;
1040
1041   /* ERRORS */
1042 eos_without_data:
1043   {
1044     GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1045     ret = FALSE;
1046     goto unlock_out;
1047   }
1048
1049 no_streams:
1050   {
1051     /* no streams */
1052     GST_WARNING_OBJECT (demux, "No streams created from manifest");
1053     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1054         (_("This file contains no playable streams.")),
1055         ("No known stream formats found at the Manifest"));
1056     ret = FALSE;
1057     goto unlock_out;
1058   }
1059
1060 invalid_manifest:
1061   {
1062     GST_MANIFEST_UNLOCK (demux);
1063     GST_API_UNLOCK (demux);
1064
1065     /* In most cases, this will happen if we set a wrong url in the
1066      * source element and we have received the 404 HTML response instead of
1067      * the manifest */
1068     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1069     return FALSE;
1070   }
1071 }
1072
1073 struct http_headers_collector
1074 {
1075   GstAdaptiveDemux *demux;
1076   gchar **cookies;
1077 };
1078
1079 static gboolean
1080 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1081     const GValue * value, gpointer userdata)
1082 {
1083   struct http_headers_collector *hdr_data = userdata;
1084   GstAdaptiveDemux *demux = hdr_data->demux;
1085   const gchar *field_name = g_quark_to_string (field_id);
1086
1087   if (G_UNLIKELY (value == NULL))
1088     return TRUE;                /* This should not happen */
1089
1090   if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1091     const gchar *user_agent = g_value_get_string (value);
1092
1093     GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1094     downloadhelper_set_user_agent (demux->download_helper, user_agent);
1095   }
1096
1097   if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1098       g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1099     guint i = 0, prev_len = 0, total_len = 0;
1100     gchar **cookies = NULL;
1101
1102     if (hdr_data->cookies != NULL)
1103       prev_len = g_strv_length (hdr_data->cookies);
1104
1105     if (GST_VALUE_HOLDS_ARRAY (value)) {
1106       total_len = gst_value_array_get_size (value) + prev_len;
1107       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1108
1109       for (i = 0; i < gst_value_array_get_size (value); i++) {
1110         GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1111             g_value_get_string (gst_value_array_get_value (value, i)));
1112         cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1113       }
1114     } else if (G_VALUE_HOLDS_STRING (value)) {
1115       total_len = 1 + prev_len;
1116       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1117
1118       GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1119           g_value_get_string (value));
1120       cookies[0] = g_value_dup_string (value);
1121     } else {
1122       GST_WARNING_OBJECT (demux, "%s field is not string or array",
1123           g_quark_to_string (field_id));
1124     }
1125
1126     if (cookies) {
1127       if (prev_len) {
1128         guint j;
1129         for (j = 0; j < prev_len; j++) {
1130           GST_DEBUG_OBJECT (demux,
1131               "Append existing cookie %s", hdr_data->cookies[j]);
1132           cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1133         }
1134       }
1135       cookies[total_len] = NULL;
1136
1137       g_strfreev (hdr_data->cookies);
1138       hdr_data->cookies = cookies;
1139     }
1140   }
1141
1142   if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1143     const gchar *referer = g_value_get_string (value);
1144     GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1145
1146     downloadhelper_set_referer (demux->download_helper, referer);
1147   }
1148
1149   /* Date header can be used to estimate server offset */
1150   if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1151     const gchar *http_date = g_value_get_string (value);
1152
1153     if (http_date) {
1154       GstDateTime *datetime =
1155           gst_adaptive_demux_util_parse_http_head_date (http_date);
1156
1157       if (datetime) {
1158         GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1159         gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1160
1161         GST_INFO_OBJECT (demux,
1162             "HTTP response Date %s", GST_STR_NULL (date_string));
1163         g_free (date_string);
1164
1165         gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1166       }
1167     }
1168   }
1169
1170   return TRUE;
1171 }
1172
1173 static gboolean
1174 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1175     GstEvent * event)
1176 {
1177   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1178   gboolean ret;
1179
1180   switch (event->type) {
1181     case GST_EVENT_FLUSH_STOP:{
1182       GST_API_LOCK (demux);
1183       GST_MANIFEST_LOCK (demux);
1184
1185       gst_adaptive_demux_reset (demux);
1186
1187       ret = gst_pad_event_default (pad, parent, event);
1188
1189       GST_MANIFEST_UNLOCK (demux);
1190       GST_API_UNLOCK (demux);
1191
1192       return ret;
1193     }
1194     case GST_EVENT_EOS:
1195     {
1196       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1197         if (!handle_incoming_manifest (demux)) {
1198           GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1199           return gst_pad_event_default (pad, parent, event);
1200         }
1201         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1202       } else {
1203         GST_ERROR_OBJECT (demux,
1204             "Failed to acquire scheduler to handle manifest");
1205         return gst_pad_event_default (pad, parent, event);
1206       }
1207       gst_event_unref (event);
1208       return TRUE;
1209     }
1210     case GST_EVENT_STREAM_START:
1211       if (gst_event_parse_group_id (event, &demux->group_id))
1212         demux->have_group_id = TRUE;
1213       else
1214         demux->have_group_id = FALSE;
1215       /* Swallow stream-start, we'll push our own */
1216       gst_event_unref (event);
1217       return TRUE;
1218     case GST_EVENT_SEGMENT:
1219       /* Swallow newsegments, we'll push our own */
1220       gst_event_unref (event);
1221       return TRUE;
1222     case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1223       const GstStructure *structure = gst_event_get_structure (event);
1224       struct http_headers_collector c = { demux, NULL };
1225
1226       if (gst_structure_has_name (structure, "http-headers")) {
1227         if (gst_structure_has_field (structure, "request-headers")) {
1228           GstStructure *req_headers = NULL;
1229           gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1230               &req_headers, NULL);
1231           if (req_headers) {
1232             gst_structure_foreach (req_headers,
1233                 gst_adaptive_demux_handle_upstream_http_header, &c);
1234             gst_structure_free (req_headers);
1235           }
1236         }
1237         if (gst_structure_has_field (structure, "response-headers")) {
1238           GstStructure *res_headers = NULL;
1239           gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1240               &res_headers, NULL);
1241           if (res_headers) {
1242             gst_structure_foreach (res_headers,
1243                 gst_adaptive_demux_handle_upstream_http_header, &c);
1244             gst_structure_free (res_headers);
1245           }
1246         }
1247
1248         if (c.cookies)
1249           downloadhelper_set_cookies (demux->download_helper, c.cookies);
1250       }
1251       break;
1252     }
1253     default:
1254       break;
1255   }
1256
1257   return gst_pad_event_default (pad, parent, event);
1258 }
1259
1260 static GstFlowReturn
1261 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1262     GstBuffer * buffer)
1263 {
1264   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1265
1266   GST_MANIFEST_LOCK (demux);
1267
1268   gst_adapter_push (demux->priv->input_adapter, buffer);
1269
1270   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1271       (gint) gst_adapter_available (demux->priv->input_adapter));
1272
1273   GST_MANIFEST_UNLOCK (demux);
1274   return GST_FLOW_OK;
1275 }
1276
1277
1278 /* Called with TRACKS_LOCK taken */
1279 static void
1280 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1281 {
1282   GList *tmp;
1283
1284   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1285     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1286
1287     gst_adaptive_demux_track_flush (track);
1288     if (gst_pad_is_active (track->sinkpad)) {
1289       gst_pad_set_active (track->sinkpad, FALSE);
1290       gst_pad_set_active (track->sinkpad, TRUE);
1291     }
1292   }
1293 }
1294
1295 /* Resets all tracks to their initial state, ready to receive new data. */
1296 static void
1297 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1298 {
1299   TRACKS_LOCK (demux);
1300   g_queue_foreach (demux->priv->periods,
1301       (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1302   TRACKS_UNLOCK (demux);
1303 }
1304
1305 /* Subclasses will call this function to ensure that a new input period is
1306  * available to receive new streams and tracks */
1307 gboolean
1308 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1309 {
1310   if (demux->input_period && !demux->input_period->prepared) {
1311     GST_DEBUG_OBJECT (demux, "Using existing input period");
1312     return TRUE;
1313   }
1314
1315   if (demux->input_period) {
1316     GST_DEBUG_OBJECT (demux, "Closing previous period");
1317     demux->input_period->closed = TRUE;
1318   }
1319   GST_DEBUG_OBJECT (demux, "Setting up new period");
1320
1321   demux->input_period = gst_adaptive_demux_period_new (demux);
1322
1323   return TRUE;
1324 }
1325
1326 /* must be called with manifest_lock taken */
1327 static void
1328 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1329 {
1330   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1331   GList *iter;
1332
1333   gst_adaptive_demux_stop_tasks (demux, TRUE);
1334
1335   if (klass->reset)
1336     klass->reset (demux);
1337
1338   /* Disable and remove all outputs */
1339   GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1340   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1341     gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1342   }
1343   g_list_free (demux->priv->outputs);
1344   demux->priv->outputs = NULL;
1345
1346   g_queue_clear_full (demux->priv->periods,
1347       (GDestroyNotify) gst_adaptive_demux_period_unref);
1348
1349   /* The output period always has an extra ref taken on it */
1350   if (demux->output_period)
1351     gst_adaptive_demux_period_unref (demux->output_period);
1352   demux->output_period = NULL;
1353   /* The input period doesn't have an extra ref taken on it */
1354   demux->input_period = NULL;
1355
1356   gst_adaptive_demux_start_new_period (demux);
1357
1358   g_free (demux->manifest_uri);
1359   g_free (demux->manifest_base_uri);
1360   demux->manifest_uri = NULL;
1361   demux->manifest_base_uri = NULL;
1362
1363   gst_adapter_clear (demux->priv->input_adapter);
1364   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1365
1366   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1367   demux->instant_rate_multiplier = 1.0;
1368
1369   demux->priv->duration = GST_CLOCK_TIME_NONE;
1370
1371   demux->priv->percent = -1;
1372   demux->priv->is_buffering = TRUE;
1373
1374   demux->have_group_id = FALSE;
1375   demux->group_id = G_MAXUINT;
1376   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1377
1378   demux->priv->global_output_position = 0;
1379
1380   demux->priv->n_audio_streams = 0;
1381   demux->priv->n_video_streams = 0;
1382   demux->priv->n_subtitle_streams = 0;
1383
1384   gst_flow_combiner_reset (demux->priv->flowcombiner);
1385 }
1386
1387 static gboolean
1388 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1389 {
1390   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1391
1392   GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1393
1394   switch (GST_QUERY_TYPE (query)) {
1395     case GST_QUERY_BUFFERING:
1396     {
1397       GstFormat format;
1398       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1399
1400       if (!demux->output_period) {
1401         if (format != GST_FORMAT_TIME) {
1402           GST_DEBUG_OBJECT (demux,
1403               "No period setup yet, can't answer non-TIME buffering queries");
1404           return FALSE;
1405         }
1406
1407         GST_DEBUG_OBJECT (demux,
1408             "No period setup yet, but still answering buffering query");
1409         return TRUE;
1410       }
1411     }
1412     default:
1413       break;
1414   }
1415
1416   return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1417 }
1418
1419 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1420 static GstAdaptiveDemux2Stream *
1421 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1422 {
1423   GList *iter;
1424
1425   /* We only look in the streams of the input period (i.e. with active streams) */
1426   for (iter = demux->input_period->streams; iter; iter = iter->next) {
1427     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1428     if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1429       return stream;
1430     }
1431   }
1432
1433   return NULL;
1434 }
1435
1436 /* TRACKS_LOCK held */
1437 static GstAdaptiveDemuxTrack *
1438 gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
1439     GstStreamType stream_type)
1440 {
1441   GList *iter;
1442
1443   for (iter = stream->tracks; iter; iter = iter->next) {
1444     GstAdaptiveDemuxTrack *track = iter->data;
1445
1446     if (track->type == stream_type)
1447       return track;
1448   }
1449
1450   return NULL;
1451 }
1452
1453 /* MANIFEST and TRACKS lock held */
1454 static void
1455 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
1456     GstAdaptiveDemux2Stream * stream)
1457 {
1458   guint i;
1459
1460   GST_DEBUG_OBJECT (stream, "Updating track information from collection");
1461
1462   for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
1463       i++) {
1464     GstStream *gst_stream =
1465         gst_stream_collection_get_stream (stream->stream_collection, i);
1466     GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
1467     GstAdaptiveDemuxTrack *track;
1468
1469     if (stream_type == GST_STREAM_TYPE_UNKNOWN)
1470       continue;
1471     track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
1472     if (!track) {
1473       GST_DEBUG_OBJECT (stream,
1474           "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
1475           gst_stream);
1476       continue;
1477     }
1478
1479     if (track->upstream_stream_id)
1480       g_free (track->upstream_stream_id);
1481     track->upstream_stream_id =
1482         g_strdup (gst_stream_get_stream_id (gst_stream));
1483   }
1484
1485 }
1486
1487 static gboolean
1488 tags_have_language_info (GstTagList * tags)
1489 {
1490   const gchar *language = NULL;
1491
1492   if (tags == NULL)
1493     return FALSE;
1494
1495   if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
1496           &language))
1497     return TRUE;
1498   if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
1499           &language))
1500     return TRUE;
1501
1502   return FALSE;
1503 }
1504
1505 static gboolean
1506 can_handle_collection (GstAdaptiveDemux2Stream * stream,
1507     GstStreamCollection * collection)
1508 {
1509   guint i;
1510   guint nb_audio, nb_video, nb_text;
1511   gboolean have_audio_languages = TRUE;
1512   gboolean have_text_languages = TRUE;
1513
1514   nb_audio = nb_video = nb_text = 0;
1515
1516   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1517     GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
1518     GstTagList *tags = gst_stream_get_tags (gst_stream);
1519
1520     GST_DEBUG_OBJECT (stream,
1521         "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
1522     switch (gst_stream_get_stream_type (gst_stream)) {
1523       case GST_STREAM_TYPE_AUDIO:
1524         have_audio_languages &= tags_have_language_info (tags);
1525         nb_audio++;
1526         break;
1527       case GST_STREAM_TYPE_VIDEO:
1528         nb_video++;
1529         break;
1530       case GST_STREAM_TYPE_TEXT:
1531         have_text_languages &= tags_have_language_info (tags);
1532         nb_text++;
1533         break;
1534       default:
1535         break;
1536     }
1537   }
1538
1539   /* Check that we either have at most 1 of each track type, or that
1540    * we have language tags for each to tell which is which */
1541   if (nb_video > 1 ||
1542       (nb_audio > 1 && !have_audio_languages) ||
1543       (nb_text > 1 && !have_text_languages)) {
1544     GST_WARNING
1545         ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
1546         nb_audio, nb_video, nb_text);
1547     return FALSE;
1548   }
1549
1550   return TRUE;
1551 }
1552
1553 static void
1554 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1555     GstMessage * msg)
1556 {
1557   GstAdaptiveDemux2Stream *stream;
1558   GstStreamCollection *collection = NULL;
1559   gboolean pending_tracks_activated = FALSE;
1560
1561   GST_MANIFEST_LOCK (demux);
1562
1563   stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1564   if (stream == NULL) {
1565     GST_WARNING_OBJECT (demux,
1566         "Failed to locate stream for collection message");
1567     goto beach;
1568   }
1569
1570   gst_message_parse_stream_collection (msg, &collection);
1571   if (!collection)
1572     goto beach;
1573
1574   /* Check whether the collection is "sane" or not.
1575    *
1576    * In the context of adaptive streaming, we can only handle multiplexed
1577    * content that provides at most one stream of valid types (audio, video,
1578    * text). Without this we cannot reliably match the output of this multiplex
1579    * to the various tracks.
1580    *
1581    * FIXME : In the future and *IF* we encounter such streams, we could envision
1582    * supporting multiple streams of the same type if, and only if, they have
1583    * tags that allow differentiating them (ex: languages).
1584    */
1585   if (!can_handle_collection (stream, collection)) {
1586     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1587         (_("Stream format can't be handled")),
1588         ("The streams provided by the multiplex are ambiguous"));
1589     goto beach;
1590   }
1591
1592   /* Store the collection on the stream */
1593   gst_object_replace ((GstObject **) & stream->stream_collection,
1594       (GstObject *) collection);
1595
1596   /* IF there are pending tracks, ask the subclass to handle that */
1597   if (stream->pending_tracks) {
1598     GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1599     g_assert (demux_class->stream_update_tracks);
1600     demux_class->stream_update_tracks (demux, stream);
1601     TRACKS_LOCK (demux);
1602     stream->pending_tracks = FALSE;
1603     pending_tracks_activated = TRUE;
1604     if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1605         demux->input_period == demux->output_period)
1606       gst_adaptive_demux_post_collection (demux);
1607   } else {
1608     g_assert (stream->tracks);
1609     TRACKS_LOCK (demux);
1610     /* If we already have assigned tracks, update the pending upstream stream_id
1611      * for each of them based on the collection information. */
1612     gst_adaptive_demux2_stream_update_tracks (demux, stream);
1613   }
1614
1615   /* If we discovered pending tracks and we no longer have any, we can ensure
1616    * selected tracks are started */
1617   if (pending_tracks_activated
1618       && !gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1619     GList *iter = demux->input_period->streams;
1620     for (; iter; iter = iter->next) {
1621       GstAdaptiveDemux2Stream *new_stream = iter->data;
1622
1623       /* The stream that posted this collection was already started. If a
1624        * different stream is now selected, start it */
1625       if (stream != new_stream
1626           && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1627         gst_adaptive_demux2_stream_start (new_stream);
1628     }
1629   }
1630   TRACKS_UNLOCK (demux);
1631
1632 beach:
1633   GST_MANIFEST_UNLOCK (demux);
1634
1635   gst_message_unref (msg);
1636   msg = NULL;
1637 }
1638
1639 static void
1640 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1641 {
1642   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1643
1644   switch (GST_MESSAGE_TYPE (msg)) {
1645     case GST_MESSAGE_STREAM_COLLECTION:
1646     {
1647       gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1648       return;
1649     }
1650     case GST_MESSAGE_ERROR:{
1651       GstAdaptiveDemux2Stream *stream = NULL;
1652       GError *err = NULL;
1653       gchar *debug = NULL;
1654       gchar *new_error = NULL;
1655       const GstStructure *details = NULL;
1656
1657       GST_MANIFEST_LOCK (demux);
1658
1659       stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1660       if (stream == NULL) {
1661         GST_WARNING_OBJECT (demux,
1662             "Failed to locate stream for errored element");
1663         GST_MANIFEST_UNLOCK (demux);
1664         break;
1665       }
1666
1667       gst_message_parse_error (msg, &err, &debug);
1668
1669       GST_WARNING_OBJECT (demux,
1670           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1671           err->message, debug);
1672
1673       if (debug)
1674         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1675       if (new_error) {
1676         g_free (err->message);
1677         err->message = new_error;
1678       }
1679
1680       gst_message_parse_error_details (msg, &details);
1681       if (details) {
1682         gst_structure_get_uint (details, "http-status-code",
1683             &stream->last_status_code);
1684       }
1685
1686       /* error, but ask to retry */
1687       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1688         gst_adaptive_demux2_stream_parse_error (stream, err);
1689         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1690       }
1691
1692       g_error_free (err);
1693       g_free (debug);
1694
1695       GST_MANIFEST_UNLOCK (demux);
1696
1697       gst_message_unref (msg);
1698       msg = NULL;
1699     }
1700       break;
1701     default:
1702       break;
1703   }
1704
1705   if (msg)
1706     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1707 }
1708
1709 /* must be called with manifest_lock taken */
1710 GstClockTime
1711 gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1712     GstAdaptiveDemux2Stream * stream)
1713 {
1714   GstAdaptiveDemuxClass *klass;
1715
1716   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1717
1718   if (klass->get_presentation_offset == NULL)
1719     return 0;
1720
1721   return klass->get_presentation_offset (demux, stream);
1722 }
1723
1724 /* must be called with manifest_lock taken */
1725 GstClockTime
1726 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1727 {
1728   GstAdaptiveDemuxClass *klass;
1729
1730   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1731
1732   if (klass->get_period_start_time == NULL)
1733     return 0;
1734
1735   return klass->get_period_start_time (demux);
1736 }
1737
1738 /* must be called with manifest_lock taken */
1739 static gboolean
1740 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1741     gboolean first_and_live)
1742 {
1743   GList *iter;
1744   GstClockTime period_start;
1745   GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1746   GList *new_streams;
1747
1748   g_return_val_if_fail (demux->input_period->streams, FALSE);
1749   g_assert (demux->input_period->prepared == FALSE);
1750
1751   new_streams = demux->input_period->streams;
1752
1753   if (!gst_adaptive_demux2_is_running (demux)) {
1754     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1755     return TRUE;
1756   }
1757
1758   GST_DEBUG_OBJECT (demux,
1759       "Preparing %d streams for period %d , first_and_live:%d",
1760       g_list_length (new_streams), demux->input_period->period_num,
1761       first_and_live);
1762
1763   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1764     GstAdaptiveDemux2Stream *stream = iter->data;
1765
1766     GST_DEBUG_OBJECT (stream, "Preparing stream");
1767
1768     stream->need_header = TRUE;
1769     stream->discont = TRUE;
1770
1771     /* Grab the first stream time for live streams
1772      * * If the stream is selected
1773      * * Or it provides dynamic tracks (in which case we need to force an update)
1774      */
1775     if (first_and_live
1776         && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1777             || stream->pending_tracks)) {
1778       /* TODO we only need the first timestamp, maybe create a simple function to
1779        * get the current PTS of a fragment ? */
1780       GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1781       gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1782
1783       GST_DEBUG_OBJECT (stream,
1784           "Got stream time %" GST_STIME_FORMAT,
1785           GST_STIME_ARGS (stream->fragment.stream_time));
1786
1787       if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1788         min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1789       } else {
1790         min_stream_time = stream->fragment.stream_time;
1791       }
1792     }
1793   }
1794
1795   period_start = gst_adaptive_demux_get_period_start_time (demux);
1796
1797   /* For live streams, the subclass is supposed to seek to the current fragment
1798    * and then tell us its stream time in stream->fragment.stream_time.  We now
1799    * also have to seek our demuxer segment to reflect this.
1800    *
1801    * FIXME: This needs some refactoring at some point.
1802    */
1803   if (first_and_live) {
1804     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1805         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1806         GST_SEEK_TYPE_NONE, -1, NULL);
1807   }
1808
1809   GST_DEBUG_OBJECT (demux,
1810       "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1811       " demux segment %" GST_SEGMENT_FORMAT,
1812       GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1813       &demux->segment);
1814
1815   /* Synchronize stream start/current positions */
1816   if (min_stream_time == GST_CLOCK_STIME_NONE)
1817     min_stream_time = period_start;
1818   else
1819     min_stream_time += period_start;
1820   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1821     GstAdaptiveDemux2Stream *stream = iter->data;
1822     stream->start_position = stream->current_position = min_stream_time;
1823   }
1824
1825   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1826     GstAdaptiveDemux2Stream *stream = iter->data;
1827     stream->compute_segment = TRUE;
1828     stream->first_and_live = first_and_live;
1829   }
1830   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1831   demux->input_period->prepared = TRUE;
1832
1833   return TRUE;
1834 }
1835
1836 static GstAdaptiveDemuxTrack *
1837 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1838 {
1839   GList *tmp;
1840
1841   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1842     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1843     if (!g_strcmp0 (track->stream_id, stream_id))
1844       return track;
1845   }
1846
1847   return NULL;
1848 }
1849
1850 /* TRACKS_LOCK hold */
1851 void
1852 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1853 {
1854   GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1855   GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1856   GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1857   GList *tmp;
1858   gint min_percent = -1, percent;
1859   gboolean all_eos = TRUE;
1860
1861   /* Go over all active tracks of the output period and update level */
1862
1863   /* Check that all tracks are above their respective low thresholds (different
1864    * tracks may have different fragment durations yielding different buffering
1865    * percentages) Overall buffering percent is the lowest. */
1866   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1867     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1868
1869     GST_LOG_OBJECT (demux,
1870         "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1871         GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1872         track->stream_id, track->period_num, track->active, track->selected,
1873         track->eos, GST_TIME_ARGS (track->level_time),
1874         GST_TIME_ARGS (track->buffering_threshold));
1875
1876     if (track->active && track->selected) {
1877       if (!track->eos) {
1878         gint cur_percent;
1879
1880         all_eos = FALSE;
1881         if (min_level_time == GST_CLOCK_TIME_NONE) {
1882           min_level_time = track->level_time;
1883         } else if (track->level_time < min_level_time) {
1884           min_level_time = track->level_time;
1885         }
1886
1887         if (track->type & GST_STREAM_TYPE_VIDEO
1888             && video_level_time > track->level_time)
1889           video_level_time = track->level_time;
1890
1891         if (track->type & GST_STREAM_TYPE_AUDIO
1892             && audio_level_time > track->level_time)
1893           audio_level_time = track->level_time;
1894
1895         if (track->level_time != GST_CLOCK_TIME_NONE
1896             && track->buffering_threshold != 0) {
1897           cur_percent =
1898               gst_util_uint64_scale (track->level_time, 100,
1899               track->buffering_threshold);
1900           if (min_percent < 0 || cur_percent < min_percent)
1901             min_percent = cur_percent;
1902         }
1903       }
1904     }
1905   }
1906
1907   GST_DEBUG_OBJECT (demux,
1908       "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1909       GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1910
1911   /* Update demuxer video/audio level properties */
1912   GST_OBJECT_LOCK (demux);
1913   demux->current_level_time_video = video_level_time;
1914   demux->current_level_time_audio = audio_level_time;
1915   GST_OBJECT_UNLOCK (demux);
1916
1917   if (min_percent < 0 && !all_eos)
1918     return;
1919
1920   if (min_percent > 100 || all_eos)
1921     percent = 100;
1922   else
1923     percent = MAX (0, min_percent);
1924
1925   GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1926
1927   if (demux->priv->is_buffering) {
1928     if (percent >= 100)
1929       demux->priv->is_buffering = FALSE;
1930     if (demux->priv->percent != percent) {
1931       demux->priv->percent = percent;
1932       demux->priv->percent_changed = TRUE;
1933     }
1934   } else if (percent < 1) {
1935     demux->priv->is_buffering = TRUE;
1936     if (demux->priv->percent != percent) {
1937       demux->priv->percent = percent;
1938       demux->priv->percent_changed = TRUE;
1939     }
1940   }
1941
1942   if (demux->priv->percent_changed)
1943     GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1944         demux->priv->is_buffering);
1945 }
1946
1947 /* With TRACKS_LOCK held */
1948 void
1949 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1950 {
1951   gint percent;
1952   GstMessage *msg;
1953
1954   if (!demux->priv->percent_changed)
1955     return;
1956
1957   BUFFERING_LOCK (demux);
1958   percent = demux->priv->percent;
1959   msg = gst_message_new_buffering ((GstObject *) demux, percent);
1960   TRACKS_UNLOCK (demux);
1961   gst_element_post_message ((GstElement *) demux, msg);
1962
1963   BUFFERING_UNLOCK (demux);
1964   TRACKS_LOCK (demux);
1965   if (percent == demux->priv->percent)
1966     demux->priv->percent_changed = FALSE;
1967 }
1968
1969 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1970 GstAdaptiveDemux2Stream *
1971 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1972     GstAdaptiveDemuxTrack * track)
1973 {
1974   GList *iter;
1975
1976   for (iter = demux->output_period->streams; iter; iter = iter->next) {
1977     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1978     if (g_list_find (stream->tracks, track))
1979       return stream;
1980   }
1981
1982   return NULL;
1983 }
1984
1985 /* Called from seek handler
1986  *
1987  * This function is used when a (flushing) seek caused a new period to be activated.
1988  *
1989  * This will ensure that:
1990  * * the current output period is marked as finished (EOS)
1991  * * Any potential intermediate (non-input/non-output) periods are removed
1992  * * That the new input period is prepared and ready
1993  */
1994 static void
1995 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1996 {
1997   GList *iter;
1998
1999   GST_DEBUG_OBJECT (demux,
2000       "Preparing new input period %u", demux->input_period->period_num);
2001
2002   /* Prepare the new input period */
2003   gst_adaptive_demux_update_collection (demux, demux->input_period);
2004
2005   /* Transfer the previous selection to the new input period */
2006   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
2007       demux->output_period);
2008   gst_adaptive_demux_prepare_streams (demux, FALSE);
2009
2010   /* Remove all periods except for the input (last) and output (first) period */
2011   while (demux->priv->periods->length > 2) {
2012     GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
2013     /* Mark all tracks of the removed period as not selected and EOS so they
2014      * will be skipped / ignored */
2015     for (iter = period->tracks; iter; iter = iter->next) {
2016       GstAdaptiveDemuxTrack *track = iter->data;
2017       track->selected = FALSE;
2018       track->eos = TRUE;
2019     }
2020     gst_adaptive_demux_period_unref (period);
2021   }
2022
2023   /* Mark all tracks of the output period as EOS so that the output loop
2024    * will immediately move to the new period */
2025   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2026     GstAdaptiveDemuxTrack *track = iter->data;
2027     track->eos = TRUE;
2028   }
2029
2030   /* Go over all slots, and clear any pending track */
2031   for (iter = demux->priv->outputs; iter; iter = iter->next) {
2032     OutputSlot *slot = (OutputSlot *) iter->data;
2033
2034     if (slot->pending_track != NULL) {
2035       GST_DEBUG_OBJECT (demux,
2036           "Removing track '%s' as pending from output of current track '%s'",
2037           slot->pending_track->stream_id, slot->track->stream_id);
2038       gst_adaptive_demux_track_unref (slot->pending_track);
2039       slot->pending_track = NULL;
2040     }
2041   }
2042 }
2043
2044 /* must be called with manifest_lock taken */
2045 gboolean
2046 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
2047     gint64 * range_start, gint64 * range_stop)
2048 {
2049   GstAdaptiveDemuxClass *klass;
2050
2051   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2052
2053   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
2054
2055   return klass->get_live_seek_range (demux, range_start, range_stop);
2056 }
2057
2058 /* must be called with manifest_lock taken */
2059 gboolean
2060 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
2061     GstAdaptiveDemux2Stream * stream)
2062 {
2063   gint64 range_start, range_stop;
2064   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
2065     GST_LOG_OBJECT (stream,
2066         "stream position %" GST_TIME_FORMAT "  live seek range %"
2067         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
2068         GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
2069         GST_STIME_ARGS (range_stop));
2070     return (stream->current_position >= range_start
2071         && stream->current_position <= range_stop);
2072   }
2073
2074   return FALSE;
2075 }
2076
2077 /* must be called with manifest_lock taken */
2078 static gboolean
2079 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
2080 {
2081   GstAdaptiveDemuxClass *klass;
2082
2083   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2084   if (gst_adaptive_demux_is_live (demux)) {
2085     return klass->get_live_seek_range != NULL;
2086   }
2087
2088   return klass->seek != NULL;
2089 }
2090
2091 static void
2092 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
2093     GstSeekType start_type, GstSeekType stop_type)
2094 {
2095   GList *iter;
2096
2097   for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2098     GstAdaptiveDemux2Stream *stream = iter->data;
2099
2100     /* Make sure the download loop clears and restarts on the next start,
2101      * which will recompute the stream segment */
2102     g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2103         stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
2104     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2105     stream->start_position = 0;
2106
2107     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
2108       stream->start_position = demux->segment.start;
2109     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
2110       stream->start_position = demux->segment.stop;
2111   }
2112 }
2113
2114 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
2115                               GST_SEEK_FLAG_SNAP_AFTER |          \
2116                               GST_SEEK_FLAG_SNAP_NEAREST |        \
2117                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
2118                               GST_SEEK_FLAG_KEY_UNIT))
2119 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
2120                               GST_SEEK_FLAG_SNAP_AFTER | \
2121                               GST_SEEK_FLAG_SNAP_NEAREST))
2122
2123 static gboolean
2124 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
2125     GstEvent * event)
2126 {
2127   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2128   gdouble rate;
2129   GstFormat format;
2130   GstSeekFlags flags;
2131   GstSeekType start_type, stop_type;
2132   gint64 start, stop;
2133   guint32 seqnum;
2134   gboolean update;
2135   gboolean ret;
2136   GstSegment oldsegment;
2137   GstEvent *flush_event;
2138
2139   GST_INFO_OBJECT (demux, "Received seek event");
2140
2141   GST_API_LOCK (demux);
2142
2143   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2144       &stop_type, &stop);
2145
2146   if (format != GST_FORMAT_TIME) {
2147     GST_API_UNLOCK (demux);
2148     GST_WARNING_OBJECT (demux,
2149         "Adaptive demuxers only support TIME-based seeking");
2150     gst_event_unref (event);
2151     return FALSE;
2152   }
2153
2154   if (flags & GST_SEEK_FLAG_SEGMENT) {
2155     GST_FIXME_OBJECT (demux, "Handle segment seeks");
2156     GST_API_UNLOCK (demux);
2157     gst_event_unref (event);
2158     return FALSE;
2159   }
2160
2161   seqnum = gst_event_get_seqnum (event);
2162
2163   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2164     GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2165     return FALSE;
2166   }
2167
2168   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2169     /* For instant rate seeks, reply directly and update
2170      * our segment so the new rate is reflected in any future
2171      * fragments */
2172     GstEvent *ev;
2173     gdouble rate_multiplier;
2174
2175     /* instant rate change only supported if direction does not change. All
2176      * other requirements are already checked before creating the seek event
2177      * but let's double-check here to be sure */
2178     if ((demux->segment.rate > 0 && rate < 0) ||
2179         (demux->segment.rate < 0 && rate > 0) ||
2180         start_type != GST_SEEK_TYPE_NONE ||
2181         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2182       GST_ERROR_OBJECT (demux,
2183           "Instant rate change seeks only supported in the "
2184           "same direction, without flushing and position change");
2185       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2186       GST_API_UNLOCK (demux);
2187       return FALSE;
2188     }
2189
2190     rate_multiplier = rate / demux->segment.rate;
2191
2192     ev = gst_event_new_instant_rate_change (rate_multiplier,
2193         (GstSegmentFlags) flags);
2194     gst_event_set_seqnum (ev, seqnum);
2195
2196     ret = gst_adaptive_demux_push_src_event (demux, ev);
2197
2198     if (ret) {
2199       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2200       demux->instant_rate_multiplier = rate_multiplier;
2201       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2202     }
2203
2204     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2205     GST_API_UNLOCK (demux);
2206     gst_event_unref (event);
2207
2208     return ret;
2209   }
2210
2211   if (!gst_adaptive_demux_can_seek (demux)) {
2212     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2213
2214     GST_API_UNLOCK (demux);
2215     gst_event_unref (event);
2216     return FALSE;
2217   }
2218
2219   /* We can only accept flushing seeks from this point onward */
2220   if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2221     GST_ERROR_OBJECT (demux,
2222         "Non-flushing non-instant-rate seeks are not possible");
2223
2224     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2225
2226     GST_API_UNLOCK (demux);
2227     gst_event_unref (event);
2228     return FALSE;
2229   }
2230
2231   if (gst_adaptive_demux_is_live (demux)) {
2232     gint64 range_start, range_stop;
2233     gboolean changed = FALSE;
2234     gboolean start_valid = TRUE, stop_valid = TRUE;
2235
2236     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2237             &range_stop)) {
2238       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2239       GST_API_UNLOCK (demux);
2240       gst_event_unref (event);
2241       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2242       return FALSE;
2243     }
2244
2245     GST_DEBUG_OBJECT (demux,
2246         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2247         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2248
2249     /* Handle relative positioning for live streams (relative to the range_stop) */
2250     if (start_type == GST_SEEK_TYPE_END) {
2251       start = range_stop + start;
2252       start_type = GST_SEEK_TYPE_SET;
2253       changed = TRUE;
2254     }
2255     if (stop_type == GST_SEEK_TYPE_END) {
2256       stop = range_stop + stop;
2257       stop_type = GST_SEEK_TYPE_SET;
2258       changed = TRUE;
2259     }
2260
2261     /* Adjust the requested start/stop position if it falls beyond the live
2262      * seek range.
2263      * The only case where we don't adjust is for the starting point of
2264      * an accurate seek (start if forward and stop if backwards)
2265      */
2266     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2267         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2268       GST_DEBUG_OBJECT (demux,
2269           "seek before live stream start, setting to range start: %"
2270           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2271       start = range_start;
2272       changed = TRUE;
2273     }
2274     /* truncate stop position also if set */
2275     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2276         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2277       GST_DEBUG_OBJECT (demux,
2278           "seek ending after live start, adjusting to: %"
2279           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2280       stop = range_stop;
2281       changed = TRUE;
2282     }
2283
2284     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2285         (start < range_start || start > range_stop)) {
2286       GST_WARNING_OBJECT (demux,
2287           "Seek to invalid position start:%" GST_STIME_FORMAT
2288           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2289           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2290           GST_STIME_ARGS (range_stop));
2291       start_valid = FALSE;
2292     }
2293     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2294         (stop < range_start || stop > range_stop)) {
2295       GST_WARNING_OBJECT (demux,
2296           "Seek to invalid position stop:%" GST_STIME_FORMAT
2297           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2298           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2299           GST_STIME_ARGS (range_stop));
2300       stop_valid = FALSE;
2301     }
2302
2303     /* If the seek position is still outside of the seekable range, refuse the seek */
2304     if (!start_valid || !stop_valid) {
2305       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2306       GST_API_UNLOCK (demux);
2307       gst_event_unref (event);
2308       return FALSE;
2309     }
2310
2311     /* Re-create seek event with changed/updated values */
2312     if (changed) {
2313       gst_event_unref (event);
2314       event =
2315           gst_event_new_seek (rate, format, flags,
2316           start_type, start, stop_type, stop);
2317       gst_event_set_seqnum (event, seqnum);
2318     }
2319   }
2320
2321   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2322
2323   /* have a backup in case seek fails */
2324   gst_segment_copy_into (&demux->segment, &oldsegment);
2325
2326   GST_DEBUG_OBJECT (demux, "sending flush start");
2327   flush_event = gst_event_new_flush_start ();
2328   gst_event_set_seqnum (flush_event, seqnum);
2329
2330   gst_adaptive_demux_push_src_event (demux, flush_event);
2331
2332   gst_adaptive_demux_stop_tasks (demux, FALSE);
2333   gst_adaptive_demux_reset_tracks (demux);
2334
2335   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2336
2337   /*
2338    * Handle snap seeks as follows:
2339    * 1) do the snap seeking a (random) active stream
2340    * 2) use the final position on this stream to seek
2341    *    on the other streams to the same position
2342    *
2343    * We can't snap at all streams at the same time as they might end in
2344    * different positions, so just pick one and align all others to that
2345    * position.
2346    */
2347   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek) {
2348     GstAdaptiveDemux2Stream *stream = NULL;
2349     GstClockTimeDiff ts;
2350     GstSeekFlags stream_seek_flags = flags;
2351     GList *iter;
2352
2353     /* snap-seek on the stream that received the event and then
2354      * use the resulting position to seek on all streams */
2355
2356     if (rate >= 0) {
2357       if (start_type != GST_SEEK_TYPE_NONE)
2358         ts = start;
2359       else {
2360         ts = gst_segment_position_from_running_time (&demux->segment,
2361             GST_FORMAT_TIME, demux->priv->global_output_position);
2362         start_type = GST_SEEK_TYPE_SET;
2363       }
2364     } else {
2365       if (stop_type != GST_SEEK_TYPE_NONE)
2366         ts = stop;
2367       else {
2368         stop_type = GST_SEEK_TYPE_SET;
2369         ts = gst_segment_position_from_running_time (&demux->segment,
2370             GST_FORMAT_TIME, demux->priv->global_output_position);
2371       }
2372     }
2373
2374     /* Pick a random active stream on which to do the stream seek */
2375     for (iter = demux->output_period->streams; iter; iter = iter->next) {
2376       GstAdaptiveDemux2Stream *cand = iter->data;
2377       if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2378         stream = cand;
2379         break;
2380       }
2381     }
2382     if (stream) {
2383       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2384     }
2385
2386     /* replace event with a new one without snapping to seek on all streams */
2387     gst_event_unref (event);
2388     if (rate >= 0) {
2389       start = ts;
2390     } else {
2391       stop = ts;
2392     }
2393     event =
2394         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2395         start_type, start, stop_type, stop);
2396     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2397   }
2398
2399   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2400       start, stop_type, stop, &update);
2401
2402   if (ret) {
2403     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2404
2405     ret = demux_class->seek (demux, event);
2406   }
2407
2408   if (!ret) {
2409     /* Is there anything else we can do if it fails? */
2410     gst_segment_copy_into (&oldsegment, &demux->segment);
2411   } else {
2412     demux->priv->segment_seqnum = seqnum;
2413   }
2414   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2415
2416   /* Resetting flow combiner */
2417   gst_flow_combiner_reset (demux->priv->flowcombiner);
2418
2419   GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2420   flush_event = gst_event_new_flush_stop (TRUE);
2421   gst_event_set_seqnum (flush_event, seqnum);
2422   gst_adaptive_demux_push_src_event (demux, flush_event);
2423
2424   /* If the seek generated a new period, prepare it */
2425   if (!demux->input_period->prepared) {
2426     /* This can only happen on flushing seeks */
2427     g_assert (flags & GST_SEEK_FLAG_FLUSH);
2428     gst_adaptive_demux_seek_to_input_period (demux);
2429   }
2430
2431   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2432   GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2433       &demux->segment);
2434   gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2435   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2436
2437   /* Reset the global output position (running time) for when the output loop restarts */
2438   demux->priv->global_output_position = 0;
2439
2440   /* After a flushing seek, any instant-rate override is undone */
2441   demux->instant_rate_multiplier = 1.0;
2442
2443   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2444
2445   /* Restart the demux */
2446   gst_adaptive_demux_start_tasks (demux);
2447
2448   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2449   GST_API_UNLOCK (demux);
2450   gst_event_unref (event);
2451
2452   return ret;
2453 }
2454
2455 /* Returns TRUE if the stream has at least one selected track */
2456 static gboolean
2457 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2458     stream)
2459 {
2460   GList *tmp;
2461
2462   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2463     GstAdaptiveDemuxTrack *track = tmp->data;
2464
2465     if (track->selected)
2466       return TRUE;
2467   }
2468
2469   return FALSE;
2470 }
2471
2472 static gboolean
2473 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2474     guint32 seqnum)
2475 {
2476   gboolean selection_handled = TRUE;
2477   GList *iter;
2478   GList *tracks = NULL;
2479
2480   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2481     return FALSE;
2482
2483   TRACKS_LOCK (demux);
2484   /* Validate the streams and fill:
2485    * tracks : list of tracks corresponding to requested streams
2486    */
2487   for (iter = streams; iter; iter = iter->next) {
2488     gchar *stream_id = (gchar *) iter->data;
2489     GstAdaptiveDemuxTrack *track;
2490
2491     GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2492     track = find_track_for_stream_id (demux->output_period, stream_id);
2493     if (!track) {
2494       GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2495       selection_handled = FALSE;
2496       goto select_streams_done;
2497     }
2498     tracks = g_list_append (tracks, track);
2499     GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2500   }
2501
2502   /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2503    * SCHEDULING THREAD */
2504
2505   /* FIXME: We want to iterate all streams, mark them as deselected,
2506    * then iterate tracks and mark any streams that have at least 1
2507    * active output track, then loop over all streams again and start/stop
2508    * them as needed */
2509
2510   /* Go over all tracks present and (de)select based on current selection */
2511   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2512     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2513
2514     if (track->selected && !g_list_find (tracks, track)) {
2515       GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2516           track->stream_id, track->active);
2517       track->selected = FALSE;
2518       track->draining = TRUE;
2519     } else if (!track->selected && g_list_find (tracks, track)) {
2520       GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2521
2522       track->selected = TRUE;
2523     }
2524   }
2525
2526   /* Start or stop streams based on the updated track selection */
2527   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2528     GstAdaptiveDemux2Stream *stream = iter->data;
2529     GList *trackiter;
2530
2531     gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2532     gboolean should_be_running =
2533         gst_adaptive_demux2_stream_has_selected_tracks (stream);
2534
2535     if (!is_running && should_be_running) {
2536       GstClockTime output_running_ts = demux->priv->global_output_position;
2537       GstClockTime start_position;
2538
2539       /* Calculate where we should start the stream, and then
2540        * start it. */
2541       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2542
2543       GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2544           GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2545           GST_TIME_ARGS (output_running_ts), &demux->segment);
2546
2547       start_position =
2548           gst_segment_position_from_running_time (&demux->segment,
2549           GST_FORMAT_TIME, output_running_ts);
2550
2551       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2552
2553       GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2554           GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2555
2556       stream->current_position = stream->start_position = start_position;
2557       stream->compute_segment = TRUE;
2558
2559       /* If output has already begun, ensure we seek this segment
2560        * to the correct restart position when the download loop begins */
2561       if (output_running_ts != 0)
2562         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2563
2564       /* Activate track pads for this stream */
2565       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2566         GstAdaptiveDemuxTrack *track =
2567             (GstAdaptiveDemuxTrack *) trackiter->data;
2568         gst_pad_set_active (track->sinkpad, TRUE);
2569       }
2570
2571       gst_adaptive_demux2_stream_start (stream);
2572     } else if (is_running && !should_be_running) {
2573       /* Stream should not be running and needs stopping */
2574       gst_adaptive_demux2_stream_stop (stream);
2575
2576       /* Set all track sinkpads to inactive for this stream */
2577       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2578         GstAdaptiveDemuxTrack *track =
2579             (GstAdaptiveDemuxTrack *) trackiter->data;
2580         gst_pad_set_active (track->sinkpad, FALSE);
2581       }
2582     }
2583   }
2584
2585   g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2586
2587 select_streams_done:
2588   demux_update_buffering_locked (demux);
2589   demux_post_buffering_locked (demux);
2590
2591   TRACKS_UNLOCK (demux);
2592   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2593
2594   if (tracks)
2595     g_list_free (tracks);
2596   return selection_handled;
2597 }
2598
2599 static gboolean
2600 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2601     GstEvent * event)
2602 {
2603   GstAdaptiveDemux *demux;
2604
2605   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2606
2607   switch (event->type) {
2608     case GST_EVENT_SEEK:
2609     {
2610       guint32 seqnum = gst_event_get_seqnum (event);
2611       if (seqnum == demux->priv->segment_seqnum) {
2612         GST_LOG_OBJECT (pad,
2613             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2614         gst_event_unref (event);
2615         return TRUE;
2616       }
2617       return gst_adaptive_demux_handle_seek_event (demux, event);
2618     }
2619     case GST_EVENT_LATENCY:{
2620       /* Upstream and our internal source are irrelevant
2621        * for latency, and we should not fail here to
2622        * configure the latency */
2623       gst_event_unref (event);
2624       return TRUE;
2625     }
2626     case GST_EVENT_QOS:{
2627       GstClockTimeDiff diff;
2628       GstClockTime timestamp;
2629       GstClockTime earliest_time;
2630
2631       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2632       /* Only take into account lateness if late */
2633       if (diff > 0)
2634         earliest_time = timestamp + 2 * diff;
2635       else
2636         earliest_time = timestamp;
2637
2638       GST_OBJECT_LOCK (demux);
2639       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2640           earliest_time > demux->priv->qos_earliest_time) {
2641         demux->priv->qos_earliest_time = earliest_time;
2642         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2643             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2644       }
2645       GST_OBJECT_UNLOCK (demux);
2646       break;
2647     }
2648     case GST_EVENT_SELECT_STREAMS:
2649     {
2650       GList *streams;
2651       gboolean selection_handled;
2652
2653       if (GST_EVENT_SEQNUM (event) ==
2654           g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2655         GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2656             GST_EVENT_SEQNUM (event));
2657         return TRUE;
2658       }
2659
2660       gst_event_parse_select_streams (event, &streams);
2661       selection_handled =
2662           handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2663       g_list_free_full (streams, g_free);
2664       return selection_handled;
2665     }
2666     default:
2667       break;
2668   }
2669
2670   return gst_pad_event_default (pad, parent, event);
2671 }
2672
2673 static gboolean
2674 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2675     GstQuery * query)
2676 {
2677   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2678   gboolean ret = FALSE;
2679
2680   if (query == NULL)
2681     return FALSE;
2682
2683
2684   switch (query->type) {
2685     case GST_QUERY_DURATION:{
2686       GstFormat fmt;
2687       GstClockTime duration = GST_CLOCK_TIME_NONE;
2688
2689       gst_query_parse_duration (query, &fmt, NULL);
2690
2691       if (gst_adaptive_demux_is_live (demux)) {
2692         /* We are able to answer this query: the duration is unknown */
2693         gst_query_set_duration (query, fmt, -1);
2694         ret = TRUE;
2695         break;
2696       }
2697
2698       if (fmt == GST_FORMAT_TIME
2699           && g_atomic_int_get (&demux->priv->have_manifest)) {
2700
2701         GST_MANIFEST_LOCK (demux);
2702         duration = demux->priv->duration;
2703         GST_MANIFEST_UNLOCK (demux);
2704
2705         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2706           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2707           ret = TRUE;
2708         }
2709       }
2710
2711       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2712           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2713       break;
2714     }
2715     case GST_QUERY_LATENCY:{
2716       gst_query_set_latency (query, FALSE, 0, -1);
2717       ret = TRUE;
2718       break;
2719     }
2720     case GST_QUERY_SEEKING:{
2721       GstFormat fmt;
2722       gint64 stop = -1;
2723       gint64 start = 0;
2724
2725       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2726         GST_INFO_OBJECT (demux,
2727             "Don't have manifest yet, can't answer seeking query");
2728         return FALSE;           /* can't answer without manifest */
2729       }
2730
2731       GST_MANIFEST_LOCK (demux);
2732
2733       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2734       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2735       if (fmt == GST_FORMAT_TIME) {
2736         GstClockTime duration;
2737         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2738
2739         ret = TRUE;
2740         if (can_seek) {
2741           if (gst_adaptive_demux_is_live (demux)) {
2742             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2743             if (!ret) {
2744               GST_MANIFEST_UNLOCK (demux);
2745               GST_INFO_OBJECT (demux, "can't answer seeking query");
2746               return FALSE;
2747             }
2748           } else {
2749             duration = demux->priv->duration;
2750             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2751               stop = duration;
2752           }
2753         }
2754         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2755         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2756             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2757             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2758       }
2759       GST_MANIFEST_UNLOCK (demux);
2760       break;
2761     }
2762     case GST_QUERY_URI:
2763
2764       GST_MANIFEST_LOCK (demux);
2765
2766       /* TODO HLS can answer this differently it seems */
2767       if (demux->manifest_uri) {
2768         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2769          * playlist or the the uri of the last downlowaded fragment? */
2770         gst_query_set_uri (query, demux->manifest_uri);
2771         ret = TRUE;
2772       }
2773
2774       GST_MANIFEST_UNLOCK (demux);
2775       break;
2776     case GST_QUERY_SELECTABLE:
2777       gst_query_set_selectable (query, TRUE);
2778       ret = TRUE;
2779       break;
2780     default:
2781       /* Don't forward queries upstream because of the special nature of this
2782        *  "demuxer", which relies on the upstream element only to be fed
2783        *  the Manifest
2784        */
2785       break;
2786   }
2787
2788   return ret;
2789 }
2790
2791 gboolean
2792 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2793 {
2794   GstEvent *seek;
2795
2796   GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2797
2798   seek =
2799       gst_event_new_seek (1.0, GST_FORMAT_TIME,
2800       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2801       GST_SEEK_TYPE_NONE, 0);
2802   gst_adaptive_demux_handle_seek_event (demux, seek);
2803   return FALSE;
2804 }
2805
2806
2807 /* Called when the scheduler starts, to kick off manifest updates
2808  * and stream downloads */
2809 static gboolean
2810 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2811 {
2812   GList *iter;
2813
2814   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2815
2816   iter = demux->input_period->streams;
2817
2818   for (; iter; iter = g_list_next (iter)) {
2819     GstAdaptiveDemux2Stream *stream = iter->data;
2820
2821     /* If we need to process this stream to discover tracks *OR* it has any
2822      * tracks which are selected, start it now */
2823     if ((stream->pending_tracks == TRUE)
2824         || gst_adaptive_demux2_stream_is_selected_locked (stream))
2825       gst_adaptive_demux2_stream_start (stream);
2826   }
2827
2828   return FALSE;
2829 }
2830
2831 /* must be called with manifest_lock taken */
2832 static void
2833 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2834 {
2835   if (!gst_adaptive_demux2_is_running (demux)) {
2836     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2837     return;
2838   }
2839
2840   GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2841   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2842       (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2843
2844   TRACKS_LOCK (demux);
2845   demux->priv->flushing = FALSE;
2846   GST_DEBUG_OBJECT (demux, "Starting the output task");
2847   gst_task_start (demux->priv->output_task);
2848   TRACKS_UNLOCK (demux);
2849 }
2850
2851 /* must be called with manifest_lock taken */
2852 static void
2853 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2854 {
2855   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2856   if (demux->priv->manifest_updates_cb != 0) {
2857     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2858         demux->priv->manifest_updates_cb);
2859     demux->priv->manifest_updates_cb = 0;
2860   }
2861 }
2862
2863 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2864
2865 /* must be called with manifest_lock taken */
2866 static void
2867 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2868 {
2869   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2870
2871   if (gst_adaptive_demux_is_live (demux)) {
2872     /* Task to periodically update the manifest */
2873     if (demux_class->requires_periodical_playlist_update (demux)) {
2874       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2875       if (demux->priv->manifest_updates_cb == 0) {
2876         demux->priv->manifest_updates_cb =
2877             gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2878             (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2879       }
2880     }
2881   }
2882 }
2883
2884 /* must be called with manifest_lock taken
2885  * This function will temporarily release manifest_lock in order to join the
2886  * download threads.
2887  * The api_lock will still protect it against other threads trying to modify
2888  * the demux element.
2889  */
2890 static void
2891 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2892 {
2893   GST_LOG_OBJECT (demux, "Stopping tasks");
2894
2895   if (stop_updates)
2896     gst_adaptive_demux_stop_manifest_update_task (demux);
2897
2898   TRACKS_LOCK (demux);
2899   demux->priv->flushing = TRUE;
2900   g_cond_signal (&demux->priv->tracks_add);
2901   gst_task_stop (demux->priv->output_task);
2902   TRACKS_UNLOCK (demux);
2903
2904   gst_task_join (demux->priv->output_task);
2905
2906   if (demux->input_period)
2907     gst_adaptive_demux_period_stop_tasks (demux->input_period);
2908
2909   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2910 }
2911
2912 /* must be called with manifest_lock taken */
2913 static gboolean
2914 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2915 {
2916   GList *iter;
2917   gboolean ret = TRUE;
2918
2919   GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2920
2921   TRACKS_LOCK (demux);
2922   for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2923     OutputSlot *slot = (OutputSlot *) iter->data;
2924     gst_event_ref (event);
2925     GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2926     ret = ret & gst_pad_push_event (slot->pad, event);
2927     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2928       slot->pushed_timed_data = FALSE;
2929   }
2930   TRACKS_UNLOCK (demux);
2931   gst_event_unref (event);
2932   return ret;
2933 }
2934
2935 /* must be called with manifest_lock taken */
2936 void
2937 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2938     GstCaps * caps)
2939 {
2940   GST_DEBUG_OBJECT (stream,
2941       "setting new caps for stream %" GST_PTR_FORMAT, caps);
2942   gst_caps_replace (&stream->pending_caps, caps);
2943   gst_caps_unref (caps);
2944 }
2945
2946 /* must be called with manifest_lock taken */
2947 void
2948 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2949     GstTagList * tags)
2950 {
2951   GST_DEBUG_OBJECT (stream,
2952       "setting new tags for stream %" GST_PTR_FORMAT, tags);
2953   if (stream->pending_tags) {
2954     gst_tag_list_unref (stream->pending_tags);
2955   }
2956   stream->pending_tags = tags;
2957 }
2958
2959 /* must be called with manifest_lock taken */
2960 void
2961 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2962     GstEvent * event)
2963 {
2964   stream->pending_events = g_list_append (stream->pending_events, event);
2965 }
2966
2967 static guint64
2968 _update_average_bitrate (GstAdaptiveDemux * demux,
2969     GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
2970 {
2971   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2972
2973   stream->moving_bitrate -= stream->fragment_bitrates[index];
2974   stream->fragment_bitrates[index] = new_bitrate;
2975   stream->moving_bitrate += new_bitrate;
2976
2977   stream->moving_index += 1;
2978
2979   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2980     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2981   return stream->moving_bitrate / stream->moving_index;
2982 }
2983
2984 static guint64
2985 gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2986     GstAdaptiveDemux2Stream * stream)
2987 {
2988   guint64 average_bitrate;
2989   guint64 fragment_bitrate;
2990   guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
2991
2992   fragment_bitrate = stream->last_bitrate;
2993   GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2994       fragment_bitrate);
2995
2996   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2997
2998   GST_INFO_OBJECT (stream,
2999       "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
3000   GST_INFO_OBJECT (stream,
3001       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
3002       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
3003
3004   /* Conservative approach, make sure we don't upgrade too fast */
3005   GST_OBJECT_LOCK (demux);
3006   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
3007
3008   /* If this is the/a video stream update the overall demuxer
3009    * reported bitrate and notify, to give the application a
3010    * chance to choose a new connection-bitrate */
3011   if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
3012     demux->current_download_rate = stream->current_download_rate;
3013     GST_OBJECT_UNLOCK (demux);
3014     g_object_notify (G_OBJECT (demux), "current-bandwidth");
3015     GST_OBJECT_LOCK (demux);
3016   }
3017
3018   connection_speed = demux->connection_speed;
3019   min_bitrate = demux->min_bitrate;
3020   max_bitrate = demux->max_bitrate;
3021   GST_OBJECT_UNLOCK (demux);
3022
3023   if (connection_speed) {
3024     GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
3025         connection_speed / 1000);
3026     return connection_speed;
3027   }
3028
3029   /* No explicit connection_speed, so choose the new variant to use as a
3030    * fraction of the measured download rate */
3031   target_download_rate =
3032       CLAMP (stream->current_download_rate, 0,
3033       G_MAXUINT) * demux->bandwidth_target_ratio;
3034
3035   GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
3036       demux->bandwidth_target_ratio, target_download_rate);
3037
3038 #if 0
3039   /* Debugging code, modulate the bitrate every few fragments */
3040   {
3041     static guint ctr = 0;
3042     if (ctr % 3 == 0) {
3043       GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
3044       target_download_rate /= 2;
3045     }
3046     ctr++;
3047   }
3048 #endif
3049
3050   if (min_bitrate > 0 && target_download_rate < min_bitrate) {
3051     target_download_rate = min_bitrate;
3052     GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
3053         min_bitrate);
3054   }
3055
3056   if (max_bitrate > 0 && target_download_rate > max_bitrate) {
3057     target_download_rate = max_bitrate;
3058     GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
3059         max_bitrate);
3060   }
3061
3062   GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
3063       target_download_rate);
3064
3065   return target_download_rate;
3066 }
3067
3068 /* must be called with manifest_lock taken */
3069 static GstFlowReturn
3070 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
3071     GstAdaptiveDemux2Stream * stream)
3072 {
3073   /* No need to advance, this isn't a real fragment */
3074   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
3075     return GST_FLOW_OK;
3076
3077   return gst_adaptive_demux2_stream_advance_fragment (demux, stream,
3078       stream->fragment.duration);
3079 }
3080
3081 /* must be called with manifest_lock taken.
3082  * Can temporarily release manifest_lock
3083  */
3084 static GstFlowReturn
3085 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
3086     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
3087 {
3088   return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
3089 }
3090
3091 static gboolean
3092 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
3093     * demux)
3094 {
3095   return TRUE;
3096 }
3097
3098 /* Called when a stream needs waking after the manifest is updated */
3099 void
3100 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
3101 {
3102   demux->priv->stream_waiting_for_manifest = TRUE;
3103 }
3104
3105 static gboolean
3106 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
3107 {
3108   GstFlowReturn ret = GST_FLOW_OK;
3109   gboolean schedule_again = TRUE;
3110
3111   GST_MANIFEST_LOCK (demux);
3112   demux->priv->manifest_updates_cb = 0;
3113
3114   /* Updating playlist only needed for live playlists */
3115   if (!gst_adaptive_demux_is_live (demux)) {
3116     GST_MANIFEST_UNLOCK (demux);
3117     return G_SOURCE_REMOVE;
3118   }
3119
3120   GST_DEBUG_OBJECT (demux, "Updating playlist");
3121   ret = gst_adaptive_demux_update_manifest (demux);
3122
3123   if (ret == GST_FLOW_EOS) {
3124     GST_MANIFEST_UNLOCK (demux);
3125     return G_SOURCE_REMOVE;
3126   }
3127
3128   if (ret == GST_FLOW_OK) {
3129     GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
3130     demux->priv->update_failed_count = 0;
3131
3132     /* Wake up download tasks */
3133     if (demux->priv->stream_waiting_for_manifest) {
3134       GList *iter;
3135
3136       for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
3137         GstAdaptiveDemux2Stream *stream = iter->data;
3138         gst_adaptive_demux2_stream_on_manifest_update (stream);
3139       }
3140       demux->priv->stream_waiting_for_manifest = FALSE;
3141     }
3142   } else {
3143     demux->priv->update_failed_count++;
3144
3145     if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
3146       GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
3147           gst_flow_get_name (ret));
3148     } else {
3149       GST_ELEMENT_ERROR (demux, STREAM, FAILED,
3150           (_("Internal data stream error.")), ("Could not update playlist"));
3151       GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
3152       schedule_again = FALSE;
3153     }
3154   }
3155
3156   if (schedule_again) {
3157     GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3158
3159     demux->priv->manifest_updates_cb =
3160         gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3161         klass->get_manifest_update_interval (demux) * GST_USECOND,
3162         (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3163   }
3164
3165   GST_MANIFEST_UNLOCK (demux);
3166
3167   return G_SOURCE_REMOVE;
3168 }
3169
3170 static gboolean
3171 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
3172 {
3173   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3174
3175   /* Loop for updating of the playlist. This periodically checks if
3176    * the playlist is updated and does so, then signals the streaming
3177    * thread in case it can continue downloading now. */
3178
3179   /* block until the next scheduled update or the signal to quit this thread */
3180   GST_DEBUG_OBJECT (demux, "Started updates task");
3181   demux->priv->manifest_updates_cb =
3182       gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3183       klass->get_manifest_update_interval (demux) * GST_USECOND,
3184       (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3185
3186   return G_SOURCE_REMOVE;
3187 }
3188
3189 static OutputSlot *
3190 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
3191     GstAdaptiveDemuxTrack * track)
3192 {
3193   GList *tmp;
3194
3195   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3196     OutputSlot *slot = (OutputSlot *) tmp->data;
3197     /* Incompatible output type */
3198     if (slot->type != track->type)
3199       continue;
3200
3201     /* Slot which is already assigned to this pending track */
3202     if (slot->pending_track == track)
3203       return slot;
3204
3205     /* slot already used for another pending track */
3206     if (slot->pending_track != NULL)
3207       continue;
3208
3209     /* Current output track is of the same type and is draining */
3210     if (slot->track && slot->track->draining)
3211       return slot;
3212   }
3213
3214   return NULL;
3215 }
3216
3217 /* TRACKS_LOCK taken */
3218 static OutputSlot *
3219 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3220 {
3221   GList *tmp;
3222
3223   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3224     OutputSlot *slot = (OutputSlot *) tmp->data;
3225
3226     if (slot->track == track)
3227       return slot;
3228   }
3229
3230   return NULL;
3231 }
3232
3233 /* TRACKS_LOCK held */
3234 static GstMessage *
3235 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3236 {
3237   GList *tmp;
3238   GstMessage *msg;
3239
3240   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3241     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3242
3243     if (track->selected && !track->active)
3244       return NULL;
3245   }
3246
3247   /* All selected tracks are active, created message */
3248   msg =
3249       gst_message_new_streams_selected (GST_OBJECT (demux),
3250       demux->output_period->collection);
3251   GST_MESSAGE_SEQNUM (msg) = seqnum;
3252   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3253     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3254     if (track->active) {
3255       gst_message_streams_selected_add (msg, track->stream_object);
3256     }
3257   }
3258
3259   return msg;
3260 }
3261
3262 static void
3263 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3264     OutputSlot * slot)
3265 {
3266   GstAdaptiveDemuxTrack *track = slot->track;
3267   GstEvent *event;
3268
3269   /* Send EVENT_STREAM_START */
3270   event = gst_event_new_stream_start (track->stream_id);
3271   if (demux->have_group_id)
3272     gst_event_set_group_id (event, demux->group_id);
3273   gst_event_set_stream_flags (event, track->flags);
3274   gst_event_set_stream (event, track->stream_object);
3275   GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3276       track->stream_id);
3277   gst_pad_push_event (slot->pad, event);
3278
3279   /* Send EVENT_STREAM_COLLECTION */
3280   event = gst_event_new_stream_collection (demux->output_period->collection);
3281   GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3282       track->stream_id);
3283   gst_pad_push_event (slot->pad, event);
3284
3285   /* Mark all sticky events for re-sending */
3286   gst_event_store_mark_all_undelivered (&track->sticky_events);
3287 }
3288
3289 /*
3290  * Called with TRACKS_LOCK taken
3291  */
3292 static void
3293 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3294 {
3295   GList *tmp;
3296   guint requested_selection_seqnum;
3297   GstMessage *msg;
3298
3299   /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3300      output slots vs active/draining tracks */
3301   requested_selection_seqnum =
3302       g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3303
3304   if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3305     return;
3306
3307   GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3308
3309   /* Go over all slots, and if they have a pending track that's no longer
3310    * selected, clear it so the slot can be reused */
3311   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3312     OutputSlot *slot = (OutputSlot *) tmp->data;
3313
3314     if (slot->pending_track != NULL && !slot->pending_track->selected) {
3315       GST_DEBUG_OBJECT (demux,
3316           "Removing deselected track '%s' as pending from output of current track '%s'",
3317           slot->pending_track->stream_id, slot->track->stream_id);
3318       gst_adaptive_demux_track_unref (slot->pending_track);
3319       slot->pending_track = NULL;
3320     }
3321   }
3322
3323   /* Go over all tracks and create/re-assign/remove slots */
3324   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3325     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3326
3327     if (track->selected) {
3328       OutputSlot *slot = find_slot_for_track (demux, track);
3329
3330       /* 0. Track is selected and has a slot. Nothing to do */
3331       if (slot) {
3332         GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3333             track->stream_id);
3334         continue;
3335       }
3336
3337       slot = find_replacement_slot_for_track (demux, track);
3338       if (slot) {
3339         /* 1. There is an existing slot of the same type which is currently
3340          *    draining, assign this track as a replacement for it */
3341         g_assert (slot->pending_track == NULL || slot->pending_track == track);
3342         if (slot->pending_track == NULL) {
3343           slot->pending_track = gst_adaptive_demux_track_ref (track);
3344           GST_DEBUG_OBJECT (demux,
3345               "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3346               track->stream_id, track->period_num,
3347               slot->track->stream_id, slot->track->period_num);
3348         }
3349       } else {
3350         /* 2. There is no compatible replacement slot, create a new one */
3351         slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3352         GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3353             track->stream_id);
3354         demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3355
3356         track->update_next_segment = TRUE;
3357
3358         slot->track = gst_adaptive_demux_track_ref (track);
3359         track->active = TRUE;
3360         gst_adaptive_demux_send_initial_events (demux, slot);
3361       }
3362
3363       /* If we were draining this track, we no longer are */
3364       track->draining = FALSE;
3365     }
3366   }
3367
3368   /* Finally check all slots have a current/pending track. If not remove it */
3369   for (tmp = demux->priv->outputs; tmp;) {
3370     OutputSlot *slot = (OutputSlot *) tmp->data;
3371     /* We should never has slots without target tracks */
3372     g_assert (slot->track);
3373     if (slot->track->draining && !slot->pending_track) {
3374       GstAdaptiveDemux2Stream *stream;
3375
3376       GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3377           slot->track->stream_id);
3378       slot->track->active = FALSE;
3379
3380       /* If the stream feeding this track is stopped, flush and clear
3381        * the track now that it's going inactive. If the stream was not
3382        * found, it means we advanced past that period already (and the
3383        * stream was stopped and discarded) */
3384       stream = find_stream_for_track_locked (demux, slot->track);
3385       if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3386         gst_adaptive_demux_track_flush (slot->track);
3387
3388       tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3389       gst_adaptive_demux_output_slot_free (demux, slot);
3390     } else
3391       tmp = tmp->next;
3392   }
3393
3394   demux->priv->current_selection_seqnum = requested_selection_seqnum;
3395   msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3396   if (msg) {
3397     TRACKS_UNLOCK (demux);
3398     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3399     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3400     TRACKS_LOCK (demux);
3401   }
3402 }
3403
3404 /* TRACKS_LOCK held */
3405 static gboolean
3406 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3407 {
3408   GList *iter;
3409   GstAdaptiveDemuxPeriod *previous_period;
3410   GstStreamCollection *collection;
3411
3412   /* Grab the next period, should be demux->periods->next->data */
3413   previous_period = g_queue_pop_head (demux->priv->periods);
3414
3415   /* Remove ref held by demux->output_period */
3416   gst_adaptive_demux_period_unref (previous_period);
3417   demux->output_period =
3418       gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3419
3420   GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3421       demux->output_period->period_num);
3422
3423   /* We can now post the collection of the new period */
3424   collection = demux->output_period->collection;
3425   TRACKS_UNLOCK (demux);
3426   gst_element_post_message (GST_ELEMENT_CAST (demux),
3427       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3428   TRACKS_LOCK (demux);
3429
3430   /* Unselect all tracks of the previous period */
3431   for (iter = previous_period->tracks; iter; iter = iter->next) {
3432     GstAdaptiveDemuxTrack *track = iter->data;
3433     if (track->selected) {
3434       track->selected = FALSE;
3435       track->draining = TRUE;
3436     }
3437   }
3438
3439   /* Force a selection re-check */
3440   g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3441   check_and_handle_selection_update_locked (demux);
3442
3443   /* Remove the final ref on the previous period now that we have done the switch */
3444   gst_adaptive_demux_period_unref (previous_period);
3445
3446   return TRUE;
3447 }
3448
3449 /* Called with TRACKS_LOCK taken */
3450 static void
3451 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3452     OutputSlot * slot)
3453 {
3454   GstAdaptiveDemuxTrack *track = slot->track;
3455   GstMessage *msg;
3456   gboolean pending_is_ready;
3457   GstAdaptiveDemux2Stream *stream;
3458
3459   /* If we have a pending track for this slot, the current track should be
3460    * draining and no longer selected */
3461   g_assert (track->draining && !track->selected);
3462
3463   /* If we're draining, check if the pending track has enough data *or* that
3464      we've already drained out entirely */
3465   pending_is_ready =
3466       (slot->pending_track->level_time >=
3467       slot->pending_track->buffering_threshold);
3468   pending_is_ready |= slot->pending_track->eos;
3469
3470   if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3471     GST_DEBUG_OBJECT (demux,
3472         "Replacement track '%s' doesn't have enough data for switching yet",
3473         slot->pending_track->stream_id);
3474     return;
3475   }
3476
3477   GST_DEBUG_OBJECT (demux,
3478       "Pending replacement track has enough data, switching");
3479   track->active = FALSE;
3480   track->draining = FALSE;
3481
3482   /* If the stream feeding this track is stopped, flush and clear
3483    * the track now that it's going inactive. If the stream was not
3484    * found, it means we advanced past that period already (and the
3485    * stream was stopped and discarded) */
3486   stream = find_stream_for_track_locked (demux, track);
3487   if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3488     gst_adaptive_demux_track_flush (track);
3489
3490   gst_adaptive_demux_track_unref (track);
3491   /* We steal the reference of pending_track */
3492   track = slot->track = slot->pending_track;
3493   slot->pending_track = NULL;
3494   slot->track->active = TRUE;
3495
3496   /* Make sure the track segment will start at the current position */
3497   track->update_next_segment = TRUE;
3498
3499   /* Send stream start and collection, and schedule sticky events */
3500   gst_adaptive_demux_send_initial_events (demux, slot);
3501
3502   /* Can we emit the streams-selected message now ? */
3503   msg =
3504       all_selected_tracks_are_active (demux,
3505       g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3506   if (msg) {
3507     TRACKS_UNLOCK (demux);
3508     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3509     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3510     TRACKS_LOCK (demux);
3511   }
3512
3513 }
3514
3515 static void
3516 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3517 {
3518   GList *tmp;
3519   GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3520   gboolean wait_for_data = FALSE;
3521   GstFlowReturn ret;
3522
3523   GST_DEBUG_OBJECT (demux, "enter");
3524
3525   TRACKS_LOCK (demux);
3526
3527   /* Check if stopping */
3528   if (demux->priv->flushing) {
3529     ret = GST_FLOW_FLUSHING;
3530     goto pause;
3531   }
3532
3533   /* If the selection changed, handle it */
3534   check_and_handle_selection_update_locked (demux);
3535
3536 restart:
3537   ret = GST_FLOW_OK;
3538   global_output_position = GST_CLOCK_STIME_NONE;
3539   if (wait_for_data) {
3540     GST_DEBUG_OBJECT (demux, "Waiting for data");
3541     g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3542     GST_DEBUG_OBJECT (demux, "Done waiting for data");
3543     if (demux->priv->flushing) {
3544       ret = GST_FLOW_FLUSHING;
3545       goto pause;
3546     }
3547     wait_for_data = FALSE;
3548   }
3549
3550   /* Grab/Recalculate current global output position
3551    * This is the minimum pending output position of all tracks used for output
3552    *
3553    * If there is a track which is empty and not EOS, wait for it to receive data
3554    * then recalculate global output position.
3555    *
3556    * This also pushes downstream all non-timed data that might be present.
3557    *
3558    * IF all tracks are EOS : stop task
3559    */
3560   GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3561   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3562     OutputSlot *slot = (OutputSlot *) tmp->data;
3563     GstAdaptiveDemuxTrack *track;
3564
3565     /* If there is a pending track, Check if it's time to switch to it */
3566     if (slot->pending_track)
3567       handle_slot_pending_track_switch_locked (demux, slot);
3568
3569     track = slot->track;
3570
3571     if (!track->active) {
3572       /* Note: Edward: I can't see in what cases we would end up with inactive
3573          tracks assigned to slots. */
3574       GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3575       g_assert (track->active);
3576       continue;
3577     }
3578
3579     if (track->next_position == GST_CLOCK_STIME_NONE) {
3580       gst_adaptive_demux_track_update_next_position (track);
3581     }
3582
3583     if (track->next_position != GST_CLOCK_STIME_NONE) {
3584       if (global_output_position == GST_CLOCK_STIME_NONE)
3585         global_output_position = track->next_position;
3586       else
3587         global_output_position =
3588             MIN (global_output_position, track->next_position);
3589       track->waiting_add = FALSE;
3590     } else if (!track->eos) {
3591       GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3592           track->stream_id, track->period_num);
3593       wait_for_data = track->waiting_add = TRUE;
3594     } else {
3595       GST_DEBUG_OBJECT (demux,
3596           "Track %s (period %u) is EOS, not waiting for timed data",
3597           track->stream_id, track->period_num);
3598     }
3599   }
3600
3601   if (wait_for_data)
3602     goto restart;
3603
3604   if (global_output_position == GST_CLOCK_STIME_NONE
3605       && demux->output_period->closed) {
3606     GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3607         demux->output_period->period_num);
3608     if (!gst_adaptive_demux_advance_output_period (demux)) {
3609       /* Failed to move to next period, error out */
3610       ret = GST_FLOW_ERROR;
3611       goto pause;
3612     }
3613     /* Restart the loop */
3614     goto restart;
3615   }
3616
3617   GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3618       GST_STIME_ARGS (global_output_position));
3619
3620   /* For each track:
3621    *
3622    * We know all active tracks have pending timed data
3623    * * while track next_position <= global output position
3624    *   * push pending data
3625    *   * Update track next_position
3626    *     * recalculate global output position
3627    *   * Pop next pending data from track and update pending position
3628    *
3629    */
3630   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3631     OutputSlot *slot = (OutputSlot *) tmp->data;
3632     GstAdaptiveDemuxTrack *track = slot->track;
3633
3634     GST_LOG_OBJECT (track->element,
3635         "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3636         " global_output_position:%" GST_STIME_FORMAT, track->active,
3637         track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3638         GST_STIME_ARGS (global_output_position));
3639
3640     if (!track->active)
3641       continue;
3642
3643     while (global_output_position == GST_CLOCK_STIME_NONE
3644         || !slot->pushed_timed_data
3645         || ((track->next_position != GST_CLOCK_STIME_NONE)
3646             && track->next_position <= global_output_position)) {
3647       GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3648
3649       if (!mo) {
3650         GST_DEBUG_OBJECT (demux,
3651             "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3652             track->stream_id, track->period_num, track->eos,
3653             slot->pushed_timed_data);
3654         /* This should only happen if the track is EOS, or exactly in between
3655          * the parser outputting segment/caps before buffers. */
3656         g_assert (track->eos || !slot->pushed_timed_data);
3657         break;
3658       }
3659
3660       demux_update_buffering_locked (demux);
3661       demux_post_buffering_locked (demux);
3662       TRACKS_UNLOCK (demux);
3663
3664       GST_DEBUG_OBJECT (demux,
3665           "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3666           track->period_num, mo);
3667
3668       if (GST_IS_EVENT (mo)) {
3669         GstEvent *event = (GstEvent *) mo;
3670         if (GST_EVENT_TYPE (event) == GST_EVENT_GAP)
3671           slot->pushed_timed_data = TRUE;
3672         gst_pad_push_event (slot->pad, event);
3673
3674         if (GST_EVENT_IS_STICKY (event))
3675           gst_event_store_mark_delivered (&track->sticky_events, event);
3676       } else if (GST_IS_BUFFER (mo)) {
3677         GstBuffer *buffer = (GstBuffer *) mo;
3678
3679         if (track->output_discont) {
3680           if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3681             buffer = gst_buffer_make_writable (buffer);
3682             GST_DEBUG_OBJECT (slot->pad,
3683                 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3684                 buffer);
3685             GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3686           }
3687           track->output_discont = FALSE;
3688         }
3689         slot->flow_ret = gst_pad_push (slot->pad, buffer);
3690         ret =
3691             gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3692             slot->pad, slot->flow_ret);
3693         GST_DEBUG_OBJECT (slot->pad,
3694             "track %s (period %u) push returned %s (combined %s)",
3695             track->stream_id, track->period_num,
3696             gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3697         slot->pushed_timed_data = TRUE;
3698       } else {
3699         GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3700       }
3701
3702       TRACKS_LOCK (demux);
3703       gst_adaptive_demux_track_update_next_position (track);
3704
3705       if (ret != GST_FLOW_OK)
3706         goto pause;
3707     }
3708   }
3709
3710   /* Store global output position */
3711   if (global_output_position != GST_CLOCK_STIME_NONE)
3712     demux->priv->global_output_position = global_output_position;
3713
3714   if (global_output_position == GST_CLOCK_STIME_NONE) {
3715     if (!demux->priv->flushing) {
3716       GST_DEBUG_OBJECT (demux,
3717           "Pausing output task after reaching NONE global_output_position");
3718       gst_task_pause (demux->priv->output_task);
3719     }
3720   }
3721
3722   TRACKS_UNLOCK (demux);
3723   GST_DEBUG_OBJECT (demux, "leave");
3724   return;
3725
3726 pause:
3727   {
3728     GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3729     /* If the flushing flag is set, then the task is being
3730      * externally stopped, so don't go to pause(), otherwise we
3731      * should so we don't keep spinning */
3732     if (!demux->priv->flushing) {
3733       GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3734           gst_flow_get_name (ret));
3735       gst_task_pause (demux->priv->output_task);
3736     }
3737
3738     TRACKS_UNLOCK (demux);
3739
3740     if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3741       GstEvent *eos = gst_event_new_eos ();
3742
3743       if (ret != GST_FLOW_EOS) {
3744         GST_ELEMENT_FLOW_ERROR (demux, ret);
3745       }
3746
3747       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3748       if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3749         gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3750       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3751
3752       gst_adaptive_demux_push_src_event (demux, eos);
3753     }
3754
3755     return;
3756   }
3757 }
3758
3759 /* must be called from the scheduler */
3760 gboolean
3761 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3762 {
3763   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3764
3765   if (klass->is_live)
3766     return klass->is_live (demux);
3767   return FALSE;
3768 }
3769
3770 /* must be called from the scheduler */
3771 GstFlowReturn
3772 gst_adaptive_demux2_stream_seek (GstAdaptiveDemux * demux,
3773     GstAdaptiveDemux2Stream * stream, gboolean forward, GstSeekFlags flags,
3774     GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
3775 {
3776   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3777
3778   if (klass->stream_seek)
3779     return klass->stream_seek (stream, forward, flags, ts, final_ts);
3780   return GST_FLOW_ERROR;
3781 }
3782
3783 /* must be called from the scheduler */
3784 gboolean
3785 gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux * demux,
3786     GstAdaptiveDemux2Stream * stream)
3787 {
3788   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3789   gboolean ret = TRUE;
3790
3791   if (klass->stream_has_next_fragment)
3792     ret = klass->stream_has_next_fragment (stream);
3793
3794   return ret;
3795 }
3796
3797 /* must be called from the scheduler */
3798 /* Called from:
3799  *  the ::finish_fragment() handlers when an *actual* fragment is done
3800  *   */
3801 GstFlowReturn
3802 gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux * demux,
3803     GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3804 {
3805   if (stream->last_ret != GST_FLOW_OK)
3806     return stream->last_ret;
3807
3808   stream->last_ret =
3809       gst_adaptive_demux2_stream_advance_fragment_unlocked (demux, stream,
3810       duration);
3811
3812   return stream->last_ret;
3813 }
3814
3815 /* must be called with manifest_lock taken */
3816 static GstFlowReturn
3817 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
3818     GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3819 {
3820   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3821   GstFlowReturn ret;
3822
3823   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
3824
3825   GST_LOG_OBJECT (stream,
3826       "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3827       GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
3828
3829   stream->download_error_count = 0;
3830   g_clear_error (&stream->last_error);
3831
3832 #if 0
3833   /* FIXME - url has no indication of byte ranges for subsegments */
3834   /* FIXME: Reenable statistics sending? */
3835   gst_element_post_message (GST_ELEMENT_CAST (demux),
3836       gst_message_new_element (GST_OBJECT_CAST (demux),
3837           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
3838               "manifest-uri", G_TYPE_STRING,
3839               demux->manifest_uri, "uri", G_TYPE_STRING,
3840               stream->fragment.uri, "fragment-start-time",
3841               GST_TYPE_CLOCK_TIME, stream->download_start_time,
3842               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
3843               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
3844               stream->download_total_bytes, "fragment-download-time",
3845               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
3846 #endif
3847
3848   /* Don't update to the end of the segment if in reverse playback */
3849   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3850   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
3851     stream->parse_segment.position += duration;
3852     stream->current_position += duration;
3853
3854     GST_DEBUG_OBJECT (stream,
3855         "stream position now %" GST_TIME_FORMAT,
3856         GST_TIME_ARGS (stream->current_position));
3857   }
3858   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3859
3860   /* When advancing with a non 1.0 rate on live streams, we need to check
3861    * the live seeking range again to make sure we can still advance to
3862    * that position */
3863   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
3864     if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
3865       ret = GST_FLOW_EOS;
3866     else
3867       ret = klass->stream_advance_fragment (stream);
3868   } else if (gst_adaptive_demux_is_live (demux)
3869       || gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
3870     ret = klass->stream_advance_fragment (stream);
3871   } else {
3872     ret = GST_FLOW_EOS;
3873   }
3874
3875   stream->download_start_time =
3876       GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
3877
3878   if (ret == GST_FLOW_OK) {
3879     GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
3880     if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
3881             gst_adaptive_demux2_stream_update_current_bitrate (demux,
3882                 stream))) {
3883       GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
3884       stream->need_header = TRUE;
3885       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
3886     }
3887   }
3888
3889   return ret;
3890 }
3891
3892 /* must be called with manifest_lock taken */
3893 static gboolean
3894 gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
3895     demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
3896 {
3897   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3898
3899   if (klass->stream_select_bitrate)
3900     return klass->stream_select_bitrate (stream, bitrate);
3901   return FALSE;
3902 }
3903
3904 /* must be called with manifest_lock taken */
3905 GstFlowReturn
3906 gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux * demux,
3907     GstAdaptiveDemux2Stream * stream)
3908 {
3909   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3910   GstFlowReturn ret;
3911
3912   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
3913       GST_FLOW_ERROR);
3914
3915   /* Make sure the sub-class will update bitrate, or else
3916    * we will later */
3917   stream->fragment.finished = FALSE;
3918
3919   GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
3920       GST_TIME_ARGS (stream->current_position));
3921
3922   ret = klass->stream_update_fragment_info (stream);
3923
3924   GST_LOG_OBJECT (stream, "ret:%s uri:%s",
3925       gst_flow_get_name (ret), stream->fragment.uri);
3926   if (ret == GST_FLOW_OK) {
3927     GST_LOG_OBJECT (stream,
3928         "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3929         GST_STIME_ARGS (stream->fragment.stream_time),
3930         GST_TIME_ARGS (stream->fragment.duration));
3931     GST_LOG_OBJECT (stream,
3932         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
3933         stream->fragment.range_start, stream->fragment.range_end);
3934   }
3935
3936   return ret;
3937 }
3938
3939 /* must be called with manifest_lock taken */
3940 GstClockTime
3941 gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux *
3942     demux, GstAdaptiveDemux2Stream * stream)
3943 {
3944   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3945
3946   if (klass->stream_get_fragment_waiting_time)
3947     return klass->stream_get_fragment_waiting_time (stream);
3948   return 0;
3949 }
3950
3951 static void
3952 handle_manifest_download_complete (DownloadRequest * request,
3953     DownloadRequestState state, GstAdaptiveDemux * demux)
3954 {
3955   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3956   GstBuffer *buffer;
3957   GstFlowReturn result;
3958
3959   g_free (demux->manifest_base_uri);
3960   g_free (demux->manifest_uri);
3961
3962   if (request->redirect_permanent && request->redirect_uri) {
3963     demux->manifest_uri = g_strdup (request->redirect_uri);
3964     demux->manifest_base_uri = NULL;
3965   } else {
3966     demux->manifest_uri = g_strdup (request->uri);
3967     demux->manifest_base_uri = g_strdup (request->redirect_uri);
3968   }
3969
3970   buffer = download_request_take_buffer (request);
3971
3972   /* We should always have a buffer since this function is the non-error
3973    * callback for the download */
3974   g_assert (buffer);
3975
3976   result = klass->update_manifest_data (demux, buffer);
3977   gst_buffer_unref (buffer);
3978
3979   /* FIXME: Should the manifest uri vars be reverted to original
3980    * values if updating fails? */
3981
3982   if (result == GST_FLOW_OK) {
3983     GstClockTime duration;
3984     /* Send an updated duration message */
3985     duration = klass->get_duration (demux);
3986     if (duration != GST_CLOCK_TIME_NONE) {
3987       GST_DEBUG_OBJECT (demux,
3988           "Sending duration message : %" GST_TIME_FORMAT,
3989           GST_TIME_ARGS (duration));
3990       gst_element_post_message (GST_ELEMENT (demux),
3991           gst_message_new_duration_changed (GST_OBJECT (demux)));
3992     } else {
3993       GST_DEBUG_OBJECT (demux,
3994           "Duration unknown, can not send the duration message");
3995     }
3996
3997     /* If a manifest changes it's liveness or periodic updateness, we need
3998      * to start/stop the manifest update task appropriately */
3999     /* Keep this condition in sync with the one in
4000      * gst_adaptive_demux_start_manifest_update_task()
4001      */
4002     if (gst_adaptive_demux_is_live (demux) &&
4003         klass->requires_periodical_playlist_update (demux)) {
4004       gst_adaptive_demux_start_manifest_update_task (demux);
4005     } else {
4006       gst_adaptive_demux_stop_manifest_update_task (demux);
4007     }
4008   }
4009 }
4010
4011 static void
4012 handle_manifest_download_failure (DownloadRequest * request,
4013     DownloadRequestState state, GstAdaptiveDemux * demux)
4014 {
4015   GST_FIXME_OBJECT (demux, "Manifest download failed.");
4016   /* Retry or error out here */
4017 }
4018
4019 static GstFlowReturn
4020 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4021 {
4022   DownloadRequest *request;
4023   GstFlowReturn ret = GST_FLOW_OK;
4024   GError *error = NULL;
4025
4026   request = download_request_new_uri (demux->manifest_uri);
4027
4028   download_request_set_callbacks (request,
4029       (DownloadRequestEventCallback) handle_manifest_download_complete,
4030       (DownloadRequestEventCallback) handle_manifest_download_failure,
4031       NULL, NULL, demux);
4032
4033   if (!downloadhelper_submit_request (demux->download_helper, NULL,
4034           DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
4035           &error)) {
4036     if (error) {
4037       GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
4038           ("Failed to download manifest: %s", error->message), (NULL));
4039       g_clear_error (&error);
4040     }
4041     ret = GST_FLOW_NOT_LINKED;
4042   }
4043
4044   return ret;
4045 }
4046
4047 /* must be called with manifest_lock taken */
4048 GstFlowReturn
4049 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4050 {
4051   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4052   GstFlowReturn ret;
4053
4054   ret = klass->update_manifest (demux);
4055
4056   return ret;
4057 }
4058
4059 void
4060 gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
4061 {
4062   g_free (f->uri);
4063   f->uri = NULL;
4064   f->range_start = 0;
4065   f->range_end = -1;
4066
4067   g_free (f->header_uri);
4068   f->header_uri = NULL;
4069   f->header_range_start = 0;
4070   f->header_range_end = -1;
4071
4072   g_free (f->index_uri);
4073   f->index_uri = NULL;
4074   f->index_range_start = 0;
4075   f->index_range_end = -1;
4076
4077   f->stream_time = GST_CLOCK_STIME_NONE;
4078   f->duration = GST_CLOCK_TIME_NONE;
4079   f->finished = FALSE;
4080 }
4081
4082 /* must be called with manifest_lock taken */
4083 gboolean
4084 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4085 {
4086   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4087   gboolean ret = FALSE;
4088
4089   if (klass->has_next_period)
4090     ret = klass->has_next_period (demux);
4091   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4092   return ret;
4093 }
4094
4095 /* must be called with manifest_lock taken */
4096 void
4097 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4098 {
4099   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4100   GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
4101
4102   g_return_if_fail (klass->advance_period != NULL);
4103
4104   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4105   /* FIXME : no return value ? What if it fails ? */
4106   klass->advance_period (demux);
4107
4108   if (previous_period == demux->input_period) {
4109     GST_ERROR_OBJECT (demux, "Advancing period failed");
4110     return;
4111   }
4112
4113   /* Stop the previous period stream tasks */
4114   gst_adaptive_demux_period_stop_tasks (previous_period);
4115
4116   gst_adaptive_demux_update_collection (demux, demux->input_period);
4117   /* Figure out a pre-emptive selection based on the output period selection */
4118   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
4119       demux->output_period);
4120
4121   gst_adaptive_demux_prepare_streams (demux, FALSE);
4122   gst_adaptive_demux_start_tasks (demux);
4123 }
4124
4125 /**
4126  * gst_adaptive_demux_get_monotonic_time:
4127  * Returns: a monotonically increasing time, using the system realtime clock
4128  */
4129 GstClockTime
4130 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
4131 {
4132   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4133   return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
4134 }
4135
4136 /**
4137  * gst_adaptive_demux_get_client_now_utc:
4138  * @demux: #GstAdaptiveDemux
4139  * Returns: the client's estimate of UTC
4140  *
4141  * Used to find the client's estimate of UTC, using the system realtime clock.
4142  */
4143 GDateTime *
4144 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
4145 {
4146   return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
4147 }
4148
4149 /**
4150  * gst_adaptive_demux_is_running
4151  * @demux: #GstAdaptiveDemux
4152  * Returns: whether the demuxer is processing data
4153  *
4154  * Returns FALSE if shutdown has started (transitioning down from
4155  * PAUSED), otherwise TRUE.
4156  */
4157 gboolean
4158 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
4159 {
4160   return g_atomic_int_get (&demux->running);
4161 }
4162
4163 /**
4164  * gst_adaptive_demux_get_qos_earliest_time:
4165  *
4166  * Returns: The QOS earliest time
4167  *
4168  * Since: 1.20
4169  */
4170 GstClockTime
4171 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
4172 {
4173   GstClockTime earliest;
4174
4175   GST_OBJECT_LOCK (demux);
4176   earliest = demux->priv->qos_earliest_time;
4177   GST_OBJECT_UNLOCK (demux);
4178
4179   return earliest;
4180 }
4181
4182 gboolean
4183 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
4184     GstAdaptiveDemux2Stream * stream)
4185 {
4186   g_return_val_if_fail (demux && stream, FALSE);
4187
4188   /* FIXME : Migrate to parent */
4189   g_return_val_if_fail (stream->demux == NULL, FALSE);
4190
4191   GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
4192
4193   TRACKS_LOCK (demux);
4194   if (demux->input_period->prepared) {
4195     GST_ERROR_OBJECT (demux,
4196         "Attempted to add streams but no new period was created");
4197     TRACKS_UNLOCK (demux);
4198     return FALSE;
4199   }
4200   stream->demux = demux;
4201   stream->period = demux->input_period;
4202   demux->input_period->streams =
4203       g_list_append (demux->input_period->streams, stream);
4204
4205   if (stream->tracks) {
4206     GList *iter;
4207     for (iter = stream->tracks; iter; iter = iter->next)
4208       if (!gst_adaptive_demux_period_add_track (demux->input_period,
4209               (GstAdaptiveDemuxTrack *) iter->data)) {
4210         GST_ERROR_OBJECT (demux, "Failed to add track elements");
4211         TRACKS_UNLOCK (demux);
4212         return FALSE;
4213       }
4214   }
4215   TRACKS_UNLOCK (demux);
4216   return TRUE;
4217 }
4218
4219 /* Return the current playback rate including any instant rate multiplier */
4220 gdouble
4221 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
4222 {
4223   gdouble rate;
4224   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4225   rate = demux->segment.rate * demux->instant_rate_multiplier;
4226   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4227   return rate;
4228 }