a805012307e5fe53707dac6587716a836a590386
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:plugin-adaptivedemux2
28  * @short_description: Next Generation adaptive demuxers
29  *
30  * What is an adaptive demuxer?  Adaptive demuxers are special demuxers in the
31  * sense that they don't actually demux data received from upstream but download
32  * the data themselves.
33  *
34  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35  * of fragments. The manifest describes the available media and the sequence of
36  * fragments to use. Each fragment contains a small part of the media (typically
37  * only a few seconds). It is possible for the manifest to have the same media
38  * available in different configurations (bitrates for example) so that the
39  * client can select the one that best suits its scenario (network fluctuation,
40  * hardware requirements...).
41  *
42  * Furthermore, that manifest can also specify alternative medias (such as audio
43  * or subtitle tracks in different languages). Only the fragments for the
44  * requested selection will be download.
45  *
46  * These elements can therefore "adapt" themselves to the network conditions (as
47  * opposed to the server doing that adaptation) and user choices, which is why
48  * they are called "adaptive" demuxers.
49  *
50  * Note: These elements require a "streams-aware" container to work
51  * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52  * GST_BIN_FLAG_STREAMS_AWARE flag set).
53  *
54  * Subclasses:
55  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56  * about the intrinsics of the subclass formats, so the subclasses are
57  * responsible for maintaining the manifest data structures and stream
58  * information.
59  *
60  * Since: 1.22
61  */
62
63 /*
64 See the adaptive-demuxer.md design documentation for more information
65
66 MT safety.
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
72
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
80
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
97
98 By using the api_lock a thread is protected against other API calls.
99 */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
107
108 #include "gst/gst-i18n-plugin.h"
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
111
112 GST_DEBUG_CATEGORY (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
114
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
118
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
121
122 #define DEFAULT_MAX_BUFFERING_TIME (30 *  GST_SECOND)
123
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME (3 * GST_SECOND)
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
128
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
131
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
135
136 enum
137 {
138   PROP_0,
139   PROP_CONNECTION_SPEED,
140   PROP_BANDWIDTH_TARGET_RATIO,
141   PROP_CONNECTION_BITRATE,
142   PROP_MIN_BITRATE,
143   PROP_MAX_BITRATE,
144   PROP_CURRENT_BANDWIDTH,
145   PROP_MAX_BUFFERING_TIME,
146   PROP_BUFFERING_HIGH_WATERMARK_TIME,
147   PROP_BUFFERING_LOW_WATERMARK_TIME,
148   PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149   PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150   PROP_CURRENT_LEVEL_TIME_VIDEO,
151   PROP_CURRENT_LEVEL_TIME_AUDIO,
152   PROP_LAST
153 };
154
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
157     GST_PAD_SRC,
158     GST_PAD_SOMETIMES,
159     GST_STATIC_CAPS_ANY);
160
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
163     GST_PAD_SRC,
164     GST_PAD_SOMETIMES,
165     GST_STATIC_CAPS_ANY);
166
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
169     GST_PAD_SRC,
170     GST_PAD_SOMETIMES,
171     GST_STATIC_CAPS_ANY);
172
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
175 {
176   /* Output pad */
177   GstPad *pad;
178
179   /* Last flow return */
180   GstFlowReturn flow_ret;
181
182   /* Stream Type */
183   GstStreamType type;
184
185   /* Target track (reference) */
186   GstAdaptiveDemuxTrack *track;
187
188   /* Pending track (which will replace track) */
189   GstAdaptiveDemuxTrack *pending_track;
190
191   /* TRUE if a buffer or a gap event was pushed through this slot. */
192   gboolean pushed_timed_data;
193 } OutputSlot;
194
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
197
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200     GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203     element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
205     GstQuery * query);
206
207 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
208
209 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
210     GstEvent * event);
211 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
212     GstObject * parent, GstBuffer * buffer);
213 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
214     GstQuery * query);
215 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
216     GstEvent * event);
217
218 static gboolean
219 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
220
221 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
222 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
223 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
224     gboolean first_and_live);
225
226 static gboolean gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
227     demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate);
228 static GstFlowReturn
229 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
230
231 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
232     demux);
233 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
234     demux);
235
236 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
237 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
238     gboolean stop_updates);
239 static GstFlowReturn
240 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
241     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer);
242 static GstFlowReturn
243 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
244     GstAdaptiveDemux2Stream * stream);
245 static GstFlowReturn
246 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
247     GstAdaptiveDemux2Stream * stream, GstClockTime duration);
248
249 static void
250 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
251     GstAdaptiveDemux2Stream * stream);
252
253 static gboolean
254 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
255     * demux);
256
257 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
258  * method to get to the padtemplates */
259 GType
260 gst_adaptive_demux_ng_get_type (void)
261 {
262   static gsize type = 0;
263
264   if (g_once_init_enter (&type)) {
265     GType _type;
266     static const GTypeInfo info = {
267       sizeof (GstAdaptiveDemuxClass),
268       NULL,
269       NULL,
270       (GClassInitFunc) gst_adaptive_demux_class_init,
271       NULL,
272       NULL,
273       sizeof (GstAdaptiveDemux),
274       0,
275       (GInstanceInitFunc) gst_adaptive_demux_init,
276     };
277
278     _type = g_type_register_static (GST_TYPE_BIN,
279         "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
280
281     private_offset =
282         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
283
284     g_once_init_leave (&type, _type);
285   }
286   return type;
287 }
288
289 static inline GstAdaptiveDemuxPrivate *
290 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
291 {
292   return (G_STRUCT_MEMBER_P (self, private_offset));
293 }
294
295 static void
296 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
297     const GValue * value, GParamSpec * pspec)
298 {
299   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
300
301   GST_OBJECT_LOCK (demux);
302
303   switch (prop_id) {
304     case PROP_CONNECTION_SPEED:
305       demux->connection_speed = g_value_get_uint (value) * 1000;
306       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
307           demux->connection_speed);
308       break;
309     case PROP_BANDWIDTH_TARGET_RATIO:
310       demux->bandwidth_target_ratio = g_value_get_float (value);
311       break;
312     case PROP_MIN_BITRATE:
313       demux->min_bitrate = g_value_get_uint (value);
314       break;
315     case PROP_MAX_BITRATE:
316       demux->max_bitrate = g_value_get_uint (value);
317       break;
318     case PROP_CONNECTION_BITRATE:
319       demux->connection_speed = g_value_get_uint (value);
320       break;
321       /* FIXME: Recalculate track and buffering levels
322        * when watermarks change? */
323     case PROP_MAX_BUFFERING_TIME:
324       demux->max_buffering_time = g_value_get_uint64 (value);
325       break;
326     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
327       demux->buffering_high_watermark_time = g_value_get_uint64 (value);
328       break;
329     case PROP_BUFFERING_LOW_WATERMARK_TIME:
330       demux->buffering_low_watermark_time = g_value_get_uint64 (value);
331       break;
332     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
333       demux->buffering_high_watermark_fragments = g_value_get_double (value);
334       break;
335     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
336       demux->buffering_low_watermark_fragments = g_value_get_double (value);
337       break;
338     default:
339       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
340       break;
341   }
342
343   GST_OBJECT_UNLOCK (demux);
344 }
345
346 static void
347 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
348     GValue * value, GParamSpec * pspec)
349 {
350   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
351
352   GST_OBJECT_LOCK (demux);
353
354   switch (prop_id) {
355     case PROP_CONNECTION_SPEED:
356       g_value_set_uint (value, demux->connection_speed / 1000);
357       break;
358     case PROP_BANDWIDTH_TARGET_RATIO:
359       g_value_set_float (value, demux->bandwidth_target_ratio);
360       break;
361     case PROP_MIN_BITRATE:
362       g_value_set_uint (value, demux->min_bitrate);
363       break;
364     case PROP_MAX_BITRATE:
365       g_value_set_uint (value, demux->max_bitrate);
366       break;
367     case PROP_CONNECTION_BITRATE:
368       g_value_set_uint (value, demux->connection_speed);
369       break;
370     case PROP_CURRENT_BANDWIDTH:
371       g_value_set_uint (value, demux->current_download_rate);
372       break;
373     case PROP_MAX_BUFFERING_TIME:
374       g_value_set_uint64 (value, demux->max_buffering_time);
375       break;
376     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
377       g_value_set_uint64 (value, demux->buffering_high_watermark_time);
378       break;
379     case PROP_BUFFERING_LOW_WATERMARK_TIME:
380       g_value_set_uint64 (value, demux->buffering_low_watermark_time);
381       break;
382     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
383       g_value_set_double (value, demux->buffering_high_watermark_fragments);
384       break;
385     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
386       g_value_set_double (value, demux->buffering_low_watermark_fragments);
387       break;
388     case PROP_CURRENT_LEVEL_TIME_VIDEO:
389       g_value_set_uint64 (value, demux->current_level_time_video);
390       break;
391     case PROP_CURRENT_LEVEL_TIME_AUDIO:
392       g_value_set_uint64 (value, demux->current_level_time_audio);
393       break;
394     default:
395       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
396       break;
397   }
398
399   GST_OBJECT_UNLOCK (demux);
400 }
401
402 static void
403 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
404 {
405   GObjectClass *gobject_class;
406   GstElementClass *gstelement_class;
407   GstBinClass *gstbin_class;
408
409   gobject_class = G_OBJECT_CLASS (klass);
410   gstelement_class = GST_ELEMENT_CLASS (klass);
411   gstbin_class = GST_BIN_CLASS (klass);
412
413   GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
414       "Base Adaptive Demux (ng)");
415
416   parent_class = g_type_class_peek_parent (klass);
417
418   if (private_offset != 0)
419     g_type_class_adjust_private_offset (klass, &private_offset);
420
421   gobject_class->set_property = gst_adaptive_demux_set_property;
422   gobject_class->get_property = gst_adaptive_demux_get_property;
423   gobject_class->finalize = gst_adaptive_demux_finalize;
424
425   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
426       g_param_spec_uint ("connection-speed", "Connection Speed",
427           "Network connection speed to use in kbps (0 = calculate from downloaded"
428           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
429           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
430
431   g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
432       g_param_spec_float ("bandwidth-target-ratio",
433           "Ratio of target bandwidth / available bandwidth",
434           "Limit of the available bitrate to use when switching to alternates",
435           0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437
438   g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
439       g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
440           "Network connection speed to use (0 = automatic) (bits/s)",
441           0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
442           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
443
444   g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
445       g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
446           "Minimum bitrate to use when switching to alternates (bits/s)",
447           0, G_MAXUINT, DEFAULT_MIN_BITRATE,
448           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
449
450   g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
451       g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
452           "Maximum bitrate to use when switching to alternates (bits/s)",
453           0, G_MAXUINT, DEFAULT_MAX_BITRATE,
454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
455
456   g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
457       g_param_spec_uint ("current-bandwidth",
458           "Current download bandwidth (bits/s)",
459           "Report of current download bandwidth (based on arriving data) (bits/s)",
460           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
461
462   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
463       g_param_spec_uint64 ("max-buffering-time",
464           "Buffering maximum size (ns)",
465           "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
466           0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
467           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468           G_PARAM_STATIC_STRINGS));
469
470   g_object_class_install_property (gobject_class,
471       PROP_BUFFERING_HIGH_WATERMARK_TIME,
472       g_param_spec_uint64 ("high-watermark-time",
473           "High buffering watermark size (ns)",
474           "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
475           0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
476           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477           G_PARAM_STATIC_STRINGS));
478
479   g_object_class_install_property (gobject_class,
480       PROP_BUFFERING_LOW_WATERMARK_TIME,
481       g_param_spec_uint64 ("low-watermark-time",
482           "Low buffering watermark size (ns)",
483           "Low watermark for parsed data below which downloads are resumed (in ns, 0=disable)",
484           0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
485           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
486           G_PARAM_STATIC_STRINGS));
487
488   g_object_class_install_property (gobject_class,
489       PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
490       g_param_spec_double ("high-watermark-fragments",
491           "High buffering watermark size (fragments)",
492           "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
493           0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
494           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495           G_PARAM_STATIC_STRINGS));
496
497   g_object_class_install_property (gobject_class,
498       PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
499       g_param_spec_double ("low-watermark-fragments",
500           "Low buffering watermark size (fragments)",
501           "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
502           0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
503           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
504           G_PARAM_STATIC_STRINGS));
505
506   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
507       g_param_spec_uint64 ("current-level-time-video",
508           "Currently buffered level of video (ns)",
509           "Currently buffered level of video track(s) (ns)",
510           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
511           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
512           G_PARAM_STATIC_STRINGS));
513
514   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
515       g_param_spec_uint64 ("current-level-time-audio",
516           "Currently buffered level of audio (ns)",
517           "Currently buffered level of audio track(s) (ns)",
518           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
519           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
520           G_PARAM_STATIC_STRINGS));
521
522   gst_element_class_add_static_pad_template (gstelement_class,
523       &gst_adaptive_demux_audiosrc_template);
524   gst_element_class_add_static_pad_template (gstelement_class,
525       &gst_adaptive_demux_videosrc_template);
526   gst_element_class_add_static_pad_template (gstelement_class,
527       &gst_adaptive_demux_subtitlesrc_template);
528
529
530   gstelement_class->change_state = gst_adaptive_demux_change_state;
531   gstelement_class->query = gst_adaptive_demux_query;
532
533   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
534
535   klass->data_received = gst_adaptive_demux2_stream_data_received_default;
536   klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
537   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
538   klass->requires_periodical_playlist_update =
539       gst_adaptive_demux_requires_periodical_playlist_update_default;
540   klass->stream_update_tracks = gst_adaptive_demux2_stream_update_tracks;
541   gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
542 }
543
544 static void
545 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
546     GstAdaptiveDemuxClass * klass)
547 {
548   GstPadTemplate *pad_template;
549
550   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
551
552   demux->priv = gst_adaptive_demux_get_instance_private (demux);
553   demux->priv->input_adapter = gst_adapter_new ();
554   demux->realtime_clock = gst_adaptive_demux_clock_new ();
555
556   demux->download_helper = downloadhelper_new (demux->realtime_clock);
557   demux->priv->segment_seqnum = gst_util_seqnum_next ();
558   demux->have_group_id = FALSE;
559   demux->group_id = G_MAXUINT;
560
561   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
562   demux->instant_rate_multiplier = 1.0;
563
564   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
565       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
566
567   g_rec_mutex_init (&demux->priv->manifest_lock);
568
569   demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
570   g_mutex_init (&demux->priv->scheduler_lock);
571
572   g_mutex_init (&demux->priv->api_lock);
573   g_mutex_init (&demux->priv->segment_lock);
574
575   g_mutex_init (&demux->priv->tracks_lock);
576   g_cond_init (&demux->priv->tracks_add);
577
578   g_mutex_init (&demux->priv->buffering_lock);
579
580   demux->priv->periods = g_queue_new ();
581
582   pad_template =
583       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
584   g_return_if_fail (pad_template != NULL);
585
586   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
587   gst_pad_set_event_function (demux->sinkpad,
588       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
589   gst_pad_set_chain_function (demux->sinkpad,
590       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
591
592   /* Properties */
593   demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
594   demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
595   demux->min_bitrate = DEFAULT_MIN_BITRATE;
596   demux->max_bitrate = DEFAULT_MAX_BITRATE;
597
598   demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
599   demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
600   demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
601   demux->buffering_high_watermark_fragments =
602       DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
603   demux->buffering_low_watermark_fragments =
604       DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
605
606   demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
607   demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
608
609   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
610
611   demux->priv->duration = GST_CLOCK_TIME_NONE;
612
613   /* Output combiner */
614   demux->priv->flowcombiner = gst_flow_combiner_new ();
615
616   /* Output task */
617   g_rec_mutex_init (&demux->priv->output_lock);
618   demux->priv->output_task =
619       gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
620       NULL);
621   gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
622 }
623
624 static void
625 gst_adaptive_demux_finalize (GObject * object)
626 {
627   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
628   GstAdaptiveDemuxPrivate *priv = demux->priv;
629
630   GST_DEBUG_OBJECT (object, "finalize");
631
632   g_object_unref (priv->input_adapter);
633
634   downloadhelper_free (demux->download_helper);
635
636   g_rec_mutex_clear (&demux->priv->manifest_lock);
637   g_mutex_clear (&demux->priv->api_lock);
638   g_mutex_clear (&demux->priv->segment_lock);
639
640   g_mutex_clear (&demux->priv->buffering_lock);
641
642   g_mutex_clear (&demux->priv->scheduler_lock);
643   gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
644
645   /* The input period is present after a reset, clear it now */
646   if (demux->input_period)
647     gst_adaptive_demux_period_unref (demux->input_period);
648
649   if (demux->realtime_clock) {
650     gst_adaptive_demux_clock_unref (demux->realtime_clock);
651     demux->realtime_clock = NULL;
652   }
653   g_object_unref (priv->output_task);
654   g_rec_mutex_clear (&priv->output_lock);
655
656   gst_flow_combiner_free (priv->flowcombiner);
657
658   g_queue_free (priv->periods);
659
660   G_OBJECT_CLASS (parent_class)->finalize (object);
661 }
662
663 static gboolean
664 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
665 {
666   gboolean ret = FALSE;
667   GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
668
669   ret = (parent && GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE));
670
671   return ret;
672 }
673
674 static GstStateChangeReturn
675 gst_adaptive_demux_change_state (GstElement * element,
676     GstStateChange transition)
677 {
678   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
679   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
680
681   switch (transition) {
682     case GST_STATE_CHANGE_NULL_TO_READY:
683       if (!gst_adaptive_demux_check_streams_aware (demux)) {
684         GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
685             (_("Element requires a streams-aware context.")), (NULL));
686         goto fail_out;
687       }
688       break;
689     case GST_STATE_CHANGE_PAUSED_TO_READY:
690       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
691         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
692
693       gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
694       downloadhelper_stop (demux->download_helper);
695
696       TRACKS_LOCK (demux);
697       demux->priv->flushing = TRUE;
698       g_cond_signal (&demux->priv->tracks_add);
699       gst_task_stop (demux->priv->output_task);
700       TRACKS_UNLOCK (demux);
701
702       gst_task_join (demux->priv->output_task);
703
704       GST_API_LOCK (demux);
705       gst_adaptive_demux_reset (demux);
706       GST_API_UNLOCK (demux);
707       break;
708     case GST_STATE_CHANGE_READY_TO_PAUSED:
709       GST_API_LOCK (demux);
710       gst_adaptive_demux_reset (demux);
711
712       gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
713       if (g_atomic_int_get (&demux->priv->have_manifest))
714         gst_adaptive_demux_start_manifest_update_task (demux);
715       GST_API_UNLOCK (demux);
716       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
717         GST_DEBUG_OBJECT (demux, "demuxer has started running");
718       /* gst_task_start (demux->priv->output_task); */
719       break;
720     default:
721       break;
722   }
723
724   /* this must be run with the scheduler and output tasks stopped. */
725   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
726
727   switch (transition) {
728     case GST_STATE_CHANGE_READY_TO_PAUSED:
729       /* Start download task */
730       downloadhelper_start (demux->download_helper);
731       break;
732     default:
733       break;
734   }
735
736 fail_out:
737   return result;
738 }
739
740 static void
741 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
742     OutputSlot * slot)
743 {
744   GstEvent *eos = gst_event_new_eos ();
745   GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
746
747   gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
748   gst_pad_push_event (slot->pad, eos);
749   gst_pad_set_active (slot->pad, FALSE);
750   gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
751   gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
752   if (slot->track)
753     gst_adaptive_demux_track_unref (slot->track);
754   if (slot->pending_track)
755     gst_adaptive_demux_track_unref (slot->pending_track);
756
757   g_slice_free (OutputSlot, slot);
758 }
759
760 static OutputSlot *
761 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
762     GstStreamType streamtype)
763 {
764   OutputSlot *slot;
765   GstPadTemplate *tmpl;
766   gchar *name;
767
768   switch (streamtype) {
769     case GST_STREAM_TYPE_AUDIO:
770       name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
771       tmpl =
772           gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
773       break;
774     case GST_STREAM_TYPE_VIDEO:
775       name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
776       tmpl =
777           gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
778       break;
779     case GST_STREAM_TYPE_TEXT:
780       name =
781           g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
782       tmpl =
783           gst_static_pad_template_get
784           (&gst_adaptive_demux_subtitlesrc_template);
785       break;
786     default:
787       g_assert_not_reached ();
788       return NULL;
789   }
790
791   slot = g_slice_new0 (OutputSlot);
792   slot->type = streamtype;
793   slot->pushed_timed_data = FALSE;
794
795   /* Create and activate new pads */
796   slot->pad = gst_pad_new_from_template (tmpl, name);
797   g_free (name);
798   gst_object_unref (tmpl);
799
800   gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
801   gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
802   gst_pad_set_active (slot->pad, TRUE);
803
804   gst_pad_set_query_function (slot->pad,
805       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
806   gst_pad_set_event_function (slot->pad,
807       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
808
809   gst_pad_set_element_private (slot->pad, slot);
810
811   GST_INFO_OBJECT (demux, "Created output slot %s:%s",
812       GST_DEBUG_PAD_NAME (slot->pad));
813   return slot;
814 }
815
816 /* Called:
817  * * After `process_manifest` or when a period starts
818  * * Or when all tracks have been created
819  *
820  * Goes over tracks and creates the collection
821  *
822  * Returns TRUE if the collection was fully created.
823  *
824  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
825  * */
826 static gboolean
827 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
828     GstAdaptiveDemuxPeriod * period)
829 {
830   GstStreamCollection *collection;
831   GList *iter;
832
833   GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
834
835   if (!period->tracks_changed) {
836     GST_DEBUG_OBJECT (demux, "Tracks didn't change");
837     return TRUE;
838   }
839
840   if (!period->tracks) {
841     GST_WARNING_OBJECT (demux, "No tracks registered/present");
842     return FALSE;
843   }
844
845   if (gst_adaptive_demux_period_has_pending_tracks (period)) {
846     GST_DEBUG_OBJECT (demux,
847         "Streams still have pending tracks, not creating/updating collection");
848     return FALSE;
849   }
850
851   /* Update collection */
852   collection = gst_stream_collection_new ("adaptivedemux");
853
854   for (iter = period->tracks; iter; iter = iter->next) {
855     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
856
857     GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
858     gst_stream_collection_add_stream (collection,
859         gst_object_ref (track->stream_object));
860   }
861
862   if (period->collection)
863     gst_object_unref (period->collection);
864   period->collection = collection;
865
866   return TRUE;
867 }
868
869 /*
870  * Called for the output period:
871  * * after `update_collection()` if the input period is the same as the output period
872  * * When the output period changes
873  *
874  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
875  */
876 static gboolean
877 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
878 {
879   GstStreamCollection *collection;
880   GstAdaptiveDemuxPeriod *period = demux->output_period;
881   guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
882
883   g_return_val_if_fail (period, FALSE);
884   if (!period->collection) {
885     GST_DEBUG_OBJECT (demux, "No collection available yet");
886     return TRUE;
887   }
888
889   collection = period->collection;
890
891   GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
892       period->period_num);
893
894   /* Post collection */
895   TRACKS_UNLOCK (demux);
896   GST_MANIFEST_UNLOCK (demux);
897
898   gst_element_post_message (GST_ELEMENT_CAST (demux),
899       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
900
901   GST_MANIFEST_LOCK (demux);
902   TRACKS_LOCK (demux);
903
904   /* If no stream selection was handled, make a default selection */
905   if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
906     gst_adaptive_demux_period_select_default_tracks (demux,
907         demux->output_period);
908   }
909
910   /* Make sure the output task is running */
911   if (gst_adaptive_demux2_is_running (demux)) {
912     demux->priv->flushing = FALSE;
913     GST_DEBUG_OBJECT (demux, "Starting the output task");
914     gst_task_start (demux->priv->output_task);
915   }
916
917   return TRUE;
918 }
919
920 static gboolean
921 handle_incoming_manifest (GstAdaptiveDemux * demux)
922 {
923   GstAdaptiveDemuxClass *demux_class;
924   GstQuery *query;
925   gboolean query_res;
926   gboolean ret = TRUE;
927   gsize available;
928   GstBuffer *manifest_buffer;
929
930   GST_API_LOCK (demux);
931   GST_MANIFEST_LOCK (demux);
932
933   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
934
935   available = gst_adapter_available (demux->priv->input_adapter);
936
937   if (available == 0)
938     goto eos_without_data;
939
940   GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
941
942   /* Need to get the URI to use it as a base to generate the fragment's
943    * uris */
944   query = gst_query_new_uri ();
945   query_res = gst_pad_peer_query (demux->sinkpad, query);
946   if (query_res) {
947     gchar *uri, *redirect_uri;
948     gboolean permanent;
949
950     gst_query_parse_uri (query, &uri);
951     gst_query_parse_uri_redirection (query, &redirect_uri);
952     gst_query_parse_uri_redirection_permanent (query, &permanent);
953
954     if (permanent && redirect_uri) {
955       demux->manifest_uri = redirect_uri;
956       demux->manifest_base_uri = NULL;
957       g_free (uri);
958     } else {
959       demux->manifest_uri = uri;
960       demux->manifest_base_uri = redirect_uri;
961     }
962
963     GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
964         demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
965   } else {
966     GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
967   }
968   gst_query_unref (query);
969
970   /* If somehow we didn't receive a stream-start with a group_id, pick one now */
971   if (!demux->have_group_id) {
972     demux->have_group_id = TRUE;
973     demux->group_id = gst_util_group_id_next ();
974   }
975
976   /* Let the subclass parse the manifest */
977   manifest_buffer =
978       gst_adapter_take_buffer (demux->priv->input_adapter, available);
979   ret = demux_class->process_manifest (demux, manifest_buffer);
980   gst_buffer_unref (manifest_buffer);
981
982   gst_element_post_message (GST_ELEMENT_CAST (demux),
983       gst_message_new_element (GST_OBJECT_CAST (demux),
984           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
985               "manifest-uri", G_TYPE_STRING,
986               demux->manifest_uri, "uri", G_TYPE_STRING,
987               demux->manifest_uri,
988               "manifest-download-start", GST_TYPE_CLOCK_TIME,
989               GST_CLOCK_TIME_NONE,
990               "manifest-download-stop", GST_TYPE_CLOCK_TIME,
991               gst_util_get_timestamp (), NULL)));
992
993   if (!ret)
994     goto invalid_manifest;
995
996   /* Streams should have been added to the input period if the manifest parsing
997    * succeeded */
998   if (!demux->input_period->streams)
999     goto no_streams;
1000
1001   g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1002
1003   GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1004   /* Send duration message */
1005   if (!gst_adaptive_demux_is_live (demux)) {
1006     GstClockTime duration = demux_class->get_duration (demux);
1007
1008     demux->priv->duration = duration;
1009     if (duration != GST_CLOCK_TIME_NONE) {
1010       GST_DEBUG_OBJECT (demux,
1011           "Sending duration message : %" GST_TIME_FORMAT,
1012           GST_TIME_ARGS (duration));
1013       gst_element_post_message (GST_ELEMENT (demux),
1014           gst_message_new_duration_changed (GST_OBJECT (demux)));
1015     } else {
1016       GST_DEBUG_OBJECT (demux,
1017           "media duration unknown, can not send the duration message");
1018     }
1019   }
1020
1021   TRACKS_LOCK (demux);
1022   /* New streams/tracks will have been added to the input period */
1023   /* The input period has streams, make it the active output period */
1024   /* FIXME : Factorize this into a function to make a period active */
1025   demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1026   ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1027       gst_adaptive_demux_post_collection (demux);
1028   TRACKS_UNLOCK (demux);
1029
1030   gst_adaptive_demux_prepare_streams (demux,
1031       gst_adaptive_demux_is_live (demux));
1032   gst_adaptive_demux_start_tasks (demux);
1033   gst_adaptive_demux_start_manifest_update_task (demux);
1034
1035 unlock_out:
1036   GST_MANIFEST_UNLOCK (demux);
1037   GST_API_UNLOCK (demux);
1038
1039   return ret;
1040
1041   /* ERRORS */
1042 eos_without_data:
1043   {
1044     GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1045     ret = FALSE;
1046     goto unlock_out;
1047   }
1048
1049 no_streams:
1050   {
1051     /* no streams */
1052     GST_WARNING_OBJECT (demux, "No streams created from manifest");
1053     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1054         (_("This file contains no playable streams.")),
1055         ("No known stream formats found at the Manifest"));
1056     ret = FALSE;
1057     goto unlock_out;
1058   }
1059
1060 invalid_manifest:
1061   {
1062     GST_MANIFEST_UNLOCK (demux);
1063     GST_API_UNLOCK (demux);
1064
1065     /* In most cases, this will happen if we set a wrong url in the
1066      * source element and we have received the 404 HTML response instead of
1067      * the manifest */
1068     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1069     return FALSE;
1070   }
1071 }
1072
1073 struct http_headers_collector
1074 {
1075   GstAdaptiveDemux *demux;
1076   gchar **cookies;
1077 };
1078
1079 static gboolean
1080 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1081     const GValue * value, gpointer userdata)
1082 {
1083   struct http_headers_collector *hdr_data = userdata;
1084   GstAdaptiveDemux *demux = hdr_data->demux;
1085   const gchar *field_name = g_quark_to_string (field_id);
1086
1087   if (G_UNLIKELY (value == NULL))
1088     return TRUE;                /* This should not happen */
1089
1090   if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1091     const gchar *user_agent = g_value_get_string (value);
1092
1093     GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1094     downloadhelper_set_user_agent (demux->download_helper, user_agent);
1095   }
1096
1097   if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1098       g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1099     guint i = 0, prev_len = 0, total_len = 0;
1100     gchar **cookies = NULL;
1101
1102     if (hdr_data->cookies != NULL)
1103       prev_len = g_strv_length (hdr_data->cookies);
1104
1105     if (GST_VALUE_HOLDS_ARRAY (value)) {
1106       total_len = gst_value_array_get_size (value) + prev_len;
1107       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1108
1109       for (i = 0; i < gst_value_array_get_size (value); i++) {
1110         GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1111             g_value_get_string (gst_value_array_get_value (value, i)));
1112         cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1113       }
1114     } else if (G_VALUE_HOLDS_STRING (value)) {
1115       total_len = 1 + prev_len;
1116       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1117
1118       GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1119           g_value_get_string (value));
1120       cookies[0] = g_value_dup_string (value);
1121     } else {
1122       GST_WARNING_OBJECT (demux, "%s field is not string or array",
1123           g_quark_to_string (field_id));
1124     }
1125
1126     if (cookies) {
1127       if (prev_len) {
1128         guint j;
1129         for (j = 0; j < prev_len; j++) {
1130           GST_DEBUG_OBJECT (demux,
1131               "Append existing cookie %s", hdr_data->cookies[j]);
1132           cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1133         }
1134       }
1135       cookies[total_len] = NULL;
1136
1137       g_strfreev (hdr_data->cookies);
1138       hdr_data->cookies = cookies;
1139     }
1140   }
1141
1142   if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1143     const gchar *referer = g_value_get_string (value);
1144     GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1145
1146     downloadhelper_set_referer (demux->download_helper, referer);
1147   }
1148
1149   /* Date header can be used to estimate server offset */
1150   if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1151     const gchar *http_date = g_value_get_string (value);
1152
1153     if (http_date) {
1154       GstDateTime *datetime =
1155           gst_adaptive_demux_util_parse_http_head_date (http_date);
1156
1157       if (datetime) {
1158         GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1159         gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1160
1161         GST_INFO_OBJECT (demux,
1162             "HTTP response Date %s", GST_STR_NULL (date_string));
1163         g_free (date_string);
1164
1165         gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1166       }
1167     }
1168   }
1169
1170   return TRUE;
1171 }
1172
1173 static gboolean
1174 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1175     GstEvent * event)
1176 {
1177   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1178   gboolean ret;
1179
1180   switch (event->type) {
1181     case GST_EVENT_FLUSH_STOP:{
1182       GST_API_LOCK (demux);
1183       GST_MANIFEST_LOCK (demux);
1184
1185       gst_adaptive_demux_reset (demux);
1186
1187       ret = gst_pad_event_default (pad, parent, event);
1188
1189       GST_MANIFEST_UNLOCK (demux);
1190       GST_API_UNLOCK (demux);
1191
1192       return ret;
1193     }
1194     case GST_EVENT_EOS:
1195     {
1196       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1197         if (!handle_incoming_manifest (demux)) {
1198           GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1199           return gst_pad_event_default (pad, parent, event);
1200         }
1201         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1202       } else {
1203         GST_ERROR_OBJECT (demux,
1204             "Failed to acquire scheduler to handle manifest");
1205         return gst_pad_event_default (pad, parent, event);
1206       }
1207       gst_event_unref (event);
1208       return TRUE;
1209     }
1210     case GST_EVENT_STREAM_START:
1211       if (gst_event_parse_group_id (event, &demux->group_id))
1212         demux->have_group_id = TRUE;
1213       else
1214         demux->have_group_id = FALSE;
1215       /* Swallow stream-start, we'll push our own */
1216       gst_event_unref (event);
1217       return TRUE;
1218     case GST_EVENT_SEGMENT:
1219       /* Swallow newsegments, we'll push our own */
1220       gst_event_unref (event);
1221       return TRUE;
1222     case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1223       const GstStructure *structure = gst_event_get_structure (event);
1224       struct http_headers_collector c = { demux, NULL };
1225
1226       if (gst_structure_has_name (structure, "http-headers")) {
1227         if (gst_structure_has_field (structure, "request-headers")) {
1228           GstStructure *req_headers = NULL;
1229           gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1230               &req_headers, NULL);
1231           if (req_headers) {
1232             gst_structure_foreach (req_headers,
1233                 gst_adaptive_demux_handle_upstream_http_header, &c);
1234             gst_structure_free (req_headers);
1235           }
1236         }
1237         if (gst_structure_has_field (structure, "response-headers")) {
1238           GstStructure *res_headers = NULL;
1239           gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1240               &res_headers, NULL);
1241           if (res_headers) {
1242             gst_structure_foreach (res_headers,
1243                 gst_adaptive_demux_handle_upstream_http_header, &c);
1244             gst_structure_free (res_headers);
1245           }
1246         }
1247
1248         if (c.cookies)
1249           downloadhelper_set_cookies (demux->download_helper, c.cookies);
1250       }
1251       break;
1252     }
1253     default:
1254       break;
1255   }
1256
1257   return gst_pad_event_default (pad, parent, event);
1258 }
1259
1260 static GstFlowReturn
1261 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1262     GstBuffer * buffer)
1263 {
1264   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1265
1266   GST_MANIFEST_LOCK (demux);
1267
1268   gst_adapter_push (demux->priv->input_adapter, buffer);
1269
1270   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1271       (gint) gst_adapter_available (demux->priv->input_adapter));
1272
1273   GST_MANIFEST_UNLOCK (demux);
1274   return GST_FLOW_OK;
1275 }
1276
1277
1278 /* Called with TRACKS_LOCK taken */
1279 static void
1280 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1281 {
1282   GList *tmp;
1283
1284   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1285     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1286
1287     gst_adaptive_demux_track_flush (track);
1288     if (gst_pad_is_active (track->sinkpad)) {
1289       gst_pad_set_active (track->sinkpad, FALSE);
1290       gst_pad_set_active (track->sinkpad, TRUE);
1291     }
1292   }
1293 }
1294
1295 /* Resets all tracks to their initial state, ready to receive new data. */
1296 static void
1297 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1298 {
1299   TRACKS_LOCK (demux);
1300   g_queue_foreach (demux->priv->periods,
1301       (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1302   TRACKS_UNLOCK (demux);
1303 }
1304
1305 /* Subclasses will call this function to ensure that a new input period is
1306  * available to receive new streams and tracks */
1307 gboolean
1308 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1309 {
1310   if (demux->input_period && !demux->input_period->prepared) {
1311     GST_DEBUG_OBJECT (demux, "Using existing input period");
1312     return TRUE;
1313   }
1314
1315   if (demux->input_period) {
1316     GST_DEBUG_OBJECT (demux, "Closing previous period");
1317     demux->input_period->closed = TRUE;
1318   }
1319   GST_DEBUG_OBJECT (demux, "Setting up new period");
1320
1321   demux->input_period = gst_adaptive_demux_period_new (demux);
1322
1323   return TRUE;
1324 }
1325
1326 /* must be called with manifest_lock taken */
1327 static void
1328 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1329 {
1330   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1331   GList *iter;
1332
1333   gst_adaptive_demux_stop_tasks (demux, TRUE);
1334
1335   if (klass->reset)
1336     klass->reset (demux);
1337
1338   /* Disable and remove all outputs */
1339   GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1340   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1341     gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1342   }
1343   g_list_free (demux->priv->outputs);
1344   demux->priv->outputs = NULL;
1345
1346   g_queue_clear_full (demux->priv->periods,
1347       (GDestroyNotify) gst_adaptive_demux_period_unref);
1348
1349   /* The output period always has an extra ref taken on it */
1350   if (demux->output_period)
1351     gst_adaptive_demux_period_unref (demux->output_period);
1352   demux->output_period = NULL;
1353   /* The input period doesn't have an extra ref taken on it */
1354   demux->input_period = NULL;
1355
1356   gst_adaptive_demux_start_new_period (demux);
1357
1358   g_free (demux->manifest_uri);
1359   g_free (demux->manifest_base_uri);
1360   demux->manifest_uri = NULL;
1361   demux->manifest_base_uri = NULL;
1362
1363   gst_adapter_clear (demux->priv->input_adapter);
1364   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1365
1366   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1367   demux->instant_rate_multiplier = 1.0;
1368
1369   demux->priv->duration = GST_CLOCK_TIME_NONE;
1370
1371   demux->priv->percent = -1;
1372   demux->priv->is_buffering = TRUE;
1373
1374   demux->have_group_id = FALSE;
1375   demux->group_id = G_MAXUINT;
1376   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1377
1378   demux->priv->global_output_position = 0;
1379
1380   demux->priv->n_audio_streams = 0;
1381   demux->priv->n_video_streams = 0;
1382   demux->priv->n_subtitle_streams = 0;
1383
1384   gst_flow_combiner_reset (demux->priv->flowcombiner);
1385 }
1386
1387 static gboolean
1388 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1389 {
1390   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1391
1392   GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1393
1394   switch (GST_QUERY_TYPE (query)) {
1395     case GST_QUERY_BUFFERING:
1396     {
1397       GstFormat format;
1398       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1399
1400       if (!demux->output_period) {
1401         if (format != GST_FORMAT_TIME) {
1402           GST_DEBUG_OBJECT (demux,
1403               "No period setup yet, can't answer non-TIME buffering queries");
1404           return FALSE;
1405         }
1406
1407         GST_DEBUG_OBJECT (demux,
1408             "No period setup yet, but still answering buffering query");
1409         return TRUE;
1410       }
1411     }
1412     default:
1413       break;
1414   }
1415
1416   return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1417 }
1418
1419 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1420 static GstAdaptiveDemux2Stream *
1421 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1422 {
1423   GList *iter;
1424
1425   /* We only look in the streams of the input period (i.e. with active streams) */
1426   for (iter = demux->input_period->streams; iter; iter = iter->next) {
1427     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1428     if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1429       return stream;
1430     }
1431   }
1432
1433   return NULL;
1434 }
1435
1436 /* TRACKS_LOCK held */
1437 static GstAdaptiveDemuxTrack *
1438 gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
1439     GstStreamType stream_type)
1440 {
1441   GList *iter;
1442
1443   for (iter = stream->tracks; iter; iter = iter->next) {
1444     GstAdaptiveDemuxTrack *track = iter->data;
1445
1446     if (track->type == stream_type)
1447       return track;
1448   }
1449
1450   return NULL;
1451 }
1452
1453 /* MANIFEST and TRACKS lock held */
1454 static void
1455 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
1456     GstAdaptiveDemux2Stream * stream)
1457 {
1458   guint i;
1459
1460   GST_DEBUG_OBJECT (stream, "Updating track information from collection");
1461
1462   for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
1463       i++) {
1464     GstStream *gst_stream =
1465         gst_stream_collection_get_stream (stream->stream_collection, i);
1466     GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
1467     GstAdaptiveDemuxTrack *track;
1468
1469     if (stream_type == GST_STREAM_TYPE_UNKNOWN)
1470       continue;
1471     track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
1472     if (!track) {
1473       GST_DEBUG_OBJECT (stream,
1474           "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
1475           gst_stream);
1476       continue;
1477     }
1478
1479     if (track->upstream_stream_id)
1480       g_free (track->upstream_stream_id);
1481     track->upstream_stream_id =
1482         g_strdup (gst_stream_get_stream_id (gst_stream));
1483   }
1484
1485 }
1486
1487 static gboolean
1488 tags_have_language_info (GstTagList * tags)
1489 {
1490   const gchar *language = NULL;
1491
1492   if (tags == NULL)
1493     return FALSE;
1494
1495   if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
1496           &language))
1497     return TRUE;
1498   if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
1499           &language))
1500     return TRUE;
1501
1502   return FALSE;
1503 }
1504
1505 static gboolean
1506 can_handle_collection (GstAdaptiveDemux2Stream * stream,
1507     GstStreamCollection * collection)
1508 {
1509   guint i;
1510   guint nb_audio, nb_video, nb_text;
1511   gboolean have_audio_languages = TRUE;
1512   gboolean have_text_languages = TRUE;
1513
1514   nb_audio = nb_video = nb_text = 0;
1515
1516   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1517     GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
1518     GstTagList *tags = gst_stream_get_tags (gst_stream);
1519
1520     GST_DEBUG_OBJECT (stream,
1521         "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
1522     switch (gst_stream_get_stream_type (gst_stream)) {
1523       case GST_STREAM_TYPE_AUDIO:
1524         have_audio_languages &= tags_have_language_info (tags);
1525         nb_audio++;
1526         break;
1527       case GST_STREAM_TYPE_VIDEO:
1528         nb_video++;
1529         break;
1530       case GST_STREAM_TYPE_TEXT:
1531         have_text_languages &= tags_have_language_info (tags);
1532         nb_text++;
1533         break;
1534       default:
1535         break;
1536     }
1537   }
1538
1539   /* Check that we either have at most 1 of each track type, or that
1540    * we have language tags for each to tell which is which */
1541   if (nb_video > 1 ||
1542       (nb_audio > 1 && !have_audio_languages) ||
1543       (nb_text > 1 && !have_text_languages)) {
1544     GST_WARNING
1545         ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
1546         nb_audio, nb_video, nb_text);
1547     return FALSE;
1548   }
1549
1550   return TRUE;
1551 }
1552
1553 static void
1554 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1555     GstMessage * msg)
1556 {
1557   GstAdaptiveDemux2Stream *stream;
1558   GstStreamCollection *collection = NULL;
1559   gboolean pending_tracks_activated = FALSE;
1560
1561   GST_MANIFEST_LOCK (demux);
1562
1563   stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1564   if (stream == NULL) {
1565     GST_WARNING_OBJECT (demux,
1566         "Failed to locate stream for collection message");
1567     goto beach;
1568   }
1569
1570   gst_message_parse_stream_collection (msg, &collection);
1571   if (!collection)
1572     goto beach;
1573
1574   /* Check whether the collection is "sane" or not.
1575    *
1576    * In the context of adaptive streaming, we can only handle multiplexed
1577    * content that provides at most one stream of valid types (audio, video,
1578    * text). Without this we cannot reliably match the output of this multiplex
1579    * to the various tracks.
1580    *
1581    * FIXME : In the future and *IF* we encounter such streams, we could envision
1582    * supporting multiple streams of the same type if, and only if, they have
1583    * tags that allow differentiating them (ex: languages).
1584    */
1585   if (!can_handle_collection (stream, collection)) {
1586     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1587         (_("Stream format can't be handled")),
1588         ("The streams provided by the multiplex are ambiguous"));
1589     goto beach;
1590   }
1591
1592   /* Store the collection on the stream */
1593   gst_object_replace ((GstObject **) & stream->stream_collection,
1594       (GstObject *) collection);
1595
1596   /* IF there are pending tracks, ask the subclass to handle that */
1597   if (stream->pending_tracks) {
1598     GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1599     g_assert (demux_class->stream_update_tracks);
1600     demux_class->stream_update_tracks (demux, stream);
1601     TRACKS_LOCK (demux);
1602     stream->pending_tracks = FALSE;
1603     pending_tracks_activated = TRUE;
1604     if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1605         demux->input_period == demux->output_period)
1606       gst_adaptive_demux_post_collection (demux);
1607   } else {
1608     g_assert (stream->tracks);
1609     TRACKS_LOCK (demux);
1610     /* If we already have assigned tracks, update the pending upstream stream_id
1611      * for each of them based on the collection information. */
1612     gst_adaptive_demux2_stream_update_tracks (demux, stream);
1613   }
1614
1615   /* If we discovered pending tracks and we no longer have any, we can ensure
1616    * selected tracks are started */
1617   if (pending_tracks_activated
1618       && !gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1619     GList *iter = demux->input_period->streams;
1620     for (; iter; iter = iter->next) {
1621       GstAdaptiveDemux2Stream *new_stream = iter->data;
1622
1623       /* The stream that posted this collection was already started. If a
1624        * different stream is now selected, start it */
1625       if (stream != new_stream
1626           && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1627         gst_adaptive_demux2_stream_start (new_stream);
1628     }
1629   }
1630   TRACKS_UNLOCK (demux);
1631
1632 beach:
1633   GST_MANIFEST_UNLOCK (demux);
1634
1635   gst_message_unref (msg);
1636   msg = NULL;
1637 }
1638
1639 static void
1640 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1641 {
1642   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1643
1644   switch (GST_MESSAGE_TYPE (msg)) {
1645     case GST_MESSAGE_STREAM_COLLECTION:
1646     {
1647       gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1648       return;
1649     }
1650     case GST_MESSAGE_ERROR:{
1651       GstAdaptiveDemux2Stream *stream = NULL;
1652       GError *err = NULL;
1653       gchar *debug = NULL;
1654       gchar *new_error = NULL;
1655       const GstStructure *details = NULL;
1656
1657       GST_MANIFEST_LOCK (demux);
1658
1659       stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1660       if (stream == NULL) {
1661         GST_WARNING_OBJECT (demux,
1662             "Failed to locate stream for errored element");
1663         GST_MANIFEST_UNLOCK (demux);
1664         break;
1665       }
1666
1667       gst_message_parse_error (msg, &err, &debug);
1668
1669       GST_WARNING_OBJECT (demux,
1670           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1671           err->message, debug);
1672
1673       if (debug)
1674         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1675       if (new_error) {
1676         g_free (err->message);
1677         err->message = new_error;
1678       }
1679
1680       gst_message_parse_error_details (msg, &details);
1681       if (details) {
1682         gst_structure_get_uint (details, "http-status-code",
1683             &stream->last_status_code);
1684       }
1685
1686       /* error, but ask to retry */
1687       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1688         gst_adaptive_demux2_stream_parse_error (stream, err);
1689         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1690       }
1691
1692       g_error_free (err);
1693       g_free (debug);
1694
1695       GST_MANIFEST_UNLOCK (demux);
1696
1697       gst_message_unref (msg);
1698       msg = NULL;
1699     }
1700       break;
1701     default:
1702       break;
1703   }
1704
1705   if (msg)
1706     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1707 }
1708
1709 /* must be called with manifest_lock taken */
1710 GstClockTime
1711 gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1712     GstAdaptiveDemux2Stream * stream)
1713 {
1714   GstAdaptiveDemuxClass *klass;
1715
1716   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1717
1718   if (klass->get_presentation_offset == NULL)
1719     return 0;
1720
1721   return klass->get_presentation_offset (demux, stream);
1722 }
1723
1724 /* must be called with manifest_lock taken */
1725 GstClockTime
1726 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1727 {
1728   GstAdaptiveDemuxClass *klass;
1729
1730   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1731
1732   if (klass->get_period_start_time == NULL)
1733     return 0;
1734
1735   return klass->get_period_start_time (demux);
1736 }
1737
1738 /* must be called with manifest_lock taken */
1739 static gboolean
1740 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1741     gboolean first_and_live)
1742 {
1743   GList *iter;
1744   GstClockTime period_start;
1745   GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1746   GList *new_streams;
1747
1748   g_return_val_if_fail (demux->input_period->streams, FALSE);
1749   g_assert (demux->input_period->prepared == FALSE);
1750
1751   new_streams = demux->input_period->streams;
1752
1753   if (!gst_adaptive_demux2_is_running (demux)) {
1754     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1755     return TRUE;
1756   }
1757
1758   GST_DEBUG_OBJECT (demux,
1759       "Preparing %d streams for period %d , first_and_live:%d",
1760       g_list_length (new_streams), demux->input_period->period_num,
1761       first_and_live);
1762
1763   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1764     GstAdaptiveDemux2Stream *stream = iter->data;
1765
1766     GST_DEBUG_OBJECT (stream, "Preparing stream");
1767
1768     stream->need_header = TRUE;
1769     stream->discont = TRUE;
1770
1771     /* Grab the first stream time for live streams
1772      * * If the stream is selected
1773      * * Or it provides dynamic tracks (in which case we need to force an update)
1774      */
1775     if (first_and_live
1776         && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1777             || stream->pending_tracks)) {
1778       /* TODO we only need the first timestamp, maybe create a simple function to
1779        * get the current PTS of a fragment ? */
1780       GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1781       gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1782
1783       GST_DEBUG_OBJECT (stream,
1784           "Got stream time %" GST_STIME_FORMAT,
1785           GST_STIME_ARGS (stream->fragment.stream_time));
1786
1787       if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1788         min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1789       } else {
1790         min_stream_time = stream->fragment.stream_time;
1791       }
1792     }
1793   }
1794
1795   period_start = gst_adaptive_demux_get_period_start_time (demux);
1796
1797   /* For live streams, the subclass is supposed to seek to the current fragment
1798    * and then tell us its stream time in stream->fragment.stream_time.  We now
1799    * also have to seek our demuxer segment to reflect this.
1800    *
1801    * FIXME: This needs some refactoring at some point.
1802    */
1803   if (first_and_live) {
1804     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1805         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1806         GST_SEEK_TYPE_NONE, -1, NULL);
1807   }
1808
1809   GST_DEBUG_OBJECT (demux,
1810       "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1811       " demux segment %" GST_SEGMENT_FORMAT,
1812       GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1813       &demux->segment);
1814
1815   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1816     GstAdaptiveDemux2Stream *stream = iter->data;
1817     stream->compute_segment = TRUE;
1818     stream->first_and_live = first_and_live;
1819   }
1820   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1821   demux->input_period->prepared = TRUE;
1822
1823   return TRUE;
1824 }
1825
1826 static GstAdaptiveDemuxTrack *
1827 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1828 {
1829   GList *tmp;
1830
1831   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1832     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1833     if (!g_strcmp0 (track->stream_id, stream_id))
1834       return track;
1835   }
1836
1837   return NULL;
1838 }
1839
1840 /* TRACKS_LOCK hold */
1841 void
1842 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1843 {
1844   GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1845   GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1846   GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1847   GList *tmp;
1848   gint min_percent = -1, percent;
1849   gboolean all_eos = TRUE;
1850
1851   /* Go over all active tracks of the output period and update level */
1852
1853   /* Check that all tracks are above their respective low thresholds (different
1854    * tracks may have different fragment durations yielding different buffering
1855    * percentages) Overall buffering percent is the lowest. */
1856   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1857     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1858
1859     GST_LOG_OBJECT (demux,
1860         "Checking track '%s' active:%d selected:%d eos:%d level:%"
1861         GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1862         track->stream_id, track->active, track->selected, track->eos,
1863         GST_TIME_ARGS (track->level_time),
1864         GST_TIME_ARGS (track->buffering_threshold));
1865
1866     if (track->active && track->selected) {
1867       if (!track->eos) {
1868         gint cur_percent;
1869
1870         all_eos = FALSE;
1871         if (min_level_time == GST_CLOCK_TIME_NONE) {
1872           min_level_time = track->level_time;
1873         } else if (track->level_time < min_level_time) {
1874           min_level_time = track->level_time;
1875         }
1876
1877         if (track->type & GST_STREAM_TYPE_VIDEO
1878             && video_level_time > track->level_time)
1879           video_level_time = track->level_time;
1880
1881         if (track->type & GST_STREAM_TYPE_AUDIO
1882             && audio_level_time > track->level_time)
1883           audio_level_time = track->level_time;
1884
1885         if (track->level_time != GST_CLOCK_TIME_NONE
1886             && track->buffering_threshold != 0) {
1887           cur_percent =
1888               gst_util_uint64_scale (track->level_time, 100,
1889               track->buffering_threshold);
1890           if (min_percent < 0 || cur_percent < min_percent)
1891             min_percent = cur_percent;
1892         }
1893       }
1894     }
1895   }
1896
1897   GST_DEBUG_OBJECT (demux,
1898       "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1899       GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1900
1901   /* Update demuxer video/audio level properties */
1902   GST_OBJECT_LOCK (demux);
1903   demux->current_level_time_video = video_level_time;
1904   demux->current_level_time_audio = audio_level_time;
1905   GST_OBJECT_UNLOCK (demux);
1906
1907   if (min_percent < 0 && !all_eos)
1908     return;
1909
1910   if (min_percent > 100 || all_eos)
1911     percent = 100;
1912   else
1913     percent = MAX (0, min_percent);
1914
1915   GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1916
1917   if (demux->priv->is_buffering) {
1918     if (percent >= 100)
1919       demux->priv->is_buffering = FALSE;
1920     if (demux->priv->percent != percent) {
1921       demux->priv->percent = percent;
1922       demux->priv->percent_changed = TRUE;
1923     }
1924   } else if (percent < 1) {
1925     demux->priv->is_buffering = TRUE;
1926     if (demux->priv->percent != percent) {
1927       demux->priv->percent = percent;
1928       demux->priv->percent_changed = TRUE;
1929     }
1930   }
1931
1932   if (demux->priv->percent_changed)
1933     GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1934         demux->priv->is_buffering);
1935 }
1936
1937 /* With TRACKS_LOCK held */
1938 void
1939 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1940 {
1941   gint percent;
1942   GstMessage *msg;
1943
1944   if (!demux->priv->percent_changed)
1945     return;
1946
1947   BUFFERING_LOCK (demux);
1948   percent = demux->priv->percent;
1949   msg = gst_message_new_buffering ((GstObject *) demux, percent);
1950   TRACKS_UNLOCK (demux);
1951   gst_element_post_message ((GstElement *) demux, msg);
1952
1953   BUFFERING_UNLOCK (demux);
1954   TRACKS_LOCK (demux);
1955   if (percent == demux->priv->percent)
1956     demux->priv->percent_changed = FALSE;
1957 }
1958
1959 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1960 GstAdaptiveDemux2Stream *
1961 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1962     GstAdaptiveDemuxTrack * track)
1963 {
1964   GList *iter;
1965
1966   for (iter = demux->output_period->streams; iter; iter = iter->next) {
1967     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1968     if (g_list_find (stream->tracks, track))
1969       return stream;
1970   }
1971
1972   return NULL;
1973 }
1974
1975 /* Scheduler context held, takes TRACKS_LOCK */
1976 static GstAdaptiveDemux2Stream *
1977 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1978 {
1979   GList *iter;
1980   GstAdaptiveDemuxTrack *track = NULL;
1981   GstAdaptiveDemux2Stream *stream = NULL;
1982
1983   TRACKS_LOCK (demux);
1984   for (iter = demux->output_period->tracks; iter; iter = g_list_next (iter)) {
1985     OutputSlot *cand = iter->data;
1986     if (cand->pad == pad) {
1987       track = cand->track;
1988       break;
1989     }
1990   }
1991
1992   if (track)
1993     stream = find_stream_for_track_locked (demux, track);
1994
1995   TRACKS_UNLOCK (demux);
1996
1997   return stream;
1998 }
1999
2000 /* Called from seek handler
2001  *
2002  * This function is used when a (flushing) seek caused a new period to be activated.
2003  *
2004  * This will ensure that:
2005  * * the current output period is marked as finished (EOS)
2006  * * Any potential intermediate (non-input/non-output) periods are removed
2007  * * That the new input period is prepared and ready
2008  */
2009 static void
2010 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
2011 {
2012   GList *iter;
2013
2014   GST_DEBUG_OBJECT (demux,
2015       "Preparing new input period %u", demux->input_period->period_num);
2016
2017   /* Prepare the new input period */
2018   gst_adaptive_demux_update_collection (demux, demux->input_period);
2019
2020   /* Transfer the previous selection to the new input period */
2021   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
2022       demux->output_period);
2023   gst_adaptive_demux_prepare_streams (demux, FALSE);
2024
2025   /* Remove all periods except for the input (last) and output (first) period */
2026   while (demux->priv->periods->length > 2) {
2027     GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
2028     /* Mark all tracks of the removed period as not selected and EOS so they
2029      * will be skipped / ignored */
2030     for (iter = period->tracks; iter; iter = iter->next) {
2031       GstAdaptiveDemuxTrack *track = iter->data;
2032       track->selected = FALSE;
2033       track->eos = TRUE;
2034     }
2035     gst_adaptive_demux_period_unref (period);
2036   }
2037
2038   /* Mark all tracks of the output period as EOS so that the output loop
2039    * will immediately move to the new period */
2040   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2041     GstAdaptiveDemuxTrack *track = iter->data;
2042     track->eos = TRUE;
2043   }
2044
2045   /* Go over all slots, and clear any pending track */
2046   for (iter = demux->priv->outputs; iter; iter = iter->next) {
2047     OutputSlot *slot = (OutputSlot *) iter->data;
2048
2049     if (slot->pending_track != NULL) {
2050       GST_DEBUG_OBJECT (demux,
2051           "Removing track '%s' as pending from output of current track '%s'",
2052           slot->pending_track->stream_id, slot->track->stream_id);
2053       gst_adaptive_demux_track_unref (slot->pending_track);
2054       slot->pending_track = NULL;
2055     }
2056   }
2057 }
2058
2059 /* must be called with manifest_lock taken */
2060 gboolean
2061 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
2062     gint64 * range_start, gint64 * range_stop)
2063 {
2064   GstAdaptiveDemuxClass *klass;
2065
2066   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2067
2068   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
2069
2070   return klass->get_live_seek_range (demux, range_start, range_stop);
2071 }
2072
2073 /* must be called with manifest_lock taken */
2074 gboolean
2075 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
2076     GstAdaptiveDemux2Stream * stream)
2077 {
2078   gint64 range_start, range_stop;
2079   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
2080     GST_LOG_OBJECT (stream,
2081         "stream position %" GST_TIME_FORMAT "  live seek range %"
2082         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
2083         GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
2084         GST_STIME_ARGS (range_stop));
2085     return (stream->current_position >= range_start
2086         && stream->current_position <= range_stop);
2087   }
2088
2089   return FALSE;
2090 }
2091
2092 /* must be called with manifest_lock taken */
2093 static gboolean
2094 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
2095 {
2096   GstAdaptiveDemuxClass *klass;
2097
2098   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2099   if (gst_adaptive_demux_is_live (demux)) {
2100     return klass->get_live_seek_range != NULL;
2101   }
2102
2103   return klass->seek != NULL;
2104 }
2105
2106 static void
2107 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
2108     GstSeekType start_type, GstSeekType stop_type)
2109 {
2110   GList *iter;
2111
2112   for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2113     GstAdaptiveDemux2Stream *stream = iter->data;
2114
2115     /* Make sure the download loop clears and restarts on the next start,
2116      * which will recompute the stream segment */
2117     g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2118         stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
2119     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2120     stream->start_position = 0;
2121
2122     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
2123       stream->start_position = demux->segment.start;
2124     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
2125       stream->start_position = demux->segment.stop;
2126   }
2127 }
2128
2129 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
2130                               GST_SEEK_FLAG_SNAP_AFTER |          \
2131                               GST_SEEK_FLAG_SNAP_NEAREST |        \
2132                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
2133                               GST_SEEK_FLAG_KEY_UNIT))
2134 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
2135                               GST_SEEK_FLAG_SNAP_AFTER | \
2136                               GST_SEEK_FLAG_SNAP_NEAREST))
2137
2138 static gboolean
2139 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
2140     GstEvent * event)
2141 {
2142   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2143   gdouble rate;
2144   GstFormat format;
2145   GstSeekFlags flags;
2146   GstSeekType start_type, stop_type;
2147   gint64 start, stop;
2148   guint32 seqnum;
2149   gboolean update;
2150   gboolean ret;
2151   GstSegment oldsegment;
2152   GstAdaptiveDemux2Stream *stream = NULL;
2153   GstEvent *flush_event;
2154
2155   GST_INFO_OBJECT (demux, "Received seek event");
2156
2157   GST_API_LOCK (demux);
2158
2159   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2160       &stop_type, &stop);
2161
2162   if (format != GST_FORMAT_TIME) {
2163     GST_API_UNLOCK (demux);
2164     GST_WARNING_OBJECT (demux,
2165         "Adaptive demuxers only support TIME-based seeking");
2166     gst_event_unref (event);
2167     return FALSE;
2168   }
2169
2170   if (flags & GST_SEEK_FLAG_SEGMENT) {
2171     GST_FIXME_OBJECT (demux, "Handle segment seeks");
2172     GST_API_UNLOCK (demux);
2173     gst_event_unref (event);
2174     return FALSE;
2175   }
2176
2177   seqnum = gst_event_get_seqnum (event);
2178
2179   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2180     GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2181     return FALSE;
2182   }
2183
2184   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2185     /* For instant rate seeks, reply directly and update
2186      * our segment so the new rate is reflected in any future
2187      * fragments */
2188     GstEvent *ev;
2189     gdouble rate_multiplier;
2190
2191     /* instant rate change only supported if direction does not change. All
2192      * other requirements are already checked before creating the seek event
2193      * but let's double-check here to be sure */
2194     if ((demux->segment.rate > 0 && rate < 0) ||
2195         (demux->segment.rate < 0 && rate > 0) ||
2196         start_type != GST_SEEK_TYPE_NONE ||
2197         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2198       GST_ERROR_OBJECT (demux,
2199           "Instant rate change seeks only supported in the "
2200           "same direction, without flushing and position change");
2201       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2202       GST_API_UNLOCK (demux);
2203       return FALSE;
2204     }
2205
2206     rate_multiplier = rate / demux->segment.rate;
2207
2208     ev = gst_event_new_instant_rate_change (rate_multiplier,
2209         (GstSegmentFlags) flags);
2210     gst_event_set_seqnum (ev, seqnum);
2211
2212     ret = gst_adaptive_demux_push_src_event (demux, ev);
2213
2214     if (ret) {
2215       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2216       demux->instant_rate_multiplier = rate_multiplier;
2217       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2218     }
2219
2220     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2221     GST_API_UNLOCK (demux);
2222     gst_event_unref (event);
2223
2224     return ret;
2225   }
2226
2227   if (!gst_adaptive_demux_can_seek (demux)) {
2228     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2229
2230     GST_API_UNLOCK (demux);
2231     gst_event_unref (event);
2232     return FALSE;
2233   }
2234
2235   /* We can only accept flushing seeks from this point onward */
2236   if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2237     GST_ERROR_OBJECT (demux,
2238         "Non-flushing non-instant-rate seeks are not possible");
2239
2240     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2241
2242     GST_API_UNLOCK (demux);
2243     gst_event_unref (event);
2244     return FALSE;
2245   }
2246
2247   if (gst_adaptive_demux_is_live (demux)) {
2248     gint64 range_start, range_stop;
2249     gboolean changed = FALSE;
2250     gboolean start_valid = TRUE, stop_valid = TRUE;
2251
2252     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2253             &range_stop)) {
2254       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2255       GST_API_UNLOCK (demux);
2256       gst_event_unref (event);
2257       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2258       return FALSE;
2259     }
2260
2261     GST_DEBUG_OBJECT (demux,
2262         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2263         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2264
2265     /* Handle relative positioning for live streams (relative to the range_stop) */
2266     if (start_type == GST_SEEK_TYPE_END) {
2267       start = range_stop + start;
2268       start_type = GST_SEEK_TYPE_SET;
2269       changed = TRUE;
2270     }
2271     if (stop_type == GST_SEEK_TYPE_END) {
2272       stop = range_stop + stop;
2273       stop_type = GST_SEEK_TYPE_SET;
2274       changed = TRUE;
2275     }
2276
2277     /* Adjust the requested start/stop position if it falls beyond the live
2278      * seek range.
2279      * The only case where we don't adjust is for the starting point of
2280      * an accurate seek (start if forward and stop if backwards)
2281      */
2282     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2283         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2284       GST_DEBUG_OBJECT (demux,
2285           "seek before live stream start, setting to range start: %"
2286           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2287       start = range_start;
2288       changed = TRUE;
2289     }
2290     /* truncate stop position also if set */
2291     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2292         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2293       GST_DEBUG_OBJECT (demux,
2294           "seek ending after live start, adjusting to: %"
2295           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2296       stop = range_stop;
2297       changed = TRUE;
2298     }
2299
2300     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2301         (start < range_start || start > range_stop)) {
2302       GST_WARNING_OBJECT (demux,
2303           "Seek to invalid position start:%" GST_STIME_FORMAT
2304           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2305           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2306           GST_STIME_ARGS (range_stop));
2307       start_valid = FALSE;
2308     }
2309     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2310         (stop < range_start || stop > range_stop)) {
2311       GST_WARNING_OBJECT (demux,
2312           "Seek to invalid position stop:%" GST_STIME_FORMAT
2313           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2314           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2315           GST_STIME_ARGS (range_stop));
2316       stop_valid = FALSE;
2317     }
2318
2319     /* If the seek position is still outside of the seekable range, refuse the seek */
2320     if (!start_valid || !stop_valid) {
2321       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2322       GST_API_UNLOCK (demux);
2323       gst_event_unref (event);
2324       return FALSE;
2325     }
2326
2327     /* Re-create seek event with changed/updated values */
2328     if (changed) {
2329       gst_event_unref (event);
2330       event =
2331           gst_event_new_seek (rate, format, flags,
2332           start_type, start, stop_type, stop);
2333       gst_event_set_seqnum (event, seqnum);
2334     }
2335   }
2336
2337   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2338
2339   /* have a backup in case seek fails */
2340   gst_segment_copy_into (&demux->segment, &oldsegment);
2341
2342   GST_DEBUG_OBJECT (demux, "sending flush start");
2343   flush_event = gst_event_new_flush_start ();
2344   gst_event_set_seqnum (flush_event, seqnum);
2345
2346   gst_adaptive_demux_push_src_event (demux, flush_event);
2347
2348   gst_adaptive_demux_stop_tasks (demux, FALSE);
2349   gst_adaptive_demux_reset_tracks (demux);
2350
2351   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2352
2353   /*
2354    * Handle snap seeks as follows:
2355    * 1) do the snap seeking on the stream that received
2356    *    the event
2357    * 2) use the final position on this stream to seek
2358    *    on the other streams to the same position
2359    *
2360    * We can't snap at all streams at the same time as
2361    * they might end in different positions, so just
2362    * use the one that received the event as the 'leading'
2363    * one to do the snap seek.
2364    *
2365    * FIXME: Could use the global_output_position (running time)
2366    * as the snap reference
2367    */
2368   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
2369           gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
2370     GstClockTimeDiff ts;
2371     GstSeekFlags stream_seek_flags = flags;
2372
2373     /* snap-seek on the stream that received the event and then
2374      * use the resulting position to seek on all streams */
2375
2376     if (rate >= 0) {
2377       if (start_type != GST_SEEK_TYPE_NONE)
2378         ts = start;
2379       else {
2380         ts = stream->current_position;
2381         start_type = GST_SEEK_TYPE_SET;
2382       }
2383     } else {
2384       if (stop_type != GST_SEEK_TYPE_NONE)
2385         ts = stop;
2386       else {
2387         stop_type = GST_SEEK_TYPE_SET;
2388         ts = stream->current_position;
2389       }
2390     }
2391
2392     if (stream) {
2393       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2394     }
2395
2396     /* replace event with a new one without snapping to seek on all streams */
2397     gst_event_unref (event);
2398     if (rate >= 0) {
2399       start = ts;
2400     } else {
2401       stop = ts;
2402     }
2403     event =
2404         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2405         start_type, start, stop_type, stop);
2406     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2407   }
2408   stream = NULL;
2409
2410   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2411       start, stop_type, stop, &update);
2412
2413   if (ret) {
2414     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2415
2416     ret = demux_class->seek (demux, event);
2417   }
2418
2419   if (!ret) {
2420     /* Is there anything else we can do if it fails? */
2421     gst_segment_copy_into (&oldsegment, &demux->segment);
2422   } else {
2423     demux->priv->segment_seqnum = seqnum;
2424   }
2425   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2426
2427   /* Resetting flow combiner */
2428   gst_flow_combiner_reset (demux->priv->flowcombiner);
2429
2430   GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2431   flush_event = gst_event_new_flush_stop (TRUE);
2432   gst_event_set_seqnum (flush_event, seqnum);
2433   gst_adaptive_demux_push_src_event (demux, flush_event);
2434
2435   /* If the seek generated a new period, prepare it */
2436   if (!demux->input_period->prepared) {
2437     /* This can only happen on flushing seeks */
2438     g_assert (flags & GST_SEEK_FLAG_FLUSH);
2439     gst_adaptive_demux_seek_to_input_period (demux);
2440   }
2441
2442   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2443   GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2444       &demux->segment);
2445   gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2446   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2447
2448   /* Reset the global output position (running time) for when the output loop restarts */
2449   demux->priv->global_output_position = 0;
2450
2451   /* After a flushing seek, any instant-rate override is undone */
2452   demux->instant_rate_multiplier = 1.0;
2453
2454   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2455
2456   /* Restart the demux */
2457   gst_adaptive_demux_start_tasks (demux);
2458
2459   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2460   GST_API_UNLOCK (demux);
2461   gst_event_unref (event);
2462
2463   return ret;
2464 }
2465
2466 /* Returns TRUE if the stream has at least one selected track */
2467 static gboolean
2468 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2469     stream)
2470 {
2471   GList *tmp;
2472
2473   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2474     GstAdaptiveDemuxTrack *track = tmp->data;
2475
2476     if (track->selected)
2477       return TRUE;
2478   }
2479
2480   return FALSE;
2481 }
2482
2483 static gboolean
2484 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2485     guint32 seqnum)
2486 {
2487   gboolean selection_handled = TRUE;
2488   GList *iter;
2489   GList *tracks = NULL;
2490
2491   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2492     return FALSE;
2493
2494   TRACKS_LOCK (demux);
2495   /* Validate the streams and fill:
2496    * tracks : list of tracks corresponding to requested streams
2497    */
2498   for (iter = streams; iter; iter = iter->next) {
2499     gchar *stream_id = (gchar *) iter->data;
2500     GstAdaptiveDemuxTrack *track;
2501
2502     GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2503     track = find_track_for_stream_id (demux->output_period, stream_id);
2504     if (!track) {
2505       GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2506       selection_handled = FALSE;
2507       goto select_streams_done;
2508     }
2509     tracks = g_list_append (tracks, track);
2510     GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2511   }
2512
2513   /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2514    * SCHEDULING THREAD */
2515
2516   /* FIXME: We want to iterate all streams, mark them as deselected,
2517    * then iterate tracks and mark any streams that have at least 1
2518    * active output track, then loop over all streams again and start/stop
2519    * them as needed */
2520
2521   /* Go over all tracks present and (de)select based on current selection */
2522   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2523     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2524
2525     if (track->selected && !g_list_find (tracks, track)) {
2526       GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2527           track->stream_id, track->active);
2528       track->selected = FALSE;
2529       track->draining = TRUE;
2530     } else if (!track->selected && g_list_find (tracks, track)) {
2531       GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2532
2533       track->selected = TRUE;
2534     }
2535   }
2536
2537   /* Start or stop streams based on the updated track selection */
2538   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2539     GstAdaptiveDemux2Stream *stream = iter->data;
2540     GList *trackiter;
2541
2542     gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2543     gboolean should_be_running =
2544         gst_adaptive_demux2_stream_has_selected_tracks (stream);
2545
2546     if (!is_running && should_be_running) {
2547       GstClockTime output_running_ts = demux->priv->global_output_position;
2548       GstClockTime start_position;
2549
2550       /* Calculate where we should start the stream, and then
2551        * start it. */
2552       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2553
2554       GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2555           GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2556           GST_TIME_ARGS (output_running_ts), &demux->segment);
2557
2558       start_position =
2559           gst_segment_position_from_running_time (&demux->segment,
2560           GST_FORMAT_TIME, output_running_ts);
2561
2562       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2563
2564       GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2565           GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2566
2567       stream->current_position = stream->start_position = start_position;
2568       stream->compute_segment = TRUE;
2569
2570       /* If output has already begun, ensure we seek this segment
2571        * to the correct restart position when the download loop begins */
2572       if (output_running_ts != 0)
2573         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2574
2575       /* Activate track pads for this stream */
2576       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2577         GstAdaptiveDemuxTrack *track =
2578             (GstAdaptiveDemuxTrack *) trackiter->data;
2579         gst_pad_set_active (track->sinkpad, TRUE);
2580       }
2581
2582       gst_adaptive_demux2_stream_start (stream);
2583     } else if (is_running && !should_be_running) {
2584       /* Stream should not be running and needs stopping */
2585       gst_adaptive_demux2_stream_stop (stream);
2586
2587       /* Set all track sinkpads to inactive for this stream */
2588       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2589         GstAdaptiveDemuxTrack *track =
2590             (GstAdaptiveDemuxTrack *) trackiter->data;
2591         gst_pad_set_active (track->sinkpad, FALSE);
2592       }
2593     }
2594   }
2595
2596   g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2597
2598 select_streams_done:
2599   demux_update_buffering_locked (demux);
2600   demux_post_buffering_locked (demux);
2601
2602   TRACKS_UNLOCK (demux);
2603   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2604
2605   if (tracks)
2606     g_list_free (tracks);
2607   return selection_handled;
2608 }
2609
2610 static gboolean
2611 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2612     GstEvent * event)
2613 {
2614   GstAdaptiveDemux *demux;
2615
2616   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2617
2618   switch (event->type) {
2619     case GST_EVENT_SEEK:
2620     {
2621       guint32 seqnum = gst_event_get_seqnum (event);
2622       if (seqnum == demux->priv->segment_seqnum) {
2623         GST_LOG_OBJECT (pad,
2624             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2625         gst_event_unref (event);
2626         return TRUE;
2627       }
2628       return gst_adaptive_demux_handle_seek_event (demux, pad, event);
2629     }
2630     case GST_EVENT_LATENCY:{
2631       /* Upstream and our internal source are irrelevant
2632        * for latency, and we should not fail here to
2633        * configure the latency */
2634       gst_event_unref (event);
2635       return TRUE;
2636     }
2637     case GST_EVENT_QOS:{
2638       GstClockTimeDiff diff;
2639       GstClockTime timestamp;
2640       GstClockTime earliest_time;
2641
2642       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2643       /* Only take into account lateness if late */
2644       if (diff > 0)
2645         earliest_time = timestamp + 2 * diff;
2646       else
2647         earliest_time = timestamp;
2648
2649       GST_OBJECT_LOCK (demux);
2650       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2651           earliest_time > demux->priv->qos_earliest_time) {
2652         demux->priv->qos_earliest_time = earliest_time;
2653         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2654             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2655       }
2656       GST_OBJECT_UNLOCK (demux);
2657       break;
2658     }
2659     case GST_EVENT_SELECT_STREAMS:
2660     {
2661       GList *streams;
2662       gboolean selection_handled;
2663
2664       if (GST_EVENT_SEQNUM (event) ==
2665           g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2666         GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2667             GST_EVENT_SEQNUM (event));
2668         return TRUE;
2669       }
2670
2671       gst_event_parse_select_streams (event, &streams);
2672       selection_handled =
2673           handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2674       g_list_free_full (streams, g_free);
2675       return selection_handled;
2676     }
2677     default:
2678       break;
2679   }
2680
2681   return gst_pad_event_default (pad, parent, event);
2682 }
2683
2684 static gboolean
2685 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2686     GstQuery * query)
2687 {
2688   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2689   gboolean ret = FALSE;
2690
2691   if (query == NULL)
2692     return FALSE;
2693
2694
2695   switch (query->type) {
2696     case GST_QUERY_DURATION:{
2697       GstFormat fmt;
2698       GstClockTime duration = GST_CLOCK_TIME_NONE;
2699
2700       gst_query_parse_duration (query, &fmt, NULL);
2701
2702       if (gst_adaptive_demux_is_live (demux)) {
2703         /* We are able to answer this query: the duration is unknown */
2704         gst_query_set_duration (query, fmt, -1);
2705         ret = TRUE;
2706         break;
2707       }
2708
2709       if (fmt == GST_FORMAT_TIME
2710           && g_atomic_int_get (&demux->priv->have_manifest)) {
2711
2712         GST_MANIFEST_LOCK (demux);
2713         duration = demux->priv->duration;
2714         GST_MANIFEST_UNLOCK (demux);
2715
2716         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2717           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2718           ret = TRUE;
2719         }
2720       }
2721
2722       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2723           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2724       break;
2725     }
2726     case GST_QUERY_LATENCY:{
2727       gst_query_set_latency (query, FALSE, 0, -1);
2728       ret = TRUE;
2729       break;
2730     }
2731     case GST_QUERY_SEEKING:{
2732       GstFormat fmt;
2733       gint64 stop = -1;
2734       gint64 start = 0;
2735
2736       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2737         GST_INFO_OBJECT (demux,
2738             "Don't have manifest yet, can't answer seeking query");
2739         return FALSE;           /* can't answer without manifest */
2740       }
2741
2742       GST_MANIFEST_LOCK (demux);
2743
2744       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2745       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2746       if (fmt == GST_FORMAT_TIME) {
2747         GstClockTime duration;
2748         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2749
2750         ret = TRUE;
2751         if (can_seek) {
2752           if (gst_adaptive_demux_is_live (demux)) {
2753             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2754             if (!ret) {
2755               GST_MANIFEST_UNLOCK (demux);
2756               GST_INFO_OBJECT (demux, "can't answer seeking query");
2757               return FALSE;
2758             }
2759           } else {
2760             duration = demux->priv->duration;
2761             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2762               stop = duration;
2763           }
2764         }
2765         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2766         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2767             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2768             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2769       }
2770       GST_MANIFEST_UNLOCK (demux);
2771       break;
2772     }
2773     case GST_QUERY_URI:
2774
2775       GST_MANIFEST_LOCK (demux);
2776
2777       /* TODO HLS can answer this differently it seems */
2778       if (demux->manifest_uri) {
2779         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2780          * playlist or the the uri of the last downlowaded fragment? */
2781         gst_query_set_uri (query, demux->manifest_uri);
2782         ret = TRUE;
2783       }
2784
2785       GST_MANIFEST_UNLOCK (demux);
2786       break;
2787     case GST_QUERY_SELECTABLE:
2788       gst_query_set_selectable (query, TRUE);
2789       ret = TRUE;
2790       break;
2791     default:
2792       /* Don't forward queries upstream because of the special nature of this
2793        *  "demuxer", which relies on the upstream element only to be fed
2794        *  the Manifest
2795        */
2796       break;
2797   }
2798
2799   return ret;
2800 }
2801
2802 /* Called when the scheduler starts, to kick off manifest updates
2803  * and stream downloads */
2804 static gboolean
2805 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2806 {
2807   GList *iter;
2808
2809   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2810
2811   iter = demux->input_period->streams;
2812
2813   for (; iter; iter = g_list_next (iter)) {
2814     GstAdaptiveDemux2Stream *stream = iter->data;
2815
2816     /* If we need to process this stream to discover tracks *OR* it has any
2817      * tracks which are selected, start it now */
2818     if ((stream->pending_tracks == TRUE)
2819         || gst_adaptive_demux2_stream_is_selected_locked (stream))
2820       gst_adaptive_demux2_stream_start (stream);
2821   }
2822
2823   return FALSE;
2824 }
2825
2826 /* must be called with manifest_lock taken */
2827 static void
2828 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2829 {
2830   if (!gst_adaptive_demux2_is_running (demux)) {
2831     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2832     return;
2833   }
2834
2835   GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2836   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2837       (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2838
2839   TRACKS_LOCK (demux);
2840   demux->priv->flushing = FALSE;
2841   GST_DEBUG_OBJECT (demux, "Starting the output task");
2842   gst_task_start (demux->priv->output_task);
2843   TRACKS_UNLOCK (demux);
2844 }
2845
2846 /* must be called with manifest_lock taken */
2847 static void
2848 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2849 {
2850   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2851   if (demux->priv->manifest_updates_cb != 0) {
2852     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2853         demux->priv->manifest_updates_cb);
2854     demux->priv->manifest_updates_cb = 0;
2855   }
2856 }
2857
2858 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2859
2860 /* must be called with manifest_lock taken */
2861 static void
2862 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2863 {
2864   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2865
2866   if (gst_adaptive_demux_is_live (demux)) {
2867     /* Task to periodically update the manifest */
2868     if (demux_class->requires_periodical_playlist_update (demux)) {
2869       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2870       if (demux->priv->manifest_updates_cb == 0) {
2871         demux->priv->manifest_updates_cb =
2872             gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2873             (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2874       }
2875     }
2876   }
2877 }
2878
2879 /* must be called with manifest_lock taken
2880  * This function will temporarily release manifest_lock in order to join the
2881  * download threads.
2882  * The api_lock will still protect it against other threads trying to modify
2883  * the demux element.
2884  */
2885 static void
2886 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2887 {
2888   GST_LOG_OBJECT (demux, "Stopping tasks");
2889
2890   if (stop_updates)
2891     gst_adaptive_demux_stop_manifest_update_task (demux);
2892
2893   TRACKS_LOCK (demux);
2894   demux->priv->flushing = TRUE;
2895   g_cond_signal (&demux->priv->tracks_add);
2896   gst_task_stop (demux->priv->output_task);
2897   TRACKS_UNLOCK (demux);
2898
2899   gst_task_join (demux->priv->output_task);
2900
2901   if (demux->input_period)
2902     gst_adaptive_demux_period_stop_tasks (demux->input_period);
2903
2904   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2905 }
2906
2907 /* must be called with manifest_lock taken */
2908 static gboolean
2909 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2910 {
2911   GList *iter;
2912   gboolean ret = TRUE;
2913
2914   GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2915
2916   TRACKS_LOCK (demux);
2917   for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2918     OutputSlot *slot = (OutputSlot *) iter->data;
2919     gst_event_ref (event);
2920     GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2921     ret = ret & gst_pad_push_event (slot->pad, event);
2922     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2923       slot->pushed_timed_data = FALSE;
2924   }
2925   TRACKS_UNLOCK (demux);
2926   gst_event_unref (event);
2927   return ret;
2928 }
2929
2930 /* must be called with manifest_lock taken */
2931 void
2932 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2933     GstCaps * caps)
2934 {
2935   GST_DEBUG_OBJECT (stream,
2936       "setting new caps for stream %" GST_PTR_FORMAT, caps);
2937   gst_caps_replace (&stream->pending_caps, caps);
2938   gst_caps_unref (caps);
2939 }
2940
2941 /* must be called with manifest_lock taken */
2942 void
2943 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2944     GstTagList * tags)
2945 {
2946   GST_DEBUG_OBJECT (stream,
2947       "setting new tags for stream %" GST_PTR_FORMAT, tags);
2948   if (stream->pending_tags) {
2949     gst_tag_list_unref (stream->pending_tags);
2950   }
2951   stream->pending_tags = tags;
2952 }
2953
2954 /* must be called with manifest_lock taken */
2955 void
2956 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2957     GstEvent * event)
2958 {
2959   stream->pending_events = g_list_append (stream->pending_events, event);
2960 }
2961
2962 static guint64
2963 _update_average_bitrate (GstAdaptiveDemux * demux,
2964     GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
2965 {
2966   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2967
2968   stream->moving_bitrate -= stream->fragment_bitrates[index];
2969   stream->fragment_bitrates[index] = new_bitrate;
2970   stream->moving_bitrate += new_bitrate;
2971
2972   stream->moving_index += 1;
2973
2974   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2975     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2976   return stream->moving_bitrate / stream->moving_index;
2977 }
2978
2979 static guint64
2980 gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2981     GstAdaptiveDemux2Stream * stream)
2982 {
2983   guint64 average_bitrate;
2984   guint64 fragment_bitrate;
2985   guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
2986
2987   fragment_bitrate = stream->last_bitrate;
2988   GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2989       fragment_bitrate);
2990
2991   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2992
2993   GST_INFO_OBJECT (stream,
2994       "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
2995   GST_INFO_OBJECT (stream,
2996       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2997       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2998
2999   /* Conservative approach, make sure we don't upgrade too fast */
3000   GST_OBJECT_LOCK (demux);
3001   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
3002
3003   /* If this is the/a video stream update the overall demuxer
3004    * reported bitrate and notify, to give the application a
3005    * chance to choose a new connection-bitrate */
3006   if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
3007     demux->current_download_rate = stream->current_download_rate;
3008     GST_OBJECT_UNLOCK (demux);
3009     g_object_notify (G_OBJECT (demux), "current-bandwidth");
3010     GST_OBJECT_LOCK (demux);
3011   }
3012
3013   connection_speed = demux->connection_speed;
3014   min_bitrate = demux->min_bitrate;
3015   max_bitrate = demux->max_bitrate;
3016   GST_OBJECT_UNLOCK (demux);
3017
3018   if (connection_speed) {
3019     GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
3020         connection_speed / 1000);
3021     return connection_speed;
3022   }
3023
3024   /* No explicit connection_speed, so choose the new variant to use as a
3025    * fraction of the measured download rate */
3026   target_download_rate =
3027       CLAMP (stream->current_download_rate, 0,
3028       G_MAXUINT) * demux->bandwidth_target_ratio;
3029
3030   GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
3031       demux->bandwidth_target_ratio, target_download_rate);
3032
3033 #if 0
3034   /* Debugging code, modulate the bitrate every few fragments */
3035   {
3036     static guint ctr = 0;
3037     if (ctr % 3 == 0) {
3038       GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
3039       target_download_rate /= 2;
3040     }
3041     ctr++;
3042   }
3043 #endif
3044
3045   if (min_bitrate > 0 && target_download_rate < min_bitrate) {
3046     target_download_rate = min_bitrate;
3047     GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
3048         min_bitrate);
3049   }
3050
3051   if (max_bitrate > 0 && target_download_rate > max_bitrate) {
3052     target_download_rate = max_bitrate;
3053     GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
3054         max_bitrate);
3055   }
3056
3057   GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
3058       target_download_rate);
3059
3060   return target_download_rate;
3061 }
3062
3063 /* must be called with manifest_lock taken */
3064 static GstFlowReturn
3065 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
3066     GstAdaptiveDemux2Stream * stream)
3067 {
3068   /* No need to advance, this isn't a real fragment */
3069   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
3070     return GST_FLOW_OK;
3071
3072   return gst_adaptive_demux2_stream_advance_fragment (demux, stream,
3073       stream->fragment.duration);
3074 }
3075
3076 /* must be called with manifest_lock taken.
3077  * Can temporarily release manifest_lock
3078  */
3079 static GstFlowReturn
3080 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
3081     GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
3082 {
3083   return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
3084 }
3085
3086 static gboolean
3087 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
3088     * demux)
3089 {
3090   return TRUE;
3091 }
3092
3093 /* Called when a stream needs waking after the manifest is updated */
3094 void
3095 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
3096 {
3097   demux->priv->stream_waiting_for_manifest = TRUE;
3098 }
3099
3100 static gboolean
3101 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
3102 {
3103   GstFlowReturn ret = GST_FLOW_OK;
3104   gboolean schedule_again = TRUE;
3105
3106   GST_MANIFEST_LOCK (demux);
3107   demux->priv->manifest_updates_cb = 0;
3108
3109   /* Updating playlist only needed for live playlists */
3110   if (!gst_adaptive_demux_is_live (demux)) {
3111     GST_MANIFEST_UNLOCK (demux);
3112     return G_SOURCE_REMOVE;
3113   }
3114
3115   GST_DEBUG_OBJECT (demux, "Updating playlist");
3116   ret = gst_adaptive_demux_update_manifest (demux);
3117
3118   if (ret == GST_FLOW_EOS) {
3119     GST_MANIFEST_UNLOCK (demux);
3120     return G_SOURCE_REMOVE;
3121   }
3122
3123   if (ret == GST_FLOW_OK) {
3124     GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
3125     demux->priv->update_failed_count = 0;
3126
3127     /* Wake up download tasks */
3128     if (demux->priv->stream_waiting_for_manifest) {
3129       GList *iter;
3130
3131       for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
3132         GstAdaptiveDemux2Stream *stream = iter->data;
3133         gst_adaptive_demux2_stream_on_manifest_update (stream);
3134       }
3135       demux->priv->stream_waiting_for_manifest = FALSE;
3136     }
3137   } else {
3138     demux->priv->update_failed_count++;
3139
3140     if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
3141       GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
3142           gst_flow_get_name (ret));
3143     } else {
3144       GST_ELEMENT_ERROR (demux, STREAM, FAILED,
3145           (_("Internal data stream error.")), ("Could not update playlist"));
3146       GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
3147       schedule_again = FALSE;
3148     }
3149   }
3150
3151   if (schedule_again) {
3152     GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3153
3154     demux->priv->manifest_updates_cb =
3155         gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3156         klass->get_manifest_update_interval (demux) * GST_USECOND,
3157         (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3158   }
3159
3160   GST_MANIFEST_UNLOCK (demux);
3161
3162   return G_SOURCE_REMOVE;
3163 }
3164
3165 static gboolean
3166 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
3167 {
3168   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3169
3170   /* Loop for updating of the playlist. This periodically checks if
3171    * the playlist is updated and does so, then signals the streaming
3172    * thread in case it can continue downloading now. */
3173
3174   /* block until the next scheduled update or the signal to quit this thread */
3175   GST_DEBUG_OBJECT (demux, "Started updates task");
3176   demux->priv->manifest_updates_cb =
3177       gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3178       klass->get_manifest_update_interval (demux) * GST_USECOND,
3179       (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3180
3181   return G_SOURCE_REMOVE;
3182 }
3183
3184 static OutputSlot *
3185 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
3186     GstAdaptiveDemuxTrack * track)
3187 {
3188   GList *tmp;
3189
3190   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3191     OutputSlot *slot = (OutputSlot *) tmp->data;
3192     /* Incompatible output type */
3193     if (slot->type != track->type)
3194       continue;
3195
3196     /* Slot which is already assigned to this pending track */
3197     if (slot->pending_track == track)
3198       return slot;
3199
3200     /* slot already used for another pending track */
3201     if (slot->pending_track != NULL)
3202       continue;
3203
3204     /* Current output track is of the same type and is draining */
3205     if (slot->track && slot->track->draining)
3206       return slot;
3207   }
3208
3209   return NULL;
3210 }
3211
3212 /* TRACKS_LOCK taken */
3213 static OutputSlot *
3214 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3215 {
3216   GList *tmp;
3217
3218   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3219     OutputSlot *slot = (OutputSlot *) tmp->data;
3220
3221     if (slot->track == track)
3222       return slot;
3223   }
3224
3225   return NULL;
3226 }
3227
3228 /* TRACKS_LOCK held */
3229 static GstMessage *
3230 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3231 {
3232   GList *tmp;
3233   GstMessage *msg;
3234
3235   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3236     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3237
3238     if (track->selected && !track->active)
3239       return NULL;
3240   }
3241
3242   /* All selected tracks are active, created message */
3243   msg =
3244       gst_message_new_streams_selected (GST_OBJECT (demux),
3245       demux->output_period->collection);
3246   GST_MESSAGE_SEQNUM (msg) = seqnum;
3247   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3248     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3249     if (track->active) {
3250       gst_message_streams_selected_add (msg, track->stream_object);
3251     }
3252   }
3253
3254   return msg;
3255 }
3256
3257 static void
3258 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3259     OutputSlot * slot)
3260 {
3261   GstAdaptiveDemuxTrack *track = slot->track;
3262   GstEvent *event;
3263
3264   /* Send EVENT_STREAM_START */
3265   event = gst_event_new_stream_start (track->stream_id);
3266   if (demux->have_group_id)
3267     gst_event_set_group_id (event, demux->group_id);
3268   gst_event_set_stream_flags (event, track->flags);
3269   gst_event_set_stream (event, track->stream_object);
3270   GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3271       track->stream_id);
3272   gst_pad_push_event (slot->pad, event);
3273
3274   /* Send EVENT_STREAM_COLLECTION */
3275   event = gst_event_new_stream_collection (demux->output_period->collection);
3276   GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3277       track->stream_id);
3278   gst_pad_push_event (slot->pad, event);
3279
3280   /* Mark all sticky events for re-sending */
3281   gst_event_store_mark_all_undelivered (&track->sticky_events);
3282 }
3283
3284 /*
3285  * Called with TRACKS_LOCK taken
3286  */
3287 static void
3288 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3289 {
3290   GList *tmp;
3291   guint requested_selection_seqnum;
3292   GstMessage *msg;
3293
3294   /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3295      output slots vs active/draining tracks */
3296   requested_selection_seqnum =
3297       g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3298
3299   if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3300     return;
3301
3302   GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3303
3304   /* Go over all slots, and if they have a pending track that's no longer
3305    * selected, clear it so the slot can be reused */
3306   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3307     OutputSlot *slot = (OutputSlot *) tmp->data;
3308
3309     if (slot->pending_track != NULL && !slot->pending_track->selected) {
3310       GST_DEBUG_OBJECT (demux,
3311           "Removing deselected track '%s' as pending from output of current track '%s'",
3312           slot->pending_track->stream_id, slot->track->stream_id);
3313       gst_adaptive_demux_track_unref (slot->pending_track);
3314       slot->pending_track = NULL;
3315     }
3316   }
3317
3318   /* Go over all tracks and create/re-assign/remove slots */
3319   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3320     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3321
3322     if (track->selected) {
3323       OutputSlot *slot = find_slot_for_track (demux, track);
3324
3325       /* 0. Track is selected and has a slot. Nothing to do */
3326       if (slot) {
3327         GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3328             track->stream_id);
3329         continue;
3330       }
3331
3332       slot = find_replacement_slot_for_track (demux, track);
3333       if (slot) {
3334         /* 1. There is an existing slot of the same type which is currently
3335          *    draining, assign this track as a replacement for it */
3336         g_assert (slot->pending_track == NULL || slot->pending_track == track);
3337         if (slot->pending_track == NULL) {
3338           slot->pending_track = gst_adaptive_demux_track_ref (track);
3339           GST_DEBUG_OBJECT (demux,
3340               "Track '%s' will be used on output of track '%s'",
3341               track->stream_id, slot->track->stream_id);
3342         }
3343       } else {
3344         /* 2. There is no compatible replacement slot, create a new one */
3345         slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3346         GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3347             track->stream_id);
3348         demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3349
3350         track->update_next_segment = TRUE;
3351
3352         slot->track = gst_adaptive_demux_track_ref (track);
3353         track->active = TRUE;
3354         gst_adaptive_demux_send_initial_events (demux, slot);
3355       }
3356
3357       /* If we were draining this track, we no longer are */
3358       track->draining = FALSE;
3359     }
3360   }
3361
3362   /* Finally check all slots have a current/pending track. If not remove it */
3363   for (tmp = demux->priv->outputs; tmp;) {
3364     OutputSlot *slot = (OutputSlot *) tmp->data;
3365     /* We should never has slots without target tracks */
3366     g_assert (slot->track);
3367     if (slot->track->draining && !slot->pending_track) {
3368       GstAdaptiveDemux2Stream *stream;
3369
3370       GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3371           slot->track->stream_id);
3372       slot->track->active = FALSE;
3373
3374       /* If the stream feeding this track is stopped, flush and clear
3375        * the track now that it's going inactive. If the stream was not
3376        * found, it means we advanced past that period already (and the
3377        * stream was stopped and discarded) */
3378       stream = find_stream_for_track_locked (demux, slot->track);
3379       if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3380         gst_adaptive_demux_track_flush (slot->track);
3381
3382       tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3383       gst_adaptive_demux_output_slot_free (demux, slot);
3384     } else
3385       tmp = tmp->next;
3386   }
3387
3388   demux->priv->current_selection_seqnum = requested_selection_seqnum;
3389   msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3390   if (msg) {
3391     TRACKS_UNLOCK (demux);
3392     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3393     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3394     TRACKS_LOCK (demux);
3395   }
3396 }
3397
3398 /* TRACKS_LOCK held */
3399 static gboolean
3400 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3401 {
3402   GList *iter;
3403   GstAdaptiveDemuxPeriod *previous_period;
3404   GstStreamCollection *collection;
3405
3406   /* Grab the next period, should be demux->periods->next->data */
3407   previous_period = g_queue_pop_head (demux->priv->periods);
3408
3409   /* Remove ref held by demux->output_period */
3410   gst_adaptive_demux_period_unref (previous_period);
3411   demux->output_period =
3412       gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3413
3414   GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3415       demux->output_period->period_num);
3416
3417   /* We can now post the collection of the new period */
3418   collection = demux->output_period->collection;
3419   TRACKS_UNLOCK (demux);
3420   gst_element_post_message (GST_ELEMENT_CAST (demux),
3421       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3422   TRACKS_LOCK (demux);
3423
3424   /* Unselect all tracks of the previous period */
3425   for (iter = previous_period->tracks; iter; iter = iter->next) {
3426     GstAdaptiveDemuxTrack *track = iter->data;
3427     if (track->selected) {
3428       track->selected = FALSE;
3429       track->draining = TRUE;
3430     }
3431   }
3432
3433   /* Force a selection re-check */
3434   g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3435   check_and_handle_selection_update_locked (demux);
3436
3437   /* Remove the final ref on the previous period now that we have done the switch */
3438   gst_adaptive_demux_period_unref (previous_period);
3439
3440   return TRUE;
3441 }
3442
3443 /* Called with TRACKS_LOCK taken */
3444 static void
3445 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3446     OutputSlot * slot)
3447 {
3448   GstAdaptiveDemuxTrack *track = slot->track;
3449   GstMessage *msg;
3450   gboolean pending_is_ready;
3451   GstAdaptiveDemux2Stream *stream;
3452
3453   /* If we have a pending track for this slot, the current track should be
3454    * draining and no longer selected */
3455   g_assert (track->draining && !track->selected);
3456
3457   /* If we're draining, check if the pending track has enough data *or* that
3458      we've already drained out entirely */
3459   pending_is_ready =
3460       (slot->pending_track->level_time >=
3461       slot->pending_track->buffering_threshold);
3462   pending_is_ready |= slot->pending_track->eos;
3463
3464   if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3465     GST_DEBUG_OBJECT (demux,
3466         "Replacement track '%s' doesn't have enough data for switching yet",
3467         slot->pending_track->stream_id);
3468     return;
3469   }
3470
3471   GST_DEBUG_OBJECT (demux,
3472       "Pending replacement track has enough data, switching");
3473   track->active = FALSE;
3474   track->draining = FALSE;
3475
3476   /* If the stream feeding this track is stopped, flush and clear
3477    * the track now that it's going inactive. If the stream was not
3478    * found, it means we advanced past that period already (and the
3479    * stream was stopped and discarded) */
3480   stream = find_stream_for_track_locked (demux, track);
3481   if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3482     gst_adaptive_demux_track_flush (track);
3483
3484   gst_adaptive_demux_track_unref (track);
3485   /* We steal the reference of pending_track */
3486   track = slot->track = slot->pending_track;
3487   slot->pending_track = NULL;
3488   slot->track->active = TRUE;
3489
3490   /* Make sure the track segment will start at the current position */
3491   track->update_next_segment = TRUE;
3492
3493   /* Send stream start and collection, and schedule sticky events */
3494   gst_adaptive_demux_send_initial_events (demux, slot);
3495
3496   /* Can we emit the streams-selected message now ? */
3497   msg =
3498       all_selected_tracks_are_active (demux,
3499       g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3500   if (msg) {
3501     TRACKS_UNLOCK (demux);
3502     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3503     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3504     TRACKS_LOCK (demux);
3505   }
3506
3507 }
3508
3509 static void
3510 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3511 {
3512   GList *tmp;
3513   GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3514   gboolean wait_for_data = FALSE;
3515   GstFlowReturn ret;
3516
3517   GST_DEBUG_OBJECT (demux, "enter");
3518
3519   TRACKS_LOCK (demux);
3520
3521   /* Check if stopping */
3522   if (demux->priv->flushing) {
3523     ret = GST_FLOW_FLUSHING;
3524     goto pause;
3525   }
3526
3527   /* If the selection changed, handle it */
3528   check_and_handle_selection_update_locked (demux);
3529
3530 restart:
3531   ret = GST_FLOW_OK;
3532   global_output_position = GST_CLOCK_STIME_NONE;
3533   if (wait_for_data) {
3534     GST_DEBUG_OBJECT (demux, "Waiting for data");
3535     g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3536     GST_DEBUG_OBJECT (demux, "Done waiting for data");
3537     if (demux->priv->flushing) {
3538       ret = GST_FLOW_FLUSHING;
3539       goto pause;
3540     }
3541     wait_for_data = FALSE;
3542   }
3543
3544   /* Grab/Recalculate current global output position
3545    * This is the minimum pending output position of all tracks used for output
3546    *
3547    * If there is a track which is empty and not EOS, wait for it to receive data
3548    * then recalculate global output position.
3549    *
3550    * This also pushes downstream all non-timed data that might be present.
3551    *
3552    * IF all tracks are EOS : stop task
3553    */
3554   GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3555   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3556     OutputSlot *slot = (OutputSlot *) tmp->data;
3557     GstAdaptiveDemuxTrack *track;
3558
3559     /* If there is a pending track, Check if it's time to switch to it */
3560     if (slot->pending_track)
3561       handle_slot_pending_track_switch_locked (demux, slot);
3562
3563     track = slot->track;
3564
3565     if (!track->active) {
3566       /* Note: Edward: I can't see in what cases we would end up with inactive
3567          tracks assigned to slots. */
3568       GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3569       g_assert (track->active);
3570       continue;
3571     }
3572
3573     if (track->next_position == GST_CLOCK_STIME_NONE) {
3574       gst_adaptive_demux_track_update_next_position (track);
3575     }
3576
3577     if (track->next_position != GST_CLOCK_STIME_NONE) {
3578       if (global_output_position == GST_CLOCK_STIME_NONE)
3579         global_output_position = track->next_position;
3580       else
3581         global_output_position =
3582             MIN (global_output_position, track->next_position);
3583       track->waiting_add = FALSE;
3584     } else if (!track->eos) {
3585       GST_DEBUG_OBJECT (demux, "Need timed data on track %s", track->stream_id);
3586       wait_for_data = track->waiting_add = TRUE;
3587     } else {
3588       GST_DEBUG_OBJECT (demux, "Track %s is EOS, not waiting for timed data",
3589           track->stream_id);
3590     }
3591   }
3592
3593   if (wait_for_data)
3594     goto restart;
3595
3596   if (global_output_position == GST_CLOCK_STIME_NONE
3597       && demux->output_period->closed) {
3598     GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3599         demux->output_period->period_num);
3600     if (!gst_adaptive_demux_advance_output_period (demux)) {
3601       /* Failed to move to next period, error out */
3602       ret = GST_FLOW_ERROR;
3603       goto pause;
3604     }
3605     /* Restart the loop */
3606     goto restart;
3607   }
3608
3609   GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3610       GST_STIME_ARGS (global_output_position));
3611
3612   /* For each track:
3613    *
3614    * We know all active tracks have pending timed data
3615    * * while track next_position <= global output position
3616    *   * push pending data
3617    *   * Update track next_position
3618    *     * recalculate global output position
3619    *   * Pop next pending data from track and update pending position
3620    *
3621    */
3622   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3623     OutputSlot *slot = (OutputSlot *) tmp->data;
3624     GstAdaptiveDemuxTrack *track = slot->track;
3625
3626     GST_LOG_OBJECT (track->element,
3627         "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3628         " global_output_position:%" GST_STIME_FORMAT, track->active,
3629         track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3630         GST_STIME_ARGS (global_output_position));
3631
3632     if (!track->active)
3633       continue;
3634
3635     while (global_output_position == GST_CLOCK_STIME_NONE
3636         || !slot->pushed_timed_data
3637         || ((track->next_position != GST_CLOCK_STIME_NONE)
3638             && track->next_position <= global_output_position)) {
3639       GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3640
3641       if (!mo) {
3642         GST_DEBUG_OBJECT (demux,
3643             "Track '%s' doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3644             track->stream_id, track->eos, slot->pushed_timed_data);
3645         /* This should only happen if the track is EOS, or exactly in between
3646          * the parser outputting segment/caps before buffers. */
3647         g_assert (track->eos || !slot->pushed_timed_data);
3648         break;
3649       }
3650
3651       demux_update_buffering_locked (demux);
3652       demux_post_buffering_locked (demux);
3653       TRACKS_UNLOCK (demux);
3654
3655       GST_DEBUG_OBJECT (demux, "Track '%s' dequeued %" GST_PTR_FORMAT,
3656           track->stream_id, mo);
3657
3658       if (GST_IS_EVENT (mo)) {
3659         GstEvent *event = (GstEvent *) mo;
3660         if (GST_EVENT_TYPE (event) == GST_EVENT_GAP)
3661           slot->pushed_timed_data = TRUE;
3662         gst_pad_push_event (slot->pad, event);
3663
3664         if (GST_EVENT_IS_STICKY (event))
3665           gst_event_store_mark_delivered (&track->sticky_events, event);
3666       } else if (GST_IS_BUFFER (mo)) {
3667         GstBuffer *buffer = (GstBuffer *) mo;
3668
3669         if (track->output_discont) {
3670           if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3671             buffer = gst_buffer_make_writable (buffer);
3672             GST_DEBUG_OBJECT (slot->pad,
3673                 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3674                 buffer);
3675             GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3676           }
3677           track->output_discont = FALSE;
3678         }
3679         slot->flow_ret = gst_pad_push (slot->pad, buffer);
3680         ret =
3681             gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3682             slot->pad, slot->flow_ret);
3683         GST_DEBUG_OBJECT (slot->pad,
3684             "track %s push returned %s (combined %s)", track->stream_id,
3685             gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3686         slot->pushed_timed_data = TRUE;
3687       } else {
3688         GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3689       }
3690
3691       TRACKS_LOCK (demux);
3692       gst_adaptive_demux_track_update_next_position (track);
3693
3694       if (ret != GST_FLOW_OK)
3695         goto pause;
3696     }
3697   }
3698
3699   /* Store global output position */
3700   if (global_output_position != GST_CLOCK_STIME_NONE)
3701     demux->priv->global_output_position = global_output_position;
3702
3703   if (global_output_position == GST_CLOCK_STIME_NONE) {
3704     if (!demux->priv->flushing) {
3705       GST_DEBUG_OBJECT (demux,
3706           "Pausing output task after reaching NONE global_output_position");
3707       gst_task_pause (demux->priv->output_task);
3708     }
3709   }
3710
3711   TRACKS_UNLOCK (demux);
3712   GST_DEBUG_OBJECT (demux, "leave");
3713   return;
3714
3715 pause:
3716   {
3717     GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3718     /* If the flushing flag is set, then the task is being
3719      * externally stopped, so don't go to pause(), otherwise we
3720      * should so we don't keep spinning */
3721     if (!demux->priv->flushing) {
3722       GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3723           gst_flow_get_name (ret));
3724       gst_task_pause (demux->priv->output_task);
3725     }
3726
3727     TRACKS_UNLOCK (demux);
3728
3729     if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3730       GstEvent *eos = gst_event_new_eos ();
3731
3732       if (ret != GST_FLOW_EOS) {
3733         GST_ELEMENT_FLOW_ERROR (demux, ret);
3734       }
3735
3736       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3737       if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3738         gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3739       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3740
3741       gst_adaptive_demux_push_src_event (demux, eos);
3742     }
3743
3744     return;
3745   }
3746 }
3747
3748 /* must be called from the scheduler */
3749 gboolean
3750 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3751 {
3752   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3753
3754   if (klass->is_live)
3755     return klass->is_live (demux);
3756   return FALSE;
3757 }
3758
3759 /* must be called from the scheduler */
3760 GstFlowReturn
3761 gst_adaptive_demux2_stream_seek (GstAdaptiveDemux * demux,
3762     GstAdaptiveDemux2Stream * stream, gboolean forward, GstSeekFlags flags,
3763     GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
3764 {
3765   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3766
3767   if (klass->stream_seek)
3768     return klass->stream_seek (stream, forward, flags, ts, final_ts);
3769   return GST_FLOW_ERROR;
3770 }
3771
3772 /* must be called from the scheduler */
3773 gboolean
3774 gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux * demux,
3775     GstAdaptiveDemux2Stream * stream)
3776 {
3777   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3778   gboolean ret = TRUE;
3779
3780   if (klass->stream_has_next_fragment)
3781     ret = klass->stream_has_next_fragment (stream);
3782
3783   return ret;
3784 }
3785
3786 /* must be called from the scheduler */
3787 /* Called from:
3788  *  the ::finish_fragment() handlers when an *actual* fragment is done
3789  *   */
3790 GstFlowReturn
3791 gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux * demux,
3792     GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3793 {
3794   if (stream->last_ret != GST_FLOW_OK)
3795     return stream->last_ret;
3796
3797   stream->last_ret =
3798       gst_adaptive_demux2_stream_advance_fragment_unlocked (demux, stream,
3799       duration);
3800
3801   return stream->last_ret;
3802 }
3803
3804 /* must be called with manifest_lock taken */
3805 static GstFlowReturn
3806 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
3807     GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3808 {
3809   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3810   GstFlowReturn ret;
3811
3812   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
3813
3814   GST_LOG_OBJECT (stream,
3815       "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3816       GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
3817
3818   stream->download_error_count = 0;
3819   g_clear_error (&stream->last_error);
3820
3821 #if 0
3822   /* FIXME - url has no indication of byte ranges for subsegments */
3823   /* FIXME: Reenable statistics sending? */
3824   gst_element_post_message (GST_ELEMENT_CAST (demux),
3825       gst_message_new_element (GST_OBJECT_CAST (demux),
3826           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
3827               "manifest-uri", G_TYPE_STRING,
3828               demux->manifest_uri, "uri", G_TYPE_STRING,
3829               stream->fragment.uri, "fragment-start-time",
3830               GST_TYPE_CLOCK_TIME, stream->download_start_time,
3831               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
3832               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
3833               stream->download_total_bytes, "fragment-download-time",
3834               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
3835 #endif
3836
3837   /* Don't update to the end of the segment if in reverse playback */
3838   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3839   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
3840     stream->parse_segment.position += duration;
3841     stream->current_position += duration;
3842
3843     GST_DEBUG_OBJECT (stream,
3844         "stream position now %" GST_TIME_FORMAT,
3845         GST_TIME_ARGS (stream->current_position));
3846   }
3847   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3848
3849   /* When advancing with a non 1.0 rate on live streams, we need to check
3850    * the live seeking range again to make sure we can still advance to
3851    * that position */
3852   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
3853     if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
3854       ret = GST_FLOW_EOS;
3855     else
3856       ret = klass->stream_advance_fragment (stream);
3857   } else if (gst_adaptive_demux_is_live (demux)
3858       || gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
3859     ret = klass->stream_advance_fragment (stream);
3860   } else {
3861     ret = GST_FLOW_EOS;
3862   }
3863
3864   stream->download_start_time =
3865       GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
3866
3867   if (ret == GST_FLOW_OK) {
3868     GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
3869     if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
3870             gst_adaptive_demux2_stream_update_current_bitrate (demux,
3871                 stream))) {
3872       GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
3873       stream->need_header = TRUE;
3874       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
3875     }
3876   }
3877
3878   return ret;
3879 }
3880
3881 /* must be called with manifest_lock taken */
3882 static gboolean
3883 gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
3884     demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
3885 {
3886   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3887
3888   if (klass->stream_select_bitrate)
3889     return klass->stream_select_bitrate (stream, bitrate);
3890   return FALSE;
3891 }
3892
3893 /* must be called with manifest_lock taken */
3894 GstFlowReturn
3895 gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux * demux,
3896     GstAdaptiveDemux2Stream * stream)
3897 {
3898   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3899   GstFlowReturn ret;
3900
3901   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
3902       GST_FLOW_ERROR);
3903
3904   /* Make sure the sub-class will update bitrate, or else
3905    * we will later */
3906   stream->fragment.finished = FALSE;
3907
3908   GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
3909       GST_TIME_ARGS (stream->current_position));
3910
3911   ret = klass->stream_update_fragment_info (stream);
3912
3913   GST_LOG_OBJECT (stream, "ret:%s uri:%s",
3914       gst_flow_get_name (ret), stream->fragment.uri);
3915   if (ret == GST_FLOW_OK) {
3916     GST_LOG_OBJECT (stream,
3917         "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3918         GST_STIME_ARGS (stream->fragment.stream_time),
3919         GST_TIME_ARGS (stream->fragment.duration));
3920     GST_LOG_OBJECT (stream,
3921         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
3922         stream->fragment.range_start, stream->fragment.range_end);
3923   }
3924
3925   return ret;
3926 }
3927
3928 /* must be called with manifest_lock taken */
3929 GstClockTime
3930 gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux *
3931     demux, GstAdaptiveDemux2Stream * stream)
3932 {
3933   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3934
3935   if (klass->stream_get_fragment_waiting_time)
3936     return klass->stream_get_fragment_waiting_time (stream);
3937   return 0;
3938 }
3939
3940 static void
3941 handle_manifest_download_complete (DownloadRequest * request,
3942     DownloadRequestState state, GstAdaptiveDemux * demux)
3943 {
3944   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3945   GstBuffer *buffer;
3946   GstFlowReturn result;
3947
3948   g_free (demux->manifest_base_uri);
3949   g_free (demux->manifest_uri);
3950
3951   if (request->redirect_permanent && request->redirect_uri) {
3952     demux->manifest_uri = g_strdup (request->redirect_uri);
3953     demux->manifest_base_uri = NULL;
3954   } else {
3955     demux->manifest_uri = g_strdup (request->uri);
3956     demux->manifest_base_uri = g_strdup (request->redirect_uri);
3957   }
3958
3959   buffer = download_request_take_buffer (request);
3960
3961   /* We should always have a buffer since this function is the non-error
3962    * callback for the download */
3963   g_assert (buffer);
3964
3965   result = klass->update_manifest_data (demux, buffer);
3966   gst_buffer_unref (buffer);
3967
3968   /* FIXME: Should the manifest uri vars be reverted to original
3969    * values if updating fails? */
3970
3971   if (result == GST_FLOW_OK) {
3972     GstClockTime duration;
3973     /* Send an updated duration message */
3974     duration = klass->get_duration (demux);
3975     if (duration != GST_CLOCK_TIME_NONE) {
3976       GST_DEBUG_OBJECT (demux,
3977           "Sending duration message : %" GST_TIME_FORMAT,
3978           GST_TIME_ARGS (duration));
3979       gst_element_post_message (GST_ELEMENT (demux),
3980           gst_message_new_duration_changed (GST_OBJECT (demux)));
3981     } else {
3982       GST_DEBUG_OBJECT (demux,
3983           "Duration unknown, can not send the duration message");
3984     }
3985
3986     /* If a manifest changes it's liveness or periodic updateness, we need
3987      * to start/stop the manifest update task appropriately */
3988     /* Keep this condition in sync with the one in
3989      * gst_adaptive_demux_start_manifest_update_task()
3990      */
3991     if (gst_adaptive_demux_is_live (demux) &&
3992         klass->requires_periodical_playlist_update (demux)) {
3993       gst_adaptive_demux_start_manifest_update_task (demux);
3994     } else {
3995       gst_adaptive_demux_stop_manifest_update_task (demux);
3996     }
3997   }
3998 }
3999
4000 static void
4001 handle_manifest_download_failure (DownloadRequest * request,
4002     DownloadRequestState state, GstAdaptiveDemux * demux)
4003 {
4004   GST_FIXME_OBJECT (demux, "Manifest download failed.");
4005   /* Retry or error out here */
4006 }
4007
4008 static GstFlowReturn
4009 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4010 {
4011   DownloadRequest *request;
4012   GstFlowReturn ret = GST_FLOW_OK;
4013   GError *error = NULL;
4014
4015   request = download_request_new_uri (demux->manifest_uri);
4016
4017   download_request_set_callbacks (request,
4018       (DownloadRequestEventCallback) handle_manifest_download_complete,
4019       (DownloadRequestEventCallback) handle_manifest_download_failure,
4020       NULL, NULL, demux);
4021
4022   if (!downloadhelper_submit_request (demux->download_helper, NULL,
4023           DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
4024           &error)) {
4025     if (error) {
4026       GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
4027           ("Failed to download manifest: %s", error->message), (NULL));
4028       g_clear_error (&error);
4029     }
4030     ret = GST_FLOW_NOT_LINKED;
4031   }
4032
4033   return ret;
4034 }
4035
4036 /* must be called with manifest_lock taken */
4037 GstFlowReturn
4038 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4039 {
4040   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4041   GstFlowReturn ret;
4042
4043   ret = klass->update_manifest (demux);
4044
4045   return ret;
4046 }
4047
4048 void
4049 gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
4050 {
4051   g_free (f->uri);
4052   f->uri = NULL;
4053   f->range_start = 0;
4054   f->range_end = -1;
4055
4056   g_free (f->header_uri);
4057   f->header_uri = NULL;
4058   f->header_range_start = 0;
4059   f->header_range_end = -1;
4060
4061   g_free (f->index_uri);
4062   f->index_uri = NULL;
4063   f->index_range_start = 0;
4064   f->index_range_end = -1;
4065
4066   f->stream_time = GST_CLOCK_STIME_NONE;
4067   f->duration = GST_CLOCK_TIME_NONE;
4068   f->finished = FALSE;
4069 }
4070
4071 /* must be called with manifest_lock taken */
4072 gboolean
4073 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4074 {
4075   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4076   gboolean ret = FALSE;
4077
4078   if (klass->has_next_period)
4079     ret = klass->has_next_period (demux);
4080   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4081   return ret;
4082 }
4083
4084 /* must be called with manifest_lock taken */
4085 void
4086 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4087 {
4088   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4089   GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
4090
4091   g_return_if_fail (klass->advance_period != NULL);
4092
4093   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4094   /* FIXME : no return value ? What if it fails ? */
4095   klass->advance_period (demux);
4096
4097   if (previous_period == demux->input_period) {
4098     GST_ERROR_OBJECT (demux, "Advancing period failed");
4099     return;
4100   }
4101
4102   /* Stop the previous period stream tasks */
4103   gst_adaptive_demux_period_stop_tasks (previous_period);
4104
4105   gst_adaptive_demux_update_collection (demux, demux->input_period);
4106   /* Figure out a pre-emptive selection based on the output period selection */
4107   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
4108       demux->output_period);
4109
4110   gst_adaptive_demux_prepare_streams (demux, FALSE);
4111   gst_adaptive_demux_start_tasks (demux);
4112 }
4113
4114 /**
4115  * gst_adaptive_demux_get_monotonic_time:
4116  * Returns: a monotonically increasing time, using the system realtime clock
4117  */
4118 GstClockTime
4119 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
4120 {
4121   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4122   return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
4123 }
4124
4125 /**
4126  * gst_adaptive_demux_get_client_now_utc:
4127  * @demux: #GstAdaptiveDemux
4128  * Returns: the client's estimate of UTC
4129  *
4130  * Used to find the client's estimate of UTC, using the system realtime clock.
4131  */
4132 GDateTime *
4133 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
4134 {
4135   return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
4136 }
4137
4138 /**
4139  * gst_adaptive_demux_is_running
4140  * @demux: #GstAdaptiveDemux
4141  * Returns: whether the demuxer is processing data
4142  *
4143  * Returns FALSE if shutdown has started (transitioning down from
4144  * PAUSED), otherwise TRUE.
4145  */
4146 gboolean
4147 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
4148 {
4149   return g_atomic_int_get (&demux->running);
4150 }
4151
4152 /**
4153  * gst_adaptive_demux_get_qos_earliest_time:
4154  *
4155  * Returns: The QOS earliest time
4156  *
4157  * Since: 1.20
4158  */
4159 GstClockTime
4160 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
4161 {
4162   GstClockTime earliest;
4163
4164   GST_OBJECT_LOCK (demux);
4165   earliest = demux->priv->qos_earliest_time;
4166   GST_OBJECT_UNLOCK (demux);
4167
4168   return earliest;
4169 }
4170
4171 gboolean
4172 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
4173     GstAdaptiveDemux2Stream * stream)
4174 {
4175   g_return_val_if_fail (demux && stream, FALSE);
4176
4177   /* FIXME : Migrate to parent */
4178   g_return_val_if_fail (stream->demux == NULL, FALSE);
4179
4180   GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
4181
4182   TRACKS_LOCK (demux);
4183   if (demux->input_period->prepared) {
4184     GST_ERROR_OBJECT (demux,
4185         "Attempted to add streams but no new period was created");
4186     TRACKS_UNLOCK (demux);
4187     return FALSE;
4188   }
4189   stream->demux = demux;
4190   stream->period = demux->input_period;
4191   demux->input_period->streams =
4192       g_list_append (demux->input_period->streams, stream);
4193
4194   if (stream->tracks) {
4195     GList *iter;
4196     for (iter = stream->tracks; iter; iter = iter->next)
4197       if (!gst_adaptive_demux_period_add_track (demux->input_period,
4198               (GstAdaptiveDemuxTrack *) iter->data)) {
4199         GST_ERROR_OBJECT (demux, "Failed to add track elements");
4200         TRACKS_UNLOCK (demux);
4201         return FALSE;
4202       }
4203   }
4204   TRACKS_UNLOCK (demux);
4205   return TRUE;
4206 }
4207
4208 /* Return the current playback rate including any instant rate multiplier */
4209 gdouble
4210 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
4211 {
4212   gdouble rate;
4213   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4214   rate = demux->segment.rate * demux->instant_rate_multiplier;
4215   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4216   return rate;
4217 }