gstadaptivedemux: fix memory leak
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:plugin-adaptivedemux2
28  * @short_description: Next Generation adaptive demuxers
29  *
30  * What is an adaptive demuxer?  Adaptive demuxers are special demuxers in the
31  * sense that they don't actually demux data received from upstream but download
32  * the data themselves.
33  *
34  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35  * of fragments. The manifest describes the available media and the sequence of
36  * fragments to use. Each fragment contains a small part of the media (typically
37  * only a few seconds). It is possible for the manifest to have the same media
38  * available in different configurations (bitrates for example) so that the
39  * client can select the one that best suits its scenario (network fluctuation,
40  * hardware requirements...).
41  *
42  * Furthermore, that manifest can also specify alternative medias (such as audio
43  * or subtitle tracks in different languages). Only the fragments for the
44  * requested selection will be download.
45  *
46  * These elements can therefore "adapt" themselves to the network conditions (as
47  * opposed to the server doing that adaptation) and user choices, which is why
48  * they are called "adaptive" demuxers.
49  *
50  * Note: These elements require a "streams-aware" container to work
51  * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52  * GST_BIN_FLAG_STREAMS_AWARE flag set).
53  *
54  * Subclasses:
55  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56  * about the intrinsics of the subclass formats, so the subclasses are
57  * responsible for maintaining the manifest data structures and stream
58  * information.
59  *
60  * Since: 1.22
61  */
62
63 /*
64 See the adaptive-demuxer.md design documentation for more information
65
66 MT safety.
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
72
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
80
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
97
98 By using the api_lock a thread is protected against other API calls.
99 */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
107
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
111
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
114
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
118
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
121
122 #define DEFAULT_MAX_BUFFERING_TIME (30 *  GST_SECOND)
123
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME 0  /* Automatic */
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
128
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
131
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
135
136 enum
137 {
138   PROP_0,
139   PROP_CONNECTION_SPEED,
140   PROP_BANDWIDTH_TARGET_RATIO,
141   PROP_CONNECTION_BITRATE,
142   PROP_MIN_BITRATE,
143   PROP_MAX_BITRATE,
144   PROP_CURRENT_BANDWIDTH,
145   PROP_MAX_BUFFERING_TIME,
146   PROP_BUFFERING_HIGH_WATERMARK_TIME,
147   PROP_BUFFERING_LOW_WATERMARK_TIME,
148   PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149   PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150   PROP_CURRENT_LEVEL_TIME_VIDEO,
151   PROP_CURRENT_LEVEL_TIME_AUDIO,
152   PROP_LAST
153 };
154
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
157     GST_PAD_SRC,
158     GST_PAD_SOMETIMES,
159     GST_STATIC_CAPS_ANY);
160
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
163     GST_PAD_SRC,
164     GST_PAD_SOMETIMES,
165     GST_STATIC_CAPS_ANY);
166
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
169     GST_PAD_SRC,
170     GST_PAD_SOMETIMES,
171     GST_STATIC_CAPS_ANY);
172
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
175 {
176   /* Output pad */
177   GstPad *pad;
178
179   /* Last flow return */
180   GstFlowReturn flow_ret;
181
182   /* Stream Type */
183   GstStreamType type;
184
185   /* Target track (reference) */
186   GstAdaptiveDemuxTrack *track;
187
188   /* Pending track (which will replace track) */
189   GstAdaptiveDemuxTrack *pending_track;
190
191   /* TRUE if a buffer or a gap event was pushed through this slot. */
192   gboolean pushed_timed_data;
193 } OutputSlot;
194
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
197
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200     GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203     element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
205     GstQuery * query);
206 static gboolean gst_adaptive_demux_send_event (GstElement * element,
207     GstEvent * event);
208
209 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
210
211 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
212     GstEvent * event);
213 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
214     GstObject * parent, GstBuffer * buffer);
215 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
216     GstQuery * query);
217 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
218     GstEvent * event);
219 static gboolean gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
220     GstEvent * event);
221 static gboolean gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux
222     * demux, GstEvent * event);
223
224 static gboolean
225 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
226
227 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
228 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
229 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
230     gboolean first_and_live);
231
232 static GstFlowReturn
233 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
234
235 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
236     demux);
237 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
238     demux);
239
240 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
241 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
242     gboolean stop_updates);
243
244 static gboolean
245 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
246     * demux);
247
248 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
249  * method to get to the padtemplates */
250 GType
251 gst_adaptive_demux_ng_get_type (void)
252 {
253   static gsize type = 0;
254
255   if (g_once_init_enter (&type)) {
256     GType _type;
257     static const GTypeInfo info = {
258       sizeof (GstAdaptiveDemuxClass),
259       NULL,
260       NULL,
261       (GClassInitFunc) gst_adaptive_demux_class_init,
262       NULL,
263       NULL,
264       sizeof (GstAdaptiveDemux),
265       0,
266       (GInstanceInitFunc) gst_adaptive_demux_init,
267     };
268
269     _type = g_type_register_static (GST_TYPE_BIN,
270         "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
271
272     private_offset =
273         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
274
275     g_once_init_leave (&type, _type);
276   }
277   return type;
278 }
279
280 static inline GstAdaptiveDemuxPrivate *
281 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
282 {
283   return (G_STRUCT_MEMBER_P (self, private_offset));
284 }
285
286 static void
287 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
288     const GValue * value, GParamSpec * pspec)
289 {
290   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
291
292   GST_OBJECT_LOCK (demux);
293
294   switch (prop_id) {
295     case PROP_CONNECTION_SPEED:
296       demux->connection_speed = g_value_get_uint (value) * 1000;
297       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
298           demux->connection_speed);
299       break;
300     case PROP_BANDWIDTH_TARGET_RATIO:
301       demux->bandwidth_target_ratio = g_value_get_float (value);
302       break;
303     case PROP_MIN_BITRATE:
304       demux->min_bitrate = g_value_get_uint (value);
305       break;
306     case PROP_MAX_BITRATE:
307       demux->max_bitrate = g_value_get_uint (value);
308       break;
309     case PROP_CONNECTION_BITRATE:
310       demux->connection_speed = g_value_get_uint (value);
311       break;
312       /* FIXME: Recalculate track and buffering levels
313        * when watermarks change? */
314     case PROP_MAX_BUFFERING_TIME:
315       demux->max_buffering_time = g_value_get_uint64 (value);
316       break;
317     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
318       demux->buffering_high_watermark_time = g_value_get_uint64 (value);
319       break;
320     case PROP_BUFFERING_LOW_WATERMARK_TIME:
321       demux->buffering_low_watermark_time = g_value_get_uint64 (value);
322       break;
323     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
324       demux->buffering_high_watermark_fragments = g_value_get_double (value);
325       break;
326     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
327       demux->buffering_low_watermark_fragments = g_value_get_double (value);
328       break;
329     default:
330       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
331       break;
332   }
333
334   GST_OBJECT_UNLOCK (demux);
335 }
336
337 static void
338 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
339     GValue * value, GParamSpec * pspec)
340 {
341   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
342
343   GST_OBJECT_LOCK (demux);
344
345   switch (prop_id) {
346     case PROP_CONNECTION_SPEED:
347       g_value_set_uint (value, demux->connection_speed / 1000);
348       break;
349     case PROP_BANDWIDTH_TARGET_RATIO:
350       g_value_set_float (value, demux->bandwidth_target_ratio);
351       break;
352     case PROP_MIN_BITRATE:
353       g_value_set_uint (value, demux->min_bitrate);
354       break;
355     case PROP_MAX_BITRATE:
356       g_value_set_uint (value, demux->max_bitrate);
357       break;
358     case PROP_CONNECTION_BITRATE:
359       g_value_set_uint (value, demux->connection_speed);
360       break;
361     case PROP_CURRENT_BANDWIDTH:
362       g_value_set_uint (value, demux->current_download_rate);
363       break;
364     case PROP_MAX_BUFFERING_TIME:
365       g_value_set_uint64 (value, demux->max_buffering_time);
366       break;
367     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
368       g_value_set_uint64 (value, demux->buffering_high_watermark_time);
369       break;
370     case PROP_BUFFERING_LOW_WATERMARK_TIME:
371       g_value_set_uint64 (value, demux->buffering_low_watermark_time);
372       break;
373     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
374       g_value_set_double (value, demux->buffering_high_watermark_fragments);
375       break;
376     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
377       g_value_set_double (value, demux->buffering_low_watermark_fragments);
378       break;
379     case PROP_CURRENT_LEVEL_TIME_VIDEO:
380       g_value_set_uint64 (value, demux->current_level_time_video);
381       break;
382     case PROP_CURRENT_LEVEL_TIME_AUDIO:
383       g_value_set_uint64 (value, demux->current_level_time_audio);
384       break;
385     default:
386       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
387       break;
388   }
389
390   GST_OBJECT_UNLOCK (demux);
391 }
392
393 static void
394 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
395 {
396   GObjectClass *gobject_class;
397   GstElementClass *gstelement_class;
398   GstBinClass *gstbin_class;
399
400   gobject_class = G_OBJECT_CLASS (klass);
401   gstelement_class = GST_ELEMENT_CLASS (klass);
402   gstbin_class = GST_BIN_CLASS (klass);
403
404   GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
405       "Base Adaptive Demux (ng)");
406
407   parent_class = g_type_class_peek_parent (klass);
408
409   if (private_offset != 0)
410     g_type_class_adjust_private_offset (klass, &private_offset);
411
412   gobject_class->set_property = gst_adaptive_demux_set_property;
413   gobject_class->get_property = gst_adaptive_demux_get_property;
414   gobject_class->finalize = gst_adaptive_demux_finalize;
415
416   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
417       g_param_spec_uint ("connection-speed", "Connection Speed",
418           "Network connection speed to use in kbps (0 = calculate from downloaded"
419           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
420           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
421
422   g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
423       g_param_spec_float ("bandwidth-target-ratio",
424           "Ratio of target bandwidth / available bandwidth",
425           "Limit of the available bitrate to use when switching to alternates",
426           0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
427           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428
429   g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
430       g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
431           "Network connection speed to use (0 = automatic) (bits/s)",
432           0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
433           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434
435   g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
436       g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
437           "Minimum bitrate to use when switching to alternates (bits/s)",
438           0, G_MAXUINT, DEFAULT_MIN_BITRATE,
439           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
442       g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
443           "Maximum bitrate to use when switching to alternates (bits/s)",
444           0, G_MAXUINT, DEFAULT_MAX_BITRATE,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446
447   g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
448       g_param_spec_uint ("current-bandwidth",
449           "Current download bandwidth (bits/s)",
450           "Report of current download bandwidth (based on arriving data) (bits/s)",
451           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
452
453   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
454       g_param_spec_uint64 ("max-buffering-time",
455           "Buffering maximum size (ns)",
456           "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
457           0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
458           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
459           G_PARAM_STATIC_STRINGS));
460
461   g_object_class_install_property (gobject_class,
462       PROP_BUFFERING_HIGH_WATERMARK_TIME,
463       g_param_spec_uint64 ("high-watermark-time",
464           "High buffering watermark size (ns)",
465           "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
466           0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
467           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468           G_PARAM_STATIC_STRINGS));
469
470   g_object_class_install_property (gobject_class,
471       PROP_BUFFERING_LOW_WATERMARK_TIME,
472       g_param_spec_uint64 ("low-watermark-time",
473           "Low buffering watermark size (ns)",
474           "Low watermark for parsed data below which downloads are resumed (in ns, 0=automatic)",
475           0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
476           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477           G_PARAM_STATIC_STRINGS));
478
479   g_object_class_install_property (gobject_class,
480       PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
481       g_param_spec_double ("high-watermark-fragments",
482           "High buffering watermark size (fragments)",
483           "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
484           0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
485           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
486           G_PARAM_STATIC_STRINGS));
487
488   g_object_class_install_property (gobject_class,
489       PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
490       g_param_spec_double ("low-watermark-fragments",
491           "Low buffering watermark size (fragments)",
492           "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
493           0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
494           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495           G_PARAM_STATIC_STRINGS));
496
497   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
498       g_param_spec_uint64 ("current-level-time-video",
499           "Currently buffered level of video (ns)",
500           "Currently buffered level of video track(s) (ns)",
501           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
502           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
503           G_PARAM_STATIC_STRINGS));
504
505   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
506       g_param_spec_uint64 ("current-level-time-audio",
507           "Currently buffered level of audio (ns)",
508           "Currently buffered level of audio track(s) (ns)",
509           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
510           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
511           G_PARAM_STATIC_STRINGS));
512
513   gst_element_class_add_static_pad_template (gstelement_class,
514       &gst_adaptive_demux_audiosrc_template);
515   gst_element_class_add_static_pad_template (gstelement_class,
516       &gst_adaptive_demux_videosrc_template);
517   gst_element_class_add_static_pad_template (gstelement_class,
518       &gst_adaptive_demux_subtitlesrc_template);
519
520   gstelement_class->change_state = gst_adaptive_demux_change_state;
521   gstelement_class->query = gst_adaptive_demux_query;
522   gstelement_class->send_event = gst_adaptive_demux_send_event;
523
524   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
525
526   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
527   klass->requires_periodical_playlist_update =
528       gst_adaptive_demux_requires_periodical_playlist_update_default;
529   gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
530 }
531
532 static void
533 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
534     GstAdaptiveDemuxClass * klass)
535 {
536   GstPadTemplate *pad_template;
537
538   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
539
540   demux->priv = gst_adaptive_demux_get_instance_private (demux);
541   demux->priv->input_adapter = gst_adapter_new ();
542   demux->realtime_clock = gst_adaptive_demux_clock_new ();
543
544   demux->download_helper = downloadhelper_new (demux->realtime_clock);
545   demux->priv->segment_seqnum = gst_util_seqnum_next ();
546   demux->have_group_id = FALSE;
547   demux->group_id = G_MAXUINT;
548
549   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
550   demux->instant_rate_multiplier = 1.0;
551
552   GST_OBJECT_FLAG_SET (demux, GST_BIN_FLAG_STREAMS_AWARE);
553   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
554       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
555
556   g_rec_mutex_init (&demux->priv->manifest_lock);
557
558   demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
559
560   g_mutex_init (&demux->priv->api_lock);
561   g_mutex_init (&demux->priv->segment_lock);
562
563   g_mutex_init (&demux->priv->tracks_lock);
564   g_cond_init (&demux->priv->tracks_add);
565
566   g_mutex_init (&demux->priv->buffering_lock);
567
568   demux->priv->periods = g_queue_new ();
569
570   pad_template =
571       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
572   g_return_if_fail (pad_template != NULL);
573
574   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
575   gst_pad_set_event_function (demux->sinkpad,
576       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
577   gst_pad_set_chain_function (demux->sinkpad,
578       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
579
580   /* Properties */
581   demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
582   demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
583   demux->min_bitrate = DEFAULT_MIN_BITRATE;
584   demux->max_bitrate = DEFAULT_MAX_BITRATE;
585
586   demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
587   demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
588   demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
589   demux->buffering_high_watermark_fragments =
590       DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
591   demux->buffering_low_watermark_fragments =
592       DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
593
594   demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
595   demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
596
597   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
598
599   demux->priv->duration = GST_CLOCK_TIME_NONE;
600
601   /* Output combiner */
602   demux->priv->flowcombiner = gst_flow_combiner_new ();
603
604   /* Output task */
605   g_rec_mutex_init (&demux->priv->output_lock);
606   demux->priv->output_task =
607       gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
608       NULL);
609   gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
610 }
611
612 static void
613 gst_adaptive_demux_finalize (GObject * object)
614 {
615   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
616   GstAdaptiveDemuxPrivate *priv = demux->priv;
617
618   GST_DEBUG_OBJECT (object, "finalize");
619
620   g_object_unref (priv->input_adapter);
621
622   downloadhelper_free (demux->download_helper);
623
624   g_rec_mutex_clear (&demux->priv->manifest_lock);
625   g_mutex_clear (&demux->priv->api_lock);
626   g_mutex_clear (&demux->priv->segment_lock);
627
628   g_mutex_clear (&demux->priv->buffering_lock);
629
630   gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
631
632   /* The input period is present after a reset, clear it now */
633   if (demux->input_period)
634     gst_adaptive_demux_period_unref (demux->input_period);
635
636   if (demux->realtime_clock) {
637     gst_adaptive_demux_clock_unref (demux->realtime_clock);
638     demux->realtime_clock = NULL;
639   }
640   g_object_unref (priv->output_task);
641   g_rec_mutex_clear (&priv->output_lock);
642
643   gst_flow_combiner_free (priv->flowcombiner);
644
645   g_queue_free (priv->periods);
646
647   G_OBJECT_CLASS (parent_class)->finalize (object);
648 }
649
650 static gboolean
651 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
652 {
653   gboolean ret = FALSE;
654   GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
655
656   if (parent) {
657     ret = GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE);
658     gst_object_unref (parent);
659   }
660
661   return ret;
662 }
663
664 static GstStateChangeReturn
665 gst_adaptive_demux_change_state (GstElement * element,
666     GstStateChange transition)
667 {
668   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
669   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
670
671   switch (transition) {
672     case GST_STATE_CHANGE_NULL_TO_READY:
673       if (!gst_adaptive_demux_check_streams_aware (demux)) {
674         GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
675             (_("Element requires a streams-aware context.")), (NULL));
676         goto fail_out;
677       }
678       break;
679     case GST_STATE_CHANGE_PAUSED_TO_READY:
680       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
681         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
682
683       gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
684       downloadhelper_stop (demux->download_helper);
685
686       TRACKS_LOCK (demux);
687       demux->priv->flushing = TRUE;
688       g_cond_signal (&demux->priv->tracks_add);
689       gst_task_stop (demux->priv->output_task);
690       TRACKS_UNLOCK (demux);
691
692       gst_task_join (demux->priv->output_task);
693
694       GST_API_LOCK (demux);
695       gst_adaptive_demux_reset (demux);
696       GST_API_UNLOCK (demux);
697       break;
698     case GST_STATE_CHANGE_READY_TO_PAUSED:
699       GST_API_LOCK (demux);
700       gst_adaptive_demux_reset (demux);
701
702       gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
703       if (g_atomic_int_get (&demux->priv->have_manifest))
704         gst_adaptive_demux_start_manifest_update_task (demux);
705       GST_API_UNLOCK (demux);
706       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
707         GST_DEBUG_OBJECT (demux, "demuxer has started running");
708       /* gst_task_start (demux->priv->output_task); */
709       break;
710     default:
711       break;
712   }
713
714   /* this must be run with the scheduler and output tasks stopped. */
715   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
716
717   switch (transition) {
718     case GST_STATE_CHANGE_READY_TO_PAUSED:
719       /* Start download task */
720       downloadhelper_start (demux->download_helper);
721       break;
722     default:
723       break;
724   }
725
726 fail_out:
727   return result;
728 }
729
730 static void
731 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
732     OutputSlot * slot)
733 {
734   GstEvent *eos = gst_event_new_eos ();
735   GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
736
737   /* FIXME: The slot might not have output any data, caps or segment yet */
738   gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
739   gst_pad_push_event (slot->pad, eos);
740   gst_pad_set_active (slot->pad, FALSE);
741   gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
742   gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
743   if (slot->track)
744     gst_adaptive_demux_track_unref (slot->track);
745   if (slot->pending_track)
746     gst_adaptive_demux_track_unref (slot->pending_track);
747
748   g_slice_free (OutputSlot, slot);
749 }
750
751 static OutputSlot *
752 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
753     GstStreamType streamtype)
754 {
755   OutputSlot *slot;
756   GstPadTemplate *tmpl;
757   gchar *name;
758
759   switch (streamtype) {
760     case GST_STREAM_TYPE_AUDIO:
761       name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
762       tmpl =
763           gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
764       break;
765     case GST_STREAM_TYPE_VIDEO:
766       name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
767       tmpl =
768           gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
769       break;
770     case GST_STREAM_TYPE_TEXT:
771       name =
772           g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
773       tmpl =
774           gst_static_pad_template_get
775           (&gst_adaptive_demux_subtitlesrc_template);
776       break;
777     default:
778       g_assert_not_reached ();
779       return NULL;
780   }
781
782   slot = g_slice_new0 (OutputSlot);
783   slot->type = streamtype;
784   slot->pushed_timed_data = FALSE;
785
786   /* Create and activate new pads */
787   slot->pad = gst_pad_new_from_template (tmpl, name);
788   g_free (name);
789   gst_object_unref (tmpl);
790
791   gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
792   gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
793   gst_pad_set_active (slot->pad, TRUE);
794
795   gst_pad_set_query_function (slot->pad,
796       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
797   gst_pad_set_event_function (slot->pad,
798       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
799
800   gst_pad_set_element_private (slot->pad, slot);
801
802   GST_INFO_OBJECT (demux, "Created output slot %s:%s",
803       GST_DEBUG_PAD_NAME (slot->pad));
804   return slot;
805 }
806
807 /* Called:
808  * * After `process_manifest` or when a period starts
809  * * Or when all tracks have been created
810  *
811  * Goes over tracks and creates the collection
812  *
813  * Returns TRUE if the collection was fully created.
814  *
815  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
816  * */
817 static gboolean
818 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
819     GstAdaptiveDemuxPeriod * period)
820 {
821   GstStreamCollection *collection;
822   GList *iter;
823
824   GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
825
826   if (!period->tracks_changed) {
827     GST_DEBUG_OBJECT (demux, "Tracks didn't change");
828     return TRUE;
829   }
830
831   if (!period->tracks) {
832     GST_WARNING_OBJECT (demux, "No tracks registered/present");
833     return FALSE;
834   }
835
836   if (gst_adaptive_demux_period_has_pending_tracks (period)) {
837     GST_DEBUG_OBJECT (demux,
838         "Streams still have pending tracks, not creating/updating collection");
839     return FALSE;
840   }
841
842   /* Update collection */
843   collection = gst_stream_collection_new ("adaptivedemux");
844
845   for (iter = period->tracks; iter; iter = iter->next) {
846     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
847
848     GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
849     gst_stream_collection_add_stream (collection,
850         gst_object_ref (track->stream_object));
851   }
852
853   if (period->collection)
854     gst_object_unref (period->collection);
855   period->collection = collection;
856
857   return TRUE;
858 }
859
860 /*
861  * Called for the output period:
862  * * after `update_collection()` if the input period is the same as the output period
863  * * When the output period changes
864  *
865  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
866  */
867 static gboolean
868 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
869 {
870   GstStreamCollection *collection;
871   GstAdaptiveDemuxPeriod *period = demux->output_period;
872   guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
873
874   g_return_val_if_fail (period, FALSE);
875   if (!period->collection) {
876     GST_DEBUG_OBJECT (demux, "No collection available yet");
877     return TRUE;
878   }
879
880   collection = period->collection;
881
882   GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
883       period->period_num);
884
885   /* Post collection */
886   TRACKS_UNLOCK (demux);
887   GST_MANIFEST_UNLOCK (demux);
888
889   gst_element_post_message (GST_ELEMENT_CAST (demux),
890       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
891
892   GST_MANIFEST_LOCK (demux);
893   TRACKS_LOCK (demux);
894
895   /* If no stream selection was handled, make a default selection */
896   if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
897     gst_adaptive_demux_period_select_default_tracks (demux,
898         demux->output_period);
899   }
900
901   /* Make sure the output task is running */
902   if (gst_adaptive_demux2_is_running (demux)) {
903     demux->priv->flushing = FALSE;
904     GST_DEBUG_OBJECT (demux, "Starting the output task");
905     gst_task_start (demux->priv->output_task);
906   }
907
908   return TRUE;
909 }
910
911 static gboolean
912 handle_incoming_manifest (GstAdaptiveDemux * demux)
913 {
914   GstAdaptiveDemuxClass *demux_class;
915   GstQuery *query;
916   gboolean query_res;
917   gboolean ret = TRUE;
918   gsize available;
919   GstBuffer *manifest_buffer;
920
921   GST_API_LOCK (demux);
922   GST_MANIFEST_LOCK (demux);
923
924   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
925
926   available = gst_adapter_available (demux->priv->input_adapter);
927
928   if (available == 0)
929     goto eos_without_data;
930
931   GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
932
933   /* Need to get the URI to use it as a base to generate the fragment's
934    * uris */
935   query = gst_query_new_uri ();
936   query_res = gst_pad_peer_query (demux->sinkpad, query);
937   if (query_res) {
938     gchar *uri, *redirect_uri;
939     gboolean permanent;
940
941     gst_query_parse_uri (query, &uri);
942     gst_query_parse_uri_redirection (query, &redirect_uri);
943     gst_query_parse_uri_redirection_permanent (query, &permanent);
944
945     if (permanent && redirect_uri) {
946       demux->manifest_uri = redirect_uri;
947       demux->manifest_base_uri = NULL;
948       g_free (uri);
949     } else {
950       demux->manifest_uri = uri;
951       demux->manifest_base_uri = redirect_uri;
952     }
953
954     GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
955         demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
956
957     if (!g_str_has_prefix (demux->manifest_uri, "data:")
958         && !g_str_has_prefix (demux->manifest_uri, "http://")
959         && !g_str_has_prefix (demux->manifest_uri, "https://")) {
960       GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
961           (_("Invalid manifest URI")),
962           ("Manifest URI needs to use either data:, http:// or https://"));
963       gst_query_unref (query);
964       ret = FALSE;
965       goto unlock_out;
966     }
967   } else {
968     GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
969   }
970   gst_query_unref (query);
971
972   /* If somehow we didn't receive a stream-start with a group_id, pick one now */
973   if (!demux->have_group_id) {
974     demux->have_group_id = TRUE;
975     demux->group_id = gst_util_group_id_next ();
976   }
977
978   /* Let the subclass parse the manifest */
979   manifest_buffer =
980       gst_adapter_take_buffer (demux->priv->input_adapter, available);
981   ret = demux_class->process_manifest (demux, manifest_buffer);
982   gst_buffer_unref (manifest_buffer);
983
984   gst_element_post_message (GST_ELEMENT_CAST (demux),
985       gst_message_new_element (GST_OBJECT_CAST (demux),
986           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
987               "manifest-uri", G_TYPE_STRING,
988               demux->manifest_uri, "uri", G_TYPE_STRING,
989               demux->manifest_uri,
990               "manifest-download-start", GST_TYPE_CLOCK_TIME,
991               GST_CLOCK_TIME_NONE,
992               "manifest-download-stop", GST_TYPE_CLOCK_TIME,
993               gst_util_get_timestamp (), NULL)));
994
995   if (!ret)
996     goto invalid_manifest;
997
998   /* Streams should have been added to the input period if the manifest parsing
999    * succeeded */
1000   if (!demux->input_period->streams)
1001     goto no_streams;
1002
1003   g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1004
1005   GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1006   /* Send duration message */
1007   if (!gst_adaptive_demux_is_live (demux)) {
1008     GstClockTime duration = demux_class->get_duration (demux);
1009
1010     demux->priv->duration = duration;
1011     if (duration != GST_CLOCK_TIME_NONE) {
1012       GST_DEBUG_OBJECT (demux,
1013           "Sending duration message : %" GST_TIME_FORMAT,
1014           GST_TIME_ARGS (duration));
1015       gst_element_post_message (GST_ELEMENT (demux),
1016           gst_message_new_duration_changed (GST_OBJECT (demux)));
1017     } else {
1018       GST_DEBUG_OBJECT (demux,
1019           "media duration unknown, can not send the duration message");
1020     }
1021   }
1022
1023   TRACKS_LOCK (demux);
1024   /* New streams/tracks will have been added to the input period */
1025   /* The input period has streams, make it the active output period */
1026   /* FIXME : Factorize this into a function to make a period active */
1027   demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1028   ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1029       gst_adaptive_demux_post_collection (demux);
1030   TRACKS_UNLOCK (demux);
1031
1032   gst_adaptive_demux_prepare_streams (demux,
1033       gst_adaptive_demux_is_live (demux));
1034   gst_adaptive_demux_start_tasks (demux);
1035   gst_adaptive_demux_start_manifest_update_task (demux);
1036
1037 unlock_out:
1038   GST_MANIFEST_UNLOCK (demux);
1039   GST_API_UNLOCK (demux);
1040
1041   return ret;
1042
1043   /* ERRORS */
1044 eos_without_data:
1045   {
1046     GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1047     ret = FALSE;
1048     goto unlock_out;
1049   }
1050
1051 no_streams:
1052   {
1053     /* no streams */
1054     GST_WARNING_OBJECT (demux, "No streams created from manifest");
1055     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1056         (_("This file contains no playable streams.")),
1057         ("No known stream formats found at the Manifest"));
1058     ret = FALSE;
1059     goto unlock_out;
1060   }
1061
1062 invalid_manifest:
1063   {
1064     GST_MANIFEST_UNLOCK (demux);
1065     GST_API_UNLOCK (demux);
1066
1067     /* In most cases, this will happen if we set a wrong url in the
1068      * source element and we have received the 404 HTML response instead of
1069      * the manifest */
1070     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1071     return FALSE;
1072   }
1073 }
1074
1075 struct http_headers_collector
1076 {
1077   GstAdaptiveDemux *demux;
1078   gchar **cookies;
1079 };
1080
1081 static gboolean
1082 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1083     const GValue * value, gpointer userdata)
1084 {
1085   struct http_headers_collector *hdr_data = userdata;
1086   GstAdaptiveDemux *demux = hdr_data->demux;
1087   const gchar *field_name = g_quark_to_string (field_id);
1088
1089   if (G_UNLIKELY (value == NULL))
1090     return TRUE;                /* This should not happen */
1091
1092   if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1093     const gchar *user_agent = g_value_get_string (value);
1094
1095     GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1096     downloadhelper_set_user_agent (demux->download_helper, user_agent);
1097   }
1098
1099   if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1100       g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1101     guint i = 0, prev_len = 0, total_len = 0;
1102     gchar **cookies = NULL;
1103
1104     if (hdr_data->cookies != NULL)
1105       prev_len = g_strv_length (hdr_data->cookies);
1106
1107     if (GST_VALUE_HOLDS_ARRAY (value)) {
1108       total_len = gst_value_array_get_size (value) + prev_len;
1109       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1110
1111       for (i = 0; i < gst_value_array_get_size (value); i++) {
1112         GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1113             g_value_get_string (gst_value_array_get_value (value, i)));
1114         cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1115       }
1116     } else if (G_VALUE_HOLDS_STRING (value)) {
1117       total_len = 1 + prev_len;
1118       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1119
1120       GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1121           g_value_get_string (value));
1122       cookies[0] = g_value_dup_string (value);
1123     } else {
1124       GST_WARNING_OBJECT (demux, "%s field is not string or array",
1125           g_quark_to_string (field_id));
1126     }
1127
1128     if (cookies) {
1129       if (prev_len) {
1130         guint j;
1131         for (j = 0; j < prev_len; j++) {
1132           GST_DEBUG_OBJECT (demux,
1133               "Append existing cookie %s", hdr_data->cookies[j]);
1134           cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1135         }
1136       }
1137       cookies[total_len] = NULL;
1138
1139       g_strfreev (hdr_data->cookies);
1140       hdr_data->cookies = cookies;
1141     }
1142   }
1143
1144   if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1145     const gchar *referer = g_value_get_string (value);
1146     GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1147
1148     downloadhelper_set_referer (demux->download_helper, referer);
1149   }
1150
1151   /* Date header can be used to estimate server offset */
1152   if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1153     const gchar *http_date = g_value_get_string (value);
1154
1155     if (http_date) {
1156       GstDateTime *datetime =
1157           gst_adaptive_demux_util_parse_http_head_date (http_date);
1158
1159       if (datetime) {
1160         GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1161         gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1162
1163         GST_INFO_OBJECT (demux,
1164             "HTTP response Date %s", GST_STR_NULL (date_string));
1165         g_free (date_string);
1166
1167         gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1168
1169         g_date_time_unref (utc_now);
1170         gst_date_time_unref (datetime);
1171       }
1172     }
1173   }
1174
1175   return TRUE;
1176 }
1177
1178 static gboolean
1179 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1180     GstEvent * event)
1181 {
1182   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1183   gboolean ret;
1184
1185   switch (event->type) {
1186     case GST_EVENT_FLUSH_STOP:{
1187       GST_API_LOCK (demux);
1188       GST_MANIFEST_LOCK (demux);
1189
1190       gst_adaptive_demux_reset (demux);
1191
1192       ret = gst_pad_event_default (pad, parent, event);
1193
1194       GST_MANIFEST_UNLOCK (demux);
1195       GST_API_UNLOCK (demux);
1196
1197       return ret;
1198     }
1199     case GST_EVENT_EOS:
1200     {
1201       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1202         if (!handle_incoming_manifest (demux)) {
1203           GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1204           return gst_pad_event_default (pad, parent, event);
1205         }
1206         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1207       } else {
1208         GST_ERROR_OBJECT (demux,
1209             "Failed to acquire scheduler to handle manifest");
1210         return gst_pad_event_default (pad, parent, event);
1211       }
1212       gst_event_unref (event);
1213       return TRUE;
1214     }
1215     case GST_EVENT_STREAM_START:
1216       if (gst_event_parse_group_id (event, &demux->group_id))
1217         demux->have_group_id = TRUE;
1218       else
1219         demux->have_group_id = FALSE;
1220       /* Swallow stream-start, we'll push our own */
1221       gst_event_unref (event);
1222       return TRUE;
1223     case GST_EVENT_SEGMENT:
1224       /* Swallow newsegments, we'll push our own */
1225       gst_event_unref (event);
1226       return TRUE;
1227     case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1228       const GstStructure *structure = gst_event_get_structure (event);
1229       struct http_headers_collector c = { demux, NULL };
1230
1231       if (gst_structure_has_name (structure, "http-headers")) {
1232         if (gst_structure_has_field (structure, "request-headers")) {
1233           GstStructure *req_headers = NULL;
1234           gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1235               &req_headers, NULL);
1236           if (req_headers) {
1237             gst_structure_foreach (req_headers,
1238                 gst_adaptive_demux_handle_upstream_http_header, &c);
1239             gst_structure_free (req_headers);
1240           }
1241         }
1242         if (gst_structure_has_field (structure, "response-headers")) {
1243           GstStructure *res_headers = NULL;
1244           gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1245               &res_headers, NULL);
1246           if (res_headers) {
1247             gst_structure_foreach (res_headers,
1248                 gst_adaptive_demux_handle_upstream_http_header, &c);
1249             gst_structure_free (res_headers);
1250           }
1251         }
1252
1253         if (c.cookies)
1254           downloadhelper_set_cookies (demux->download_helper, c.cookies);
1255       }
1256       break;
1257     }
1258     default:
1259       break;
1260   }
1261
1262   return gst_pad_event_default (pad, parent, event);
1263 }
1264
1265 static GstFlowReturn
1266 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1267     GstBuffer * buffer)
1268 {
1269   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1270
1271   GST_MANIFEST_LOCK (demux);
1272
1273   gst_adapter_push (demux->priv->input_adapter, buffer);
1274
1275   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1276       (gint) gst_adapter_available (demux->priv->input_adapter));
1277
1278   GST_MANIFEST_UNLOCK (demux);
1279   return GST_FLOW_OK;
1280 }
1281
1282
1283 /* Called with TRACKS_LOCK taken */
1284 static void
1285 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1286 {
1287   GList *tmp;
1288
1289   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1290     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1291
1292     gst_adaptive_demux_track_flush (track);
1293     if (gst_pad_is_active (track->sinkpad)) {
1294       gst_pad_set_active (track->sinkpad, FALSE);
1295       gst_pad_set_active (track->sinkpad, TRUE);
1296     }
1297   }
1298 }
1299
1300 /* Resets all tracks to their initial state, ready to receive new data. */
1301 static void
1302 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1303 {
1304   TRACKS_LOCK (demux);
1305   g_queue_foreach (demux->priv->periods,
1306       (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1307   TRACKS_UNLOCK (demux);
1308 }
1309
1310 /* Subclasses will call this function to ensure that a new input period is
1311  * available to receive new streams and tracks */
1312 gboolean
1313 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1314 {
1315   if (demux->input_period && !demux->input_period->prepared) {
1316     GST_DEBUG_OBJECT (demux, "Using existing input period");
1317     return TRUE;
1318   }
1319
1320   if (demux->input_period) {
1321     GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1322     demux->input_period->has_next_period = TRUE;
1323   }
1324   GST_DEBUG_OBJECT (demux, "Setting up new period");
1325
1326   demux->input_period = gst_adaptive_demux_period_new (demux);
1327
1328   return TRUE;
1329 }
1330
1331 /* must be called with manifest_lock taken */
1332 static void
1333 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1334 {
1335   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1336   GList *iter;
1337
1338   gst_adaptive_demux_stop_tasks (demux, TRUE);
1339
1340   if (klass->reset)
1341     klass->reset (demux);
1342
1343   /* Disable and remove all outputs */
1344   GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1345   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1346     gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1347   }
1348   g_list_free (demux->priv->outputs);
1349   demux->priv->outputs = NULL;
1350
1351   g_queue_clear_full (demux->priv->periods,
1352       (GDestroyNotify) gst_adaptive_demux_period_unref);
1353
1354   /* The output period always has an extra ref taken on it */
1355   if (demux->output_period)
1356     gst_adaptive_demux_period_unref (demux->output_period);
1357   demux->output_period = NULL;
1358   /* The input period doesn't have an extra ref taken on it */
1359   demux->input_period = NULL;
1360
1361   gst_adaptive_demux_start_new_period (demux);
1362
1363   g_free (demux->manifest_uri);
1364   g_free (demux->manifest_base_uri);
1365   demux->manifest_uri = NULL;
1366   demux->manifest_base_uri = NULL;
1367
1368   gst_adapter_clear (demux->priv->input_adapter);
1369   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1370
1371   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1372   demux->instant_rate_multiplier = 1.0;
1373
1374   demux->priv->duration = GST_CLOCK_TIME_NONE;
1375
1376   demux->priv->percent = -1;
1377   demux->priv->is_buffering = TRUE;
1378
1379   demux->have_group_id = FALSE;
1380   demux->group_id = G_MAXUINT;
1381   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1382
1383   demux->priv->global_output_position = 0;
1384
1385   demux->priv->n_audio_streams = 0;
1386   demux->priv->n_video_streams = 0;
1387   demux->priv->n_subtitle_streams = 0;
1388
1389   gst_flow_combiner_reset (demux->priv->flowcombiner);
1390 }
1391
1392 static gboolean
1393 gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
1394 {
1395   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1396   gboolean res = FALSE;
1397
1398   GST_DEBUG_OBJECT (demux, "Received event %" GST_PTR_FORMAT, event);
1399
1400   switch (GST_EVENT_TYPE (event)) {
1401     case GST_EVENT_SEEK:
1402     {
1403       res = gst_adaptive_demux_handle_seek_event (demux, event);
1404       break;
1405     }
1406     case GST_EVENT_SELECT_STREAMS:
1407     {
1408       res = gst_adaptive_demux_handle_select_streams_event (demux, event);
1409       break;
1410     }
1411     default:
1412       res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1413       break;
1414   }
1415   return res;
1416 }
1417
1418 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1419 static GstAdaptiveDemux2Stream *
1420 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1421 {
1422   GList *iter;
1423
1424   /* We only look in the streams of the input period (i.e. with active streams) */
1425   for (iter = demux->input_period->streams; iter; iter = iter->next) {
1426     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1427     if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1428       return stream;
1429     }
1430   }
1431
1432   return NULL;
1433 }
1434
1435 static void
1436 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1437     GstMessage * msg)
1438 {
1439   GstAdaptiveDemux2Stream *stream;
1440   GstStreamCollection *collection = NULL;
1441   gboolean pending_tracks_activated = FALSE;
1442
1443   GST_MANIFEST_LOCK (demux);
1444
1445   stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1446   if (stream == NULL) {
1447     GST_WARNING_OBJECT (demux,
1448         "Failed to locate stream for collection message");
1449     goto beach;
1450   }
1451
1452   gst_message_parse_stream_collection (msg, &collection);
1453   if (!collection)
1454     goto beach;
1455
1456   TRACKS_LOCK (demux);
1457
1458   if (!gst_adaptive_demux2_stream_handle_collection (stream, collection,
1459           &pending_tracks_activated)) {
1460     TRACKS_UNLOCK (demux);
1461
1462     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1463         (_("Stream format can't be handled")),
1464         ("The streams provided by the multiplex are ambiguous"));
1465     goto beach;
1466   }
1467
1468   if (pending_tracks_activated) {
1469     /* If pending tracks were handled, then update the demuxer collection */
1470     if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1471         demux->input_period == demux->output_period) {
1472       gst_adaptive_demux_post_collection (demux);
1473     }
1474
1475     /* If we discovered pending tracks and we no longer have any, we can ensure
1476      * selected tracks are started */
1477     if (!gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1478       GList *iter = demux->input_period->streams;
1479       for (; iter; iter = iter->next) {
1480         GstAdaptiveDemux2Stream *new_stream = iter->data;
1481
1482         /* The stream that posted this collection was already started. If a
1483          * different stream is now selected, start it */
1484         if (stream != new_stream
1485             && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1486           gst_adaptive_demux2_stream_start (new_stream);
1487       }
1488     }
1489   }
1490   TRACKS_UNLOCK (demux);
1491
1492 beach:
1493   GST_MANIFEST_UNLOCK (demux);
1494
1495   if (collection)
1496     gst_object_unref (collection);
1497   gst_message_unref (msg);
1498   msg = NULL;
1499 }
1500
1501 static void
1502 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1503 {
1504   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1505
1506   switch (GST_MESSAGE_TYPE (msg)) {
1507     case GST_MESSAGE_STREAM_COLLECTION:
1508     {
1509       gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1510       return;
1511     }
1512     case GST_MESSAGE_ERROR:{
1513       GstAdaptiveDemux2Stream *stream = NULL;
1514       GError *err = NULL;
1515       gchar *debug = NULL;
1516       gchar *new_error = NULL;
1517       const GstStructure *details = NULL;
1518
1519       GST_MANIFEST_LOCK (demux);
1520
1521       stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1522       if (stream == NULL) {
1523         GST_WARNING_OBJECT (demux,
1524             "Failed to locate stream for errored element");
1525         GST_MANIFEST_UNLOCK (demux);
1526         break;
1527       }
1528
1529       gst_message_parse_error (msg, &err, &debug);
1530
1531       GST_WARNING_OBJECT (demux,
1532           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1533           err->message, debug);
1534
1535       if (debug)
1536         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1537       if (new_error) {
1538         g_free (err->message);
1539         err->message = new_error;
1540       }
1541
1542       gst_message_parse_error_details (msg, &details);
1543       if (details) {
1544         gst_structure_get_uint (details, "http-status-code",
1545             &stream->last_status_code);
1546       }
1547
1548       /* error, but ask to retry */
1549       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1550         gst_adaptive_demux2_stream_parse_error (stream, err);
1551         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1552       }
1553
1554       g_error_free (err);
1555       g_free (debug);
1556
1557       GST_MANIFEST_UNLOCK (demux);
1558
1559       gst_message_unref (msg);
1560       msg = NULL;
1561     }
1562       break;
1563     default:
1564       break;
1565   }
1566
1567   if (msg)
1568     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1569 }
1570
1571 /* must be called with manifest_lock taken */
1572 GstClockTime
1573 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1574 {
1575   GstAdaptiveDemuxClass *klass;
1576
1577   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1578
1579   if (klass->get_period_start_time == NULL)
1580     return 0;
1581
1582   return klass->get_period_start_time (demux);
1583 }
1584
1585 /* must be called with manifest_lock taken */
1586 static gboolean
1587 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1588     gboolean first_and_live)
1589 {
1590   GList *iter;
1591   GstClockTime period_start;
1592   GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1593   GList *new_streams;
1594
1595   g_return_val_if_fail (demux->input_period->streams, FALSE);
1596   g_assert (demux->input_period->prepared == FALSE);
1597
1598   new_streams = demux->input_period->streams;
1599
1600   if (!gst_adaptive_demux2_is_running (demux)) {
1601     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1602     return TRUE;
1603   }
1604
1605   GST_DEBUG_OBJECT (demux,
1606       "Preparing %d streams for period %d , first_and_live:%d",
1607       g_list_length (new_streams), demux->input_period->period_num,
1608       first_and_live);
1609
1610   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1611     GstAdaptiveDemux2Stream *stream = iter->data;
1612
1613     GST_DEBUG_OBJECT (stream, "Preparing stream");
1614
1615     stream->need_header = TRUE;
1616     stream->discont = TRUE;
1617
1618     /* Grab the first stream time for live streams
1619      * * If the stream is selected
1620      * * Or it provides dynamic tracks (in which case we need to force an update)
1621      */
1622     if (first_and_live
1623         && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1624             || stream->pending_tracks)) {
1625       /* TODO we only need the first timestamp, maybe create a simple function to
1626        * get the current PTS of a fragment ? */
1627       GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1628       gst_adaptive_demux2_stream_update_fragment_info (stream);
1629
1630       GST_DEBUG_OBJECT (stream,
1631           "Got stream time %" GST_STIME_FORMAT,
1632           GST_STIME_ARGS (stream->fragment.stream_time));
1633
1634       if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1635         min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1636       } else {
1637         min_stream_time = stream->fragment.stream_time;
1638       }
1639     }
1640   }
1641
1642   period_start = gst_adaptive_demux_get_period_start_time (demux);
1643
1644   /* For live streams, the subclass is supposed to seek to the current fragment
1645    * and then tell us its stream time in stream->fragment.stream_time.  We now
1646    * also have to seek our demuxer segment to reflect this.
1647    *
1648    * FIXME: This needs some refactoring at some point.
1649    */
1650   if (first_and_live) {
1651     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1652         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1653         GST_SEEK_TYPE_NONE, -1, NULL);
1654   }
1655
1656   GST_DEBUG_OBJECT (demux,
1657       "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1658       " demux segment %" GST_SEGMENT_FORMAT,
1659       GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1660       &demux->segment);
1661
1662   /* Synchronize stream start/current positions */
1663   if (min_stream_time == GST_CLOCK_STIME_NONE)
1664     min_stream_time = period_start;
1665   else
1666     min_stream_time += period_start;
1667   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1668     GstAdaptiveDemux2Stream *stream = iter->data;
1669     stream->start_position = stream->current_position = min_stream_time;
1670   }
1671
1672   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1673     GstAdaptiveDemux2Stream *stream = iter->data;
1674     stream->compute_segment = TRUE;
1675     stream->first_and_live = first_and_live;
1676   }
1677   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1678   demux->input_period->prepared = TRUE;
1679
1680   return TRUE;
1681 }
1682
1683 static GstAdaptiveDemuxTrack *
1684 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1685 {
1686   GList *tmp;
1687
1688   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1689     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1690     if (!g_strcmp0 (track->stream_id, stream_id))
1691       return track;
1692   }
1693
1694   return NULL;
1695 }
1696
1697 /* TRACKS_LOCK hold */
1698 void
1699 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1700 {
1701   GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1702   GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1703   GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1704   GList *tmp;
1705   gint min_percent = -1, percent;
1706   gboolean all_eos = TRUE;
1707
1708   /* Go over all active tracks of the output period and update level */
1709
1710   /* Check that all tracks are above their respective low thresholds (different
1711    * tracks may have different fragment durations yielding different buffering
1712    * percentages) Overall buffering percent is the lowest. */
1713   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1714     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1715
1716     GST_LOG_OBJECT (demux,
1717         "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1718         GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1719         track->stream_id, track->period_num, track->active, track->selected,
1720         track->eos, GST_TIME_ARGS (track->level_time),
1721         GST_TIME_ARGS (track->buffering_threshold));
1722
1723     if (track->active && track->selected) {
1724       if (!track->eos) {
1725         gint cur_percent;
1726
1727         all_eos = FALSE;
1728         if (min_level_time == GST_CLOCK_TIME_NONE) {
1729           min_level_time = track->level_time;
1730         } else if (track->level_time < min_level_time) {
1731           min_level_time = track->level_time;
1732         }
1733
1734         if (track->type & GST_STREAM_TYPE_VIDEO
1735             && video_level_time > track->level_time)
1736           video_level_time = track->level_time;
1737
1738         if (track->type & GST_STREAM_TYPE_AUDIO
1739             && audio_level_time > track->level_time)
1740           audio_level_time = track->level_time;
1741
1742         if (track->level_time != GST_CLOCK_TIME_NONE
1743             && track->buffering_threshold != 0) {
1744           cur_percent =
1745               gst_util_uint64_scale (track->level_time, 100,
1746               track->buffering_threshold);
1747           if (min_percent < 0 || cur_percent < min_percent)
1748             min_percent = cur_percent;
1749         }
1750       }
1751     }
1752   }
1753
1754   GST_DEBUG_OBJECT (demux,
1755       "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1756       GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1757
1758   /* Update demuxer video/audio level properties */
1759   GST_OBJECT_LOCK (demux);
1760   demux->current_level_time_video = video_level_time;
1761   demux->current_level_time_audio = audio_level_time;
1762   GST_OBJECT_UNLOCK (demux);
1763
1764   if (min_percent < 0 && !all_eos)
1765     return;
1766
1767   if (min_percent > 100 || all_eos)
1768     percent = 100;
1769   else
1770     percent = MAX (0, min_percent);
1771
1772   GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1773
1774   if (demux->priv->is_buffering) {
1775     if (percent >= 100)
1776       demux->priv->is_buffering = FALSE;
1777     if (demux->priv->percent != percent) {
1778       demux->priv->percent = percent;
1779       demux->priv->percent_changed = TRUE;
1780     }
1781   } else if (percent < 1) {
1782     demux->priv->is_buffering = TRUE;
1783     if (demux->priv->percent != percent) {
1784       demux->priv->percent = percent;
1785       demux->priv->percent_changed = TRUE;
1786     }
1787   }
1788
1789   if (demux->priv->percent_changed)
1790     GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1791         demux->priv->is_buffering);
1792 }
1793
1794 /* With TRACKS_LOCK held */
1795 void
1796 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1797 {
1798   gint percent;
1799   GstMessage *msg;
1800
1801   if (!demux->priv->percent_changed)
1802     return;
1803
1804   BUFFERING_LOCK (demux);
1805   percent = demux->priv->percent;
1806   msg = gst_message_new_buffering ((GstObject *) demux, percent);
1807   TRACKS_UNLOCK (demux);
1808   gst_element_post_message ((GstElement *) demux, msg);
1809
1810   BUFFERING_UNLOCK (demux);
1811   TRACKS_LOCK (demux);
1812   if (percent == demux->priv->percent)
1813     demux->priv->percent_changed = FALSE;
1814 }
1815
1816 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1817 GstAdaptiveDemux2Stream *
1818 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1819     GstAdaptiveDemuxTrack * track)
1820 {
1821   GList *iter;
1822
1823   for (iter = demux->output_period->streams; iter; iter = iter->next) {
1824     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1825     if (g_list_find (stream->tracks, track))
1826       return stream;
1827   }
1828
1829   return NULL;
1830 }
1831
1832 /* Called from seek handler
1833  *
1834  * This function is used when a (flushing) seek caused a new period to be activated.
1835  *
1836  * This will ensure that:
1837  * * the current output period is marked as finished (EOS)
1838  * * Any potential intermediate (non-input/non-output) periods are removed
1839  * * That the new input period is prepared and ready
1840  */
1841 static void
1842 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1843 {
1844   GList *iter;
1845
1846   GST_DEBUG_OBJECT (demux,
1847       "Preparing new input period %u", demux->input_period->period_num);
1848
1849   /* Prepare the new input period */
1850   gst_adaptive_demux_update_collection (demux, demux->input_period);
1851
1852   /* Transfer the previous selection to the new input period */
1853   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
1854       demux->output_period);
1855   gst_adaptive_demux_prepare_streams (demux, FALSE);
1856
1857   /* Remove all periods except for the input (last) and output (first) period */
1858   while (demux->priv->periods->length > 2) {
1859     GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
1860     /* Mark all tracks of the removed period as not selected and EOS so they
1861      * will be skipped / ignored */
1862     for (iter = period->tracks; iter; iter = iter->next) {
1863       GstAdaptiveDemuxTrack *track = iter->data;
1864       track->selected = FALSE;
1865       track->eos = TRUE;
1866     }
1867     gst_adaptive_demux_period_unref (period);
1868   }
1869
1870   /* Mark all tracks of the output period as EOS so that the output loop
1871    * will immediately move to the new period */
1872   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
1873     GstAdaptiveDemuxTrack *track = iter->data;
1874     track->eos = TRUE;
1875   }
1876
1877   /* Go over all slots, and clear any pending track */
1878   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1879     OutputSlot *slot = (OutputSlot *) iter->data;
1880
1881     if (slot->pending_track != NULL) {
1882       GST_DEBUG_OBJECT (demux,
1883           "Removing track '%s' as pending from output of current track '%s'",
1884           slot->pending_track->stream_id, slot->track->stream_id);
1885       gst_adaptive_demux_track_unref (slot->pending_track);
1886       slot->pending_track = NULL;
1887     }
1888   }
1889 }
1890
1891 /* must be called with manifest_lock taken */
1892 gboolean
1893 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1894     gint64 * range_start, gint64 * range_stop)
1895 {
1896   GstAdaptiveDemuxClass *klass;
1897
1898   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1899
1900   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1901
1902   return klass->get_live_seek_range (demux, range_start, range_stop);
1903 }
1904
1905 /* must be called with manifest_lock taken */
1906 gboolean
1907 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1908     GstAdaptiveDemux2Stream * stream)
1909 {
1910   gint64 range_start, range_stop;
1911   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1912     GST_LOG_OBJECT (stream,
1913         "stream position %" GST_TIME_FORMAT "  live seek range %"
1914         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1915         GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
1916         GST_STIME_ARGS (range_stop));
1917     return (stream->current_position >= range_start
1918         && stream->current_position <= range_stop);
1919   }
1920
1921   return FALSE;
1922 }
1923
1924 /* must be called with manifest_lock taken */
1925 static gboolean
1926 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1927 {
1928   GstAdaptiveDemuxClass *klass;
1929
1930   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1931   if (gst_adaptive_demux_is_live (demux)) {
1932     return klass->get_live_seek_range != NULL;
1933   }
1934
1935   return klass->seek != NULL;
1936 }
1937
1938 static void
1939 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
1940     GstSeekType start_type, GstSeekType stop_type)
1941 {
1942   GList *iter;
1943
1944   for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
1945     GstAdaptiveDemux2Stream *stream = iter->data;
1946
1947     /* Make sure the download loop clears and restarts on the next start,
1948      * which will recompute the stream segment */
1949     g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1950         stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
1951     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
1952     stream->start_position = 0;
1953
1954     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1955       stream->start_position = demux->segment.start;
1956     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1957       stream->start_position = demux->segment.stop;
1958   }
1959 }
1960
1961 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
1962                               GST_SEEK_FLAG_SNAP_AFTER |          \
1963                               GST_SEEK_FLAG_SNAP_NEAREST |        \
1964                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1965                               GST_SEEK_FLAG_KEY_UNIT))
1966 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1967                               GST_SEEK_FLAG_SNAP_AFTER | \
1968                               GST_SEEK_FLAG_SNAP_NEAREST))
1969
1970 static gboolean
1971 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
1972     GstEvent * event)
1973 {
1974   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1975   gdouble rate;
1976   GstFormat format;
1977   GstSeekFlags flags;
1978   GstSeekType start_type, stop_type;
1979   gint64 start, stop;
1980   guint32 seqnum;
1981   gboolean update;
1982   gboolean ret = FALSE;
1983   GstSegment oldsegment;
1984   GstEvent *flush_event;
1985
1986   GST_INFO_OBJECT (demux, "Received seek event");
1987
1988   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1989       &stop_type, &stop);
1990
1991   if (format != GST_FORMAT_TIME) {
1992     GST_WARNING_OBJECT (demux,
1993         "Adaptive demuxers only support TIME-based seeking");
1994     gst_event_unref (event);
1995     return FALSE;
1996   }
1997
1998   if (flags & GST_SEEK_FLAG_SEGMENT) {
1999     GST_FIXME_OBJECT (demux, "Handle segment seeks");
2000     gst_event_unref (event);
2001     return FALSE;
2002   }
2003
2004   seqnum = gst_event_get_seqnum (event);
2005
2006   GST_API_LOCK (demux);
2007   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2008     GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2009     GST_API_UNLOCK (demux);
2010     return FALSE;
2011   }
2012
2013   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2014     /* For instant rate seeks, reply directly and update
2015      * our segment so the new rate is reflected in any future
2016      * fragments */
2017     GstEvent *ev;
2018     gdouble rate_multiplier;
2019
2020     /* instant rate change only supported if direction does not change. All
2021      * other requirements are already checked before creating the seek event
2022      * but let's double-check here to be sure */
2023     if ((demux->segment.rate > 0 && rate < 0) ||
2024         (demux->segment.rate < 0 && rate > 0) ||
2025         start_type != GST_SEEK_TYPE_NONE ||
2026         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2027       GST_ERROR_OBJECT (demux,
2028           "Instant rate change seeks only supported in the "
2029           "same direction, without flushing and position change");
2030       goto unlock_return;
2031     }
2032
2033     rate_multiplier = rate / demux->segment.rate;
2034
2035     ev = gst_event_new_instant_rate_change (rate_multiplier,
2036         (GstSegmentFlags) flags);
2037     gst_event_set_seqnum (ev, seqnum);
2038
2039     ret = gst_adaptive_demux_push_src_event (demux, ev);
2040
2041     if (ret) {
2042       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2043       demux->instant_rate_multiplier = rate_multiplier;
2044       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2045     }
2046     goto unlock_return;
2047   }
2048
2049   if (!gst_adaptive_demux_can_seek (demux))
2050     goto unlock_return;
2051
2052   /* We can only accept flushing seeks from this point onward */
2053   if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2054     GST_ERROR_OBJECT (demux,
2055         "Non-flushing non-instant-rate seeks are not possible");
2056     goto unlock_return;
2057   }
2058
2059   if (gst_adaptive_demux_is_live (demux)) {
2060     gint64 range_start, range_stop;
2061     gboolean changed = FALSE;
2062     gboolean start_valid = TRUE, stop_valid = TRUE;
2063
2064     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2065             &range_stop)) {
2066       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2067       goto unlock_return;
2068     }
2069
2070     GST_DEBUG_OBJECT (demux,
2071         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2072         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2073
2074     /* Handle relative positioning for live streams (relative to the range_stop) */
2075     if (start_type == GST_SEEK_TYPE_END) {
2076       start = range_stop + start;
2077       start_type = GST_SEEK_TYPE_SET;
2078       changed = TRUE;
2079     }
2080     if (stop_type == GST_SEEK_TYPE_END) {
2081       stop = range_stop + stop;
2082       stop_type = GST_SEEK_TYPE_SET;
2083       changed = TRUE;
2084     }
2085
2086     /* Adjust the requested start/stop position if it falls beyond the live
2087      * seek range.
2088      * The only case where we don't adjust is for the starting point of
2089      * an accurate seek (start if forward and stop if backwards)
2090      */
2091     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2092         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2093       GST_DEBUG_OBJECT (demux,
2094           "seek before live stream start, setting to range start: %"
2095           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2096       start = range_start;
2097       changed = TRUE;
2098     }
2099     /* truncate stop position also if set */
2100     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2101         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2102       GST_DEBUG_OBJECT (demux,
2103           "seek ending after live start, adjusting to: %"
2104           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2105       stop = range_stop;
2106       changed = TRUE;
2107     }
2108
2109     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2110         (start < range_start || start > range_stop)) {
2111       GST_WARNING_OBJECT (demux,
2112           "Seek to invalid position start:%" GST_STIME_FORMAT
2113           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2114           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2115           GST_STIME_ARGS (range_stop));
2116       start_valid = FALSE;
2117     }
2118     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2119         (stop < range_start || stop > range_stop)) {
2120       GST_WARNING_OBJECT (demux,
2121           "Seek to invalid position stop:%" GST_STIME_FORMAT
2122           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2123           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2124           GST_STIME_ARGS (range_stop));
2125       stop_valid = FALSE;
2126     }
2127
2128     /* If the seek position is still outside of the seekable range, refuse the seek */
2129     if (!start_valid || !stop_valid)
2130       goto unlock_return;
2131
2132     /* Re-create seek event with changed/updated values */
2133     if (changed) {
2134       gst_event_unref (event);
2135       event =
2136           gst_event_new_seek (rate, format, flags,
2137           start_type, start, stop_type, stop);
2138       gst_event_set_seqnum (event, seqnum);
2139     }
2140   }
2141
2142   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2143
2144   /* have a backup in case seek fails */
2145   gst_segment_copy_into (&demux->segment, &oldsegment);
2146
2147   GST_DEBUG_OBJECT (demux, "sending flush start");
2148   flush_event = gst_event_new_flush_start ();
2149   gst_event_set_seqnum (flush_event, seqnum);
2150
2151   gst_adaptive_demux_push_src_event (demux, flush_event);
2152
2153   gst_adaptive_demux_stop_tasks (demux, FALSE);
2154   gst_adaptive_demux_reset_tracks (demux);
2155
2156   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2157
2158   if (!IS_SNAP_SEEK (flags) && !(flags & GST_SEEK_FLAG_ACCURATE)) {
2159     /* If no accurate seeking was specified, we want to default to seeking to
2160      * the previous segment for efficient/fast playback. */
2161     flags |= GST_SEEK_FLAG_KEY_UNIT;
2162   }
2163
2164   if (IS_SNAP_SEEK (flags)) {
2165     GstAdaptiveDemux2Stream *default_stream = NULL;
2166     GstAdaptiveDemux2Stream *stream = NULL;
2167     GList *iter;
2168     /*
2169      * Handle snap seeks as follows:
2170      * 1) do the snap seeking a (random) active stream
2171      * 1.1) If none are active yet (early-seek), pick a random default one
2172      * 2) use the final position on this stream to seek
2173      *    on the other streams to the same position
2174      *
2175      * We can't snap at all streams at the same time as they might end in
2176      * different positions, so just pick one and align all others to that
2177      * position.
2178      */
2179
2180     /* Pick a random active stream on which to do the stream seek */
2181     for (iter = demux->output_period->streams; iter; iter = iter->next) {
2182       GstAdaptiveDemux2Stream *cand = iter->data;
2183       if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2184         stream = cand;
2185         break;
2186       }
2187       if (default_stream == NULL
2188           && gst_adaptive_demux2_stream_is_default_locked (cand))
2189         default_stream = cand;
2190     }
2191
2192     if (stream == NULL)
2193       stream = default_stream;
2194
2195     if (stream) {
2196       GstClockTimeDiff ts;
2197       GstSeekFlags stream_seek_flags = flags;
2198
2199       /* snap-seek on the chosen stream and then
2200        * use the resulting position to seek on all streams */
2201       if (rate >= 0) {
2202         if (start_type != GST_SEEK_TYPE_NONE)
2203           ts = start;
2204         else {
2205           ts = gst_segment_position_from_running_time (&demux->segment,
2206               GST_FORMAT_TIME, demux->priv->global_output_position);
2207           start_type = GST_SEEK_TYPE_SET;
2208         }
2209       } else {
2210         if (stop_type != GST_SEEK_TYPE_NONE)
2211           ts = stop;
2212         else {
2213           stop_type = GST_SEEK_TYPE_SET;
2214           ts = gst_segment_position_from_running_time (&demux->segment,
2215               GST_FORMAT_TIME, demux->priv->global_output_position);
2216         }
2217       }
2218
2219       if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
2220               ts, &ts) != GST_FLOW_OK) {
2221         GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2222         goto unlock_return;
2223       }
2224       /* replace event with a new one without snapping to seek on all streams */
2225       gst_event_unref (event);
2226       if (rate >= 0) {
2227         start = ts;
2228       } else {
2229         stop = ts;
2230       }
2231       event =
2232           gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2233           start_type, start, stop_type, stop);
2234       GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2235     }
2236   }
2237
2238   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2239       start, stop_type, stop, &update);
2240
2241   if (ret) {
2242     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2243
2244     ret = demux_class->seek (demux, event);
2245   }
2246
2247   if (!ret) {
2248     /* Is there anything else we can do if it fails? */
2249     gst_segment_copy_into (&oldsegment, &demux->segment);
2250   } else {
2251     demux->priv->segment_seqnum = seqnum;
2252   }
2253   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2254
2255   /* Resetting flow combiner */
2256   gst_flow_combiner_reset (demux->priv->flowcombiner);
2257
2258   GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2259   flush_event = gst_event_new_flush_stop (TRUE);
2260   gst_event_set_seqnum (flush_event, seqnum);
2261   gst_adaptive_demux_push_src_event (demux, flush_event);
2262
2263   /* If the seek generated a new period, prepare it */
2264   if (!demux->input_period->prepared) {
2265     /* This can only happen on flushing seeks */
2266     g_assert (flags & GST_SEEK_FLAG_FLUSH);
2267     gst_adaptive_demux_seek_to_input_period (demux);
2268   }
2269
2270   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2271   GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2272       &demux->segment);
2273   gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2274   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2275
2276   /* Reset the global output position (running time) for when the output loop restarts */
2277   demux->priv->global_output_position = 0;
2278
2279   /* After a flushing seek, any instant-rate override is undone */
2280   demux->instant_rate_multiplier = 1.0;
2281
2282   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2283
2284   /* Restart the demux */
2285   gst_adaptive_demux_start_tasks (demux);
2286
2287 unlock_return:
2288   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2289   GST_API_UNLOCK (demux);
2290   gst_event_unref (event);
2291
2292   return ret;
2293 }
2294
2295 /* Returns TRUE if the stream has at least one selected track */
2296 static gboolean
2297 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2298     stream)
2299 {
2300   GList *tmp;
2301
2302   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2303     GstAdaptiveDemuxTrack *track = tmp->data;
2304
2305     if (track->selected)
2306       return TRUE;
2307   }
2308
2309   return FALSE;
2310 }
2311
2312 static gboolean
2313 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2314     guint32 seqnum)
2315 {
2316   gboolean selection_handled = TRUE;
2317   GList *iter;
2318   GList *tracks = NULL;
2319
2320   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2321     return FALSE;
2322
2323   TRACKS_LOCK (demux);
2324   /* We can't do stream selection if we are migrating between periods */
2325   if (demux->input_period && demux->output_period != demux->input_period) {
2326     GST_WARNING_OBJECT (demux,
2327         "Stream selection while migrating between periods is not possible");
2328     TRACKS_UNLOCK (demux);
2329     return FALSE;
2330   }
2331   /* Validate the streams and fill:
2332    * tracks : list of tracks corresponding to requested streams
2333    */
2334   for (iter = streams; iter; iter = iter->next) {
2335     gchar *stream_id = (gchar *) iter->data;
2336     GstAdaptiveDemuxTrack *track;
2337
2338     GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2339     track = find_track_for_stream_id (demux->output_period, stream_id);
2340     if (!track) {
2341       GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2342       selection_handled = FALSE;
2343       goto select_streams_done;
2344     }
2345     tracks = g_list_append (tracks, track);
2346     GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2347   }
2348
2349   /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2350    * SCHEDULING THREAD */
2351
2352   /* FIXME: We want to iterate all streams, mark them as deselected,
2353    * then iterate tracks and mark any streams that have at least 1
2354    * active output track, then loop over all streams again and start/stop
2355    * them as needed */
2356
2357   /* Go over all tracks present and (de)select based on current selection */
2358   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2359     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2360
2361     if (track->selected && !g_list_find (tracks, track)) {
2362       GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2363           track->stream_id, track->active);
2364       track->selected = FALSE;
2365       track->draining = TRUE;
2366     } else if (!track->selected && g_list_find (tracks, track)) {
2367       GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2368
2369       track->selected = TRUE;
2370     }
2371   }
2372
2373   /* Start or stop streams based on the updated track selection */
2374   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2375     GstAdaptiveDemux2Stream *stream = iter->data;
2376     GList *trackiter;
2377
2378     gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2379     gboolean should_be_running =
2380         gst_adaptive_demux2_stream_has_selected_tracks (stream);
2381
2382     if (!is_running && should_be_running) {
2383       GstClockTime output_running_ts = demux->priv->global_output_position;
2384       GstClockTime start_position;
2385
2386       /* Calculate where we should start the stream, and then
2387        * start it. */
2388       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2389
2390       GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2391           GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2392           GST_TIME_ARGS (output_running_ts), &demux->segment);
2393
2394       start_position =
2395           gst_segment_position_from_running_time (&demux->segment,
2396           GST_FORMAT_TIME, output_running_ts);
2397
2398       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2399
2400       GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2401           GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2402
2403       stream->current_position = stream->start_position = start_position;
2404       stream->compute_segment = TRUE;
2405
2406       /* If output has already begun, ensure we seek this segment
2407        * to the correct restart position when the download loop begins */
2408       if (output_running_ts != 0)
2409         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2410
2411       /* Activate track pads for this stream */
2412       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2413         GstAdaptiveDemuxTrack *track =
2414             (GstAdaptiveDemuxTrack *) trackiter->data;
2415         gst_pad_set_active (track->sinkpad, TRUE);
2416       }
2417
2418       gst_adaptive_demux2_stream_start (stream);
2419     } else if (is_running && !should_be_running) {
2420       /* Stream should not be running and needs stopping */
2421       gst_adaptive_demux2_stream_stop (stream);
2422
2423       /* Set all track sinkpads to inactive for this stream */
2424       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2425         GstAdaptiveDemuxTrack *track =
2426             (GstAdaptiveDemuxTrack *) trackiter->data;
2427         gst_pad_set_active (track->sinkpad, FALSE);
2428       }
2429     }
2430   }
2431
2432   g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2433
2434 select_streams_done:
2435   demux_update_buffering_locked (demux);
2436   demux_post_buffering_locked (demux);
2437
2438   TRACKS_UNLOCK (demux);
2439   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2440
2441   if (tracks)
2442     g_list_free (tracks);
2443   return selection_handled;
2444 }
2445
2446 static gboolean
2447 gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux * demux,
2448     GstEvent * event)
2449 {
2450   GList *streams;
2451   gboolean selection_handled;
2452
2453   if (GST_EVENT_SEQNUM (event) ==
2454       g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2455     GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2456         GST_EVENT_SEQNUM (event));
2457     return TRUE;
2458   }
2459
2460   gst_event_parse_select_streams (event, &streams);
2461   selection_handled =
2462       handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2463   g_list_free_full (streams, g_free);
2464
2465   gst_event_unref (event);
2466   return selection_handled;
2467 }
2468
2469 static gboolean
2470 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2471     GstEvent * event)
2472 {
2473   GstAdaptiveDemux *demux;
2474
2475   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2476
2477   switch (event->type) {
2478     case GST_EVENT_SEEK:
2479     {
2480       guint32 seqnum = gst_event_get_seqnum (event);
2481       if (seqnum == demux->priv->segment_seqnum) {
2482         GST_LOG_OBJECT (pad,
2483             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2484         gst_event_unref (event);
2485         return TRUE;
2486       }
2487       return gst_adaptive_demux_handle_seek_event (demux, event);
2488     }
2489     case GST_EVENT_LATENCY:{
2490       /* Upstream and our internal source are irrelevant
2491        * for latency, and we should not fail here to
2492        * configure the latency */
2493       gst_event_unref (event);
2494       return TRUE;
2495     }
2496     case GST_EVENT_QOS:{
2497       GstClockTimeDiff diff;
2498       GstClockTime timestamp;
2499       GstClockTime earliest_time;
2500
2501       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2502       /* Only take into account lateness if late */
2503       if (diff > 0)
2504         earliest_time = timestamp + 2 * diff;
2505       else
2506         earliest_time = timestamp;
2507
2508       GST_OBJECT_LOCK (demux);
2509       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2510           earliest_time > demux->priv->qos_earliest_time) {
2511         demux->priv->qos_earliest_time = earliest_time;
2512         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2513             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2514       }
2515       GST_OBJECT_UNLOCK (demux);
2516       break;
2517     }
2518     case GST_EVENT_SELECT_STREAMS:
2519     {
2520       return gst_adaptive_demux_handle_select_streams_event (demux, event);
2521     }
2522     default:
2523       break;
2524   }
2525
2526   return gst_pad_event_default (pad, parent, event);
2527 }
2528
2529 static gboolean
2530 gst_adaptive_demux_handle_query_seeking (GstAdaptiveDemux * demux,
2531     GstQuery * query)
2532 {
2533   GstFormat fmt = GST_FORMAT_UNDEFINED;
2534   gint64 stop = -1;
2535   gint64 start = 0;
2536   gboolean ret = FALSE;
2537
2538   if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2539     GST_INFO_OBJECT (demux,
2540         "Don't have manifest yet, can't answer seeking query");
2541     return FALSE;               /* can't answer without manifest */
2542   }
2543
2544   GST_MANIFEST_LOCK (demux);
2545
2546   gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2547   GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2548   if (fmt == GST_FORMAT_TIME) {
2549     GstClockTime duration;
2550     gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2551
2552     ret = TRUE;
2553     if (can_seek) {
2554       if (gst_adaptive_demux_is_live (demux)) {
2555         ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2556
2557         if (!ret) {
2558           GST_MANIFEST_UNLOCK (demux);
2559           GST_INFO_OBJECT (demux, "can't answer seeking query");
2560           return FALSE;
2561         }
2562       } else {
2563         duration = demux->priv->duration;
2564         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2565           stop = duration;
2566       }
2567     }
2568     gst_query_set_seeking (query, fmt, can_seek, start, stop);
2569     GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2570         GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2571         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2572   }
2573   GST_MANIFEST_UNLOCK (demux);
2574   return ret;
2575 }
2576
2577 static gboolean
2578 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2579     GstQuery * query)
2580 {
2581   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2582   gboolean ret = FALSE;
2583
2584   if (query == NULL)
2585     return FALSE;
2586
2587   switch (query->type) {
2588     case GST_QUERY_DURATION:{
2589       GstFormat fmt;
2590       GstClockTime duration = GST_CLOCK_TIME_NONE;
2591
2592       gst_query_parse_duration (query, &fmt, NULL);
2593
2594       if (gst_adaptive_demux_is_live (demux)) {
2595         /* We are able to answer this query: the duration is unknown */
2596         gst_query_set_duration (query, fmt, -1);
2597         ret = TRUE;
2598         break;
2599       }
2600
2601       if (fmt == GST_FORMAT_TIME
2602           && g_atomic_int_get (&demux->priv->have_manifest)) {
2603
2604         GST_MANIFEST_LOCK (demux);
2605         duration = demux->priv->duration;
2606         GST_MANIFEST_UNLOCK (demux);
2607
2608         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2609           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2610           ret = TRUE;
2611         }
2612       }
2613
2614       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2615           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2616       break;
2617     }
2618     case GST_QUERY_LATENCY:{
2619       gst_query_set_latency (query, FALSE, 0, -1);
2620       ret = TRUE;
2621       break;
2622     }
2623     case GST_QUERY_SEEKING:
2624       ret = gst_adaptive_demux_handle_query_seeking (demux, query);
2625       break;
2626     case GST_QUERY_URI:
2627
2628       GST_MANIFEST_LOCK (demux);
2629
2630       /* TODO HLS can answer this differently it seems */
2631       if (demux->manifest_uri) {
2632         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2633          * playlist or the the uri of the last downlowaded fragment? */
2634         gst_query_set_uri (query, demux->manifest_uri);
2635         ret = TRUE;
2636       }
2637
2638       GST_MANIFEST_UNLOCK (demux);
2639       break;
2640     case GST_QUERY_SELECTABLE:
2641       gst_query_set_selectable (query, TRUE);
2642       ret = TRUE;
2643       break;
2644     default:
2645       /* Don't forward queries upstream because of the special nature of this
2646        *  "demuxer", which relies on the upstream element only to be fed
2647        *  the Manifest
2648        */
2649       break;
2650   }
2651
2652   return ret;
2653 }
2654
2655 static gboolean
2656 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
2657 {
2658   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
2659
2660   GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
2661
2662   switch (GST_QUERY_TYPE (query)) {
2663     case GST_QUERY_BUFFERING:
2664     {
2665       GstFormat format;
2666       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2667
2668       if (!demux->output_period) {
2669         if (format != GST_FORMAT_TIME) {
2670           GST_DEBUG_OBJECT (demux,
2671               "No period setup yet, can't answer non-TIME buffering queries");
2672           return FALSE;
2673         }
2674
2675         GST_DEBUG_OBJECT (demux,
2676             "No period setup yet, but still answering buffering query");
2677         return TRUE;
2678       }
2679     }
2680     case GST_QUERY_SEEKING:
2681     {
2682       /* Source pads might not be present early on which would cause the default
2683        * element query handler to fail, yet we can answer this query */
2684       return gst_adaptive_demux_handle_query_seeking (demux, query);
2685     }
2686     default:
2687       break;
2688   }
2689
2690   return GST_ELEMENT_CLASS (parent_class)->query (element, query);
2691 }
2692
2693 gboolean
2694 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2695 {
2696   GstEvent *seek;
2697
2698   GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2699
2700   seek =
2701       gst_event_new_seek (1.0, GST_FORMAT_TIME,
2702       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2703       GST_SEEK_TYPE_NONE, 0);
2704   gst_adaptive_demux_handle_seek_event (demux, seek);
2705   return FALSE;
2706 }
2707
2708
2709 /* Called when the scheduler starts, to kick off manifest updates
2710  * and stream downloads */
2711 static gboolean
2712 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2713 {
2714   GList *iter;
2715
2716   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2717
2718   iter = demux->input_period->streams;
2719
2720   for (; iter; iter = g_list_next (iter)) {
2721     GstAdaptiveDemux2Stream *stream = iter->data;
2722
2723     /* If we need to process this stream to discover tracks *OR* it has any
2724      * tracks which are selected, start it now */
2725     if ((stream->pending_tracks == TRUE)
2726         || gst_adaptive_demux2_stream_is_selected_locked (stream))
2727       gst_adaptive_demux2_stream_start (stream);
2728   }
2729
2730   return FALSE;
2731 }
2732
2733 /* must be called with manifest_lock taken */
2734 static void
2735 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2736 {
2737   if (!gst_adaptive_demux2_is_running (demux)) {
2738     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2739     return;
2740   }
2741
2742   GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2743   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2744       (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2745
2746   TRACKS_LOCK (demux);
2747   demux->priv->flushing = FALSE;
2748   GST_DEBUG_OBJECT (demux, "Starting the output task");
2749   gst_task_start (demux->priv->output_task);
2750   TRACKS_UNLOCK (demux);
2751 }
2752
2753 /* must be called with manifest_lock taken */
2754 static void
2755 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2756 {
2757   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2758   if (demux->priv->manifest_updates_cb != 0) {
2759     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2760         demux->priv->manifest_updates_cb);
2761     demux->priv->manifest_updates_cb = 0;
2762   }
2763 }
2764
2765 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2766
2767 /* must be called with manifest_lock taken */
2768 static void
2769 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2770 {
2771   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2772
2773   if (gst_adaptive_demux_is_live (demux)) {
2774     /* Task to periodically update the manifest */
2775     if (demux_class->requires_periodical_playlist_update (demux)) {
2776       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2777       if (demux->priv->manifest_updates_cb == 0) {
2778         demux->priv->manifest_updates_cb =
2779             gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2780             (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2781       }
2782     }
2783   }
2784 }
2785
2786 /* must be called with manifest_lock taken
2787  * This function will temporarily release manifest_lock in order to join the
2788  * download threads.
2789  * The api_lock will still protect it against other threads trying to modify
2790  * the demux element.
2791  */
2792 static void
2793 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2794 {
2795   GST_LOG_OBJECT (demux, "Stopping tasks");
2796
2797   if (stop_updates)
2798     gst_adaptive_demux_stop_manifest_update_task (demux);
2799
2800   TRACKS_LOCK (demux);
2801   if (demux->input_period)
2802     gst_adaptive_demux_period_stop_tasks (demux->input_period);
2803
2804   demux->priv->flushing = TRUE;
2805   g_cond_signal (&demux->priv->tracks_add);
2806   gst_task_stop (demux->priv->output_task);
2807   TRACKS_UNLOCK (demux);
2808
2809   gst_task_join (demux->priv->output_task);
2810
2811   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2812 }
2813
2814 /* must be called with manifest_lock taken */
2815 static gboolean
2816 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2817 {
2818   GList *iter;
2819   gboolean ret = TRUE;
2820
2821   GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2822
2823   TRACKS_LOCK (demux);
2824   for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2825     OutputSlot *slot = (OutputSlot *) iter->data;
2826     gst_event_ref (event);
2827     GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2828     ret = ret & gst_pad_push_event (slot->pad, event);
2829     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2830       slot->pushed_timed_data = FALSE;
2831   }
2832   TRACKS_UNLOCK (demux);
2833   gst_event_unref (event);
2834   return ret;
2835 }
2836
2837 /* must be called with manifest_lock taken */
2838 void
2839 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2840     GstCaps * caps)
2841 {
2842   GST_DEBUG_OBJECT (stream,
2843       "setting new caps for stream %" GST_PTR_FORMAT, caps);
2844   gst_caps_replace (&stream->pending_caps, caps);
2845   gst_caps_unref (caps);
2846 }
2847
2848 /* must be called with manifest_lock taken */
2849 void
2850 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2851     GstTagList * tags)
2852 {
2853   GST_DEBUG_OBJECT (stream,
2854       "setting new tags for stream %" GST_PTR_FORMAT, tags);
2855   gst_clear_tag_list (&stream->pending_tags);
2856   stream->pending_tags = tags;
2857 }
2858
2859 /* must be called with manifest_lock taken */
2860 void
2861 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2862     GstEvent * event)
2863 {
2864   stream->pending_events = g_list_append (stream->pending_events, event);
2865 }
2866
2867 static gboolean
2868 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2869     * demux)
2870 {
2871   return TRUE;
2872 }
2873
2874 /* Called when a stream needs waking after the manifest is updated */
2875 void
2876 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
2877 {
2878   demux->priv->stream_waiting_for_manifest = TRUE;
2879 }
2880
2881 static gboolean
2882 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
2883 {
2884   GstFlowReturn ret = GST_FLOW_OK;
2885   gboolean schedule_again = TRUE;
2886
2887   GST_MANIFEST_LOCK (demux);
2888   demux->priv->manifest_updates_cb = 0;
2889
2890   /* Updating playlist only needed for live playlists */
2891   if (!gst_adaptive_demux_is_live (demux)) {
2892     GST_MANIFEST_UNLOCK (demux);
2893     return G_SOURCE_REMOVE;
2894   }
2895
2896   GST_DEBUG_OBJECT (demux, "Updating playlist");
2897   ret = gst_adaptive_demux_update_manifest (demux);
2898
2899   if (ret == GST_FLOW_EOS) {
2900     GST_MANIFEST_UNLOCK (demux);
2901     return G_SOURCE_REMOVE;
2902   }
2903
2904   if (ret == GST_FLOW_OK) {
2905     GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
2906     demux->priv->update_failed_count = 0;
2907
2908     /* Wake up download tasks */
2909     if (demux->priv->stream_waiting_for_manifest) {
2910       GList *iter;
2911
2912       for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2913         GstAdaptiveDemux2Stream *stream = iter->data;
2914         gst_adaptive_demux2_stream_on_manifest_update (stream);
2915       }
2916       demux->priv->stream_waiting_for_manifest = FALSE;
2917     }
2918   } else {
2919     demux->priv->update_failed_count++;
2920
2921     if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
2922       GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
2923           gst_flow_get_name (ret));
2924     } else {
2925       GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2926           (_("Internal data stream error.")), ("Could not update playlist"));
2927       GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
2928       schedule_again = FALSE;
2929     }
2930   }
2931
2932   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC)
2933     gst_adaptive_demux_handle_lost_sync (demux);
2934
2935   if (schedule_again) {
2936     GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2937
2938     demux->priv->manifest_updates_cb =
2939         gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2940         klass->get_manifest_update_interval (demux) * GST_USECOND,
2941         (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2942   }
2943
2944   GST_MANIFEST_UNLOCK (demux);
2945
2946   return G_SOURCE_REMOVE;
2947 }
2948
2949 static gboolean
2950 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
2951 {
2952   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2953
2954   /* Loop for updating of the playlist. This periodically checks if
2955    * the playlist is updated and does so, then signals the streaming
2956    * thread in case it can continue downloading now. */
2957
2958   /* block until the next scheduled update or the signal to quit this thread */
2959   GST_DEBUG_OBJECT (demux, "Started updates task");
2960   demux->priv->manifest_updates_cb =
2961       gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2962       klass->get_manifest_update_interval (demux) * GST_USECOND,
2963       (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2964
2965   return G_SOURCE_REMOVE;
2966 }
2967
2968 static OutputSlot *
2969 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
2970     GstAdaptiveDemuxTrack * track)
2971 {
2972   GList *tmp;
2973
2974   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2975     OutputSlot *slot = (OutputSlot *) tmp->data;
2976     /* Incompatible output type */
2977     if (slot->type != track->type)
2978       continue;
2979
2980     /* Slot which is already assigned to this pending track */
2981     if (slot->pending_track == track)
2982       return slot;
2983
2984     /* slot already used for another pending track */
2985     if (slot->pending_track != NULL)
2986       continue;
2987
2988     /* Current output track is of the same type and is draining */
2989     if (slot->track && slot->track->draining)
2990       return slot;
2991   }
2992
2993   return NULL;
2994 }
2995
2996 /* TRACKS_LOCK taken */
2997 static OutputSlot *
2998 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
2999 {
3000   GList *tmp;
3001
3002   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3003     OutputSlot *slot = (OutputSlot *) tmp->data;
3004
3005     if (slot->track == track)
3006       return slot;
3007   }
3008
3009   return NULL;
3010 }
3011
3012 /* TRACKS_LOCK held */
3013 static GstMessage *
3014 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3015 {
3016   GList *tmp;
3017   GstMessage *msg;
3018
3019   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3020     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3021
3022     if (track->selected && !track->active)
3023       return NULL;
3024   }
3025
3026   /* All selected tracks are active, created message */
3027   msg =
3028       gst_message_new_streams_selected (GST_OBJECT (demux),
3029       demux->output_period->collection);
3030   GST_MESSAGE_SEQNUM (msg) = seqnum;
3031   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3032     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3033     if (track->active) {
3034       gst_message_streams_selected_add (msg, track->stream_object);
3035     }
3036   }
3037
3038   return msg;
3039 }
3040
3041 static void
3042 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3043     OutputSlot * slot)
3044 {
3045   GstAdaptiveDemuxTrack *track = slot->track;
3046   GstEvent *event;
3047
3048   /* Send EVENT_STREAM_START */
3049   event = gst_event_new_stream_start (track->stream_id);
3050   if (demux->have_group_id)
3051     gst_event_set_group_id (event, demux->group_id);
3052   gst_event_set_stream_flags (event, track->flags);
3053   gst_event_set_stream (event, track->stream_object);
3054   GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3055       track->stream_id);
3056   gst_pad_push_event (slot->pad, event);
3057
3058   /* Send EVENT_STREAM_COLLECTION */
3059   event = gst_event_new_stream_collection (demux->output_period->collection);
3060   GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3061       track->stream_id);
3062   gst_pad_push_event (slot->pad, event);
3063
3064   /* Mark all sticky events for re-sending */
3065   gst_event_store_mark_all_undelivered (&track->sticky_events);
3066 }
3067
3068 /*
3069  * Called with TRACKS_LOCK taken
3070  */
3071 static void
3072 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3073 {
3074   GList *tmp;
3075   guint requested_selection_seqnum;
3076   GstMessage *msg;
3077
3078   /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3079      output slots vs active/draining tracks */
3080   requested_selection_seqnum =
3081       g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3082
3083   if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3084     return;
3085
3086   GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3087
3088   /* Go over all slots, and if they have a pending track that's no longer
3089    * selected, clear it so the slot can be reused */
3090   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3091     OutputSlot *slot = (OutputSlot *) tmp->data;
3092
3093     if (slot->pending_track != NULL && !slot->pending_track->selected) {
3094       GST_DEBUG_OBJECT (demux,
3095           "Removing deselected track '%s' as pending from output of current track '%s'",
3096           slot->pending_track->stream_id, slot->track->stream_id);
3097       gst_adaptive_demux_track_unref (slot->pending_track);
3098       slot->pending_track = NULL;
3099     }
3100   }
3101
3102   /* Go over all tracks and create/re-assign/remove slots */
3103   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3104     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3105
3106     if (track->selected) {
3107       OutputSlot *slot = find_slot_for_track (demux, track);
3108
3109       /* 0. Track is selected and has a slot. Nothing to do */
3110       if (slot) {
3111         GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3112             track->stream_id);
3113         continue;
3114       }
3115
3116       slot = find_replacement_slot_for_track (demux, track);
3117       if (slot) {
3118         /* 1. There is an existing slot of the same type which is currently
3119          *    draining, assign this track as a replacement for it */
3120         g_assert (slot->pending_track == NULL || slot->pending_track == track);
3121         if (slot->pending_track == NULL) {
3122           slot->pending_track = gst_adaptive_demux_track_ref (track);
3123           GST_DEBUG_OBJECT (demux,
3124               "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3125               track->stream_id, track->period_num,
3126               slot->track->stream_id, slot->track->period_num);
3127         }
3128       } else {
3129         /* 2. There is no compatible replacement slot, create a new one */
3130         slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3131         GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3132             track->stream_id);
3133         demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3134
3135         track->update_next_segment = TRUE;
3136
3137         slot->track = gst_adaptive_demux_track_ref (track);
3138         track->active = TRUE;
3139         gst_adaptive_demux_send_initial_events (demux, slot);
3140       }
3141
3142       /* If we were draining this track, we no longer are */
3143       track->draining = FALSE;
3144     }
3145   }
3146
3147   /* Finally check all slots have a current/pending track. If not remove it */
3148   for (tmp = demux->priv->outputs; tmp;) {
3149     OutputSlot *slot = (OutputSlot *) tmp->data;
3150     /* We should never has slots without target tracks */
3151     g_assert (slot->track);
3152     if (slot->track->draining && !slot->pending_track) {
3153       GstAdaptiveDemux2Stream *stream;
3154
3155       GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3156           slot->track->stream_id);
3157       slot->track->active = FALSE;
3158
3159       /* If the stream feeding this track is stopped, flush and clear
3160        * the track now that it's going inactive. If the stream was not
3161        * found, it means we advanced past that period already (and the
3162        * stream was stopped and discarded) */
3163       stream = find_stream_for_track_locked (demux, slot->track);
3164       if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3165         gst_adaptive_demux_track_flush (slot->track);
3166
3167       tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3168       gst_adaptive_demux_output_slot_free (demux, slot);
3169     } else
3170       tmp = tmp->next;
3171   }
3172
3173   demux->priv->current_selection_seqnum = requested_selection_seqnum;
3174   msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3175   if (msg) {
3176     TRACKS_UNLOCK (demux);
3177     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3178     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3179     TRACKS_LOCK (demux);
3180   }
3181 }
3182
3183 /* TRACKS_LOCK held */
3184 static gboolean
3185 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3186 {
3187   GList *iter;
3188   GstAdaptiveDemuxPeriod *previous_period;
3189   GstStreamCollection *collection;
3190
3191   /* Grab the next period, should be demux->periods->next->data */
3192   previous_period = g_queue_pop_head (demux->priv->periods);
3193
3194   /* Remove ref held by demux->output_period */
3195   gst_adaptive_demux_period_unref (previous_period);
3196   demux->output_period =
3197       gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3198
3199   GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3200       demux->output_period->period_num);
3201
3202   /* We can now post the collection of the new period */
3203   collection = demux->output_period->collection;
3204   TRACKS_UNLOCK (demux);
3205   gst_element_post_message (GST_ELEMENT_CAST (demux),
3206       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3207   TRACKS_LOCK (demux);
3208
3209   /* Unselect all tracks of the previous period */
3210   for (iter = previous_period->tracks; iter; iter = iter->next) {
3211     GstAdaptiveDemuxTrack *track = iter->data;
3212     if (track->selected) {
3213       track->selected = FALSE;
3214       track->draining = TRUE;
3215     }
3216   }
3217
3218   /* Force a selection re-check */
3219   g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3220   check_and_handle_selection_update_locked (demux);
3221
3222   /* Remove the final ref on the previous period now that we have done the switch */
3223   gst_adaptive_demux_period_unref (previous_period);
3224
3225   return TRUE;
3226 }
3227
3228 /* Called with TRACKS_LOCK taken */
3229 static void
3230 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3231     OutputSlot * slot)
3232 {
3233   GstAdaptiveDemuxTrack *track = slot->track;
3234   GstMessage *msg;
3235   gboolean pending_is_ready;
3236   GstAdaptiveDemux2Stream *stream;
3237
3238   /* If we have a pending track for this slot, the current track should be
3239    * draining and no longer selected */
3240   g_assert (track->draining && !track->selected);
3241
3242   /* If we're draining, check if the pending track has enough data *or* that
3243      we've already drained out entirely */
3244   pending_is_ready =
3245       (slot->pending_track->level_time >=
3246       slot->pending_track->buffering_threshold);
3247   pending_is_ready |= slot->pending_track->eos;
3248
3249   if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3250     GST_DEBUG_OBJECT (demux,
3251         "Replacement track '%s' doesn't have enough data for switching yet",
3252         slot->pending_track->stream_id);
3253     return;
3254   }
3255
3256   GST_DEBUG_OBJECT (demux,
3257       "Pending replacement track has enough data, switching");
3258   track->active = FALSE;
3259   track->draining = FALSE;
3260
3261   /* If the stream feeding this track is stopped, flush and clear
3262    * the track now that it's going inactive. If the stream was not
3263    * found, it means we advanced past that period already (and the
3264    * stream was stopped and discarded) */
3265   stream = find_stream_for_track_locked (demux, track);
3266   if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3267     gst_adaptive_demux_track_flush (track);
3268
3269   gst_adaptive_demux_track_unref (track);
3270   /* We steal the reference of pending_track */
3271   track = slot->track = slot->pending_track;
3272   slot->pending_track = NULL;
3273   slot->track->active = TRUE;
3274
3275   /* Make sure the track segment will start at the current position */
3276   track->update_next_segment = TRUE;
3277
3278   /* Send stream start and collection, and schedule sticky events */
3279   gst_adaptive_demux_send_initial_events (demux, slot);
3280
3281   /* Can we emit the streams-selected message now ? */
3282   msg =
3283       all_selected_tracks_are_active (demux,
3284       g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3285   if (msg) {
3286     TRACKS_UNLOCK (demux);
3287     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3288     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3289     TRACKS_LOCK (demux);
3290   }
3291
3292 }
3293
3294 static void
3295 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3296 {
3297   GList *tmp;
3298   GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3299   gboolean wait_for_data = FALSE;
3300   gboolean all_tracks_empty;
3301   GstFlowReturn ret;
3302
3303   GST_DEBUG_OBJECT (demux, "enter");
3304
3305   TRACKS_LOCK (demux);
3306
3307   /* Check if stopping */
3308   if (demux->priv->flushing) {
3309     ret = GST_FLOW_FLUSHING;
3310     goto pause;
3311   }
3312
3313   /* If the selection changed, handle it */
3314   check_and_handle_selection_update_locked (demux);
3315
3316 restart:
3317   ret = GST_FLOW_OK;
3318   global_output_position = GST_CLOCK_STIME_NONE;
3319   all_tracks_empty = TRUE;
3320
3321   if (wait_for_data) {
3322     GST_DEBUG_OBJECT (demux, "Waiting for data");
3323     g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3324     GST_DEBUG_OBJECT (demux, "Done waiting for data");
3325     if (demux->priv->flushing) {
3326       ret = GST_FLOW_FLUSHING;
3327       goto pause;
3328     }
3329     wait_for_data = FALSE;
3330   }
3331
3332   /* Grab/Recalculate current global output position
3333    * This is the minimum pending output position of all tracks used for output
3334    *
3335    * If there is a track which is empty and not EOS, wait for it to receive data
3336    * then recalculate global output position.
3337    *
3338    * This also pushes downstream all non-timed data that might be present.
3339    *
3340    * IF all tracks are EOS : stop task
3341    */
3342   GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3343   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3344     OutputSlot *slot = (OutputSlot *) tmp->data;
3345     GstAdaptiveDemuxTrack *track;
3346
3347     /* If there is a pending track, Check if it's time to switch to it */
3348     if (slot->pending_track)
3349       handle_slot_pending_track_switch_locked (demux, slot);
3350
3351     track = slot->track;
3352
3353     if (!track->active) {
3354       /* Note: Edward: I can't see in what cases we would end up with inactive
3355          tracks assigned to slots. */
3356       GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3357       g_assert (track->active);
3358       continue;
3359     }
3360
3361     if (track->next_position == GST_CLOCK_STIME_NONE) {
3362       gst_adaptive_demux_track_update_next_position (track);
3363     }
3364
3365     GST_TRACE_OBJECT (demux,
3366         "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3367         track->stream_id, track->period_num,
3368         GST_STIME_ARGS (track->next_position));
3369
3370     if (track->next_position != GST_CLOCK_STIME_NONE) {
3371       if (global_output_position == GST_CLOCK_STIME_NONE)
3372         global_output_position = track->next_position;
3373       else
3374         global_output_position =
3375             MIN (global_output_position, track->next_position);
3376       track->waiting_add = FALSE;
3377       all_tracks_empty = FALSE;
3378     } else if (!track->eos) {
3379       GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3380           track->stream_id, track->period_num);
3381       all_tracks_empty = FALSE;
3382       wait_for_data = track->waiting_add = TRUE;
3383     } else {
3384       GST_DEBUG_OBJECT (demux,
3385           "Track %s (period %u) is EOS, not waiting for timed data",
3386           track->stream_id, track->period_num);
3387
3388       if (gst_queue_array_get_length (track->queue) > 0) {
3389         all_tracks_empty = FALSE;
3390       }
3391     }
3392   }
3393
3394   if (wait_for_data)
3395     goto restart;
3396
3397   if (all_tracks_empty && demux->output_period->has_next_period) {
3398     GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3399         demux->output_period->period_num);
3400     if (!gst_adaptive_demux_advance_output_period (demux)) {
3401       /* Failed to move to next period, error out */
3402       ret = GST_FLOW_ERROR;
3403       goto pause;
3404     }
3405     /* Restart the loop */
3406     goto restart;
3407   }
3408
3409   GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3410       GST_STIME_ARGS (global_output_position));
3411
3412   /* For each track:
3413    *
3414    * We know all active tracks have pending timed data
3415    * * while track next_position <= global output position
3416    *   * push pending data
3417    *   * Update track next_position
3418    *     * recalculate global output position
3419    *   * Pop next pending data from track and update pending position
3420    *
3421    */
3422   gboolean need_restart = FALSE;
3423
3424   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3425     OutputSlot *slot = (OutputSlot *) tmp->data;
3426     GstAdaptiveDemuxTrack *track = slot->track;
3427
3428     GST_LOG_OBJECT (track->element,
3429         "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3430         " global_output_position:%" GST_STIME_FORMAT, track->active,
3431         track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3432         GST_STIME_ARGS (global_output_position));
3433
3434     if (!track->active)
3435       continue;
3436
3437     while (global_output_position == GST_CLOCK_STIME_NONE
3438         || !slot->pushed_timed_data
3439         || ((track->next_position != GST_CLOCK_STIME_NONE)
3440             && track->next_position <= global_output_position)
3441         || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3442       GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3443
3444       if (!mo) {
3445         GST_DEBUG_OBJECT (demux,
3446             "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3447             track->stream_id, track->period_num, track->eos,
3448             slot->pushed_timed_data);
3449         /* This should only happen if the track is EOS, or exactly in between
3450          * the parser outputting segment/caps before buffers. */
3451         g_assert (track->eos || !slot->pushed_timed_data);
3452
3453         /* If we drained the track, but there's a pending track on the slot
3454          * loop again to activate it */
3455         if (slot->pending_track) {
3456           GST_DEBUG_OBJECT (demux,
3457               "Track '%s' (period %u) drained, but has a pending track to activate",
3458               track->stream_id, track->period_num);
3459           goto restart;
3460         }
3461         break;
3462       }
3463
3464       demux_update_buffering_locked (demux);
3465       demux_post_buffering_locked (demux);
3466       TRACKS_UNLOCK (demux);
3467
3468       GST_DEBUG_OBJECT (demux,
3469           "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3470           track->period_num, mo);
3471
3472       if (GST_IS_EVENT (mo)) {
3473         GstEvent *event = (GstEvent *) mo;
3474         if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3475           slot->pushed_timed_data = TRUE;
3476         } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3477           /* If there is a pending next period, don't send the EOS */
3478           if (demux->output_period->has_next_period) {
3479             GST_LOG_OBJECT (demux,
3480                 "Dropping EOS on track '%s' (period %u) before next period",
3481                 track->stream_id, track->period_num);
3482             gst_event_store_mark_delivered (&track->sticky_events, event);
3483             gst_event_unref (event);
3484             event = NULL;
3485             /* We'll need to re-check if all tracks are empty again above */
3486             need_restart = TRUE;
3487           }
3488         }
3489
3490         if (event != NULL) {
3491           gst_pad_push_event (slot->pad, gst_event_ref (event));
3492
3493           if (GST_EVENT_IS_STICKY (event))
3494             gst_event_store_mark_delivered (&track->sticky_events, event);
3495           gst_event_unref (event);
3496         }
3497       } else if (GST_IS_BUFFER (mo)) {
3498         GstBuffer *buffer = (GstBuffer *) mo;
3499
3500         if (track->output_discont) {
3501           if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3502             buffer = gst_buffer_make_writable (buffer);
3503             GST_DEBUG_OBJECT (slot->pad,
3504                 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3505                 buffer);
3506             GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3507           }
3508           track->output_discont = FALSE;
3509         }
3510         slot->flow_ret = gst_pad_push (slot->pad, buffer);
3511         ret =
3512             gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3513             slot->pad, slot->flow_ret);
3514         GST_DEBUG_OBJECT (slot->pad,
3515             "track %s (period %u) push returned %s (combined %s)",
3516             track->stream_id, track->period_num,
3517             gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3518         slot->pushed_timed_data = TRUE;
3519       } else {
3520         GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3521       }
3522
3523       TRACKS_LOCK (demux);
3524       gst_adaptive_demux_track_update_next_position (track);
3525
3526       if (ret != GST_FLOW_OK)
3527         goto pause;
3528     }
3529   }
3530
3531   /* Store global output position */
3532   if (global_output_position != GST_CLOCK_STIME_NONE) {
3533     demux->priv->global_output_position = global_output_position;
3534
3535     /* And see if any streams need to be woken for more input */
3536     gst_adaptive_demux_period_check_input_wakeup_locked (demux->input_period,
3537         global_output_position);
3538   }
3539
3540   if (need_restart)
3541     goto restart;
3542
3543   if (global_output_position == GST_CLOCK_STIME_NONE) {
3544     if (!demux->priv->flushing) {
3545       GST_DEBUG_OBJECT (demux,
3546           "Pausing output task after reaching NONE global_output_position");
3547       gst_task_pause (demux->priv->output_task);
3548     }
3549   }
3550
3551   TRACKS_UNLOCK (demux);
3552   GST_DEBUG_OBJECT (demux, "leave");
3553   return;
3554
3555 pause:
3556   {
3557     GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3558     /* If the flushing flag is set, then the task is being
3559      * externally stopped, so don't go to pause(), otherwise we
3560      * should so we don't keep spinning */
3561     if (!demux->priv->flushing) {
3562       GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3563           gst_flow_get_name (ret));
3564       gst_task_pause (demux->priv->output_task);
3565     }
3566
3567     TRACKS_UNLOCK (demux);
3568
3569     if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3570       GstEvent *eos = gst_event_new_eos ();
3571
3572       if (ret != GST_FLOW_EOS) {
3573         GST_ELEMENT_FLOW_ERROR (demux, ret);
3574       }
3575
3576       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3577       if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3578         gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3579       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3580
3581       gst_adaptive_demux_push_src_event (demux, eos);
3582     }
3583
3584     return;
3585   }
3586 }
3587
3588 /* must be called from the scheduler */
3589 gboolean
3590 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3591 {
3592   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3593
3594   if (klass->is_live)
3595     return klass->is_live (demux);
3596   return FALSE;
3597 }
3598
3599 static void
3600 handle_manifest_download_complete (DownloadRequest * request,
3601     DownloadRequestState state, GstAdaptiveDemux * demux)
3602 {
3603   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3604   GstBuffer *buffer;
3605   GstFlowReturn result;
3606
3607   g_free (demux->manifest_base_uri);
3608   g_free (demux->manifest_uri);
3609
3610   if (request->redirect_permanent && request->redirect_uri) {
3611     demux->manifest_uri = g_strdup (request->redirect_uri);
3612     demux->manifest_base_uri = NULL;
3613   } else {
3614     demux->manifest_uri = g_strdup (request->uri);
3615     demux->manifest_base_uri = g_strdup (request->redirect_uri);
3616   }
3617
3618   buffer = download_request_take_buffer (request);
3619
3620   /* We should always have a buffer since this function is the non-error
3621    * callback for the download */
3622   g_assert (buffer);
3623
3624   result = klass->update_manifest_data (demux, buffer);
3625   gst_buffer_unref (buffer);
3626
3627   /* FIXME: Should the manifest uri vars be reverted to original
3628    * values if updating fails? */
3629
3630   if (result == GST_FLOW_OK) {
3631     GstClockTime duration;
3632     /* Send an updated duration message */
3633     duration = klass->get_duration (demux);
3634     if (duration != GST_CLOCK_TIME_NONE) {
3635       GST_DEBUG_OBJECT (demux,
3636           "Sending duration message : %" GST_TIME_FORMAT,
3637           GST_TIME_ARGS (duration));
3638       gst_element_post_message (GST_ELEMENT (demux),
3639           gst_message_new_duration_changed (GST_OBJECT (demux)));
3640     } else {
3641       GST_DEBUG_OBJECT (demux,
3642           "Duration unknown, can not send the duration message");
3643     }
3644
3645     /* If a manifest changes it's liveness or periodic updateness, we need
3646      * to start/stop the manifest update task appropriately */
3647     /* Keep this condition in sync with the one in
3648      * gst_adaptive_demux_start_manifest_update_task()
3649      */
3650     if (gst_adaptive_demux_is_live (demux) &&
3651         klass->requires_periodical_playlist_update (demux)) {
3652       gst_adaptive_demux_start_manifest_update_task (demux);
3653     } else {
3654       gst_adaptive_demux_stop_manifest_update_task (demux);
3655     }
3656   }
3657 }
3658
3659 static void
3660 handle_manifest_download_failure (DownloadRequest * request,
3661     DownloadRequestState state, GstAdaptiveDemux * demux)
3662 {
3663   GST_FIXME_OBJECT (demux, "Manifest download failed.");
3664   /* Retry or error out here */
3665 }
3666
3667 static GstFlowReturn
3668 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3669 {
3670   DownloadRequest *request;
3671   GstFlowReturn ret = GST_FLOW_OK;
3672   GError *error = NULL;
3673
3674   request = download_request_new_uri (demux->manifest_uri);
3675
3676   download_request_set_callbacks (request,
3677       (DownloadRequestEventCallback) handle_manifest_download_complete,
3678       (DownloadRequestEventCallback) handle_manifest_download_failure,
3679       NULL, NULL, demux);
3680
3681   if (!downloadhelper_submit_request (demux->download_helper, NULL,
3682           DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
3683           &error)) {
3684     if (error) {
3685       GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
3686           ("Failed to download manifest: %s", error->message), (NULL));
3687       g_clear_error (&error);
3688     }
3689     ret = GST_FLOW_NOT_LINKED;
3690   }
3691
3692   return ret;
3693 }
3694
3695 /* must be called with manifest_lock taken */
3696 GstFlowReturn
3697 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3698 {
3699   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3700   GstFlowReturn ret;
3701
3702   ret = klass->update_manifest (demux);
3703
3704   return ret;
3705 }
3706
3707 /* must be called with manifest_lock taken */
3708 gboolean
3709 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3710 {
3711   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3712   gboolean ret = FALSE;
3713
3714   if (klass->has_next_period)
3715     ret = klass->has_next_period (demux);
3716   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3717   return ret;
3718 }
3719
3720 /* must be called with manifest_lock taken */
3721 void
3722 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3723 {
3724   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3725   GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
3726
3727   g_return_if_fail (klass->advance_period != NULL);
3728
3729   GST_DEBUG_OBJECT (demux, "Advancing to next period");
3730   /* FIXME : no return value ? What if it fails ? */
3731   klass->advance_period (demux);
3732
3733   if (previous_period == demux->input_period) {
3734     GST_ERROR_OBJECT (demux, "Advancing period failed");
3735     return;
3736   }
3737
3738   /* Stop the previous period stream tasks */
3739   gst_adaptive_demux_period_stop_tasks (previous_period);
3740
3741   gst_adaptive_demux_update_collection (demux, demux->input_period);
3742   /* Figure out a pre-emptive selection based on the output period selection */
3743   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
3744       demux->output_period);
3745
3746   gst_adaptive_demux_prepare_streams (demux, FALSE);
3747   gst_adaptive_demux_start_tasks (demux);
3748 }
3749
3750 /**
3751  * gst_adaptive_demux_get_monotonic_time:
3752  * Returns: a monotonically increasing time, using the system realtime clock
3753  */
3754 GstClockTime
3755 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
3756 {
3757   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3758   return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
3759 }
3760
3761 /**
3762  * gst_adaptive_demux_get_client_now_utc:
3763  * @demux: #GstAdaptiveDemux
3764  * Returns: the client's estimate of UTC
3765  *
3766  * Used to find the client's estimate of UTC, using the system realtime clock.
3767  */
3768 GDateTime *
3769 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
3770 {
3771   return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
3772 }
3773
3774 /**
3775  * gst_adaptive_demux_is_running
3776  * @demux: #GstAdaptiveDemux
3777  * Returns: whether the demuxer is processing data
3778  *
3779  * Returns FALSE if shutdown has started (transitioning down from
3780  * PAUSED), otherwise TRUE.
3781  */
3782 gboolean
3783 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
3784 {
3785   return g_atomic_int_get (&demux->running);
3786 }
3787
3788 /**
3789  * gst_adaptive_demux_get_qos_earliest_time:
3790  *
3791  * Returns: The QOS earliest time
3792  *
3793  * Since: 1.20
3794  */
3795 GstClockTime
3796 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
3797 {
3798   GstClockTime earliest;
3799
3800   GST_OBJECT_LOCK (demux);
3801   earliest = demux->priv->qos_earliest_time;
3802   GST_OBJECT_UNLOCK (demux);
3803
3804   return earliest;
3805 }
3806
3807 gboolean
3808 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
3809     GstAdaptiveDemux2Stream * stream)
3810 {
3811   g_return_val_if_fail (demux && stream, FALSE);
3812
3813   /* FIXME : Migrate to parent */
3814   g_return_val_if_fail (stream->demux == NULL, FALSE);
3815
3816   GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
3817
3818   TRACKS_LOCK (demux);
3819   if (demux->input_period->prepared) {
3820     GST_ERROR_OBJECT (demux,
3821         "Attempted to add streams but no new period was created");
3822     TRACKS_UNLOCK (demux);
3823     return FALSE;
3824   }
3825   stream->demux = demux;
3826   stream->period = demux->input_period;
3827   demux->input_period->streams =
3828       g_list_append (demux->input_period->streams, stream);
3829
3830   if (stream->tracks) {
3831     GList *iter;
3832     for (iter = stream->tracks; iter; iter = iter->next)
3833       if (!gst_adaptive_demux_period_add_track (demux->input_period,
3834               (GstAdaptiveDemuxTrack *) iter->data)) {
3835         GST_ERROR_OBJECT (demux, "Failed to add track elements");
3836         TRACKS_UNLOCK (demux);
3837         return FALSE;
3838       }
3839   }
3840   TRACKS_UNLOCK (demux);
3841   return TRUE;
3842 }
3843
3844 /* Return the current playback rate including any instant rate multiplier */
3845 gdouble
3846 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
3847 {
3848   gdouble rate;
3849   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3850   rate = demux->segment.rate * demux->instant_rate_multiplier;
3851   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3852   return rate;
3853 }