adaptivedemux2: Fix collection leaks
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:plugin-adaptivedemux2
28  * @short_description: Next Generation adaptive demuxers
29  *
30  * What is an adaptive demuxer?  Adaptive demuxers are special demuxers in the
31  * sense that they don't actually demux data received from upstream but download
32  * the data themselves.
33  *
34  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35  * of fragments. The manifest describes the available media and the sequence of
36  * fragments to use. Each fragment contains a small part of the media (typically
37  * only a few seconds). It is possible for the manifest to have the same media
38  * available in different configurations (bitrates for example) so that the
39  * client can select the one that best suits its scenario (network fluctuation,
40  * hardware requirements...).
41  *
42  * Furthermore, that manifest can also specify alternative medias (such as audio
43  * or subtitle tracks in different languages). Only the fragments for the
44  * requested selection will be download.
45  *
46  * These elements can therefore "adapt" themselves to the network conditions (as
47  * opposed to the server doing that adaptation) and user choices, which is why
48  * they are called "adaptive" demuxers.
49  *
50  * Note: These elements require a "streams-aware" container to work
51  * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52  * GST_BIN_FLAG_STREAMS_AWARE flag set).
53  *
54  * Subclasses:
55  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56  * about the intrinsics of the subclass formats, so the subclasses are
57  * responsible for maintaining the manifest data structures and stream
58  * information.
59  *
60  * Since: 1.22
61  */
62
63 /*
64 See the adaptive-demuxer.md design documentation for more information
65
66 MT safety.
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
72
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
80
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
97
98 By using the api_lock a thread is protected against other API calls.
99 */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
107
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
111
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
114
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
118
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
121
122 #define DEFAULT_MAX_BUFFERING_TIME (30 *  GST_SECOND)
123
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME 0  /* Automatic */
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
128
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
131
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
135
136 enum
137 {
138   PROP_0,
139   PROP_CONNECTION_SPEED,
140   PROP_BANDWIDTH_TARGET_RATIO,
141   PROP_CONNECTION_BITRATE,
142   PROP_MIN_BITRATE,
143   PROP_MAX_BITRATE,
144   PROP_CURRENT_BANDWIDTH,
145   PROP_MAX_BUFFERING_TIME,
146   PROP_BUFFERING_HIGH_WATERMARK_TIME,
147   PROP_BUFFERING_LOW_WATERMARK_TIME,
148   PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149   PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150   PROP_CURRENT_LEVEL_TIME_VIDEO,
151   PROP_CURRENT_LEVEL_TIME_AUDIO,
152   PROP_LAST
153 };
154
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
157     GST_PAD_SRC,
158     GST_PAD_SOMETIMES,
159     GST_STATIC_CAPS_ANY);
160
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
163     GST_PAD_SRC,
164     GST_PAD_SOMETIMES,
165     GST_STATIC_CAPS_ANY);
166
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
169     GST_PAD_SRC,
170     GST_PAD_SOMETIMES,
171     GST_STATIC_CAPS_ANY);
172
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
175 {
176   /* Output pad */
177   GstPad *pad;
178
179   /* Last flow return */
180   GstFlowReturn flow_ret;
181
182   /* Stream Type */
183   GstStreamType type;
184
185   /* Target track (reference) */
186   GstAdaptiveDemuxTrack *track;
187
188   /* Pending track (which will replace track) */
189   GstAdaptiveDemuxTrack *pending_track;
190
191   /* TRUE if a buffer or a gap event was pushed through this slot. */
192   gboolean pushed_timed_data;
193 } OutputSlot;
194
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
197
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200     GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203     element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
205     GstQuery * query);
206 static gboolean gst_adaptive_demux_send_event (GstElement * element,
207     GstEvent * event);
208
209 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
210
211 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
212     GstEvent * event);
213 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
214     GstObject * parent, GstBuffer * buffer);
215 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
216     GstQuery * query);
217 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
218     GstEvent * event);
219 static gboolean gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
220     GstEvent * event);
221 static gboolean gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux
222     * demux, GstEvent * event);
223
224 static gboolean
225 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
226
227 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
228 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
229 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
230     gboolean first_and_live);
231
232 static GstFlowReturn
233 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
234
235 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
236     demux);
237 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
238     demux);
239
240 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
241 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
242     gboolean stop_updates);
243
244 static gboolean
245 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
246     * demux);
247
248 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
249  * method to get to the padtemplates */
250 GType
251 gst_adaptive_demux_ng_get_type (void)
252 {
253   static gsize type = 0;
254
255   if (g_once_init_enter (&type)) {
256     GType _type;
257     static const GTypeInfo info = {
258       sizeof (GstAdaptiveDemuxClass),
259       NULL,
260       NULL,
261       (GClassInitFunc) gst_adaptive_demux_class_init,
262       NULL,
263       NULL,
264       sizeof (GstAdaptiveDemux),
265       0,
266       (GInstanceInitFunc) gst_adaptive_demux_init,
267     };
268
269     _type = g_type_register_static (GST_TYPE_BIN,
270         "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
271
272     private_offset =
273         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
274
275     g_once_init_leave (&type, _type);
276   }
277   return type;
278 }
279
280 static inline GstAdaptiveDemuxPrivate *
281 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
282 {
283   return (G_STRUCT_MEMBER_P (self, private_offset));
284 }
285
286 static void
287 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
288     const GValue * value, GParamSpec * pspec)
289 {
290   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
291
292   GST_OBJECT_LOCK (demux);
293
294   switch (prop_id) {
295     case PROP_CONNECTION_SPEED:
296       demux->connection_speed = g_value_get_uint (value) * 1000;
297       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
298           demux->connection_speed);
299       break;
300     case PROP_BANDWIDTH_TARGET_RATIO:
301       demux->bandwidth_target_ratio = g_value_get_float (value);
302       break;
303     case PROP_MIN_BITRATE:
304       demux->min_bitrate = g_value_get_uint (value);
305       break;
306     case PROP_MAX_BITRATE:
307       demux->max_bitrate = g_value_get_uint (value);
308       break;
309     case PROP_CONNECTION_BITRATE:
310       demux->connection_speed = g_value_get_uint (value);
311       break;
312       /* FIXME: Recalculate track and buffering levels
313        * when watermarks change? */
314     case PROP_MAX_BUFFERING_TIME:
315       demux->max_buffering_time = g_value_get_uint64 (value);
316       break;
317     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
318       demux->buffering_high_watermark_time = g_value_get_uint64 (value);
319       break;
320     case PROP_BUFFERING_LOW_WATERMARK_TIME:
321       demux->buffering_low_watermark_time = g_value_get_uint64 (value);
322       break;
323     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
324       demux->buffering_high_watermark_fragments = g_value_get_double (value);
325       break;
326     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
327       demux->buffering_low_watermark_fragments = g_value_get_double (value);
328       break;
329     default:
330       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
331       break;
332   }
333
334   GST_OBJECT_UNLOCK (demux);
335 }
336
337 static void
338 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
339     GValue * value, GParamSpec * pspec)
340 {
341   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
342
343   GST_OBJECT_LOCK (demux);
344
345   switch (prop_id) {
346     case PROP_CONNECTION_SPEED:
347       g_value_set_uint (value, demux->connection_speed / 1000);
348       break;
349     case PROP_BANDWIDTH_TARGET_RATIO:
350       g_value_set_float (value, demux->bandwidth_target_ratio);
351       break;
352     case PROP_MIN_BITRATE:
353       g_value_set_uint (value, demux->min_bitrate);
354       break;
355     case PROP_MAX_BITRATE:
356       g_value_set_uint (value, demux->max_bitrate);
357       break;
358     case PROP_CONNECTION_BITRATE:
359       g_value_set_uint (value, demux->connection_speed);
360       break;
361     case PROP_CURRENT_BANDWIDTH:
362       g_value_set_uint (value, demux->current_download_rate);
363       break;
364     case PROP_MAX_BUFFERING_TIME:
365       g_value_set_uint64 (value, demux->max_buffering_time);
366       break;
367     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
368       g_value_set_uint64 (value, demux->buffering_high_watermark_time);
369       break;
370     case PROP_BUFFERING_LOW_WATERMARK_TIME:
371       g_value_set_uint64 (value, demux->buffering_low_watermark_time);
372       break;
373     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
374       g_value_set_double (value, demux->buffering_high_watermark_fragments);
375       break;
376     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
377       g_value_set_double (value, demux->buffering_low_watermark_fragments);
378       break;
379     case PROP_CURRENT_LEVEL_TIME_VIDEO:
380       g_value_set_uint64 (value, demux->current_level_time_video);
381       break;
382     case PROP_CURRENT_LEVEL_TIME_AUDIO:
383       g_value_set_uint64 (value, demux->current_level_time_audio);
384       break;
385     default:
386       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
387       break;
388   }
389
390   GST_OBJECT_UNLOCK (demux);
391 }
392
393 static void
394 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
395 {
396   GObjectClass *gobject_class;
397   GstElementClass *gstelement_class;
398   GstBinClass *gstbin_class;
399
400   gobject_class = G_OBJECT_CLASS (klass);
401   gstelement_class = GST_ELEMENT_CLASS (klass);
402   gstbin_class = GST_BIN_CLASS (klass);
403
404   GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
405       "Base Adaptive Demux (ng)");
406
407   parent_class = g_type_class_peek_parent (klass);
408
409   if (private_offset != 0)
410     g_type_class_adjust_private_offset (klass, &private_offset);
411
412   gobject_class->set_property = gst_adaptive_demux_set_property;
413   gobject_class->get_property = gst_adaptive_demux_get_property;
414   gobject_class->finalize = gst_adaptive_demux_finalize;
415
416   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
417       g_param_spec_uint ("connection-speed", "Connection Speed",
418           "Network connection speed to use in kbps (0 = calculate from downloaded"
419           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
420           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
421
422   g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
423       g_param_spec_float ("bandwidth-target-ratio",
424           "Ratio of target bandwidth / available bandwidth",
425           "Limit of the available bitrate to use when switching to alternates",
426           0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
427           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428
429   g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
430       g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
431           "Network connection speed to use (0 = automatic) (bits/s)",
432           0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
433           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434
435   g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
436       g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
437           "Minimum bitrate to use when switching to alternates (bits/s)",
438           0, G_MAXUINT, DEFAULT_MIN_BITRATE,
439           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
442       g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
443           "Maximum bitrate to use when switching to alternates (bits/s)",
444           0, G_MAXUINT, DEFAULT_MAX_BITRATE,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446
447   g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
448       g_param_spec_uint ("current-bandwidth",
449           "Current download bandwidth (bits/s)",
450           "Report of current download bandwidth (based on arriving data) (bits/s)",
451           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
452
453   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
454       g_param_spec_uint64 ("max-buffering-time",
455           "Buffering maximum size (ns)",
456           "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
457           0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
458           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
459           G_PARAM_STATIC_STRINGS));
460
461   g_object_class_install_property (gobject_class,
462       PROP_BUFFERING_HIGH_WATERMARK_TIME,
463       g_param_spec_uint64 ("high-watermark-time",
464           "High buffering watermark size (ns)",
465           "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
466           0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
467           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468           G_PARAM_STATIC_STRINGS));
469
470   g_object_class_install_property (gobject_class,
471       PROP_BUFFERING_LOW_WATERMARK_TIME,
472       g_param_spec_uint64 ("low-watermark-time",
473           "Low buffering watermark size (ns)",
474           "Low watermark for parsed data below which downloads are resumed (in ns, 0=automatic)",
475           0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
476           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477           G_PARAM_STATIC_STRINGS));
478
479   g_object_class_install_property (gobject_class,
480       PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
481       g_param_spec_double ("high-watermark-fragments",
482           "High buffering watermark size (fragments)",
483           "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
484           0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
485           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
486           G_PARAM_STATIC_STRINGS));
487
488   g_object_class_install_property (gobject_class,
489       PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
490       g_param_spec_double ("low-watermark-fragments",
491           "Low buffering watermark size (fragments)",
492           "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
493           0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
494           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495           G_PARAM_STATIC_STRINGS));
496
497   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
498       g_param_spec_uint64 ("current-level-time-video",
499           "Currently buffered level of video (ns)",
500           "Currently buffered level of video track(s) (ns)",
501           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
502           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
503           G_PARAM_STATIC_STRINGS));
504
505   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
506       g_param_spec_uint64 ("current-level-time-audio",
507           "Currently buffered level of audio (ns)",
508           "Currently buffered level of audio track(s) (ns)",
509           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
510           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
511           G_PARAM_STATIC_STRINGS));
512
513   gst_element_class_add_static_pad_template (gstelement_class,
514       &gst_adaptive_demux_audiosrc_template);
515   gst_element_class_add_static_pad_template (gstelement_class,
516       &gst_adaptive_demux_videosrc_template);
517   gst_element_class_add_static_pad_template (gstelement_class,
518       &gst_adaptive_demux_subtitlesrc_template);
519
520   gstelement_class->change_state = gst_adaptive_demux_change_state;
521   gstelement_class->query = gst_adaptive_demux_query;
522   gstelement_class->send_event = gst_adaptive_demux_send_event;
523
524   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
525
526   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
527   klass->requires_periodical_playlist_update =
528       gst_adaptive_demux_requires_periodical_playlist_update_default;
529   gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
530 }
531
532 static void
533 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
534     GstAdaptiveDemuxClass * klass)
535 {
536   GstPadTemplate *pad_template;
537
538   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
539
540   demux->priv = gst_adaptive_demux_get_instance_private (demux);
541   demux->priv->input_adapter = gst_adapter_new ();
542   demux->realtime_clock = gst_adaptive_demux_clock_new ();
543
544   demux->download_helper = downloadhelper_new (demux->realtime_clock);
545   demux->priv->segment_seqnum = gst_util_seqnum_next ();
546   demux->have_group_id = FALSE;
547   demux->group_id = G_MAXUINT;
548
549   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
550   demux->instant_rate_multiplier = 1.0;
551
552   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
553       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
554
555   g_rec_mutex_init (&demux->priv->manifest_lock);
556
557   demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
558
559   g_mutex_init (&demux->priv->api_lock);
560   g_mutex_init (&demux->priv->segment_lock);
561
562   g_mutex_init (&demux->priv->tracks_lock);
563   g_cond_init (&demux->priv->tracks_add);
564
565   g_mutex_init (&demux->priv->buffering_lock);
566
567   demux->priv->periods = g_queue_new ();
568
569   pad_template =
570       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
571   g_return_if_fail (pad_template != NULL);
572
573   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
574   gst_pad_set_event_function (demux->sinkpad,
575       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
576   gst_pad_set_chain_function (demux->sinkpad,
577       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
578
579   /* Properties */
580   demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
581   demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
582   demux->min_bitrate = DEFAULT_MIN_BITRATE;
583   demux->max_bitrate = DEFAULT_MAX_BITRATE;
584
585   demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
586   demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
587   demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
588   demux->buffering_high_watermark_fragments =
589       DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
590   demux->buffering_low_watermark_fragments =
591       DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
592
593   demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
594   demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
595
596   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
597
598   demux->priv->duration = GST_CLOCK_TIME_NONE;
599
600   /* Output combiner */
601   demux->priv->flowcombiner = gst_flow_combiner_new ();
602
603   /* Output task */
604   g_rec_mutex_init (&demux->priv->output_lock);
605   demux->priv->output_task =
606       gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
607       NULL);
608   gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
609 }
610
611 static void
612 gst_adaptive_demux_finalize (GObject * object)
613 {
614   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
615   GstAdaptiveDemuxPrivate *priv = demux->priv;
616
617   GST_DEBUG_OBJECT (object, "finalize");
618
619   g_object_unref (priv->input_adapter);
620
621   downloadhelper_free (demux->download_helper);
622
623   g_rec_mutex_clear (&demux->priv->manifest_lock);
624   g_mutex_clear (&demux->priv->api_lock);
625   g_mutex_clear (&demux->priv->segment_lock);
626
627   g_mutex_clear (&demux->priv->buffering_lock);
628
629   gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
630
631   /* The input period is present after a reset, clear it now */
632   if (demux->input_period)
633     gst_adaptive_demux_period_unref (demux->input_period);
634
635   if (demux->realtime_clock) {
636     gst_adaptive_demux_clock_unref (demux->realtime_clock);
637     demux->realtime_clock = NULL;
638   }
639   g_object_unref (priv->output_task);
640   g_rec_mutex_clear (&priv->output_lock);
641
642   gst_flow_combiner_free (priv->flowcombiner);
643
644   g_queue_free (priv->periods);
645
646   G_OBJECT_CLASS (parent_class)->finalize (object);
647 }
648
649 static gboolean
650 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
651 {
652   gboolean ret = FALSE;
653   GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
654
655   ret = (parent && GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE));
656
657   return ret;
658 }
659
660 static GstStateChangeReturn
661 gst_adaptive_demux_change_state (GstElement * element,
662     GstStateChange transition)
663 {
664   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
665   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
666
667   switch (transition) {
668     case GST_STATE_CHANGE_NULL_TO_READY:
669       if (!gst_adaptive_demux_check_streams_aware (demux)) {
670         GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
671             (_("Element requires a streams-aware context.")), (NULL));
672         goto fail_out;
673       }
674       break;
675     case GST_STATE_CHANGE_PAUSED_TO_READY:
676       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
677         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
678
679       gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
680       downloadhelper_stop (demux->download_helper);
681
682       TRACKS_LOCK (demux);
683       demux->priv->flushing = TRUE;
684       g_cond_signal (&demux->priv->tracks_add);
685       gst_task_stop (demux->priv->output_task);
686       TRACKS_UNLOCK (demux);
687
688       gst_task_join (demux->priv->output_task);
689
690       GST_API_LOCK (demux);
691       gst_adaptive_demux_reset (demux);
692       GST_API_UNLOCK (demux);
693       break;
694     case GST_STATE_CHANGE_READY_TO_PAUSED:
695       GST_API_LOCK (demux);
696       gst_adaptive_demux_reset (demux);
697
698       gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
699       if (g_atomic_int_get (&demux->priv->have_manifest))
700         gst_adaptive_demux_start_manifest_update_task (demux);
701       GST_API_UNLOCK (demux);
702       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
703         GST_DEBUG_OBJECT (demux, "demuxer has started running");
704       /* gst_task_start (demux->priv->output_task); */
705       break;
706     default:
707       break;
708   }
709
710   /* this must be run with the scheduler and output tasks stopped. */
711   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
712
713   switch (transition) {
714     case GST_STATE_CHANGE_READY_TO_PAUSED:
715       /* Start download task */
716       downloadhelper_start (demux->download_helper);
717       break;
718     default:
719       break;
720   }
721
722 fail_out:
723   return result;
724 }
725
726 static void
727 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
728     OutputSlot * slot)
729 {
730   GstEvent *eos = gst_event_new_eos ();
731   GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
732
733   /* FIXME: The slot might not have output any data, caps or segment yet */
734   gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
735   gst_pad_push_event (slot->pad, eos);
736   gst_pad_set_active (slot->pad, FALSE);
737   gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
738   gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
739   if (slot->track)
740     gst_adaptive_demux_track_unref (slot->track);
741   if (slot->pending_track)
742     gst_adaptive_demux_track_unref (slot->pending_track);
743
744   g_slice_free (OutputSlot, slot);
745 }
746
747 static OutputSlot *
748 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
749     GstStreamType streamtype)
750 {
751   OutputSlot *slot;
752   GstPadTemplate *tmpl;
753   gchar *name;
754
755   switch (streamtype) {
756     case GST_STREAM_TYPE_AUDIO:
757       name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
758       tmpl =
759           gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
760       break;
761     case GST_STREAM_TYPE_VIDEO:
762       name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
763       tmpl =
764           gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
765       break;
766     case GST_STREAM_TYPE_TEXT:
767       name =
768           g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
769       tmpl =
770           gst_static_pad_template_get
771           (&gst_adaptive_demux_subtitlesrc_template);
772       break;
773     default:
774       g_assert_not_reached ();
775       return NULL;
776   }
777
778   slot = g_slice_new0 (OutputSlot);
779   slot->type = streamtype;
780   slot->pushed_timed_data = FALSE;
781
782   /* Create and activate new pads */
783   slot->pad = gst_pad_new_from_template (tmpl, name);
784   g_free (name);
785   gst_object_unref (tmpl);
786
787   gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
788   gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
789   gst_pad_set_active (slot->pad, TRUE);
790
791   gst_pad_set_query_function (slot->pad,
792       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
793   gst_pad_set_event_function (slot->pad,
794       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
795
796   gst_pad_set_element_private (slot->pad, slot);
797
798   GST_INFO_OBJECT (demux, "Created output slot %s:%s",
799       GST_DEBUG_PAD_NAME (slot->pad));
800   return slot;
801 }
802
803 /* Called:
804  * * After `process_manifest` or when a period starts
805  * * Or when all tracks have been created
806  *
807  * Goes over tracks and creates the collection
808  *
809  * Returns TRUE if the collection was fully created.
810  *
811  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
812  * */
813 static gboolean
814 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
815     GstAdaptiveDemuxPeriod * period)
816 {
817   GstStreamCollection *collection;
818   GList *iter;
819
820   GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
821
822   if (!period->tracks_changed) {
823     GST_DEBUG_OBJECT (demux, "Tracks didn't change");
824     return TRUE;
825   }
826
827   if (!period->tracks) {
828     GST_WARNING_OBJECT (demux, "No tracks registered/present");
829     return FALSE;
830   }
831
832   if (gst_adaptive_demux_period_has_pending_tracks (period)) {
833     GST_DEBUG_OBJECT (demux,
834         "Streams still have pending tracks, not creating/updating collection");
835     return FALSE;
836   }
837
838   /* Update collection */
839   collection = gst_stream_collection_new ("adaptivedemux");
840
841   for (iter = period->tracks; iter; iter = iter->next) {
842     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
843
844     GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
845     gst_stream_collection_add_stream (collection,
846         gst_object_ref (track->stream_object));
847   }
848
849   if (period->collection)
850     gst_object_unref (period->collection);
851   period->collection = collection;
852
853   return TRUE;
854 }
855
856 /*
857  * Called for the output period:
858  * * after `update_collection()` if the input period is the same as the output period
859  * * When the output period changes
860  *
861  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
862  */
863 static gboolean
864 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
865 {
866   GstStreamCollection *collection;
867   GstAdaptiveDemuxPeriod *period = demux->output_period;
868   guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
869
870   g_return_val_if_fail (period, FALSE);
871   if (!period->collection) {
872     GST_DEBUG_OBJECT (demux, "No collection available yet");
873     return TRUE;
874   }
875
876   collection = period->collection;
877
878   GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
879       period->period_num);
880
881   /* Post collection */
882   TRACKS_UNLOCK (demux);
883   GST_MANIFEST_UNLOCK (demux);
884
885   gst_element_post_message (GST_ELEMENT_CAST (demux),
886       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
887
888   GST_MANIFEST_LOCK (demux);
889   TRACKS_LOCK (demux);
890
891   /* If no stream selection was handled, make a default selection */
892   if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
893     gst_adaptive_demux_period_select_default_tracks (demux,
894         demux->output_period);
895   }
896
897   /* Make sure the output task is running */
898   if (gst_adaptive_demux2_is_running (demux)) {
899     demux->priv->flushing = FALSE;
900     GST_DEBUG_OBJECT (demux, "Starting the output task");
901     gst_task_start (demux->priv->output_task);
902   }
903
904   return TRUE;
905 }
906
907 static gboolean
908 handle_incoming_manifest (GstAdaptiveDemux * demux)
909 {
910   GstAdaptiveDemuxClass *demux_class;
911   GstQuery *query;
912   gboolean query_res;
913   gboolean ret = TRUE;
914   gsize available;
915   GstBuffer *manifest_buffer;
916
917   GST_API_LOCK (demux);
918   GST_MANIFEST_LOCK (demux);
919
920   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
921
922   available = gst_adapter_available (demux->priv->input_adapter);
923
924   if (available == 0)
925     goto eos_without_data;
926
927   GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
928
929   /* Need to get the URI to use it as a base to generate the fragment's
930    * uris */
931   query = gst_query_new_uri ();
932   query_res = gst_pad_peer_query (demux->sinkpad, query);
933   if (query_res) {
934     gchar *uri, *redirect_uri;
935     gboolean permanent;
936
937     gst_query_parse_uri (query, &uri);
938     gst_query_parse_uri_redirection (query, &redirect_uri);
939     gst_query_parse_uri_redirection_permanent (query, &permanent);
940
941     if (permanent && redirect_uri) {
942       demux->manifest_uri = redirect_uri;
943       demux->manifest_base_uri = NULL;
944       g_free (uri);
945     } else {
946       demux->manifest_uri = uri;
947       demux->manifest_base_uri = redirect_uri;
948     }
949
950     GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
951         demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
952   } else {
953     GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
954   }
955   gst_query_unref (query);
956
957   /* If somehow we didn't receive a stream-start with a group_id, pick one now */
958   if (!demux->have_group_id) {
959     demux->have_group_id = TRUE;
960     demux->group_id = gst_util_group_id_next ();
961   }
962
963   /* Let the subclass parse the manifest */
964   manifest_buffer =
965       gst_adapter_take_buffer (demux->priv->input_adapter, available);
966   ret = demux_class->process_manifest (demux, manifest_buffer);
967   gst_buffer_unref (manifest_buffer);
968
969   gst_element_post_message (GST_ELEMENT_CAST (demux),
970       gst_message_new_element (GST_OBJECT_CAST (demux),
971           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
972               "manifest-uri", G_TYPE_STRING,
973               demux->manifest_uri, "uri", G_TYPE_STRING,
974               demux->manifest_uri,
975               "manifest-download-start", GST_TYPE_CLOCK_TIME,
976               GST_CLOCK_TIME_NONE,
977               "manifest-download-stop", GST_TYPE_CLOCK_TIME,
978               gst_util_get_timestamp (), NULL)));
979
980   if (!ret)
981     goto invalid_manifest;
982
983   /* Streams should have been added to the input period if the manifest parsing
984    * succeeded */
985   if (!demux->input_period->streams)
986     goto no_streams;
987
988   g_atomic_int_set (&demux->priv->have_manifest, TRUE);
989
990   GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
991   /* Send duration message */
992   if (!gst_adaptive_demux_is_live (demux)) {
993     GstClockTime duration = demux_class->get_duration (demux);
994
995     demux->priv->duration = duration;
996     if (duration != GST_CLOCK_TIME_NONE) {
997       GST_DEBUG_OBJECT (demux,
998           "Sending duration message : %" GST_TIME_FORMAT,
999           GST_TIME_ARGS (duration));
1000       gst_element_post_message (GST_ELEMENT (demux),
1001           gst_message_new_duration_changed (GST_OBJECT (demux)));
1002     } else {
1003       GST_DEBUG_OBJECT (demux,
1004           "media duration unknown, can not send the duration message");
1005     }
1006   }
1007
1008   TRACKS_LOCK (demux);
1009   /* New streams/tracks will have been added to the input period */
1010   /* The input period has streams, make it the active output period */
1011   /* FIXME : Factorize this into a function to make a period active */
1012   demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1013   ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1014       gst_adaptive_demux_post_collection (demux);
1015   TRACKS_UNLOCK (demux);
1016
1017   gst_adaptive_demux_prepare_streams (demux,
1018       gst_adaptive_demux_is_live (demux));
1019   gst_adaptive_demux_start_tasks (demux);
1020   gst_adaptive_demux_start_manifest_update_task (demux);
1021
1022 unlock_out:
1023   GST_MANIFEST_UNLOCK (demux);
1024   GST_API_UNLOCK (demux);
1025
1026   return ret;
1027
1028   /* ERRORS */
1029 eos_without_data:
1030   {
1031     GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1032     ret = FALSE;
1033     goto unlock_out;
1034   }
1035
1036 no_streams:
1037   {
1038     /* no streams */
1039     GST_WARNING_OBJECT (demux, "No streams created from manifest");
1040     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1041         (_("This file contains no playable streams.")),
1042         ("No known stream formats found at the Manifest"));
1043     ret = FALSE;
1044     goto unlock_out;
1045   }
1046
1047 invalid_manifest:
1048   {
1049     GST_MANIFEST_UNLOCK (demux);
1050     GST_API_UNLOCK (demux);
1051
1052     /* In most cases, this will happen if we set a wrong url in the
1053      * source element and we have received the 404 HTML response instead of
1054      * the manifest */
1055     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1056     return FALSE;
1057   }
1058 }
1059
1060 struct http_headers_collector
1061 {
1062   GstAdaptiveDemux *demux;
1063   gchar **cookies;
1064 };
1065
1066 static gboolean
1067 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1068     const GValue * value, gpointer userdata)
1069 {
1070   struct http_headers_collector *hdr_data = userdata;
1071   GstAdaptiveDemux *demux = hdr_data->demux;
1072   const gchar *field_name = g_quark_to_string (field_id);
1073
1074   if (G_UNLIKELY (value == NULL))
1075     return TRUE;                /* This should not happen */
1076
1077   if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1078     const gchar *user_agent = g_value_get_string (value);
1079
1080     GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1081     downloadhelper_set_user_agent (demux->download_helper, user_agent);
1082   }
1083
1084   if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1085       g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1086     guint i = 0, prev_len = 0, total_len = 0;
1087     gchar **cookies = NULL;
1088
1089     if (hdr_data->cookies != NULL)
1090       prev_len = g_strv_length (hdr_data->cookies);
1091
1092     if (GST_VALUE_HOLDS_ARRAY (value)) {
1093       total_len = gst_value_array_get_size (value) + prev_len;
1094       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1095
1096       for (i = 0; i < gst_value_array_get_size (value); i++) {
1097         GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1098             g_value_get_string (gst_value_array_get_value (value, i)));
1099         cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1100       }
1101     } else if (G_VALUE_HOLDS_STRING (value)) {
1102       total_len = 1 + prev_len;
1103       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1104
1105       GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1106           g_value_get_string (value));
1107       cookies[0] = g_value_dup_string (value);
1108     } else {
1109       GST_WARNING_OBJECT (demux, "%s field is not string or array",
1110           g_quark_to_string (field_id));
1111     }
1112
1113     if (cookies) {
1114       if (prev_len) {
1115         guint j;
1116         for (j = 0; j < prev_len; j++) {
1117           GST_DEBUG_OBJECT (demux,
1118               "Append existing cookie %s", hdr_data->cookies[j]);
1119           cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1120         }
1121       }
1122       cookies[total_len] = NULL;
1123
1124       g_strfreev (hdr_data->cookies);
1125       hdr_data->cookies = cookies;
1126     }
1127   }
1128
1129   if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1130     const gchar *referer = g_value_get_string (value);
1131     GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1132
1133     downloadhelper_set_referer (demux->download_helper, referer);
1134   }
1135
1136   /* Date header can be used to estimate server offset */
1137   if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1138     const gchar *http_date = g_value_get_string (value);
1139
1140     if (http_date) {
1141       GstDateTime *datetime =
1142           gst_adaptive_demux_util_parse_http_head_date (http_date);
1143
1144       if (datetime) {
1145         GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1146         gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1147
1148         GST_INFO_OBJECT (demux,
1149             "HTTP response Date %s", GST_STR_NULL (date_string));
1150         g_free (date_string);
1151
1152         gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1153
1154         g_date_time_unref (utc_now);
1155         gst_date_time_unref (datetime);
1156       }
1157     }
1158   }
1159
1160   return TRUE;
1161 }
1162
1163 static gboolean
1164 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1165     GstEvent * event)
1166 {
1167   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1168   gboolean ret;
1169
1170   switch (event->type) {
1171     case GST_EVENT_FLUSH_STOP:{
1172       GST_API_LOCK (demux);
1173       GST_MANIFEST_LOCK (demux);
1174
1175       gst_adaptive_demux_reset (demux);
1176
1177       ret = gst_pad_event_default (pad, parent, event);
1178
1179       GST_MANIFEST_UNLOCK (demux);
1180       GST_API_UNLOCK (demux);
1181
1182       return ret;
1183     }
1184     case GST_EVENT_EOS:
1185     {
1186       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1187         if (!handle_incoming_manifest (demux)) {
1188           GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1189           return gst_pad_event_default (pad, parent, event);
1190         }
1191         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1192       } else {
1193         GST_ERROR_OBJECT (demux,
1194             "Failed to acquire scheduler to handle manifest");
1195         return gst_pad_event_default (pad, parent, event);
1196       }
1197       gst_event_unref (event);
1198       return TRUE;
1199     }
1200     case GST_EVENT_STREAM_START:
1201       if (gst_event_parse_group_id (event, &demux->group_id))
1202         demux->have_group_id = TRUE;
1203       else
1204         demux->have_group_id = FALSE;
1205       /* Swallow stream-start, we'll push our own */
1206       gst_event_unref (event);
1207       return TRUE;
1208     case GST_EVENT_SEGMENT:
1209       /* Swallow newsegments, we'll push our own */
1210       gst_event_unref (event);
1211       return TRUE;
1212     case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1213       const GstStructure *structure = gst_event_get_structure (event);
1214       struct http_headers_collector c = { demux, NULL };
1215
1216       if (gst_structure_has_name (structure, "http-headers")) {
1217         if (gst_structure_has_field (structure, "request-headers")) {
1218           GstStructure *req_headers = NULL;
1219           gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1220               &req_headers, NULL);
1221           if (req_headers) {
1222             gst_structure_foreach (req_headers,
1223                 gst_adaptive_demux_handle_upstream_http_header, &c);
1224             gst_structure_free (req_headers);
1225           }
1226         }
1227         if (gst_structure_has_field (structure, "response-headers")) {
1228           GstStructure *res_headers = NULL;
1229           gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1230               &res_headers, NULL);
1231           if (res_headers) {
1232             gst_structure_foreach (res_headers,
1233                 gst_adaptive_demux_handle_upstream_http_header, &c);
1234             gst_structure_free (res_headers);
1235           }
1236         }
1237
1238         if (c.cookies)
1239           downloadhelper_set_cookies (demux->download_helper, c.cookies);
1240       }
1241       break;
1242     }
1243     default:
1244       break;
1245   }
1246
1247   return gst_pad_event_default (pad, parent, event);
1248 }
1249
1250 static GstFlowReturn
1251 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1252     GstBuffer * buffer)
1253 {
1254   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1255
1256   GST_MANIFEST_LOCK (demux);
1257
1258   gst_adapter_push (demux->priv->input_adapter, buffer);
1259
1260   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1261       (gint) gst_adapter_available (demux->priv->input_adapter));
1262
1263   GST_MANIFEST_UNLOCK (demux);
1264   return GST_FLOW_OK;
1265 }
1266
1267
1268 /* Called with TRACKS_LOCK taken */
1269 static void
1270 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1271 {
1272   GList *tmp;
1273
1274   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1275     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1276
1277     gst_adaptive_demux_track_flush (track);
1278     if (gst_pad_is_active (track->sinkpad)) {
1279       gst_pad_set_active (track->sinkpad, FALSE);
1280       gst_pad_set_active (track->sinkpad, TRUE);
1281     }
1282   }
1283 }
1284
1285 /* Resets all tracks to their initial state, ready to receive new data. */
1286 static void
1287 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1288 {
1289   TRACKS_LOCK (demux);
1290   g_queue_foreach (demux->priv->periods,
1291       (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1292   TRACKS_UNLOCK (demux);
1293 }
1294
1295 /* Subclasses will call this function to ensure that a new input period is
1296  * available to receive new streams and tracks */
1297 gboolean
1298 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1299 {
1300   if (demux->input_period && !demux->input_period->prepared) {
1301     GST_DEBUG_OBJECT (demux, "Using existing input period");
1302     return TRUE;
1303   }
1304
1305   if (demux->input_period) {
1306     GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1307     demux->input_period->has_next_period = TRUE;
1308   }
1309   GST_DEBUG_OBJECT (demux, "Setting up new period");
1310
1311   demux->input_period = gst_adaptive_demux_period_new (demux);
1312
1313   return TRUE;
1314 }
1315
1316 /* must be called with manifest_lock taken */
1317 static void
1318 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1319 {
1320   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1321   GList *iter;
1322
1323   gst_adaptive_demux_stop_tasks (demux, TRUE);
1324
1325   if (klass->reset)
1326     klass->reset (demux);
1327
1328   /* Disable and remove all outputs */
1329   GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1330   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1331     gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1332   }
1333   g_list_free (demux->priv->outputs);
1334   demux->priv->outputs = NULL;
1335
1336   g_queue_clear_full (demux->priv->periods,
1337       (GDestroyNotify) gst_adaptive_demux_period_unref);
1338
1339   /* The output period always has an extra ref taken on it */
1340   if (demux->output_period)
1341     gst_adaptive_demux_period_unref (demux->output_period);
1342   demux->output_period = NULL;
1343   /* The input period doesn't have an extra ref taken on it */
1344   demux->input_period = NULL;
1345
1346   gst_adaptive_demux_start_new_period (demux);
1347
1348   g_free (demux->manifest_uri);
1349   g_free (demux->manifest_base_uri);
1350   demux->manifest_uri = NULL;
1351   demux->manifest_base_uri = NULL;
1352
1353   gst_adapter_clear (demux->priv->input_adapter);
1354   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1355
1356   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1357   demux->instant_rate_multiplier = 1.0;
1358
1359   demux->priv->duration = GST_CLOCK_TIME_NONE;
1360
1361   demux->priv->percent = -1;
1362   demux->priv->is_buffering = TRUE;
1363
1364   demux->have_group_id = FALSE;
1365   demux->group_id = G_MAXUINT;
1366   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1367
1368   demux->priv->global_output_position = 0;
1369
1370   demux->priv->n_audio_streams = 0;
1371   demux->priv->n_video_streams = 0;
1372   demux->priv->n_subtitle_streams = 0;
1373
1374   gst_flow_combiner_reset (demux->priv->flowcombiner);
1375 }
1376
1377 static gboolean
1378 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1379 {
1380   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1381
1382   GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1383
1384   switch (GST_QUERY_TYPE (query)) {
1385     case GST_QUERY_BUFFERING:
1386     {
1387       GstFormat format;
1388       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1389
1390       if (!demux->output_period) {
1391         if (format != GST_FORMAT_TIME) {
1392           GST_DEBUG_OBJECT (demux,
1393               "No period setup yet, can't answer non-TIME buffering queries");
1394           return FALSE;
1395         }
1396
1397         GST_DEBUG_OBJECT (demux,
1398             "No period setup yet, but still answering buffering query");
1399         return TRUE;
1400       }
1401     }
1402     default:
1403       break;
1404   }
1405
1406   return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1407 }
1408
1409 static gboolean
1410 gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
1411 {
1412   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1413   gboolean res = FALSE;
1414
1415   GST_DEBUG_OBJECT (demux, "Received event %" GST_PTR_FORMAT, event);
1416
1417   switch (GST_EVENT_TYPE (event)) {
1418     case GST_EVENT_SEEK:
1419     {
1420       res = gst_adaptive_demux_handle_seek_event (demux, event);
1421       break;
1422     }
1423     case GST_EVENT_SELECT_STREAMS:
1424     {
1425       res = gst_adaptive_demux_handle_select_streams_event (demux, event);
1426       break;
1427     }
1428     default:
1429       res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1430       break;
1431   }
1432   return res;
1433 }
1434
1435 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1436 static GstAdaptiveDemux2Stream *
1437 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1438 {
1439   GList *iter;
1440
1441   /* We only look in the streams of the input period (i.e. with active streams) */
1442   for (iter = demux->input_period->streams; iter; iter = iter->next) {
1443     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1444     if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1445       return stream;
1446     }
1447   }
1448
1449   return NULL;
1450 }
1451
1452 static void
1453 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1454     GstMessage * msg)
1455 {
1456   GstAdaptiveDemux2Stream *stream;
1457   GstStreamCollection *collection = NULL;
1458   gboolean pending_tracks_activated = FALSE;
1459
1460   GST_MANIFEST_LOCK (demux);
1461
1462   stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1463   if (stream == NULL) {
1464     GST_WARNING_OBJECT (demux,
1465         "Failed to locate stream for collection message");
1466     goto beach;
1467   }
1468
1469   gst_message_parse_stream_collection (msg, &collection);
1470   if (!collection)
1471     goto beach;
1472
1473   TRACKS_LOCK (demux);
1474
1475   if (!gst_adaptive_demux2_stream_handle_collection (stream, collection,
1476           &pending_tracks_activated)) {
1477     TRACKS_UNLOCK (demux);
1478
1479     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1480         (_("Stream format can't be handled")),
1481         ("The streams provided by the multiplex are ambiguous"));
1482     goto beach;
1483   }
1484
1485   if (pending_tracks_activated) {
1486     /* If pending tracks were handled, then update the demuxer collection */
1487     if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1488         demux->input_period == demux->output_period) {
1489       gst_adaptive_demux_post_collection (demux);
1490     }
1491
1492     /* If we discovered pending tracks and we no longer have any, we can ensure
1493      * selected tracks are started */
1494     if (!gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1495       GList *iter = demux->input_period->streams;
1496       for (; iter; iter = iter->next) {
1497         GstAdaptiveDemux2Stream *new_stream = iter->data;
1498
1499         /* The stream that posted this collection was already started. If a
1500          * different stream is now selected, start it */
1501         if (stream != new_stream
1502             && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1503           gst_adaptive_demux2_stream_start (new_stream);
1504       }
1505     }
1506   }
1507   TRACKS_UNLOCK (demux);
1508
1509 beach:
1510   GST_MANIFEST_UNLOCK (demux);
1511
1512   if (collection)
1513     gst_object_unref (collection);
1514   gst_message_unref (msg);
1515   msg = NULL;
1516 }
1517
1518 static void
1519 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1520 {
1521   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1522
1523   switch (GST_MESSAGE_TYPE (msg)) {
1524     case GST_MESSAGE_STREAM_COLLECTION:
1525     {
1526       gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1527       return;
1528     }
1529     case GST_MESSAGE_ERROR:{
1530       GstAdaptiveDemux2Stream *stream = NULL;
1531       GError *err = NULL;
1532       gchar *debug = NULL;
1533       gchar *new_error = NULL;
1534       const GstStructure *details = NULL;
1535
1536       GST_MANIFEST_LOCK (demux);
1537
1538       stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1539       if (stream == NULL) {
1540         GST_WARNING_OBJECT (demux,
1541             "Failed to locate stream for errored element");
1542         GST_MANIFEST_UNLOCK (demux);
1543         break;
1544       }
1545
1546       gst_message_parse_error (msg, &err, &debug);
1547
1548       GST_WARNING_OBJECT (demux,
1549           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1550           err->message, debug);
1551
1552       if (debug)
1553         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1554       if (new_error) {
1555         g_free (err->message);
1556         err->message = new_error;
1557       }
1558
1559       gst_message_parse_error_details (msg, &details);
1560       if (details) {
1561         gst_structure_get_uint (details, "http-status-code",
1562             &stream->last_status_code);
1563       }
1564
1565       /* error, but ask to retry */
1566       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1567         gst_adaptive_demux2_stream_parse_error (stream, err);
1568         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1569       }
1570
1571       g_error_free (err);
1572       g_free (debug);
1573
1574       GST_MANIFEST_UNLOCK (demux);
1575
1576       gst_message_unref (msg);
1577       msg = NULL;
1578     }
1579       break;
1580     default:
1581       break;
1582   }
1583
1584   if (msg)
1585     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1586 }
1587
1588 /* must be called with manifest_lock taken */
1589 GstClockTime
1590 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1591 {
1592   GstAdaptiveDemuxClass *klass;
1593
1594   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1595
1596   if (klass->get_period_start_time == NULL)
1597     return 0;
1598
1599   return klass->get_period_start_time (demux);
1600 }
1601
1602 /* must be called with manifest_lock taken */
1603 static gboolean
1604 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1605     gboolean first_and_live)
1606 {
1607   GList *iter;
1608   GstClockTime period_start;
1609   GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1610   GList *new_streams;
1611
1612   g_return_val_if_fail (demux->input_period->streams, FALSE);
1613   g_assert (demux->input_period->prepared == FALSE);
1614
1615   new_streams = demux->input_period->streams;
1616
1617   if (!gst_adaptive_demux2_is_running (demux)) {
1618     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1619     return TRUE;
1620   }
1621
1622   GST_DEBUG_OBJECT (demux,
1623       "Preparing %d streams for period %d , first_and_live:%d",
1624       g_list_length (new_streams), demux->input_period->period_num,
1625       first_and_live);
1626
1627   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1628     GstAdaptiveDemux2Stream *stream = iter->data;
1629
1630     GST_DEBUG_OBJECT (stream, "Preparing stream");
1631
1632     stream->need_header = TRUE;
1633     stream->discont = TRUE;
1634
1635     /* Grab the first stream time for live streams
1636      * * If the stream is selected
1637      * * Or it provides dynamic tracks (in which case we need to force an update)
1638      */
1639     if (first_and_live
1640         && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1641             || stream->pending_tracks)) {
1642       /* TODO we only need the first timestamp, maybe create a simple function to
1643        * get the current PTS of a fragment ? */
1644       GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1645       gst_adaptive_demux2_stream_update_fragment_info (stream);
1646
1647       GST_DEBUG_OBJECT (stream,
1648           "Got stream time %" GST_STIME_FORMAT,
1649           GST_STIME_ARGS (stream->fragment.stream_time));
1650
1651       if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1652         min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1653       } else {
1654         min_stream_time = stream->fragment.stream_time;
1655       }
1656     }
1657   }
1658
1659   period_start = gst_adaptive_demux_get_period_start_time (demux);
1660
1661   /* For live streams, the subclass is supposed to seek to the current fragment
1662    * and then tell us its stream time in stream->fragment.stream_time.  We now
1663    * also have to seek our demuxer segment to reflect this.
1664    *
1665    * FIXME: This needs some refactoring at some point.
1666    */
1667   if (first_and_live) {
1668     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1669         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1670         GST_SEEK_TYPE_NONE, -1, NULL);
1671   }
1672
1673   GST_DEBUG_OBJECT (demux,
1674       "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1675       " demux segment %" GST_SEGMENT_FORMAT,
1676       GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1677       &demux->segment);
1678
1679   /* Synchronize stream start/current positions */
1680   if (min_stream_time == GST_CLOCK_STIME_NONE)
1681     min_stream_time = period_start;
1682   else
1683     min_stream_time += period_start;
1684   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1685     GstAdaptiveDemux2Stream *stream = iter->data;
1686     stream->start_position = stream->current_position = min_stream_time;
1687   }
1688
1689   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1690     GstAdaptiveDemux2Stream *stream = iter->data;
1691     stream->compute_segment = TRUE;
1692     stream->first_and_live = first_and_live;
1693   }
1694   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1695   demux->input_period->prepared = TRUE;
1696
1697   return TRUE;
1698 }
1699
1700 static GstAdaptiveDemuxTrack *
1701 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1702 {
1703   GList *tmp;
1704
1705   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1706     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1707     if (!g_strcmp0 (track->stream_id, stream_id))
1708       return track;
1709   }
1710
1711   return NULL;
1712 }
1713
1714 /* TRACKS_LOCK hold */
1715 void
1716 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1717 {
1718   GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1719   GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1720   GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1721   GList *tmp;
1722   gint min_percent = -1, percent;
1723   gboolean all_eos = TRUE;
1724
1725   /* Go over all active tracks of the output period and update level */
1726
1727   /* Check that all tracks are above their respective low thresholds (different
1728    * tracks may have different fragment durations yielding different buffering
1729    * percentages) Overall buffering percent is the lowest. */
1730   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1731     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1732
1733     GST_LOG_OBJECT (demux,
1734         "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1735         GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1736         track->stream_id, track->period_num, track->active, track->selected,
1737         track->eos, GST_TIME_ARGS (track->level_time),
1738         GST_TIME_ARGS (track->buffering_threshold));
1739
1740     if (track->active && track->selected) {
1741       if (!track->eos) {
1742         gint cur_percent;
1743
1744         all_eos = FALSE;
1745         if (min_level_time == GST_CLOCK_TIME_NONE) {
1746           min_level_time = track->level_time;
1747         } else if (track->level_time < min_level_time) {
1748           min_level_time = track->level_time;
1749         }
1750
1751         if (track->type & GST_STREAM_TYPE_VIDEO
1752             && video_level_time > track->level_time)
1753           video_level_time = track->level_time;
1754
1755         if (track->type & GST_STREAM_TYPE_AUDIO
1756             && audio_level_time > track->level_time)
1757           audio_level_time = track->level_time;
1758
1759         if (track->level_time != GST_CLOCK_TIME_NONE
1760             && track->buffering_threshold != 0) {
1761           cur_percent =
1762               gst_util_uint64_scale (track->level_time, 100,
1763               track->buffering_threshold);
1764           if (min_percent < 0 || cur_percent < min_percent)
1765             min_percent = cur_percent;
1766         }
1767       }
1768     }
1769   }
1770
1771   GST_DEBUG_OBJECT (demux,
1772       "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1773       GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1774
1775   /* Update demuxer video/audio level properties */
1776   GST_OBJECT_LOCK (demux);
1777   demux->current_level_time_video = video_level_time;
1778   demux->current_level_time_audio = audio_level_time;
1779   GST_OBJECT_UNLOCK (demux);
1780
1781   if (min_percent < 0 && !all_eos)
1782     return;
1783
1784   if (min_percent > 100 || all_eos)
1785     percent = 100;
1786   else
1787     percent = MAX (0, min_percent);
1788
1789   GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1790
1791   if (demux->priv->is_buffering) {
1792     if (percent >= 100)
1793       demux->priv->is_buffering = FALSE;
1794     if (demux->priv->percent != percent) {
1795       demux->priv->percent = percent;
1796       demux->priv->percent_changed = TRUE;
1797     }
1798   } else if (percent < 1) {
1799     demux->priv->is_buffering = TRUE;
1800     if (demux->priv->percent != percent) {
1801       demux->priv->percent = percent;
1802       demux->priv->percent_changed = TRUE;
1803     }
1804   }
1805
1806   if (demux->priv->percent_changed)
1807     GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1808         demux->priv->is_buffering);
1809 }
1810
1811 /* With TRACKS_LOCK held */
1812 void
1813 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1814 {
1815   gint percent;
1816   GstMessage *msg;
1817
1818   if (!demux->priv->percent_changed)
1819     return;
1820
1821   BUFFERING_LOCK (demux);
1822   percent = demux->priv->percent;
1823   msg = gst_message_new_buffering ((GstObject *) demux, percent);
1824   TRACKS_UNLOCK (demux);
1825   gst_element_post_message ((GstElement *) demux, msg);
1826
1827   BUFFERING_UNLOCK (demux);
1828   TRACKS_LOCK (demux);
1829   if (percent == demux->priv->percent)
1830     demux->priv->percent_changed = FALSE;
1831 }
1832
1833 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1834 GstAdaptiveDemux2Stream *
1835 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1836     GstAdaptiveDemuxTrack * track)
1837 {
1838   GList *iter;
1839
1840   for (iter = demux->output_period->streams; iter; iter = iter->next) {
1841     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1842     if (g_list_find (stream->tracks, track))
1843       return stream;
1844   }
1845
1846   return NULL;
1847 }
1848
1849 /* Called from seek handler
1850  *
1851  * This function is used when a (flushing) seek caused a new period to be activated.
1852  *
1853  * This will ensure that:
1854  * * the current output period is marked as finished (EOS)
1855  * * Any potential intermediate (non-input/non-output) periods are removed
1856  * * That the new input period is prepared and ready
1857  */
1858 static void
1859 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1860 {
1861   GList *iter;
1862
1863   GST_DEBUG_OBJECT (demux,
1864       "Preparing new input period %u", demux->input_period->period_num);
1865
1866   /* Prepare the new input period */
1867   gst_adaptive_demux_update_collection (demux, demux->input_period);
1868
1869   /* Transfer the previous selection to the new input period */
1870   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
1871       demux->output_period);
1872   gst_adaptive_demux_prepare_streams (demux, FALSE);
1873
1874   /* Remove all periods except for the input (last) and output (first) period */
1875   while (demux->priv->periods->length > 2) {
1876     GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
1877     /* Mark all tracks of the removed period as not selected and EOS so they
1878      * will be skipped / ignored */
1879     for (iter = period->tracks; iter; iter = iter->next) {
1880       GstAdaptiveDemuxTrack *track = iter->data;
1881       track->selected = FALSE;
1882       track->eos = TRUE;
1883     }
1884     gst_adaptive_demux_period_unref (period);
1885   }
1886
1887   /* Mark all tracks of the output period as EOS so that the output loop
1888    * will immediately move to the new period */
1889   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
1890     GstAdaptiveDemuxTrack *track = iter->data;
1891     track->eos = TRUE;
1892   }
1893
1894   /* Go over all slots, and clear any pending track */
1895   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1896     OutputSlot *slot = (OutputSlot *) iter->data;
1897
1898     if (slot->pending_track != NULL) {
1899       GST_DEBUG_OBJECT (demux,
1900           "Removing track '%s' as pending from output of current track '%s'",
1901           slot->pending_track->stream_id, slot->track->stream_id);
1902       gst_adaptive_demux_track_unref (slot->pending_track);
1903       slot->pending_track = NULL;
1904     }
1905   }
1906 }
1907
1908 /* must be called with manifest_lock taken */
1909 gboolean
1910 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1911     gint64 * range_start, gint64 * range_stop)
1912 {
1913   GstAdaptiveDemuxClass *klass;
1914
1915   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1916
1917   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1918
1919   return klass->get_live_seek_range (demux, range_start, range_stop);
1920 }
1921
1922 /* must be called with manifest_lock taken */
1923 gboolean
1924 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1925     GstAdaptiveDemux2Stream * stream)
1926 {
1927   gint64 range_start, range_stop;
1928   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1929     GST_LOG_OBJECT (stream,
1930         "stream position %" GST_TIME_FORMAT "  live seek range %"
1931         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1932         GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
1933         GST_STIME_ARGS (range_stop));
1934     return (stream->current_position >= range_start
1935         && stream->current_position <= range_stop);
1936   }
1937
1938   return FALSE;
1939 }
1940
1941 /* must be called with manifest_lock taken */
1942 static gboolean
1943 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1944 {
1945   GstAdaptiveDemuxClass *klass;
1946
1947   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1948   if (gst_adaptive_demux_is_live (demux)) {
1949     return klass->get_live_seek_range != NULL;
1950   }
1951
1952   return klass->seek != NULL;
1953 }
1954
1955 static void
1956 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
1957     GstSeekType start_type, GstSeekType stop_type)
1958 {
1959   GList *iter;
1960
1961   for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
1962     GstAdaptiveDemux2Stream *stream = iter->data;
1963
1964     /* Make sure the download loop clears and restarts on the next start,
1965      * which will recompute the stream segment */
1966     g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1967         stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
1968     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
1969     stream->start_position = 0;
1970
1971     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1972       stream->start_position = demux->segment.start;
1973     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1974       stream->start_position = demux->segment.stop;
1975   }
1976 }
1977
1978 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
1979                               GST_SEEK_FLAG_SNAP_AFTER |          \
1980                               GST_SEEK_FLAG_SNAP_NEAREST |        \
1981                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1982                               GST_SEEK_FLAG_KEY_UNIT))
1983 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1984                               GST_SEEK_FLAG_SNAP_AFTER | \
1985                               GST_SEEK_FLAG_SNAP_NEAREST))
1986
1987 static gboolean
1988 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
1989     GstEvent * event)
1990 {
1991   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1992   gdouble rate;
1993   GstFormat format;
1994   GstSeekFlags flags;
1995   GstSeekType start_type, stop_type;
1996   gint64 start, stop;
1997   guint32 seqnum;
1998   gboolean update;
1999   gboolean ret;
2000   GstSegment oldsegment;
2001   GstEvent *flush_event;
2002
2003   GST_INFO_OBJECT (demux, "Received seek event");
2004
2005   GST_API_LOCK (demux);
2006
2007   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2008       &stop_type, &stop);
2009
2010   if (format != GST_FORMAT_TIME) {
2011     GST_API_UNLOCK (demux);
2012     GST_WARNING_OBJECT (demux,
2013         "Adaptive demuxers only support TIME-based seeking");
2014     gst_event_unref (event);
2015     return FALSE;
2016   }
2017
2018   if (flags & GST_SEEK_FLAG_SEGMENT) {
2019     GST_FIXME_OBJECT (demux, "Handle segment seeks");
2020     GST_API_UNLOCK (demux);
2021     gst_event_unref (event);
2022     return FALSE;
2023   }
2024
2025   seqnum = gst_event_get_seqnum (event);
2026
2027   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2028     GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2029     return FALSE;
2030   }
2031
2032   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2033     /* For instant rate seeks, reply directly and update
2034      * our segment so the new rate is reflected in any future
2035      * fragments */
2036     GstEvent *ev;
2037     gdouble rate_multiplier;
2038
2039     /* instant rate change only supported if direction does not change. All
2040      * other requirements are already checked before creating the seek event
2041      * but let's double-check here to be sure */
2042     if ((demux->segment.rate > 0 && rate < 0) ||
2043         (demux->segment.rate < 0 && rate > 0) ||
2044         start_type != GST_SEEK_TYPE_NONE ||
2045         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2046       GST_ERROR_OBJECT (demux,
2047           "Instant rate change seeks only supported in the "
2048           "same direction, without flushing and position change");
2049       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2050       GST_API_UNLOCK (demux);
2051       return FALSE;
2052     }
2053
2054     rate_multiplier = rate / demux->segment.rate;
2055
2056     ev = gst_event_new_instant_rate_change (rate_multiplier,
2057         (GstSegmentFlags) flags);
2058     gst_event_set_seqnum (ev, seqnum);
2059
2060     ret = gst_adaptive_demux_push_src_event (demux, ev);
2061
2062     if (ret) {
2063       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2064       demux->instant_rate_multiplier = rate_multiplier;
2065       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2066     }
2067
2068     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2069     GST_API_UNLOCK (demux);
2070     gst_event_unref (event);
2071
2072     return ret;
2073   }
2074
2075   if (!gst_adaptive_demux_can_seek (demux)) {
2076     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2077
2078     GST_API_UNLOCK (demux);
2079     gst_event_unref (event);
2080     return FALSE;
2081   }
2082
2083   /* We can only accept flushing seeks from this point onward */
2084   if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2085     GST_ERROR_OBJECT (demux,
2086         "Non-flushing non-instant-rate seeks are not possible");
2087
2088     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2089
2090     GST_API_UNLOCK (demux);
2091     gst_event_unref (event);
2092     return FALSE;
2093   }
2094
2095   if (gst_adaptive_demux_is_live (demux)) {
2096     gint64 range_start, range_stop;
2097     gboolean changed = FALSE;
2098     gboolean start_valid = TRUE, stop_valid = TRUE;
2099
2100     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2101             &range_stop)) {
2102       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2103       GST_API_UNLOCK (demux);
2104       gst_event_unref (event);
2105       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2106       return FALSE;
2107     }
2108
2109     GST_DEBUG_OBJECT (demux,
2110         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2111         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2112
2113     /* Handle relative positioning for live streams (relative to the range_stop) */
2114     if (start_type == GST_SEEK_TYPE_END) {
2115       start = range_stop + start;
2116       start_type = GST_SEEK_TYPE_SET;
2117       changed = TRUE;
2118     }
2119     if (stop_type == GST_SEEK_TYPE_END) {
2120       stop = range_stop + stop;
2121       stop_type = GST_SEEK_TYPE_SET;
2122       changed = TRUE;
2123     }
2124
2125     /* Adjust the requested start/stop position if it falls beyond the live
2126      * seek range.
2127      * The only case where we don't adjust is for the starting point of
2128      * an accurate seek (start if forward and stop if backwards)
2129      */
2130     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2131         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2132       GST_DEBUG_OBJECT (demux,
2133           "seek before live stream start, setting to range start: %"
2134           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2135       start = range_start;
2136       changed = TRUE;
2137     }
2138     /* truncate stop position also if set */
2139     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2140         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2141       GST_DEBUG_OBJECT (demux,
2142           "seek ending after live start, adjusting to: %"
2143           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2144       stop = range_stop;
2145       changed = TRUE;
2146     }
2147
2148     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2149         (start < range_start || start > range_stop)) {
2150       GST_WARNING_OBJECT (demux,
2151           "Seek to invalid position start:%" GST_STIME_FORMAT
2152           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2153           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2154           GST_STIME_ARGS (range_stop));
2155       start_valid = FALSE;
2156     }
2157     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2158         (stop < range_start || stop > range_stop)) {
2159       GST_WARNING_OBJECT (demux,
2160           "Seek to invalid position stop:%" GST_STIME_FORMAT
2161           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2162           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2163           GST_STIME_ARGS (range_stop));
2164       stop_valid = FALSE;
2165     }
2166
2167     /* If the seek position is still outside of the seekable range, refuse the seek */
2168     if (!start_valid || !stop_valid) {
2169       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2170       GST_API_UNLOCK (demux);
2171       gst_event_unref (event);
2172       return FALSE;
2173     }
2174
2175     /* Re-create seek event with changed/updated values */
2176     if (changed) {
2177       gst_event_unref (event);
2178       event =
2179           gst_event_new_seek (rate, format, flags,
2180           start_type, start, stop_type, stop);
2181       gst_event_set_seqnum (event, seqnum);
2182     }
2183   }
2184
2185   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2186
2187   /* have a backup in case seek fails */
2188   gst_segment_copy_into (&demux->segment, &oldsegment);
2189
2190   GST_DEBUG_OBJECT (demux, "sending flush start");
2191   flush_event = gst_event_new_flush_start ();
2192   gst_event_set_seqnum (flush_event, seqnum);
2193
2194   gst_adaptive_demux_push_src_event (demux, flush_event);
2195
2196   gst_adaptive_demux_stop_tasks (demux, FALSE);
2197   gst_adaptive_demux_reset_tracks (demux);
2198
2199   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2200
2201   /*
2202    * Handle snap seeks as follows:
2203    * 1) do the snap seeking a (random) active stream
2204    * 2) use the final position on this stream to seek
2205    *    on the other streams to the same position
2206    *
2207    * We can't snap at all streams at the same time as they might end in
2208    * different positions, so just pick one and align all others to that
2209    * position.
2210    */
2211
2212   GstAdaptiveDemux2Stream *stream = NULL;
2213   GList *iter;
2214   /* Pick a random active stream on which to do the stream seek */
2215   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2216     GstAdaptiveDemux2Stream *cand = iter->data;
2217     if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2218       stream = cand;
2219       break;
2220     }
2221   }
2222
2223   if (stream && IS_SNAP_SEEK (flags)) {
2224     GstClockTimeDiff ts;
2225     GstSeekFlags stream_seek_flags = flags;
2226
2227     /* snap-seek on the chosen stream and then
2228      * use the resulting position to seek on all streams */
2229     if (rate >= 0) {
2230       if (start_type != GST_SEEK_TYPE_NONE)
2231         ts = start;
2232       else {
2233         ts = gst_segment_position_from_running_time (&demux->segment,
2234             GST_FORMAT_TIME, demux->priv->global_output_position);
2235         start_type = GST_SEEK_TYPE_SET;
2236       }
2237     } else {
2238       if (stop_type != GST_SEEK_TYPE_NONE)
2239         ts = stop;
2240       else {
2241         stop_type = GST_SEEK_TYPE_SET;
2242         ts = gst_segment_position_from_running_time (&demux->segment,
2243             GST_FORMAT_TIME, demux->priv->global_output_position);
2244       }
2245     }
2246
2247     if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
2248             ts, &ts) != GST_FLOW_OK) {
2249       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2250
2251       GST_API_UNLOCK (demux);
2252       gst_event_unref (event);
2253       return FALSE;
2254     }
2255
2256     /* replace event with a new one without snapping to seek on all streams */
2257     gst_event_unref (event);
2258     if (rate >= 0) {
2259       start = ts;
2260     } else {
2261       stop = ts;
2262     }
2263     event =
2264         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2265         start_type, start, stop_type, stop);
2266     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2267   }
2268
2269   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2270       start, stop_type, stop, &update);
2271
2272   if (ret) {
2273     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2274
2275     ret = demux_class->seek (demux, event);
2276   }
2277
2278   if (!ret) {
2279     /* Is there anything else we can do if it fails? */
2280     gst_segment_copy_into (&oldsegment, &demux->segment);
2281   } else {
2282     demux->priv->segment_seqnum = seqnum;
2283   }
2284   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2285
2286   /* Resetting flow combiner */
2287   gst_flow_combiner_reset (demux->priv->flowcombiner);
2288
2289   GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2290   flush_event = gst_event_new_flush_stop (TRUE);
2291   gst_event_set_seqnum (flush_event, seqnum);
2292   gst_adaptive_demux_push_src_event (demux, flush_event);
2293
2294   /* If the seek generated a new period, prepare it */
2295   if (!demux->input_period->prepared) {
2296     /* This can only happen on flushing seeks */
2297     g_assert (flags & GST_SEEK_FLAG_FLUSH);
2298     gst_adaptive_demux_seek_to_input_period (demux);
2299   }
2300
2301   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2302   GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2303       &demux->segment);
2304   gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2305   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2306
2307   /* Reset the global output position (running time) for when the output loop restarts */
2308   demux->priv->global_output_position = 0;
2309
2310   /* After a flushing seek, any instant-rate override is undone */
2311   demux->instant_rate_multiplier = 1.0;
2312
2313   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2314
2315   /* Restart the demux */
2316   gst_adaptive_demux_start_tasks (demux);
2317
2318   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2319   GST_API_UNLOCK (demux);
2320   gst_event_unref (event);
2321
2322   return ret;
2323 }
2324
2325 /* Returns TRUE if the stream has at least one selected track */
2326 static gboolean
2327 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2328     stream)
2329 {
2330   GList *tmp;
2331
2332   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2333     GstAdaptiveDemuxTrack *track = tmp->data;
2334
2335     if (track->selected)
2336       return TRUE;
2337   }
2338
2339   return FALSE;
2340 }
2341
2342 static gboolean
2343 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2344     guint32 seqnum)
2345 {
2346   gboolean selection_handled = TRUE;
2347   GList *iter;
2348   GList *tracks = NULL;
2349
2350   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2351     return FALSE;
2352
2353   TRACKS_LOCK (demux);
2354   /* Validate the streams and fill:
2355    * tracks : list of tracks corresponding to requested streams
2356    */
2357   for (iter = streams; iter; iter = iter->next) {
2358     gchar *stream_id = (gchar *) iter->data;
2359     GstAdaptiveDemuxTrack *track;
2360
2361     GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2362     track = find_track_for_stream_id (demux->output_period, stream_id);
2363     if (!track) {
2364       GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2365       selection_handled = FALSE;
2366       goto select_streams_done;
2367     }
2368     tracks = g_list_append (tracks, track);
2369     GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2370   }
2371
2372   /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2373    * SCHEDULING THREAD */
2374
2375   /* FIXME: We want to iterate all streams, mark them as deselected,
2376    * then iterate tracks and mark any streams that have at least 1
2377    * active output track, then loop over all streams again and start/stop
2378    * them as needed */
2379
2380   /* Go over all tracks present and (de)select based on current selection */
2381   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2382     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2383
2384     if (track->selected && !g_list_find (tracks, track)) {
2385       GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2386           track->stream_id, track->active);
2387       track->selected = FALSE;
2388       track->draining = TRUE;
2389     } else if (!track->selected && g_list_find (tracks, track)) {
2390       GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2391
2392       track->selected = TRUE;
2393     }
2394   }
2395
2396   /* Start or stop streams based on the updated track selection */
2397   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2398     GstAdaptiveDemux2Stream *stream = iter->data;
2399     GList *trackiter;
2400
2401     gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2402     gboolean should_be_running =
2403         gst_adaptive_demux2_stream_has_selected_tracks (stream);
2404
2405     if (!is_running && should_be_running) {
2406       GstClockTime output_running_ts = demux->priv->global_output_position;
2407       GstClockTime start_position;
2408
2409       /* Calculate where we should start the stream, and then
2410        * start it. */
2411       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2412
2413       GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2414           GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2415           GST_TIME_ARGS (output_running_ts), &demux->segment);
2416
2417       start_position =
2418           gst_segment_position_from_running_time (&demux->segment,
2419           GST_FORMAT_TIME, output_running_ts);
2420
2421       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2422
2423       GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2424           GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2425
2426       stream->current_position = stream->start_position = start_position;
2427       stream->compute_segment = TRUE;
2428
2429       /* If output has already begun, ensure we seek this segment
2430        * to the correct restart position when the download loop begins */
2431       if (output_running_ts != 0)
2432         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2433
2434       /* Activate track pads for this stream */
2435       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2436         GstAdaptiveDemuxTrack *track =
2437             (GstAdaptiveDemuxTrack *) trackiter->data;
2438         gst_pad_set_active (track->sinkpad, TRUE);
2439       }
2440
2441       gst_adaptive_demux2_stream_start (stream);
2442     } else if (is_running && !should_be_running) {
2443       /* Stream should not be running and needs stopping */
2444       gst_adaptive_demux2_stream_stop (stream);
2445
2446       /* Set all track sinkpads to inactive for this stream */
2447       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2448         GstAdaptiveDemuxTrack *track =
2449             (GstAdaptiveDemuxTrack *) trackiter->data;
2450         gst_pad_set_active (track->sinkpad, FALSE);
2451       }
2452     }
2453   }
2454
2455   g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2456
2457 select_streams_done:
2458   demux_update_buffering_locked (demux);
2459   demux_post_buffering_locked (demux);
2460
2461   TRACKS_UNLOCK (demux);
2462   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2463
2464   if (tracks)
2465     g_list_free (tracks);
2466   return selection_handled;
2467 }
2468
2469 static gboolean
2470 gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux * demux,
2471     GstEvent * event)
2472 {
2473   GList *streams;
2474   gboolean selection_handled;
2475
2476   if (GST_EVENT_SEQNUM (event) ==
2477       g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2478     GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2479         GST_EVENT_SEQNUM (event));
2480     return TRUE;
2481   }
2482
2483   gst_event_parse_select_streams (event, &streams);
2484   selection_handled =
2485       handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2486   g_list_free_full (streams, g_free);
2487
2488   gst_event_unref (event);
2489   return selection_handled;
2490 }
2491
2492 static gboolean
2493 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2494     GstEvent * event)
2495 {
2496   GstAdaptiveDemux *demux;
2497
2498   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2499
2500   switch (event->type) {
2501     case GST_EVENT_SEEK:
2502     {
2503       guint32 seqnum = gst_event_get_seqnum (event);
2504       if (seqnum == demux->priv->segment_seqnum) {
2505         GST_LOG_OBJECT (pad,
2506             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2507         gst_event_unref (event);
2508         return TRUE;
2509       }
2510       return gst_adaptive_demux_handle_seek_event (demux, event);
2511     }
2512     case GST_EVENT_LATENCY:{
2513       /* Upstream and our internal source are irrelevant
2514        * for latency, and we should not fail here to
2515        * configure the latency */
2516       gst_event_unref (event);
2517       return TRUE;
2518     }
2519     case GST_EVENT_QOS:{
2520       GstClockTimeDiff diff;
2521       GstClockTime timestamp;
2522       GstClockTime earliest_time;
2523
2524       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2525       /* Only take into account lateness if late */
2526       if (diff > 0)
2527         earliest_time = timestamp + 2 * diff;
2528       else
2529         earliest_time = timestamp;
2530
2531       GST_OBJECT_LOCK (demux);
2532       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2533           earliest_time > demux->priv->qos_earliest_time) {
2534         demux->priv->qos_earliest_time = earliest_time;
2535         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2536             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2537       }
2538       GST_OBJECT_UNLOCK (demux);
2539       break;
2540     }
2541     case GST_EVENT_SELECT_STREAMS:
2542     {
2543       return gst_adaptive_demux_handle_select_streams_event (demux, event);
2544     }
2545     default:
2546       break;
2547   }
2548
2549   return gst_pad_event_default (pad, parent, event);
2550 }
2551
2552 static gboolean
2553 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2554     GstQuery * query)
2555 {
2556   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2557   gboolean ret = FALSE;
2558
2559   if (query == NULL)
2560     return FALSE;
2561
2562
2563   switch (query->type) {
2564     case GST_QUERY_DURATION:{
2565       GstFormat fmt;
2566       GstClockTime duration = GST_CLOCK_TIME_NONE;
2567
2568       gst_query_parse_duration (query, &fmt, NULL);
2569
2570       if (gst_adaptive_demux_is_live (demux)) {
2571         /* We are able to answer this query: the duration is unknown */
2572         gst_query_set_duration (query, fmt, -1);
2573         ret = TRUE;
2574         break;
2575       }
2576
2577       if (fmt == GST_FORMAT_TIME
2578           && g_atomic_int_get (&demux->priv->have_manifest)) {
2579
2580         GST_MANIFEST_LOCK (demux);
2581         duration = demux->priv->duration;
2582         GST_MANIFEST_UNLOCK (demux);
2583
2584         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2585           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2586           ret = TRUE;
2587         }
2588       }
2589
2590       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2591           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2592       break;
2593     }
2594     case GST_QUERY_LATENCY:{
2595       gst_query_set_latency (query, FALSE, 0, -1);
2596       ret = TRUE;
2597       break;
2598     }
2599     case GST_QUERY_SEEKING:{
2600       GstFormat fmt;
2601       gint64 stop = -1;
2602       gint64 start = 0;
2603
2604       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2605         GST_INFO_OBJECT (demux,
2606             "Don't have manifest yet, can't answer seeking query");
2607         return FALSE;           /* can't answer without manifest */
2608       }
2609
2610       GST_MANIFEST_LOCK (demux);
2611
2612       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2613       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2614       if (fmt == GST_FORMAT_TIME) {
2615         GstClockTime duration;
2616         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2617
2618         ret = TRUE;
2619         if (can_seek) {
2620           if (gst_adaptive_demux_is_live (demux)) {
2621             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2622             if (!ret) {
2623               GST_MANIFEST_UNLOCK (demux);
2624               GST_INFO_OBJECT (demux, "can't answer seeking query");
2625               return FALSE;
2626             }
2627           } else {
2628             duration = demux->priv->duration;
2629             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2630               stop = duration;
2631           }
2632         }
2633         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2634         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2635             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2636             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2637       }
2638       GST_MANIFEST_UNLOCK (demux);
2639       break;
2640     }
2641     case GST_QUERY_URI:
2642
2643       GST_MANIFEST_LOCK (demux);
2644
2645       /* TODO HLS can answer this differently it seems */
2646       if (demux->manifest_uri) {
2647         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2648          * playlist or the the uri of the last downlowaded fragment? */
2649         gst_query_set_uri (query, demux->manifest_uri);
2650         ret = TRUE;
2651       }
2652
2653       GST_MANIFEST_UNLOCK (demux);
2654       break;
2655     case GST_QUERY_SELECTABLE:
2656       gst_query_set_selectable (query, TRUE);
2657       ret = TRUE;
2658       break;
2659     default:
2660       /* Don't forward queries upstream because of the special nature of this
2661        *  "demuxer", which relies on the upstream element only to be fed
2662        *  the Manifest
2663        */
2664       break;
2665   }
2666
2667   return ret;
2668 }
2669
2670 gboolean
2671 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2672 {
2673   GstEvent *seek;
2674
2675   GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2676
2677   seek =
2678       gst_event_new_seek (1.0, GST_FORMAT_TIME,
2679       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2680       GST_SEEK_TYPE_NONE, 0);
2681   gst_adaptive_demux_handle_seek_event (demux, seek);
2682   return FALSE;
2683 }
2684
2685
2686 /* Called when the scheduler starts, to kick off manifest updates
2687  * and stream downloads */
2688 static gboolean
2689 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2690 {
2691   GList *iter;
2692
2693   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2694
2695   iter = demux->input_period->streams;
2696
2697   for (; iter; iter = g_list_next (iter)) {
2698     GstAdaptiveDemux2Stream *stream = iter->data;
2699
2700     /* If we need to process this stream to discover tracks *OR* it has any
2701      * tracks which are selected, start it now */
2702     if ((stream->pending_tracks == TRUE)
2703         || gst_adaptive_demux2_stream_is_selected_locked (stream))
2704       gst_adaptive_demux2_stream_start (stream);
2705   }
2706
2707   return FALSE;
2708 }
2709
2710 /* must be called with manifest_lock taken */
2711 static void
2712 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2713 {
2714   if (!gst_adaptive_demux2_is_running (demux)) {
2715     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2716     return;
2717   }
2718
2719   GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2720   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2721       (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2722
2723   TRACKS_LOCK (demux);
2724   demux->priv->flushing = FALSE;
2725   GST_DEBUG_OBJECT (demux, "Starting the output task");
2726   gst_task_start (demux->priv->output_task);
2727   TRACKS_UNLOCK (demux);
2728 }
2729
2730 /* must be called with manifest_lock taken */
2731 static void
2732 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2733 {
2734   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2735   if (demux->priv->manifest_updates_cb != 0) {
2736     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2737         demux->priv->manifest_updates_cb);
2738     demux->priv->manifest_updates_cb = 0;
2739   }
2740 }
2741
2742 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2743
2744 /* must be called with manifest_lock taken */
2745 static void
2746 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2747 {
2748   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2749
2750   if (gst_adaptive_demux_is_live (demux)) {
2751     /* Task to periodically update the manifest */
2752     if (demux_class->requires_periodical_playlist_update (demux)) {
2753       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2754       if (demux->priv->manifest_updates_cb == 0) {
2755         demux->priv->manifest_updates_cb =
2756             gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2757             (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2758       }
2759     }
2760   }
2761 }
2762
2763 /* must be called with manifest_lock taken
2764  * This function will temporarily release manifest_lock in order to join the
2765  * download threads.
2766  * The api_lock will still protect it against other threads trying to modify
2767  * the demux element.
2768  */
2769 static void
2770 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2771 {
2772   GST_LOG_OBJECT (demux, "Stopping tasks");
2773
2774   if (stop_updates)
2775     gst_adaptive_demux_stop_manifest_update_task (demux);
2776
2777   TRACKS_LOCK (demux);
2778   if (demux->input_period)
2779     gst_adaptive_demux_period_stop_tasks (demux->input_period);
2780
2781   demux->priv->flushing = TRUE;
2782   g_cond_signal (&demux->priv->tracks_add);
2783   gst_task_stop (demux->priv->output_task);
2784   TRACKS_UNLOCK (demux);
2785
2786   gst_task_join (demux->priv->output_task);
2787
2788   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2789 }
2790
2791 /* must be called with manifest_lock taken */
2792 static gboolean
2793 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2794 {
2795   GList *iter;
2796   gboolean ret = TRUE;
2797
2798   GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2799
2800   TRACKS_LOCK (demux);
2801   for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2802     OutputSlot *slot = (OutputSlot *) iter->data;
2803     gst_event_ref (event);
2804     GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2805     ret = ret & gst_pad_push_event (slot->pad, event);
2806     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2807       slot->pushed_timed_data = FALSE;
2808   }
2809   TRACKS_UNLOCK (demux);
2810   gst_event_unref (event);
2811   return ret;
2812 }
2813
2814 /* must be called with manifest_lock taken */
2815 void
2816 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2817     GstCaps * caps)
2818 {
2819   GST_DEBUG_OBJECT (stream,
2820       "setting new caps for stream %" GST_PTR_FORMAT, caps);
2821   gst_caps_replace (&stream->pending_caps, caps);
2822   gst_caps_unref (caps);
2823 }
2824
2825 /* must be called with manifest_lock taken */
2826 void
2827 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2828     GstTagList * tags)
2829 {
2830   GST_DEBUG_OBJECT (stream,
2831       "setting new tags for stream %" GST_PTR_FORMAT, tags);
2832   if (stream->pending_tags) {
2833     gst_tag_list_unref (stream->pending_tags);
2834   }
2835   stream->pending_tags = tags;
2836 }
2837
2838 /* must be called with manifest_lock taken */
2839 void
2840 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2841     GstEvent * event)
2842 {
2843   stream->pending_events = g_list_append (stream->pending_events, event);
2844 }
2845
2846 static gboolean
2847 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2848     * demux)
2849 {
2850   return TRUE;
2851 }
2852
2853 /* Called when a stream needs waking after the manifest is updated */
2854 void
2855 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
2856 {
2857   demux->priv->stream_waiting_for_manifest = TRUE;
2858 }
2859
2860 static gboolean
2861 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
2862 {
2863   GstFlowReturn ret = GST_FLOW_OK;
2864   gboolean schedule_again = TRUE;
2865
2866   GST_MANIFEST_LOCK (demux);
2867   demux->priv->manifest_updates_cb = 0;
2868
2869   /* Updating playlist only needed for live playlists */
2870   if (!gst_adaptive_demux_is_live (demux)) {
2871     GST_MANIFEST_UNLOCK (demux);
2872     return G_SOURCE_REMOVE;
2873   }
2874
2875   GST_DEBUG_OBJECT (demux, "Updating playlist");
2876   ret = gst_adaptive_demux_update_manifest (demux);
2877
2878   if (ret == GST_FLOW_EOS) {
2879     GST_MANIFEST_UNLOCK (demux);
2880     return G_SOURCE_REMOVE;
2881   }
2882
2883   if (ret == GST_FLOW_OK) {
2884     GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
2885     demux->priv->update_failed_count = 0;
2886
2887     /* Wake up download tasks */
2888     if (demux->priv->stream_waiting_for_manifest) {
2889       GList *iter;
2890
2891       for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2892         GstAdaptiveDemux2Stream *stream = iter->data;
2893         gst_adaptive_demux2_stream_on_manifest_update (stream);
2894       }
2895       demux->priv->stream_waiting_for_manifest = FALSE;
2896     }
2897   } else {
2898     demux->priv->update_failed_count++;
2899
2900     if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
2901       GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
2902           gst_flow_get_name (ret));
2903     } else {
2904       GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2905           (_("Internal data stream error.")), ("Could not update playlist"));
2906       GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
2907       schedule_again = FALSE;
2908     }
2909   }
2910
2911   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC)
2912     gst_adaptive_demux_handle_lost_sync (demux);
2913
2914   if (schedule_again) {
2915     GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2916
2917     demux->priv->manifest_updates_cb =
2918         gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2919         klass->get_manifest_update_interval (demux) * GST_USECOND,
2920         (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2921   }
2922
2923   GST_MANIFEST_UNLOCK (demux);
2924
2925   return G_SOURCE_REMOVE;
2926 }
2927
2928 static gboolean
2929 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
2930 {
2931   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2932
2933   /* Loop for updating of the playlist. This periodically checks if
2934    * the playlist is updated and does so, then signals the streaming
2935    * thread in case it can continue downloading now. */
2936
2937   /* block until the next scheduled update or the signal to quit this thread */
2938   GST_DEBUG_OBJECT (demux, "Started updates task");
2939   demux->priv->manifest_updates_cb =
2940       gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2941       klass->get_manifest_update_interval (demux) * GST_USECOND,
2942       (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2943
2944   return G_SOURCE_REMOVE;
2945 }
2946
2947 static OutputSlot *
2948 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
2949     GstAdaptiveDemuxTrack * track)
2950 {
2951   GList *tmp;
2952
2953   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2954     OutputSlot *slot = (OutputSlot *) tmp->data;
2955     /* Incompatible output type */
2956     if (slot->type != track->type)
2957       continue;
2958
2959     /* Slot which is already assigned to this pending track */
2960     if (slot->pending_track == track)
2961       return slot;
2962
2963     /* slot already used for another pending track */
2964     if (slot->pending_track != NULL)
2965       continue;
2966
2967     /* Current output track is of the same type and is draining */
2968     if (slot->track && slot->track->draining)
2969       return slot;
2970   }
2971
2972   return NULL;
2973 }
2974
2975 /* TRACKS_LOCK taken */
2976 static OutputSlot *
2977 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
2978 {
2979   GList *tmp;
2980
2981   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2982     OutputSlot *slot = (OutputSlot *) tmp->data;
2983
2984     if (slot->track == track)
2985       return slot;
2986   }
2987
2988   return NULL;
2989 }
2990
2991 /* TRACKS_LOCK held */
2992 static GstMessage *
2993 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
2994 {
2995   GList *tmp;
2996   GstMessage *msg;
2997
2998   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
2999     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3000
3001     if (track->selected && !track->active)
3002       return NULL;
3003   }
3004
3005   /* All selected tracks are active, created message */
3006   msg =
3007       gst_message_new_streams_selected (GST_OBJECT (demux),
3008       demux->output_period->collection);
3009   GST_MESSAGE_SEQNUM (msg) = seqnum;
3010   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3011     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3012     if (track->active) {
3013       gst_message_streams_selected_add (msg, track->stream_object);
3014     }
3015   }
3016
3017   return msg;
3018 }
3019
3020 static void
3021 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3022     OutputSlot * slot)
3023 {
3024   GstAdaptiveDemuxTrack *track = slot->track;
3025   GstEvent *event;
3026
3027   /* Send EVENT_STREAM_START */
3028   event = gst_event_new_stream_start (track->stream_id);
3029   if (demux->have_group_id)
3030     gst_event_set_group_id (event, demux->group_id);
3031   gst_event_set_stream_flags (event, track->flags);
3032   gst_event_set_stream (event, track->stream_object);
3033   GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3034       track->stream_id);
3035   gst_pad_push_event (slot->pad, event);
3036
3037   /* Send EVENT_STREAM_COLLECTION */
3038   event = gst_event_new_stream_collection (demux->output_period->collection);
3039   GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3040       track->stream_id);
3041   gst_pad_push_event (slot->pad, event);
3042
3043   /* Mark all sticky events for re-sending */
3044   gst_event_store_mark_all_undelivered (&track->sticky_events);
3045 }
3046
3047 /*
3048  * Called with TRACKS_LOCK taken
3049  */
3050 static void
3051 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3052 {
3053   GList *tmp;
3054   guint requested_selection_seqnum;
3055   GstMessage *msg;
3056
3057   /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3058      output slots vs active/draining tracks */
3059   requested_selection_seqnum =
3060       g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3061
3062   if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3063     return;
3064
3065   GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3066
3067   /* Go over all slots, and if they have a pending track that's no longer
3068    * selected, clear it so the slot can be reused */
3069   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3070     OutputSlot *slot = (OutputSlot *) tmp->data;
3071
3072     if (slot->pending_track != NULL && !slot->pending_track->selected) {
3073       GST_DEBUG_OBJECT (demux,
3074           "Removing deselected track '%s' as pending from output of current track '%s'",
3075           slot->pending_track->stream_id, slot->track->stream_id);
3076       gst_adaptive_demux_track_unref (slot->pending_track);
3077       slot->pending_track = NULL;
3078     }
3079   }
3080
3081   /* Go over all tracks and create/re-assign/remove slots */
3082   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3083     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3084
3085     if (track->selected) {
3086       OutputSlot *slot = find_slot_for_track (demux, track);
3087
3088       /* 0. Track is selected and has a slot. Nothing to do */
3089       if (slot) {
3090         GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3091             track->stream_id);
3092         continue;
3093       }
3094
3095       slot = find_replacement_slot_for_track (demux, track);
3096       if (slot) {
3097         /* 1. There is an existing slot of the same type which is currently
3098          *    draining, assign this track as a replacement for it */
3099         g_assert (slot->pending_track == NULL || slot->pending_track == track);
3100         if (slot->pending_track == NULL) {
3101           slot->pending_track = gst_adaptive_demux_track_ref (track);
3102           GST_DEBUG_OBJECT (demux,
3103               "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3104               track->stream_id, track->period_num,
3105               slot->track->stream_id, slot->track->period_num);
3106         }
3107       } else {
3108         /* 2. There is no compatible replacement slot, create a new one */
3109         slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3110         GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3111             track->stream_id);
3112         demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3113
3114         track->update_next_segment = TRUE;
3115
3116         slot->track = gst_adaptive_demux_track_ref (track);
3117         track->active = TRUE;
3118         gst_adaptive_demux_send_initial_events (demux, slot);
3119       }
3120
3121       /* If we were draining this track, we no longer are */
3122       track->draining = FALSE;
3123     }
3124   }
3125
3126   /* Finally check all slots have a current/pending track. If not remove it */
3127   for (tmp = demux->priv->outputs; tmp;) {
3128     OutputSlot *slot = (OutputSlot *) tmp->data;
3129     /* We should never has slots without target tracks */
3130     g_assert (slot->track);
3131     if (slot->track->draining && !slot->pending_track) {
3132       GstAdaptiveDemux2Stream *stream;
3133
3134       GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3135           slot->track->stream_id);
3136       slot->track->active = FALSE;
3137
3138       /* If the stream feeding this track is stopped, flush and clear
3139        * the track now that it's going inactive. If the stream was not
3140        * found, it means we advanced past that period already (and the
3141        * stream was stopped and discarded) */
3142       stream = find_stream_for_track_locked (demux, slot->track);
3143       if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3144         gst_adaptive_demux_track_flush (slot->track);
3145
3146       tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3147       gst_adaptive_demux_output_slot_free (demux, slot);
3148     } else
3149       tmp = tmp->next;
3150   }
3151
3152   demux->priv->current_selection_seqnum = requested_selection_seqnum;
3153   msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3154   if (msg) {
3155     TRACKS_UNLOCK (demux);
3156     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3157     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3158     TRACKS_LOCK (demux);
3159   }
3160 }
3161
3162 /* TRACKS_LOCK held */
3163 static gboolean
3164 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3165 {
3166   GList *iter;
3167   GstAdaptiveDemuxPeriod *previous_period;
3168   GstStreamCollection *collection;
3169
3170   /* Grab the next period, should be demux->periods->next->data */
3171   previous_period = g_queue_pop_head (demux->priv->periods);
3172
3173   /* Remove ref held by demux->output_period */
3174   gst_adaptive_demux_period_unref (previous_period);
3175   demux->output_period =
3176       gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3177
3178   GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3179       demux->output_period->period_num);
3180
3181   /* We can now post the collection of the new period */
3182   collection = demux->output_period->collection;
3183   TRACKS_UNLOCK (demux);
3184   gst_element_post_message (GST_ELEMENT_CAST (demux),
3185       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3186   TRACKS_LOCK (demux);
3187
3188   /* Unselect all tracks of the previous period */
3189   for (iter = previous_period->tracks; iter; iter = iter->next) {
3190     GstAdaptiveDemuxTrack *track = iter->data;
3191     if (track->selected) {
3192       track->selected = FALSE;
3193       track->draining = TRUE;
3194     }
3195   }
3196
3197   /* Force a selection re-check */
3198   g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3199   check_and_handle_selection_update_locked (demux);
3200
3201   /* Remove the final ref on the previous period now that we have done the switch */
3202   gst_adaptive_demux_period_unref (previous_period);
3203
3204   return TRUE;
3205 }
3206
3207 /* Called with TRACKS_LOCK taken */
3208 static void
3209 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3210     OutputSlot * slot)
3211 {
3212   GstAdaptiveDemuxTrack *track = slot->track;
3213   GstMessage *msg;
3214   gboolean pending_is_ready;
3215   GstAdaptiveDemux2Stream *stream;
3216
3217   /* If we have a pending track for this slot, the current track should be
3218    * draining and no longer selected */
3219   g_assert (track->draining && !track->selected);
3220
3221   /* If we're draining, check if the pending track has enough data *or* that
3222      we've already drained out entirely */
3223   pending_is_ready =
3224       (slot->pending_track->level_time >=
3225       slot->pending_track->buffering_threshold);
3226   pending_is_ready |= slot->pending_track->eos;
3227
3228   if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3229     GST_DEBUG_OBJECT (demux,
3230         "Replacement track '%s' doesn't have enough data for switching yet",
3231         slot->pending_track->stream_id);
3232     return;
3233   }
3234
3235   GST_DEBUG_OBJECT (demux,
3236       "Pending replacement track has enough data, switching");
3237   track->active = FALSE;
3238   track->draining = FALSE;
3239
3240   /* If the stream feeding this track is stopped, flush and clear
3241    * the track now that it's going inactive. If the stream was not
3242    * found, it means we advanced past that period already (and the
3243    * stream was stopped and discarded) */
3244   stream = find_stream_for_track_locked (demux, track);
3245   if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3246     gst_adaptive_demux_track_flush (track);
3247
3248   gst_adaptive_demux_track_unref (track);
3249   /* We steal the reference of pending_track */
3250   track = slot->track = slot->pending_track;
3251   slot->pending_track = NULL;
3252   slot->track->active = TRUE;
3253
3254   /* Make sure the track segment will start at the current position */
3255   track->update_next_segment = TRUE;
3256
3257   /* Send stream start and collection, and schedule sticky events */
3258   gst_adaptive_demux_send_initial_events (demux, slot);
3259
3260   /* Can we emit the streams-selected message now ? */
3261   msg =
3262       all_selected_tracks_are_active (demux,
3263       g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3264   if (msg) {
3265     TRACKS_UNLOCK (demux);
3266     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3267     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3268     TRACKS_LOCK (demux);
3269   }
3270
3271 }
3272
3273 static void
3274 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3275 {
3276   GList *tmp;
3277   GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3278   gboolean wait_for_data = FALSE;
3279   gboolean all_tracks_empty;
3280   GstFlowReturn ret;
3281
3282   GST_DEBUG_OBJECT (demux, "enter");
3283
3284   TRACKS_LOCK (demux);
3285
3286   /* Check if stopping */
3287   if (demux->priv->flushing) {
3288     ret = GST_FLOW_FLUSHING;
3289     goto pause;
3290   }
3291
3292   /* If the selection changed, handle it */
3293   check_and_handle_selection_update_locked (demux);
3294
3295 restart:
3296   ret = GST_FLOW_OK;
3297   global_output_position = GST_CLOCK_STIME_NONE;
3298   all_tracks_empty = TRUE;
3299
3300   if (wait_for_data) {
3301     GST_DEBUG_OBJECT (demux, "Waiting for data");
3302     g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3303     GST_DEBUG_OBJECT (demux, "Done waiting for data");
3304     if (demux->priv->flushing) {
3305       ret = GST_FLOW_FLUSHING;
3306       goto pause;
3307     }
3308     wait_for_data = FALSE;
3309   }
3310
3311   /* Grab/Recalculate current global output position
3312    * This is the minimum pending output position of all tracks used for output
3313    *
3314    * If there is a track which is empty and not EOS, wait for it to receive data
3315    * then recalculate global output position.
3316    *
3317    * This also pushes downstream all non-timed data that might be present.
3318    *
3319    * IF all tracks are EOS : stop task
3320    */
3321   GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3322   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3323     OutputSlot *slot = (OutputSlot *) tmp->data;
3324     GstAdaptiveDemuxTrack *track;
3325
3326     /* If there is a pending track, Check if it's time to switch to it */
3327     if (slot->pending_track)
3328       handle_slot_pending_track_switch_locked (demux, slot);
3329
3330     track = slot->track;
3331
3332     if (!track->active) {
3333       /* Note: Edward: I can't see in what cases we would end up with inactive
3334          tracks assigned to slots. */
3335       GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3336       g_assert (track->active);
3337       continue;
3338     }
3339
3340     if (track->next_position == GST_CLOCK_STIME_NONE) {
3341       gst_adaptive_demux_track_update_next_position (track);
3342     }
3343
3344     GST_TRACE_OBJECT (demux,
3345         "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3346         track->stream_id, track->period_num,
3347         GST_STIME_ARGS (track->next_position));
3348
3349     if (track->next_position != GST_CLOCK_STIME_NONE) {
3350       if (global_output_position == GST_CLOCK_STIME_NONE)
3351         global_output_position = track->next_position;
3352       else
3353         global_output_position =
3354             MIN (global_output_position, track->next_position);
3355       track->waiting_add = FALSE;
3356       all_tracks_empty = FALSE;
3357     } else if (!track->eos) {
3358       GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3359           track->stream_id, track->period_num);
3360       all_tracks_empty = FALSE;
3361       wait_for_data = track->waiting_add = TRUE;
3362     } else {
3363       GST_DEBUG_OBJECT (demux,
3364           "Track %s (period %u) is EOS, not waiting for timed data",
3365           track->stream_id, track->period_num);
3366
3367       if (gst_queue_array_get_length (track->queue) > 0) {
3368         all_tracks_empty = FALSE;
3369       }
3370     }
3371   }
3372
3373   if (wait_for_data)
3374     goto restart;
3375
3376   if (all_tracks_empty && demux->output_period->has_next_period) {
3377     GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3378         demux->output_period->period_num);
3379     if (!gst_adaptive_demux_advance_output_period (demux)) {
3380       /* Failed to move to next period, error out */
3381       ret = GST_FLOW_ERROR;
3382       goto pause;
3383     }
3384     /* Restart the loop */
3385     goto restart;
3386   }
3387
3388   GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3389       GST_STIME_ARGS (global_output_position));
3390
3391   /* For each track:
3392    *
3393    * We know all active tracks have pending timed data
3394    * * while track next_position <= global output position
3395    *   * push pending data
3396    *   * Update track next_position
3397    *     * recalculate global output position
3398    *   * Pop next pending data from track and update pending position
3399    *
3400    */
3401   gboolean need_restart = FALSE;
3402
3403   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3404     OutputSlot *slot = (OutputSlot *) tmp->data;
3405     GstAdaptiveDemuxTrack *track = slot->track;
3406
3407     GST_LOG_OBJECT (track->element,
3408         "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3409         " global_output_position:%" GST_STIME_FORMAT, track->active,
3410         track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3411         GST_STIME_ARGS (global_output_position));
3412
3413     if (!track->active)
3414       continue;
3415
3416     while (global_output_position == GST_CLOCK_STIME_NONE
3417         || !slot->pushed_timed_data
3418         || ((track->next_position != GST_CLOCK_STIME_NONE)
3419             && track->next_position <= global_output_position)
3420         || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3421       GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3422
3423       if (!mo) {
3424         GST_DEBUG_OBJECT (demux,
3425             "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3426             track->stream_id, track->period_num, track->eos,
3427             slot->pushed_timed_data);
3428         /* This should only happen if the track is EOS, or exactly in between
3429          * the parser outputting segment/caps before buffers. */
3430         g_assert (track->eos || !slot->pushed_timed_data);
3431
3432         /* If we drained the track, but there's a pending track on the slot
3433          * loop again to activate it */
3434         if (slot->pending_track) {
3435           GST_DEBUG_OBJECT (demux,
3436               "Track '%s' (period %u) drained, but has a pending track to activate",
3437               track->stream_id, track->period_num);
3438           goto restart;
3439         }
3440         break;
3441       }
3442
3443       demux_update_buffering_locked (demux);
3444       demux_post_buffering_locked (demux);
3445       TRACKS_UNLOCK (demux);
3446
3447       GST_DEBUG_OBJECT (demux,
3448           "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3449           track->period_num, mo);
3450
3451       if (GST_IS_EVENT (mo)) {
3452         GstEvent *event = (GstEvent *) mo;
3453         if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3454           slot->pushed_timed_data = TRUE;
3455         } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3456           /* If there is a pending next period, don't send the EOS */
3457           if (demux->output_period->has_next_period) {
3458             GST_LOG_OBJECT (demux,
3459                 "Dropping EOS on track '%s' (period %u) before next period",
3460                 track->stream_id, track->period_num);
3461             gst_event_store_mark_delivered (&track->sticky_events, event);
3462             gst_event_unref (event);
3463             event = NULL;
3464             /* We'll need to re-check if all tracks are empty again above */
3465             need_restart = TRUE;
3466           }
3467         }
3468
3469         if (event != NULL) {
3470           gst_pad_push_event (slot->pad, gst_event_ref (event));
3471
3472           if (GST_EVENT_IS_STICKY (event))
3473             gst_event_store_mark_delivered (&track->sticky_events, event);
3474           gst_event_unref (event);
3475         }
3476       } else if (GST_IS_BUFFER (mo)) {
3477         GstBuffer *buffer = (GstBuffer *) mo;
3478
3479         if (track->output_discont) {
3480           if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3481             buffer = gst_buffer_make_writable (buffer);
3482             GST_DEBUG_OBJECT (slot->pad,
3483                 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3484                 buffer);
3485             GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3486           }
3487           track->output_discont = FALSE;
3488         }
3489         slot->flow_ret = gst_pad_push (slot->pad, buffer);
3490         ret =
3491             gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3492             slot->pad, slot->flow_ret);
3493         GST_DEBUG_OBJECT (slot->pad,
3494             "track %s (period %u) push returned %s (combined %s)",
3495             track->stream_id, track->period_num,
3496             gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3497         slot->pushed_timed_data = TRUE;
3498       } else {
3499         GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3500       }
3501
3502       TRACKS_LOCK (demux);
3503       gst_adaptive_demux_track_update_next_position (track);
3504
3505       if (ret != GST_FLOW_OK)
3506         goto pause;
3507     }
3508   }
3509
3510   /* Store global output position */
3511   if (global_output_position != GST_CLOCK_STIME_NONE) {
3512     demux->priv->global_output_position = global_output_position;
3513
3514     /* And see if any streams need to be woken for more input */
3515     gst_adaptive_demux_period_check_input_wakeup_locked (demux->input_period,
3516         global_output_position);
3517   }
3518
3519   if (need_restart)
3520     goto restart;
3521
3522   if (global_output_position == GST_CLOCK_STIME_NONE) {
3523     if (!demux->priv->flushing) {
3524       GST_DEBUG_OBJECT (demux,
3525           "Pausing output task after reaching NONE global_output_position");
3526       gst_task_pause (demux->priv->output_task);
3527     }
3528   }
3529
3530   TRACKS_UNLOCK (demux);
3531   GST_DEBUG_OBJECT (demux, "leave");
3532   return;
3533
3534 pause:
3535   {
3536     GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3537     /* If the flushing flag is set, then the task is being
3538      * externally stopped, so don't go to pause(), otherwise we
3539      * should so we don't keep spinning */
3540     if (!demux->priv->flushing) {
3541       GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3542           gst_flow_get_name (ret));
3543       gst_task_pause (demux->priv->output_task);
3544     }
3545
3546     TRACKS_UNLOCK (demux);
3547
3548     if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3549       GstEvent *eos = gst_event_new_eos ();
3550
3551       if (ret != GST_FLOW_EOS) {
3552         GST_ELEMENT_FLOW_ERROR (demux, ret);
3553       }
3554
3555       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3556       if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3557         gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3558       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3559
3560       gst_adaptive_demux_push_src_event (demux, eos);
3561     }
3562
3563     return;
3564   }
3565 }
3566
3567 /* must be called from the scheduler */
3568 gboolean
3569 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3570 {
3571   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3572
3573   if (klass->is_live)
3574     return klass->is_live (demux);
3575   return FALSE;
3576 }
3577
3578 static void
3579 handle_manifest_download_complete (DownloadRequest * request,
3580     DownloadRequestState state, GstAdaptiveDemux * demux)
3581 {
3582   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3583   GstBuffer *buffer;
3584   GstFlowReturn result;
3585
3586   g_free (demux->manifest_base_uri);
3587   g_free (demux->manifest_uri);
3588
3589   if (request->redirect_permanent && request->redirect_uri) {
3590     demux->manifest_uri = g_strdup (request->redirect_uri);
3591     demux->manifest_base_uri = NULL;
3592   } else {
3593     demux->manifest_uri = g_strdup (request->uri);
3594     demux->manifest_base_uri = g_strdup (request->redirect_uri);
3595   }
3596
3597   buffer = download_request_take_buffer (request);
3598
3599   /* We should always have a buffer since this function is the non-error
3600    * callback for the download */
3601   g_assert (buffer);
3602
3603   result = klass->update_manifest_data (demux, buffer);
3604   gst_buffer_unref (buffer);
3605
3606   /* FIXME: Should the manifest uri vars be reverted to original
3607    * values if updating fails? */
3608
3609   if (result == GST_FLOW_OK) {
3610     GstClockTime duration;
3611     /* Send an updated duration message */
3612     duration = klass->get_duration (demux);
3613     if (duration != GST_CLOCK_TIME_NONE) {
3614       GST_DEBUG_OBJECT (demux,
3615           "Sending duration message : %" GST_TIME_FORMAT,
3616           GST_TIME_ARGS (duration));
3617       gst_element_post_message (GST_ELEMENT (demux),
3618           gst_message_new_duration_changed (GST_OBJECT (demux)));
3619     } else {
3620       GST_DEBUG_OBJECT (demux,
3621           "Duration unknown, can not send the duration message");
3622     }
3623
3624     /* If a manifest changes it's liveness or periodic updateness, we need
3625      * to start/stop the manifest update task appropriately */
3626     /* Keep this condition in sync with the one in
3627      * gst_adaptive_demux_start_manifest_update_task()
3628      */
3629     if (gst_adaptive_demux_is_live (demux) &&
3630         klass->requires_periodical_playlist_update (demux)) {
3631       gst_adaptive_demux_start_manifest_update_task (demux);
3632     } else {
3633       gst_adaptive_demux_stop_manifest_update_task (demux);
3634     }
3635   }
3636 }
3637
3638 static void
3639 handle_manifest_download_failure (DownloadRequest * request,
3640     DownloadRequestState state, GstAdaptiveDemux * demux)
3641 {
3642   GST_FIXME_OBJECT (demux, "Manifest download failed.");
3643   /* Retry or error out here */
3644 }
3645
3646 static GstFlowReturn
3647 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3648 {
3649   DownloadRequest *request;
3650   GstFlowReturn ret = GST_FLOW_OK;
3651   GError *error = NULL;
3652
3653   request = download_request_new_uri (demux->manifest_uri);
3654
3655   download_request_set_callbacks (request,
3656       (DownloadRequestEventCallback) handle_manifest_download_complete,
3657       (DownloadRequestEventCallback) handle_manifest_download_failure,
3658       NULL, NULL, demux);
3659
3660   if (!downloadhelper_submit_request (demux->download_helper, NULL,
3661           DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
3662           &error)) {
3663     if (error) {
3664       GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
3665           ("Failed to download manifest: %s", error->message), (NULL));
3666       g_clear_error (&error);
3667     }
3668     ret = GST_FLOW_NOT_LINKED;
3669   }
3670
3671   return ret;
3672 }
3673
3674 /* must be called with manifest_lock taken */
3675 GstFlowReturn
3676 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3677 {
3678   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3679   GstFlowReturn ret;
3680
3681   ret = klass->update_manifest (demux);
3682
3683   return ret;
3684 }
3685
3686 /* must be called with manifest_lock taken */
3687 gboolean
3688 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3689 {
3690   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3691   gboolean ret = FALSE;
3692
3693   if (klass->has_next_period)
3694     ret = klass->has_next_period (demux);
3695   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3696   return ret;
3697 }
3698
3699 /* must be called with manifest_lock taken */
3700 void
3701 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3702 {
3703   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3704   GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
3705
3706   g_return_if_fail (klass->advance_period != NULL);
3707
3708   GST_DEBUG_OBJECT (demux, "Advancing to next period");
3709   /* FIXME : no return value ? What if it fails ? */
3710   klass->advance_period (demux);
3711
3712   if (previous_period == demux->input_period) {
3713     GST_ERROR_OBJECT (demux, "Advancing period failed");
3714     return;
3715   }
3716
3717   /* Stop the previous period stream tasks */
3718   gst_adaptive_demux_period_stop_tasks (previous_period);
3719
3720   gst_adaptive_demux_update_collection (demux, demux->input_period);
3721   /* Figure out a pre-emptive selection based on the output period selection */
3722   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
3723       demux->output_period);
3724
3725   gst_adaptive_demux_prepare_streams (demux, FALSE);
3726   gst_adaptive_demux_start_tasks (demux);
3727 }
3728
3729 /**
3730  * gst_adaptive_demux_get_monotonic_time:
3731  * Returns: a monotonically increasing time, using the system realtime clock
3732  */
3733 GstClockTime
3734 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
3735 {
3736   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3737   return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
3738 }
3739
3740 /**
3741  * gst_adaptive_demux_get_client_now_utc:
3742  * @demux: #GstAdaptiveDemux
3743  * Returns: the client's estimate of UTC
3744  *
3745  * Used to find the client's estimate of UTC, using the system realtime clock.
3746  */
3747 GDateTime *
3748 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
3749 {
3750   return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
3751 }
3752
3753 /**
3754  * gst_adaptive_demux_is_running
3755  * @demux: #GstAdaptiveDemux
3756  * Returns: whether the demuxer is processing data
3757  *
3758  * Returns FALSE if shutdown has started (transitioning down from
3759  * PAUSED), otherwise TRUE.
3760  */
3761 gboolean
3762 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
3763 {
3764   return g_atomic_int_get (&demux->running);
3765 }
3766
3767 /**
3768  * gst_adaptive_demux_get_qos_earliest_time:
3769  *
3770  * Returns: The QOS earliest time
3771  *
3772  * Since: 1.20
3773  */
3774 GstClockTime
3775 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
3776 {
3777   GstClockTime earliest;
3778
3779   GST_OBJECT_LOCK (demux);
3780   earliest = demux->priv->qos_earliest_time;
3781   GST_OBJECT_UNLOCK (demux);
3782
3783   return earliest;
3784 }
3785
3786 gboolean
3787 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
3788     GstAdaptiveDemux2Stream * stream)
3789 {
3790   g_return_val_if_fail (demux && stream, FALSE);
3791
3792   /* FIXME : Migrate to parent */
3793   g_return_val_if_fail (stream->demux == NULL, FALSE);
3794
3795   GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
3796
3797   TRACKS_LOCK (demux);
3798   if (demux->input_period->prepared) {
3799     GST_ERROR_OBJECT (demux,
3800         "Attempted to add streams but no new period was created");
3801     TRACKS_UNLOCK (demux);
3802     return FALSE;
3803   }
3804   stream->demux = demux;
3805   stream->period = demux->input_period;
3806   demux->input_period->streams =
3807       g_list_append (demux->input_period->streams, stream);
3808
3809   if (stream->tracks) {
3810     GList *iter;
3811     for (iter = stream->tracks; iter; iter = iter->next)
3812       if (!gst_adaptive_demux_period_add_track (demux->input_period,
3813               (GstAdaptiveDemuxTrack *) iter->data)) {
3814         GST_ERROR_OBJECT (demux, "Failed to add track elements");
3815         TRACKS_UNLOCK (demux);
3816         return FALSE;
3817       }
3818   }
3819   TRACKS_UNLOCK (demux);
3820   return TRUE;
3821 }
3822
3823 /* Return the current playback rate including any instant rate multiplier */
3824 gdouble
3825 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
3826 {
3827   gdouble rate;
3828   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3829   rate = demux->segment.rate * demux->instant_rate_multiplier;
3830   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3831   return rate;
3832 }