b465ac37437da909d9ec75f3ddeba4014d74b63b
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:plugin-adaptivedemux2
28  * @short_description: Next Generation adaptive demuxers
29  *
30  * What is an adaptive demuxer?  Adaptive demuxers are special demuxers in the
31  * sense that they don't actually demux data received from upstream but download
32  * the data themselves.
33  *
34  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35  * of fragments. The manifest describes the available media and the sequence of
36  * fragments to use. Each fragment contains a small part of the media (typically
37  * only a few seconds). It is possible for the manifest to have the same media
38  * available in different configurations (bitrates for example) so that the
39  * client can select the one that best suits its scenario (network fluctuation,
40  * hardware requirements...).
41  *
42  * Furthermore, that manifest can also specify alternative medias (such as audio
43  * or subtitle tracks in different languages). Only the fragments for the
44  * requested selection will be download.
45  *
46  * These elements can therefore "adapt" themselves to the network conditions (as
47  * opposed to the server doing that adaptation) and user choices, which is why
48  * they are called "adaptive" demuxers.
49  *
50  * Note: These elements require a "streams-aware" container to work
51  * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52  * GST_BIN_FLAG_STREAMS_AWARE flag set).
53  *
54  * Subclasses:
55  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56  * about the intrinsics of the subclass formats, so the subclasses are
57  * responsible for maintaining the manifest data structures and stream
58  * information.
59  *
60  * Since: 1.22
61  */
62
63 /*
64 See the adaptive-demuxer.md design documentation for more information
65
66 MT safety.
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
72
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
80
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
97
98 By using the api_lock a thread is protected against other API calls.
99 */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
107
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
111
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
114
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
118
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
121
122 #define DEFAULT_MAX_BUFFERING_TIME (30 *  GST_SECOND)
123
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME 0  /* Automatic */
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
128
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
131
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
135
136 enum
137 {
138   PROP_0,
139   PROP_CONNECTION_SPEED,
140   PROP_BANDWIDTH_TARGET_RATIO,
141   PROP_CONNECTION_BITRATE,
142   PROP_MIN_BITRATE,
143   PROP_MAX_BITRATE,
144   PROP_CURRENT_BANDWIDTH,
145   PROP_MAX_BUFFERING_TIME,
146   PROP_BUFFERING_HIGH_WATERMARK_TIME,
147   PROP_BUFFERING_LOW_WATERMARK_TIME,
148   PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149   PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150   PROP_CURRENT_LEVEL_TIME_VIDEO,
151   PROP_CURRENT_LEVEL_TIME_AUDIO,
152   PROP_LAST
153 };
154
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
157     GST_PAD_SRC,
158     GST_PAD_SOMETIMES,
159     GST_STATIC_CAPS_ANY);
160
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
163     GST_PAD_SRC,
164     GST_PAD_SOMETIMES,
165     GST_STATIC_CAPS_ANY);
166
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
169     GST_PAD_SRC,
170     GST_PAD_SOMETIMES,
171     GST_STATIC_CAPS_ANY);
172
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
175 {
176   /* Output pad */
177   GstPad *pad;
178
179   /* Last flow return */
180   GstFlowReturn flow_ret;
181
182   /* Stream Type */
183   GstStreamType type;
184
185   /* Target track (reference) */
186   GstAdaptiveDemuxTrack *track;
187
188   /* Pending track (which will replace track) */
189   GstAdaptiveDemuxTrack *pending_track;
190
191   /* TRUE if a buffer or a gap event was pushed through this slot. */
192   gboolean pushed_timed_data;
193 } OutputSlot;
194
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
197
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200     GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203     element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
205     GstQuery * query);
206 static gboolean gst_adaptive_demux_send_event (GstElement * element,
207     GstEvent * event);
208
209 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
210
211 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
212     GstEvent * event);
213 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
214     GstObject * parent, GstBuffer * buffer);
215 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
216     GstQuery * query);
217 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
218     GstEvent * event);
219 static gboolean gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
220     GstEvent * event);
221 static gboolean gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux
222     * demux, GstEvent * event);
223
224 static gboolean
225 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
226
227 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
228 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
229 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
230     gboolean first_and_live);
231
232 static GstFlowReturn
233 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
234
235 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
236     demux);
237 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
238     demux);
239
240 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
241 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
242     gboolean stop_updates);
243
244 static gboolean
245 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
246     * demux);
247
248 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
249  * method to get to the padtemplates */
250 GType
251 gst_adaptive_demux_ng_get_type (void)
252 {
253   static gsize type = 0;
254
255   if (g_once_init_enter (&type)) {
256     GType _type;
257     static const GTypeInfo info = {
258       sizeof (GstAdaptiveDemuxClass),
259       NULL,
260       NULL,
261       (GClassInitFunc) gst_adaptive_demux_class_init,
262       NULL,
263       NULL,
264       sizeof (GstAdaptiveDemux),
265       0,
266       (GInstanceInitFunc) gst_adaptive_demux_init,
267     };
268
269     _type = g_type_register_static (GST_TYPE_BIN,
270         "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
271
272     private_offset =
273         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
274
275     g_once_init_leave (&type, _type);
276   }
277   return type;
278 }
279
280 static inline GstAdaptiveDemuxPrivate *
281 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
282 {
283   return (G_STRUCT_MEMBER_P (self, private_offset));
284 }
285
286 static void
287 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
288     const GValue * value, GParamSpec * pspec)
289 {
290   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
291
292   GST_OBJECT_LOCK (demux);
293
294   switch (prop_id) {
295     case PROP_CONNECTION_SPEED:
296       demux->connection_speed = g_value_get_uint (value) * 1000;
297       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
298           demux->connection_speed);
299       break;
300     case PROP_BANDWIDTH_TARGET_RATIO:
301       demux->bandwidth_target_ratio = g_value_get_float (value);
302       break;
303     case PROP_MIN_BITRATE:
304       demux->min_bitrate = g_value_get_uint (value);
305       break;
306     case PROP_MAX_BITRATE:
307       demux->max_bitrate = g_value_get_uint (value);
308       break;
309     case PROP_CONNECTION_BITRATE:
310       demux->connection_speed = g_value_get_uint (value);
311       break;
312       /* FIXME: Recalculate track and buffering levels
313        * when watermarks change? */
314     case PROP_MAX_BUFFERING_TIME:
315       demux->max_buffering_time = g_value_get_uint64 (value);
316       break;
317     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
318       demux->buffering_high_watermark_time = g_value_get_uint64 (value);
319       break;
320     case PROP_BUFFERING_LOW_WATERMARK_TIME:
321       demux->buffering_low_watermark_time = g_value_get_uint64 (value);
322       break;
323     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
324       demux->buffering_high_watermark_fragments = g_value_get_double (value);
325       break;
326     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
327       demux->buffering_low_watermark_fragments = g_value_get_double (value);
328       break;
329     default:
330       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
331       break;
332   }
333
334   GST_OBJECT_UNLOCK (demux);
335 }
336
337 static void
338 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
339     GValue * value, GParamSpec * pspec)
340 {
341   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
342
343   GST_OBJECT_LOCK (demux);
344
345   switch (prop_id) {
346     case PROP_CONNECTION_SPEED:
347       g_value_set_uint (value, demux->connection_speed / 1000);
348       break;
349     case PROP_BANDWIDTH_TARGET_RATIO:
350       g_value_set_float (value, demux->bandwidth_target_ratio);
351       break;
352     case PROP_MIN_BITRATE:
353       g_value_set_uint (value, demux->min_bitrate);
354       break;
355     case PROP_MAX_BITRATE:
356       g_value_set_uint (value, demux->max_bitrate);
357       break;
358     case PROP_CONNECTION_BITRATE:
359       g_value_set_uint (value, demux->connection_speed);
360       break;
361     case PROP_CURRENT_BANDWIDTH:
362       g_value_set_uint (value, demux->current_download_rate);
363       break;
364     case PROP_MAX_BUFFERING_TIME:
365       g_value_set_uint64 (value, demux->max_buffering_time);
366       break;
367     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
368       g_value_set_uint64 (value, demux->buffering_high_watermark_time);
369       break;
370     case PROP_BUFFERING_LOW_WATERMARK_TIME:
371       g_value_set_uint64 (value, demux->buffering_low_watermark_time);
372       break;
373     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
374       g_value_set_double (value, demux->buffering_high_watermark_fragments);
375       break;
376     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
377       g_value_set_double (value, demux->buffering_low_watermark_fragments);
378       break;
379     case PROP_CURRENT_LEVEL_TIME_VIDEO:
380       g_value_set_uint64 (value, demux->current_level_time_video);
381       break;
382     case PROP_CURRENT_LEVEL_TIME_AUDIO:
383       g_value_set_uint64 (value, demux->current_level_time_audio);
384       break;
385     default:
386       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
387       break;
388   }
389
390   GST_OBJECT_UNLOCK (demux);
391 }
392
393 static void
394 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
395 {
396   GObjectClass *gobject_class;
397   GstElementClass *gstelement_class;
398   GstBinClass *gstbin_class;
399
400   gobject_class = G_OBJECT_CLASS (klass);
401   gstelement_class = GST_ELEMENT_CLASS (klass);
402   gstbin_class = GST_BIN_CLASS (klass);
403
404   GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
405       "Base Adaptive Demux (ng)");
406
407   parent_class = g_type_class_peek_parent (klass);
408
409   if (private_offset != 0)
410     g_type_class_adjust_private_offset (klass, &private_offset);
411
412   gobject_class->set_property = gst_adaptive_demux_set_property;
413   gobject_class->get_property = gst_adaptive_demux_get_property;
414   gobject_class->finalize = gst_adaptive_demux_finalize;
415
416   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
417       g_param_spec_uint ("connection-speed", "Connection Speed",
418           "Network connection speed to use in kbps (0 = calculate from downloaded"
419           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
420           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
421
422   g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
423       g_param_spec_float ("bandwidth-target-ratio",
424           "Ratio of target bandwidth / available bandwidth",
425           "Limit of the available bitrate to use when switching to alternates",
426           0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
427           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428
429   g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
430       g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
431           "Network connection speed to use (0 = automatic) (bits/s)",
432           0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
433           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434
435   g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
436       g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
437           "Minimum bitrate to use when switching to alternates (bits/s)",
438           0, G_MAXUINT, DEFAULT_MIN_BITRATE,
439           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
442       g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
443           "Maximum bitrate to use when switching to alternates (bits/s)",
444           0, G_MAXUINT, DEFAULT_MAX_BITRATE,
445           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446
447   g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
448       g_param_spec_uint ("current-bandwidth",
449           "Current download bandwidth (bits/s)",
450           "Report of current download bandwidth (based on arriving data) (bits/s)",
451           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
452
453   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
454       g_param_spec_uint64 ("max-buffering-time",
455           "Buffering maximum size (ns)",
456           "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
457           0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
458           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
459           G_PARAM_STATIC_STRINGS));
460
461   g_object_class_install_property (gobject_class,
462       PROP_BUFFERING_HIGH_WATERMARK_TIME,
463       g_param_spec_uint64 ("high-watermark-time",
464           "High buffering watermark size (ns)",
465           "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
466           0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
467           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468           G_PARAM_STATIC_STRINGS));
469
470   g_object_class_install_property (gobject_class,
471       PROP_BUFFERING_LOW_WATERMARK_TIME,
472       g_param_spec_uint64 ("low-watermark-time",
473           "Low buffering watermark size (ns)",
474           "Low watermark for parsed data below which downloads are resumed (in ns, 0=automatic)",
475           0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
476           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477           G_PARAM_STATIC_STRINGS));
478
479   g_object_class_install_property (gobject_class,
480       PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
481       g_param_spec_double ("high-watermark-fragments",
482           "High buffering watermark size (fragments)",
483           "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
484           0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
485           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
486           G_PARAM_STATIC_STRINGS));
487
488   g_object_class_install_property (gobject_class,
489       PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
490       g_param_spec_double ("low-watermark-fragments",
491           "Low buffering watermark size (fragments)",
492           "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
493           0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
494           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495           G_PARAM_STATIC_STRINGS));
496
497   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
498       g_param_spec_uint64 ("current-level-time-video",
499           "Currently buffered level of video (ns)",
500           "Currently buffered level of video track(s) (ns)",
501           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
502           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
503           G_PARAM_STATIC_STRINGS));
504
505   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
506       g_param_spec_uint64 ("current-level-time-audio",
507           "Currently buffered level of audio (ns)",
508           "Currently buffered level of audio track(s) (ns)",
509           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
510           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
511           G_PARAM_STATIC_STRINGS));
512
513   gst_element_class_add_static_pad_template (gstelement_class,
514       &gst_adaptive_demux_audiosrc_template);
515   gst_element_class_add_static_pad_template (gstelement_class,
516       &gst_adaptive_demux_videosrc_template);
517   gst_element_class_add_static_pad_template (gstelement_class,
518       &gst_adaptive_demux_subtitlesrc_template);
519
520   gstelement_class->change_state = gst_adaptive_demux_change_state;
521   gstelement_class->query = gst_adaptive_demux_query;
522   gstelement_class->send_event = gst_adaptive_demux_send_event;
523
524   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
525
526   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
527   klass->requires_periodical_playlist_update =
528       gst_adaptive_demux_requires_periodical_playlist_update_default;
529   gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
530 }
531
532 static void
533 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
534     GstAdaptiveDemuxClass * klass)
535 {
536   GstPadTemplate *pad_template;
537
538   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
539
540   demux->priv = gst_adaptive_demux_get_instance_private (demux);
541   demux->priv->input_adapter = gst_adapter_new ();
542   demux->realtime_clock = gst_adaptive_demux_clock_new ();
543
544   demux->download_helper = downloadhelper_new (demux->realtime_clock);
545   demux->priv->segment_seqnum = gst_util_seqnum_next ();
546   demux->have_group_id = FALSE;
547   demux->group_id = G_MAXUINT;
548
549   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
550   demux->instant_rate_multiplier = 1.0;
551
552   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
553       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
554
555   g_rec_mutex_init (&demux->priv->manifest_lock);
556
557   demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
558
559   g_mutex_init (&demux->priv->api_lock);
560   g_mutex_init (&demux->priv->segment_lock);
561
562   g_mutex_init (&demux->priv->tracks_lock);
563   g_cond_init (&demux->priv->tracks_add);
564
565   g_mutex_init (&demux->priv->buffering_lock);
566
567   demux->priv->periods = g_queue_new ();
568
569   pad_template =
570       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
571   g_return_if_fail (pad_template != NULL);
572
573   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
574   gst_pad_set_event_function (demux->sinkpad,
575       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
576   gst_pad_set_chain_function (demux->sinkpad,
577       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
578
579   /* Properties */
580   demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
581   demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
582   demux->min_bitrate = DEFAULT_MIN_BITRATE;
583   demux->max_bitrate = DEFAULT_MAX_BITRATE;
584
585   demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
586   demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
587   demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
588   demux->buffering_high_watermark_fragments =
589       DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
590   demux->buffering_low_watermark_fragments =
591       DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
592
593   demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
594   demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
595
596   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
597
598   demux->priv->duration = GST_CLOCK_TIME_NONE;
599
600   /* Output combiner */
601   demux->priv->flowcombiner = gst_flow_combiner_new ();
602
603   /* Output task */
604   g_rec_mutex_init (&demux->priv->output_lock);
605   demux->priv->output_task =
606       gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
607       NULL);
608   gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
609 }
610
611 static void
612 gst_adaptive_demux_finalize (GObject * object)
613 {
614   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
615   GstAdaptiveDemuxPrivate *priv = demux->priv;
616
617   GST_DEBUG_OBJECT (object, "finalize");
618
619   g_object_unref (priv->input_adapter);
620
621   downloadhelper_free (demux->download_helper);
622
623   g_rec_mutex_clear (&demux->priv->manifest_lock);
624   g_mutex_clear (&demux->priv->api_lock);
625   g_mutex_clear (&demux->priv->segment_lock);
626
627   g_mutex_clear (&demux->priv->buffering_lock);
628
629   gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
630
631   /* The input period is present after a reset, clear it now */
632   if (demux->input_period)
633     gst_adaptive_demux_period_unref (demux->input_period);
634
635   if (demux->realtime_clock) {
636     gst_adaptive_demux_clock_unref (demux->realtime_clock);
637     demux->realtime_clock = NULL;
638   }
639   g_object_unref (priv->output_task);
640   g_rec_mutex_clear (&priv->output_lock);
641
642   gst_flow_combiner_free (priv->flowcombiner);
643
644   g_queue_free (priv->periods);
645
646   G_OBJECT_CLASS (parent_class)->finalize (object);
647 }
648
649 static gboolean
650 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
651 {
652   gboolean ret = FALSE;
653   GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
654
655   if (parent) {
656     ret = GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE);
657     gst_object_unref (parent);
658   }
659
660   return ret;
661 }
662
663 static GstStateChangeReturn
664 gst_adaptive_demux_change_state (GstElement * element,
665     GstStateChange transition)
666 {
667   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
668   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
669
670   switch (transition) {
671     case GST_STATE_CHANGE_NULL_TO_READY:
672       if (!gst_adaptive_demux_check_streams_aware (demux)) {
673         GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
674             (_("Element requires a streams-aware context.")), (NULL));
675         goto fail_out;
676       }
677       break;
678     case GST_STATE_CHANGE_PAUSED_TO_READY:
679       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
680         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
681
682       gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
683       downloadhelper_stop (demux->download_helper);
684
685       TRACKS_LOCK (demux);
686       demux->priv->flushing = TRUE;
687       g_cond_signal (&demux->priv->tracks_add);
688       gst_task_stop (demux->priv->output_task);
689       TRACKS_UNLOCK (demux);
690
691       gst_task_join (demux->priv->output_task);
692
693       GST_API_LOCK (demux);
694       gst_adaptive_demux_reset (demux);
695       GST_API_UNLOCK (demux);
696       break;
697     case GST_STATE_CHANGE_READY_TO_PAUSED:
698       GST_API_LOCK (demux);
699       gst_adaptive_demux_reset (demux);
700
701       gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
702       if (g_atomic_int_get (&demux->priv->have_manifest))
703         gst_adaptive_demux_start_manifest_update_task (demux);
704       GST_API_UNLOCK (demux);
705       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
706         GST_DEBUG_OBJECT (demux, "demuxer has started running");
707       /* gst_task_start (demux->priv->output_task); */
708       break;
709     default:
710       break;
711   }
712
713   /* this must be run with the scheduler and output tasks stopped. */
714   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
715
716   switch (transition) {
717     case GST_STATE_CHANGE_READY_TO_PAUSED:
718       /* Start download task */
719       downloadhelper_start (demux->download_helper);
720       break;
721     default:
722       break;
723   }
724
725 fail_out:
726   return result;
727 }
728
729 static void
730 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
731     OutputSlot * slot)
732 {
733   GstEvent *eos = gst_event_new_eos ();
734   GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
735
736   /* FIXME: The slot might not have output any data, caps or segment yet */
737   gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
738   gst_pad_push_event (slot->pad, eos);
739   gst_pad_set_active (slot->pad, FALSE);
740   gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
741   gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
742   if (slot->track)
743     gst_adaptive_demux_track_unref (slot->track);
744   if (slot->pending_track)
745     gst_adaptive_demux_track_unref (slot->pending_track);
746
747   g_slice_free (OutputSlot, slot);
748 }
749
750 static OutputSlot *
751 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
752     GstStreamType streamtype)
753 {
754   OutputSlot *slot;
755   GstPadTemplate *tmpl;
756   gchar *name;
757
758   switch (streamtype) {
759     case GST_STREAM_TYPE_AUDIO:
760       name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
761       tmpl =
762           gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
763       break;
764     case GST_STREAM_TYPE_VIDEO:
765       name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
766       tmpl =
767           gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
768       break;
769     case GST_STREAM_TYPE_TEXT:
770       name =
771           g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
772       tmpl =
773           gst_static_pad_template_get
774           (&gst_adaptive_demux_subtitlesrc_template);
775       break;
776     default:
777       g_assert_not_reached ();
778       return NULL;
779   }
780
781   slot = g_slice_new0 (OutputSlot);
782   slot->type = streamtype;
783   slot->pushed_timed_data = FALSE;
784
785   /* Create and activate new pads */
786   slot->pad = gst_pad_new_from_template (tmpl, name);
787   g_free (name);
788   gst_object_unref (tmpl);
789
790   gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
791   gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
792   gst_pad_set_active (slot->pad, TRUE);
793
794   gst_pad_set_query_function (slot->pad,
795       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
796   gst_pad_set_event_function (slot->pad,
797       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
798
799   gst_pad_set_element_private (slot->pad, slot);
800
801   GST_INFO_OBJECT (demux, "Created output slot %s:%s",
802       GST_DEBUG_PAD_NAME (slot->pad));
803   return slot;
804 }
805
806 /* Called:
807  * * After `process_manifest` or when a period starts
808  * * Or when all tracks have been created
809  *
810  * Goes over tracks and creates the collection
811  *
812  * Returns TRUE if the collection was fully created.
813  *
814  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
815  * */
816 static gboolean
817 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
818     GstAdaptiveDemuxPeriod * period)
819 {
820   GstStreamCollection *collection;
821   GList *iter;
822
823   GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
824
825   if (!period->tracks_changed) {
826     GST_DEBUG_OBJECT (demux, "Tracks didn't change");
827     return TRUE;
828   }
829
830   if (!period->tracks) {
831     GST_WARNING_OBJECT (demux, "No tracks registered/present");
832     return FALSE;
833   }
834
835   if (gst_adaptive_demux_period_has_pending_tracks (period)) {
836     GST_DEBUG_OBJECT (demux,
837         "Streams still have pending tracks, not creating/updating collection");
838     return FALSE;
839   }
840
841   /* Update collection */
842   collection = gst_stream_collection_new ("adaptivedemux");
843
844   for (iter = period->tracks; iter; iter = iter->next) {
845     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
846
847     GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
848     gst_stream_collection_add_stream (collection,
849         gst_object_ref (track->stream_object));
850   }
851
852   if (period->collection)
853     gst_object_unref (period->collection);
854   period->collection = collection;
855
856   return TRUE;
857 }
858
859 /*
860  * Called for the output period:
861  * * after `update_collection()` if the input period is the same as the output period
862  * * When the output period changes
863  *
864  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
865  */
866 static gboolean
867 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
868 {
869   GstStreamCollection *collection;
870   GstAdaptiveDemuxPeriod *period = demux->output_period;
871   guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
872
873   g_return_val_if_fail (period, FALSE);
874   if (!period->collection) {
875     GST_DEBUG_OBJECT (demux, "No collection available yet");
876     return TRUE;
877   }
878
879   collection = period->collection;
880
881   GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
882       period->period_num);
883
884   /* Post collection */
885   TRACKS_UNLOCK (demux);
886   GST_MANIFEST_UNLOCK (demux);
887
888   gst_element_post_message (GST_ELEMENT_CAST (demux),
889       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
890
891   GST_MANIFEST_LOCK (demux);
892   TRACKS_LOCK (demux);
893
894   /* If no stream selection was handled, make a default selection */
895   if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
896     gst_adaptive_demux_period_select_default_tracks (demux,
897         demux->output_period);
898   }
899
900   /* Make sure the output task is running */
901   if (gst_adaptive_demux2_is_running (demux)) {
902     demux->priv->flushing = FALSE;
903     GST_DEBUG_OBJECT (demux, "Starting the output task");
904     gst_task_start (demux->priv->output_task);
905   }
906
907   return TRUE;
908 }
909
910 static gboolean
911 handle_incoming_manifest (GstAdaptiveDemux * demux)
912 {
913   GstAdaptiveDemuxClass *demux_class;
914   GstQuery *query;
915   gboolean query_res;
916   gboolean ret = TRUE;
917   gsize available;
918   GstBuffer *manifest_buffer;
919
920   GST_API_LOCK (demux);
921   GST_MANIFEST_LOCK (demux);
922
923   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
924
925   available = gst_adapter_available (demux->priv->input_adapter);
926
927   if (available == 0)
928     goto eos_without_data;
929
930   GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
931
932   /* Need to get the URI to use it as a base to generate the fragment's
933    * uris */
934   query = gst_query_new_uri ();
935   query_res = gst_pad_peer_query (demux->sinkpad, query);
936   if (query_res) {
937     gchar *uri, *redirect_uri;
938     gboolean permanent;
939
940     gst_query_parse_uri (query, &uri);
941     gst_query_parse_uri_redirection (query, &redirect_uri);
942     gst_query_parse_uri_redirection_permanent (query, &permanent);
943
944     if (permanent && redirect_uri) {
945       demux->manifest_uri = redirect_uri;
946       demux->manifest_base_uri = NULL;
947       g_free (uri);
948     } else {
949       demux->manifest_uri = uri;
950       demux->manifest_base_uri = redirect_uri;
951     }
952
953     GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
954         demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
955   } else {
956     GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
957   }
958   gst_query_unref (query);
959
960   /* If somehow we didn't receive a stream-start with a group_id, pick one now */
961   if (!demux->have_group_id) {
962     demux->have_group_id = TRUE;
963     demux->group_id = gst_util_group_id_next ();
964   }
965
966   /* Let the subclass parse the manifest */
967   manifest_buffer =
968       gst_adapter_take_buffer (demux->priv->input_adapter, available);
969   ret = demux_class->process_manifest (demux, manifest_buffer);
970   gst_buffer_unref (manifest_buffer);
971
972   gst_element_post_message (GST_ELEMENT_CAST (demux),
973       gst_message_new_element (GST_OBJECT_CAST (demux),
974           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
975               "manifest-uri", G_TYPE_STRING,
976               demux->manifest_uri, "uri", G_TYPE_STRING,
977               demux->manifest_uri,
978               "manifest-download-start", GST_TYPE_CLOCK_TIME,
979               GST_CLOCK_TIME_NONE,
980               "manifest-download-stop", GST_TYPE_CLOCK_TIME,
981               gst_util_get_timestamp (), NULL)));
982
983   if (!ret)
984     goto invalid_manifest;
985
986   /* Streams should have been added to the input period if the manifest parsing
987    * succeeded */
988   if (!demux->input_period->streams)
989     goto no_streams;
990
991   g_atomic_int_set (&demux->priv->have_manifest, TRUE);
992
993   GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
994   /* Send duration message */
995   if (!gst_adaptive_demux_is_live (demux)) {
996     GstClockTime duration = demux_class->get_duration (demux);
997
998     demux->priv->duration = duration;
999     if (duration != GST_CLOCK_TIME_NONE) {
1000       GST_DEBUG_OBJECT (demux,
1001           "Sending duration message : %" GST_TIME_FORMAT,
1002           GST_TIME_ARGS (duration));
1003       gst_element_post_message (GST_ELEMENT (demux),
1004           gst_message_new_duration_changed (GST_OBJECT (demux)));
1005     } else {
1006       GST_DEBUG_OBJECT (demux,
1007           "media duration unknown, can not send the duration message");
1008     }
1009   }
1010
1011   TRACKS_LOCK (demux);
1012   /* New streams/tracks will have been added to the input period */
1013   /* The input period has streams, make it the active output period */
1014   /* FIXME : Factorize this into a function to make a period active */
1015   demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1016   ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1017       gst_adaptive_demux_post_collection (demux);
1018   TRACKS_UNLOCK (demux);
1019
1020   gst_adaptive_demux_prepare_streams (demux,
1021       gst_adaptive_demux_is_live (demux));
1022   gst_adaptive_demux_start_tasks (demux);
1023   gst_adaptive_demux_start_manifest_update_task (demux);
1024
1025 unlock_out:
1026   GST_MANIFEST_UNLOCK (demux);
1027   GST_API_UNLOCK (demux);
1028
1029   return ret;
1030
1031   /* ERRORS */
1032 eos_without_data:
1033   {
1034     GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1035     ret = FALSE;
1036     goto unlock_out;
1037   }
1038
1039 no_streams:
1040   {
1041     /* no streams */
1042     GST_WARNING_OBJECT (demux, "No streams created from manifest");
1043     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1044         (_("This file contains no playable streams.")),
1045         ("No known stream formats found at the Manifest"));
1046     ret = FALSE;
1047     goto unlock_out;
1048   }
1049
1050 invalid_manifest:
1051   {
1052     GST_MANIFEST_UNLOCK (demux);
1053     GST_API_UNLOCK (demux);
1054
1055     /* In most cases, this will happen if we set a wrong url in the
1056      * source element and we have received the 404 HTML response instead of
1057      * the manifest */
1058     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1059     return FALSE;
1060   }
1061 }
1062
1063 struct http_headers_collector
1064 {
1065   GstAdaptiveDemux *demux;
1066   gchar **cookies;
1067 };
1068
1069 static gboolean
1070 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1071     const GValue * value, gpointer userdata)
1072 {
1073   struct http_headers_collector *hdr_data = userdata;
1074   GstAdaptiveDemux *demux = hdr_data->demux;
1075   const gchar *field_name = g_quark_to_string (field_id);
1076
1077   if (G_UNLIKELY (value == NULL))
1078     return TRUE;                /* This should not happen */
1079
1080   if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1081     const gchar *user_agent = g_value_get_string (value);
1082
1083     GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1084     downloadhelper_set_user_agent (demux->download_helper, user_agent);
1085   }
1086
1087   if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1088       g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1089     guint i = 0, prev_len = 0, total_len = 0;
1090     gchar **cookies = NULL;
1091
1092     if (hdr_data->cookies != NULL)
1093       prev_len = g_strv_length (hdr_data->cookies);
1094
1095     if (GST_VALUE_HOLDS_ARRAY (value)) {
1096       total_len = gst_value_array_get_size (value) + prev_len;
1097       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1098
1099       for (i = 0; i < gst_value_array_get_size (value); i++) {
1100         GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1101             g_value_get_string (gst_value_array_get_value (value, i)));
1102         cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1103       }
1104     } else if (G_VALUE_HOLDS_STRING (value)) {
1105       total_len = 1 + prev_len;
1106       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1107
1108       GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1109           g_value_get_string (value));
1110       cookies[0] = g_value_dup_string (value);
1111     } else {
1112       GST_WARNING_OBJECT (demux, "%s field is not string or array",
1113           g_quark_to_string (field_id));
1114     }
1115
1116     if (cookies) {
1117       if (prev_len) {
1118         guint j;
1119         for (j = 0; j < prev_len; j++) {
1120           GST_DEBUG_OBJECT (demux,
1121               "Append existing cookie %s", hdr_data->cookies[j]);
1122           cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1123         }
1124       }
1125       cookies[total_len] = NULL;
1126
1127       g_strfreev (hdr_data->cookies);
1128       hdr_data->cookies = cookies;
1129     }
1130   }
1131
1132   if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1133     const gchar *referer = g_value_get_string (value);
1134     GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1135
1136     downloadhelper_set_referer (demux->download_helper, referer);
1137   }
1138
1139   /* Date header can be used to estimate server offset */
1140   if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1141     const gchar *http_date = g_value_get_string (value);
1142
1143     if (http_date) {
1144       GstDateTime *datetime =
1145           gst_adaptive_demux_util_parse_http_head_date (http_date);
1146
1147       if (datetime) {
1148         GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1149         gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1150
1151         GST_INFO_OBJECT (demux,
1152             "HTTP response Date %s", GST_STR_NULL (date_string));
1153         g_free (date_string);
1154
1155         gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1156
1157         g_date_time_unref (utc_now);
1158         gst_date_time_unref (datetime);
1159       }
1160     }
1161   }
1162
1163   return TRUE;
1164 }
1165
1166 static gboolean
1167 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1168     GstEvent * event)
1169 {
1170   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1171   gboolean ret;
1172
1173   switch (event->type) {
1174     case GST_EVENT_FLUSH_STOP:{
1175       GST_API_LOCK (demux);
1176       GST_MANIFEST_LOCK (demux);
1177
1178       gst_adaptive_demux_reset (demux);
1179
1180       ret = gst_pad_event_default (pad, parent, event);
1181
1182       GST_MANIFEST_UNLOCK (demux);
1183       GST_API_UNLOCK (demux);
1184
1185       return ret;
1186     }
1187     case GST_EVENT_EOS:
1188     {
1189       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1190         if (!handle_incoming_manifest (demux)) {
1191           GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1192           return gst_pad_event_default (pad, parent, event);
1193         }
1194         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1195       } else {
1196         GST_ERROR_OBJECT (demux,
1197             "Failed to acquire scheduler to handle manifest");
1198         return gst_pad_event_default (pad, parent, event);
1199       }
1200       gst_event_unref (event);
1201       return TRUE;
1202     }
1203     case GST_EVENT_STREAM_START:
1204       if (gst_event_parse_group_id (event, &demux->group_id))
1205         demux->have_group_id = TRUE;
1206       else
1207         demux->have_group_id = FALSE;
1208       /* Swallow stream-start, we'll push our own */
1209       gst_event_unref (event);
1210       return TRUE;
1211     case GST_EVENT_SEGMENT:
1212       /* Swallow newsegments, we'll push our own */
1213       gst_event_unref (event);
1214       return TRUE;
1215     case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1216       const GstStructure *structure = gst_event_get_structure (event);
1217       struct http_headers_collector c = { demux, NULL };
1218
1219       if (gst_structure_has_name (structure, "http-headers")) {
1220         if (gst_structure_has_field (structure, "request-headers")) {
1221           GstStructure *req_headers = NULL;
1222           gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1223               &req_headers, NULL);
1224           if (req_headers) {
1225             gst_structure_foreach (req_headers,
1226                 gst_adaptive_demux_handle_upstream_http_header, &c);
1227             gst_structure_free (req_headers);
1228           }
1229         }
1230         if (gst_structure_has_field (structure, "response-headers")) {
1231           GstStructure *res_headers = NULL;
1232           gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1233               &res_headers, NULL);
1234           if (res_headers) {
1235             gst_structure_foreach (res_headers,
1236                 gst_adaptive_demux_handle_upstream_http_header, &c);
1237             gst_structure_free (res_headers);
1238           }
1239         }
1240
1241         if (c.cookies)
1242           downloadhelper_set_cookies (demux->download_helper, c.cookies);
1243       }
1244       break;
1245     }
1246     default:
1247       break;
1248   }
1249
1250   return gst_pad_event_default (pad, parent, event);
1251 }
1252
1253 static GstFlowReturn
1254 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1255     GstBuffer * buffer)
1256 {
1257   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1258
1259   GST_MANIFEST_LOCK (demux);
1260
1261   gst_adapter_push (demux->priv->input_adapter, buffer);
1262
1263   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1264       (gint) gst_adapter_available (demux->priv->input_adapter));
1265
1266   GST_MANIFEST_UNLOCK (demux);
1267   return GST_FLOW_OK;
1268 }
1269
1270
1271 /* Called with TRACKS_LOCK taken */
1272 static void
1273 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1274 {
1275   GList *tmp;
1276
1277   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1278     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1279
1280     gst_adaptive_demux_track_flush (track);
1281     if (gst_pad_is_active (track->sinkpad)) {
1282       gst_pad_set_active (track->sinkpad, FALSE);
1283       gst_pad_set_active (track->sinkpad, TRUE);
1284     }
1285   }
1286 }
1287
1288 /* Resets all tracks to their initial state, ready to receive new data. */
1289 static void
1290 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1291 {
1292   TRACKS_LOCK (demux);
1293   g_queue_foreach (demux->priv->periods,
1294       (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1295   TRACKS_UNLOCK (demux);
1296 }
1297
1298 /* Subclasses will call this function to ensure that a new input period is
1299  * available to receive new streams and tracks */
1300 gboolean
1301 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1302 {
1303   if (demux->input_period && !demux->input_period->prepared) {
1304     GST_DEBUG_OBJECT (demux, "Using existing input period");
1305     return TRUE;
1306   }
1307
1308   if (demux->input_period) {
1309     GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1310     demux->input_period->has_next_period = TRUE;
1311   }
1312   GST_DEBUG_OBJECT (demux, "Setting up new period");
1313
1314   demux->input_period = gst_adaptive_demux_period_new (demux);
1315
1316   return TRUE;
1317 }
1318
1319 /* must be called with manifest_lock taken */
1320 static void
1321 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1322 {
1323   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1324   GList *iter;
1325
1326   gst_adaptive_demux_stop_tasks (demux, TRUE);
1327
1328   if (klass->reset)
1329     klass->reset (demux);
1330
1331   /* Disable and remove all outputs */
1332   GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1333   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1334     gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1335   }
1336   g_list_free (demux->priv->outputs);
1337   demux->priv->outputs = NULL;
1338
1339   g_queue_clear_full (demux->priv->periods,
1340       (GDestroyNotify) gst_adaptive_demux_period_unref);
1341
1342   /* The output period always has an extra ref taken on it */
1343   if (demux->output_period)
1344     gst_adaptive_demux_period_unref (demux->output_period);
1345   demux->output_period = NULL;
1346   /* The input period doesn't have an extra ref taken on it */
1347   demux->input_period = NULL;
1348
1349   gst_adaptive_demux_start_new_period (demux);
1350
1351   g_free (demux->manifest_uri);
1352   g_free (demux->manifest_base_uri);
1353   demux->manifest_uri = NULL;
1354   demux->manifest_base_uri = NULL;
1355
1356   gst_adapter_clear (demux->priv->input_adapter);
1357   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1358
1359   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1360   demux->instant_rate_multiplier = 1.0;
1361
1362   demux->priv->duration = GST_CLOCK_TIME_NONE;
1363
1364   demux->priv->percent = -1;
1365   demux->priv->is_buffering = TRUE;
1366
1367   demux->have_group_id = FALSE;
1368   demux->group_id = G_MAXUINT;
1369   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1370
1371   demux->priv->global_output_position = 0;
1372
1373   demux->priv->n_audio_streams = 0;
1374   demux->priv->n_video_streams = 0;
1375   demux->priv->n_subtitle_streams = 0;
1376
1377   gst_flow_combiner_reset (demux->priv->flowcombiner);
1378 }
1379
1380 static gboolean
1381 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1382 {
1383   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1384
1385   GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1386
1387   switch (GST_QUERY_TYPE (query)) {
1388     case GST_QUERY_BUFFERING:
1389     {
1390       GstFormat format;
1391       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1392
1393       if (!demux->output_period) {
1394         if (format != GST_FORMAT_TIME) {
1395           GST_DEBUG_OBJECT (demux,
1396               "No period setup yet, can't answer non-TIME buffering queries");
1397           return FALSE;
1398         }
1399
1400         GST_DEBUG_OBJECT (demux,
1401             "No period setup yet, but still answering buffering query");
1402         return TRUE;
1403       }
1404     }
1405     default:
1406       break;
1407   }
1408
1409   return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1410 }
1411
1412 static gboolean
1413 gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
1414 {
1415   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1416   gboolean res = FALSE;
1417
1418   GST_DEBUG_OBJECT (demux, "Received event %" GST_PTR_FORMAT, event);
1419
1420   switch (GST_EVENT_TYPE (event)) {
1421     case GST_EVENT_SEEK:
1422     {
1423       res = gst_adaptive_demux_handle_seek_event (demux, event);
1424       break;
1425     }
1426     case GST_EVENT_SELECT_STREAMS:
1427     {
1428       res = gst_adaptive_demux_handle_select_streams_event (demux, event);
1429       break;
1430     }
1431     default:
1432       res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1433       break;
1434   }
1435   return res;
1436 }
1437
1438 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1439 static GstAdaptiveDemux2Stream *
1440 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1441 {
1442   GList *iter;
1443
1444   /* We only look in the streams of the input period (i.e. with active streams) */
1445   for (iter = demux->input_period->streams; iter; iter = iter->next) {
1446     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1447     if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1448       return stream;
1449     }
1450   }
1451
1452   return NULL;
1453 }
1454
1455 static void
1456 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1457     GstMessage * msg)
1458 {
1459   GstAdaptiveDemux2Stream *stream;
1460   GstStreamCollection *collection = NULL;
1461   gboolean pending_tracks_activated = FALSE;
1462
1463   GST_MANIFEST_LOCK (demux);
1464
1465   stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1466   if (stream == NULL) {
1467     GST_WARNING_OBJECT (demux,
1468         "Failed to locate stream for collection message");
1469     goto beach;
1470   }
1471
1472   gst_message_parse_stream_collection (msg, &collection);
1473   if (!collection)
1474     goto beach;
1475
1476   TRACKS_LOCK (demux);
1477
1478   if (!gst_adaptive_demux2_stream_handle_collection (stream, collection,
1479           &pending_tracks_activated)) {
1480     TRACKS_UNLOCK (demux);
1481
1482     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1483         (_("Stream format can't be handled")),
1484         ("The streams provided by the multiplex are ambiguous"));
1485     goto beach;
1486   }
1487
1488   if (pending_tracks_activated) {
1489     /* If pending tracks were handled, then update the demuxer collection */
1490     if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1491         demux->input_period == demux->output_period) {
1492       gst_adaptive_demux_post_collection (demux);
1493     }
1494
1495     /* If we discovered pending tracks and we no longer have any, we can ensure
1496      * selected tracks are started */
1497     if (!gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1498       GList *iter = demux->input_period->streams;
1499       for (; iter; iter = iter->next) {
1500         GstAdaptiveDemux2Stream *new_stream = iter->data;
1501
1502         /* The stream that posted this collection was already started. If a
1503          * different stream is now selected, start it */
1504         if (stream != new_stream
1505             && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1506           gst_adaptive_demux2_stream_start (new_stream);
1507       }
1508     }
1509   }
1510   TRACKS_UNLOCK (demux);
1511
1512 beach:
1513   GST_MANIFEST_UNLOCK (demux);
1514
1515   if (collection)
1516     gst_object_unref (collection);
1517   gst_message_unref (msg);
1518   msg = NULL;
1519 }
1520
1521 static void
1522 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1523 {
1524   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1525
1526   switch (GST_MESSAGE_TYPE (msg)) {
1527     case GST_MESSAGE_STREAM_COLLECTION:
1528     {
1529       gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1530       return;
1531     }
1532     case GST_MESSAGE_ERROR:{
1533       GstAdaptiveDemux2Stream *stream = NULL;
1534       GError *err = NULL;
1535       gchar *debug = NULL;
1536       gchar *new_error = NULL;
1537       const GstStructure *details = NULL;
1538
1539       GST_MANIFEST_LOCK (demux);
1540
1541       stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1542       if (stream == NULL) {
1543         GST_WARNING_OBJECT (demux,
1544             "Failed to locate stream for errored element");
1545         GST_MANIFEST_UNLOCK (demux);
1546         break;
1547       }
1548
1549       gst_message_parse_error (msg, &err, &debug);
1550
1551       GST_WARNING_OBJECT (demux,
1552           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1553           err->message, debug);
1554
1555       if (debug)
1556         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1557       if (new_error) {
1558         g_free (err->message);
1559         err->message = new_error;
1560       }
1561
1562       gst_message_parse_error_details (msg, &details);
1563       if (details) {
1564         gst_structure_get_uint (details, "http-status-code",
1565             &stream->last_status_code);
1566       }
1567
1568       /* error, but ask to retry */
1569       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1570         gst_adaptive_demux2_stream_parse_error (stream, err);
1571         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1572       }
1573
1574       g_error_free (err);
1575       g_free (debug);
1576
1577       GST_MANIFEST_UNLOCK (demux);
1578
1579       gst_message_unref (msg);
1580       msg = NULL;
1581     }
1582       break;
1583     default:
1584       break;
1585   }
1586
1587   if (msg)
1588     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1589 }
1590
1591 /* must be called with manifest_lock taken */
1592 GstClockTime
1593 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1594 {
1595   GstAdaptiveDemuxClass *klass;
1596
1597   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1598
1599   if (klass->get_period_start_time == NULL)
1600     return 0;
1601
1602   return klass->get_period_start_time (demux);
1603 }
1604
1605 /* must be called with manifest_lock taken */
1606 static gboolean
1607 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1608     gboolean first_and_live)
1609 {
1610   GList *iter;
1611   GstClockTime period_start;
1612   GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1613   GList *new_streams;
1614
1615   g_return_val_if_fail (demux->input_period->streams, FALSE);
1616   g_assert (demux->input_period->prepared == FALSE);
1617
1618   new_streams = demux->input_period->streams;
1619
1620   if (!gst_adaptive_demux2_is_running (demux)) {
1621     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1622     return TRUE;
1623   }
1624
1625   GST_DEBUG_OBJECT (demux,
1626       "Preparing %d streams for period %d , first_and_live:%d",
1627       g_list_length (new_streams), demux->input_period->period_num,
1628       first_and_live);
1629
1630   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1631     GstAdaptiveDemux2Stream *stream = iter->data;
1632
1633     GST_DEBUG_OBJECT (stream, "Preparing stream");
1634
1635     stream->need_header = TRUE;
1636     stream->discont = TRUE;
1637
1638     /* Grab the first stream time for live streams
1639      * * If the stream is selected
1640      * * Or it provides dynamic tracks (in which case we need to force an update)
1641      */
1642     if (first_and_live
1643         && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1644             || stream->pending_tracks)) {
1645       /* TODO we only need the first timestamp, maybe create a simple function to
1646        * get the current PTS of a fragment ? */
1647       GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1648       gst_adaptive_demux2_stream_update_fragment_info (stream);
1649
1650       GST_DEBUG_OBJECT (stream,
1651           "Got stream time %" GST_STIME_FORMAT,
1652           GST_STIME_ARGS (stream->fragment.stream_time));
1653
1654       if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1655         min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1656       } else {
1657         min_stream_time = stream->fragment.stream_time;
1658       }
1659     }
1660   }
1661
1662   period_start = gst_adaptive_demux_get_period_start_time (demux);
1663
1664   /* For live streams, the subclass is supposed to seek to the current fragment
1665    * and then tell us its stream time in stream->fragment.stream_time.  We now
1666    * also have to seek our demuxer segment to reflect this.
1667    *
1668    * FIXME: This needs some refactoring at some point.
1669    */
1670   if (first_and_live) {
1671     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1672         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1673         GST_SEEK_TYPE_NONE, -1, NULL);
1674   }
1675
1676   GST_DEBUG_OBJECT (demux,
1677       "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1678       " demux segment %" GST_SEGMENT_FORMAT,
1679       GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1680       &demux->segment);
1681
1682   /* Synchronize stream start/current positions */
1683   if (min_stream_time == GST_CLOCK_STIME_NONE)
1684     min_stream_time = period_start;
1685   else
1686     min_stream_time += period_start;
1687   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1688     GstAdaptiveDemux2Stream *stream = iter->data;
1689     stream->start_position = stream->current_position = min_stream_time;
1690   }
1691
1692   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1693     GstAdaptiveDemux2Stream *stream = iter->data;
1694     stream->compute_segment = TRUE;
1695     stream->first_and_live = first_and_live;
1696   }
1697   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1698   demux->input_period->prepared = TRUE;
1699
1700   return TRUE;
1701 }
1702
1703 static GstAdaptiveDemuxTrack *
1704 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1705 {
1706   GList *tmp;
1707
1708   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1709     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1710     if (!g_strcmp0 (track->stream_id, stream_id))
1711       return track;
1712   }
1713
1714   return NULL;
1715 }
1716
1717 /* TRACKS_LOCK hold */
1718 void
1719 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1720 {
1721   GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1722   GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1723   GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1724   GList *tmp;
1725   gint min_percent = -1, percent;
1726   gboolean all_eos = TRUE;
1727
1728   /* Go over all active tracks of the output period and update level */
1729
1730   /* Check that all tracks are above their respective low thresholds (different
1731    * tracks may have different fragment durations yielding different buffering
1732    * percentages) Overall buffering percent is the lowest. */
1733   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1734     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1735
1736     GST_LOG_OBJECT (demux,
1737         "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1738         GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1739         track->stream_id, track->period_num, track->active, track->selected,
1740         track->eos, GST_TIME_ARGS (track->level_time),
1741         GST_TIME_ARGS (track->buffering_threshold));
1742
1743     if (track->active && track->selected) {
1744       if (!track->eos) {
1745         gint cur_percent;
1746
1747         all_eos = FALSE;
1748         if (min_level_time == GST_CLOCK_TIME_NONE) {
1749           min_level_time = track->level_time;
1750         } else if (track->level_time < min_level_time) {
1751           min_level_time = track->level_time;
1752         }
1753
1754         if (track->type & GST_STREAM_TYPE_VIDEO
1755             && video_level_time > track->level_time)
1756           video_level_time = track->level_time;
1757
1758         if (track->type & GST_STREAM_TYPE_AUDIO
1759             && audio_level_time > track->level_time)
1760           audio_level_time = track->level_time;
1761
1762         if (track->level_time != GST_CLOCK_TIME_NONE
1763             && track->buffering_threshold != 0) {
1764           cur_percent =
1765               gst_util_uint64_scale (track->level_time, 100,
1766               track->buffering_threshold);
1767           if (min_percent < 0 || cur_percent < min_percent)
1768             min_percent = cur_percent;
1769         }
1770       }
1771     }
1772   }
1773
1774   GST_DEBUG_OBJECT (demux,
1775       "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1776       GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1777
1778   /* Update demuxer video/audio level properties */
1779   GST_OBJECT_LOCK (demux);
1780   demux->current_level_time_video = video_level_time;
1781   demux->current_level_time_audio = audio_level_time;
1782   GST_OBJECT_UNLOCK (demux);
1783
1784   if (min_percent < 0 && !all_eos)
1785     return;
1786
1787   if (min_percent > 100 || all_eos)
1788     percent = 100;
1789   else
1790     percent = MAX (0, min_percent);
1791
1792   GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1793
1794   if (demux->priv->is_buffering) {
1795     if (percent >= 100)
1796       demux->priv->is_buffering = FALSE;
1797     if (demux->priv->percent != percent) {
1798       demux->priv->percent = percent;
1799       demux->priv->percent_changed = TRUE;
1800     }
1801   } else if (percent < 1) {
1802     demux->priv->is_buffering = TRUE;
1803     if (demux->priv->percent != percent) {
1804       demux->priv->percent = percent;
1805       demux->priv->percent_changed = TRUE;
1806     }
1807   }
1808
1809   if (demux->priv->percent_changed)
1810     GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1811         demux->priv->is_buffering);
1812 }
1813
1814 /* With TRACKS_LOCK held */
1815 void
1816 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1817 {
1818   gint percent;
1819   GstMessage *msg;
1820
1821   if (!demux->priv->percent_changed)
1822     return;
1823
1824   BUFFERING_LOCK (demux);
1825   percent = demux->priv->percent;
1826   msg = gst_message_new_buffering ((GstObject *) demux, percent);
1827   TRACKS_UNLOCK (demux);
1828   gst_element_post_message ((GstElement *) demux, msg);
1829
1830   BUFFERING_UNLOCK (demux);
1831   TRACKS_LOCK (demux);
1832   if (percent == demux->priv->percent)
1833     demux->priv->percent_changed = FALSE;
1834 }
1835
1836 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1837 GstAdaptiveDemux2Stream *
1838 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1839     GstAdaptiveDemuxTrack * track)
1840 {
1841   GList *iter;
1842
1843   for (iter = demux->output_period->streams; iter; iter = iter->next) {
1844     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1845     if (g_list_find (stream->tracks, track))
1846       return stream;
1847   }
1848
1849   return NULL;
1850 }
1851
1852 /* Called from seek handler
1853  *
1854  * This function is used when a (flushing) seek caused a new period to be activated.
1855  *
1856  * This will ensure that:
1857  * * the current output period is marked as finished (EOS)
1858  * * Any potential intermediate (non-input/non-output) periods are removed
1859  * * That the new input period is prepared and ready
1860  */
1861 static void
1862 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1863 {
1864   GList *iter;
1865
1866   GST_DEBUG_OBJECT (demux,
1867       "Preparing new input period %u", demux->input_period->period_num);
1868
1869   /* Prepare the new input period */
1870   gst_adaptive_demux_update_collection (demux, demux->input_period);
1871
1872   /* Transfer the previous selection to the new input period */
1873   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
1874       demux->output_period);
1875   gst_adaptive_demux_prepare_streams (demux, FALSE);
1876
1877   /* Remove all periods except for the input (last) and output (first) period */
1878   while (demux->priv->periods->length > 2) {
1879     GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
1880     /* Mark all tracks of the removed period as not selected and EOS so they
1881      * will be skipped / ignored */
1882     for (iter = period->tracks; iter; iter = iter->next) {
1883       GstAdaptiveDemuxTrack *track = iter->data;
1884       track->selected = FALSE;
1885       track->eos = TRUE;
1886     }
1887     gst_adaptive_demux_period_unref (period);
1888   }
1889
1890   /* Mark all tracks of the output period as EOS so that the output loop
1891    * will immediately move to the new period */
1892   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
1893     GstAdaptiveDemuxTrack *track = iter->data;
1894     track->eos = TRUE;
1895   }
1896
1897   /* Go over all slots, and clear any pending track */
1898   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1899     OutputSlot *slot = (OutputSlot *) iter->data;
1900
1901     if (slot->pending_track != NULL) {
1902       GST_DEBUG_OBJECT (demux,
1903           "Removing track '%s' as pending from output of current track '%s'",
1904           slot->pending_track->stream_id, slot->track->stream_id);
1905       gst_adaptive_demux_track_unref (slot->pending_track);
1906       slot->pending_track = NULL;
1907     }
1908   }
1909 }
1910
1911 /* must be called with manifest_lock taken */
1912 gboolean
1913 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1914     gint64 * range_start, gint64 * range_stop)
1915 {
1916   GstAdaptiveDemuxClass *klass;
1917
1918   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1919
1920   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1921
1922   return klass->get_live_seek_range (demux, range_start, range_stop);
1923 }
1924
1925 /* must be called with manifest_lock taken */
1926 gboolean
1927 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1928     GstAdaptiveDemux2Stream * stream)
1929 {
1930   gint64 range_start, range_stop;
1931   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1932     GST_LOG_OBJECT (stream,
1933         "stream position %" GST_TIME_FORMAT "  live seek range %"
1934         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1935         GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
1936         GST_STIME_ARGS (range_stop));
1937     return (stream->current_position >= range_start
1938         && stream->current_position <= range_stop);
1939   }
1940
1941   return FALSE;
1942 }
1943
1944 /* must be called with manifest_lock taken */
1945 static gboolean
1946 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1947 {
1948   GstAdaptiveDemuxClass *klass;
1949
1950   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1951   if (gst_adaptive_demux_is_live (demux)) {
1952     return klass->get_live_seek_range != NULL;
1953   }
1954
1955   return klass->seek != NULL;
1956 }
1957
1958 static void
1959 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
1960     GstSeekType start_type, GstSeekType stop_type)
1961 {
1962   GList *iter;
1963
1964   for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
1965     GstAdaptiveDemux2Stream *stream = iter->data;
1966
1967     /* Make sure the download loop clears and restarts on the next start,
1968      * which will recompute the stream segment */
1969     g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1970         stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
1971     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
1972     stream->start_position = 0;
1973
1974     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1975       stream->start_position = demux->segment.start;
1976     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1977       stream->start_position = demux->segment.stop;
1978   }
1979 }
1980
1981 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
1982                               GST_SEEK_FLAG_SNAP_AFTER |          \
1983                               GST_SEEK_FLAG_SNAP_NEAREST |        \
1984                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1985                               GST_SEEK_FLAG_KEY_UNIT))
1986 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1987                               GST_SEEK_FLAG_SNAP_AFTER | \
1988                               GST_SEEK_FLAG_SNAP_NEAREST))
1989
1990 static gboolean
1991 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
1992     GstEvent * event)
1993 {
1994   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1995   gdouble rate;
1996   GstFormat format;
1997   GstSeekFlags flags;
1998   GstSeekType start_type, stop_type;
1999   gint64 start, stop;
2000   guint32 seqnum;
2001   gboolean update;
2002   gboolean ret;
2003   GstSegment oldsegment;
2004   GstEvent *flush_event;
2005
2006   GST_INFO_OBJECT (demux, "Received seek event");
2007
2008   GST_API_LOCK (demux);
2009
2010   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2011       &stop_type, &stop);
2012
2013   if (format != GST_FORMAT_TIME) {
2014     GST_API_UNLOCK (demux);
2015     GST_WARNING_OBJECT (demux,
2016         "Adaptive demuxers only support TIME-based seeking");
2017     gst_event_unref (event);
2018     return FALSE;
2019   }
2020
2021   if (flags & GST_SEEK_FLAG_SEGMENT) {
2022     GST_FIXME_OBJECT (demux, "Handle segment seeks");
2023     GST_API_UNLOCK (demux);
2024     gst_event_unref (event);
2025     return FALSE;
2026   }
2027
2028   seqnum = gst_event_get_seqnum (event);
2029
2030   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2031     GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2032     return FALSE;
2033   }
2034
2035   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2036     /* For instant rate seeks, reply directly and update
2037      * our segment so the new rate is reflected in any future
2038      * fragments */
2039     GstEvent *ev;
2040     gdouble rate_multiplier;
2041
2042     /* instant rate change only supported if direction does not change. All
2043      * other requirements are already checked before creating the seek event
2044      * but let's double-check here to be sure */
2045     if ((demux->segment.rate > 0 && rate < 0) ||
2046         (demux->segment.rate < 0 && rate > 0) ||
2047         start_type != GST_SEEK_TYPE_NONE ||
2048         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2049       GST_ERROR_OBJECT (demux,
2050           "Instant rate change seeks only supported in the "
2051           "same direction, without flushing and position change");
2052       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2053       GST_API_UNLOCK (demux);
2054       return FALSE;
2055     }
2056
2057     rate_multiplier = rate / demux->segment.rate;
2058
2059     ev = gst_event_new_instant_rate_change (rate_multiplier,
2060         (GstSegmentFlags) flags);
2061     gst_event_set_seqnum (ev, seqnum);
2062
2063     ret = gst_adaptive_demux_push_src_event (demux, ev);
2064
2065     if (ret) {
2066       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2067       demux->instant_rate_multiplier = rate_multiplier;
2068       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2069     }
2070
2071     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2072     GST_API_UNLOCK (demux);
2073     gst_event_unref (event);
2074
2075     return ret;
2076   }
2077
2078   if (!gst_adaptive_demux_can_seek (demux)) {
2079     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2080
2081     GST_API_UNLOCK (demux);
2082     gst_event_unref (event);
2083     return FALSE;
2084   }
2085
2086   /* We can only accept flushing seeks from this point onward */
2087   if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2088     GST_ERROR_OBJECT (demux,
2089         "Non-flushing non-instant-rate seeks are not possible");
2090
2091     GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2092
2093     GST_API_UNLOCK (demux);
2094     gst_event_unref (event);
2095     return FALSE;
2096   }
2097
2098   if (gst_adaptive_demux_is_live (demux)) {
2099     gint64 range_start, range_stop;
2100     gboolean changed = FALSE;
2101     gboolean start_valid = TRUE, stop_valid = TRUE;
2102
2103     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2104             &range_stop)) {
2105       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2106       GST_API_UNLOCK (demux);
2107       gst_event_unref (event);
2108       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2109       return FALSE;
2110     }
2111
2112     GST_DEBUG_OBJECT (demux,
2113         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2114         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2115
2116     /* Handle relative positioning for live streams (relative to the range_stop) */
2117     if (start_type == GST_SEEK_TYPE_END) {
2118       start = range_stop + start;
2119       start_type = GST_SEEK_TYPE_SET;
2120       changed = TRUE;
2121     }
2122     if (stop_type == GST_SEEK_TYPE_END) {
2123       stop = range_stop + stop;
2124       stop_type = GST_SEEK_TYPE_SET;
2125       changed = TRUE;
2126     }
2127
2128     /* Adjust the requested start/stop position if it falls beyond the live
2129      * seek range.
2130      * The only case where we don't adjust is for the starting point of
2131      * an accurate seek (start if forward and stop if backwards)
2132      */
2133     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2134         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2135       GST_DEBUG_OBJECT (demux,
2136           "seek before live stream start, setting to range start: %"
2137           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2138       start = range_start;
2139       changed = TRUE;
2140     }
2141     /* truncate stop position also if set */
2142     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2143         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2144       GST_DEBUG_OBJECT (demux,
2145           "seek ending after live start, adjusting to: %"
2146           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2147       stop = range_stop;
2148       changed = TRUE;
2149     }
2150
2151     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2152         (start < range_start || start > range_stop)) {
2153       GST_WARNING_OBJECT (demux,
2154           "Seek to invalid position start:%" GST_STIME_FORMAT
2155           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2156           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2157           GST_STIME_ARGS (range_stop));
2158       start_valid = FALSE;
2159     }
2160     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2161         (stop < range_start || stop > range_stop)) {
2162       GST_WARNING_OBJECT (demux,
2163           "Seek to invalid position stop:%" GST_STIME_FORMAT
2164           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2165           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2166           GST_STIME_ARGS (range_stop));
2167       stop_valid = FALSE;
2168     }
2169
2170     /* If the seek position is still outside of the seekable range, refuse the seek */
2171     if (!start_valid || !stop_valid) {
2172       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2173       GST_API_UNLOCK (demux);
2174       gst_event_unref (event);
2175       return FALSE;
2176     }
2177
2178     /* Re-create seek event with changed/updated values */
2179     if (changed) {
2180       gst_event_unref (event);
2181       event =
2182           gst_event_new_seek (rate, format, flags,
2183           start_type, start, stop_type, stop);
2184       gst_event_set_seqnum (event, seqnum);
2185     }
2186   }
2187
2188   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2189
2190   /* have a backup in case seek fails */
2191   gst_segment_copy_into (&demux->segment, &oldsegment);
2192
2193   GST_DEBUG_OBJECT (demux, "sending flush start");
2194   flush_event = gst_event_new_flush_start ();
2195   gst_event_set_seqnum (flush_event, seqnum);
2196
2197   gst_adaptive_demux_push_src_event (demux, flush_event);
2198
2199   gst_adaptive_demux_stop_tasks (demux, FALSE);
2200   gst_adaptive_demux_reset_tracks (demux);
2201
2202   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2203
2204   /*
2205    * Handle snap seeks as follows:
2206    * 1) do the snap seeking a (random) active stream
2207    * 2) use the final position on this stream to seek
2208    *    on the other streams to the same position
2209    *
2210    * We can't snap at all streams at the same time as they might end in
2211    * different positions, so just pick one and align all others to that
2212    * position.
2213    */
2214
2215   GstAdaptiveDemux2Stream *stream = NULL;
2216   GList *iter;
2217   /* Pick a random active stream on which to do the stream seek */
2218   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2219     GstAdaptiveDemux2Stream *cand = iter->data;
2220     if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2221       stream = cand;
2222       break;
2223     }
2224   }
2225
2226   if (stream && IS_SNAP_SEEK (flags)) {
2227     GstClockTimeDiff ts;
2228     GstSeekFlags stream_seek_flags = flags;
2229
2230     /* snap-seek on the chosen stream and then
2231      * use the resulting position to seek on all streams */
2232     if (rate >= 0) {
2233       if (start_type != GST_SEEK_TYPE_NONE)
2234         ts = start;
2235       else {
2236         ts = gst_segment_position_from_running_time (&demux->segment,
2237             GST_FORMAT_TIME, demux->priv->global_output_position);
2238         start_type = GST_SEEK_TYPE_SET;
2239       }
2240     } else {
2241       if (stop_type != GST_SEEK_TYPE_NONE)
2242         ts = stop;
2243       else {
2244         stop_type = GST_SEEK_TYPE_SET;
2245         ts = gst_segment_position_from_running_time (&demux->segment,
2246             GST_FORMAT_TIME, demux->priv->global_output_position);
2247       }
2248     }
2249
2250     if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
2251             ts, &ts) != GST_FLOW_OK) {
2252       GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2253
2254       GST_API_UNLOCK (demux);
2255       gst_event_unref (event);
2256       return FALSE;
2257     }
2258
2259     /* replace event with a new one without snapping to seek on all streams */
2260     gst_event_unref (event);
2261     if (rate >= 0) {
2262       start = ts;
2263     } else {
2264       stop = ts;
2265     }
2266     event =
2267         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2268         start_type, start, stop_type, stop);
2269     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2270   }
2271
2272   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2273       start, stop_type, stop, &update);
2274
2275   if (ret) {
2276     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2277
2278     ret = demux_class->seek (demux, event);
2279   }
2280
2281   if (!ret) {
2282     /* Is there anything else we can do if it fails? */
2283     gst_segment_copy_into (&oldsegment, &demux->segment);
2284   } else {
2285     demux->priv->segment_seqnum = seqnum;
2286   }
2287   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2288
2289   /* Resetting flow combiner */
2290   gst_flow_combiner_reset (demux->priv->flowcombiner);
2291
2292   GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2293   flush_event = gst_event_new_flush_stop (TRUE);
2294   gst_event_set_seqnum (flush_event, seqnum);
2295   gst_adaptive_demux_push_src_event (demux, flush_event);
2296
2297   /* If the seek generated a new period, prepare it */
2298   if (!demux->input_period->prepared) {
2299     /* This can only happen on flushing seeks */
2300     g_assert (flags & GST_SEEK_FLAG_FLUSH);
2301     gst_adaptive_demux_seek_to_input_period (demux);
2302   }
2303
2304   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2305   GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2306       &demux->segment);
2307   gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2308   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2309
2310   /* Reset the global output position (running time) for when the output loop restarts */
2311   demux->priv->global_output_position = 0;
2312
2313   /* After a flushing seek, any instant-rate override is undone */
2314   demux->instant_rate_multiplier = 1.0;
2315
2316   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2317
2318   /* Restart the demux */
2319   gst_adaptive_demux_start_tasks (demux);
2320
2321   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2322   GST_API_UNLOCK (demux);
2323   gst_event_unref (event);
2324
2325   return ret;
2326 }
2327
2328 /* Returns TRUE if the stream has at least one selected track */
2329 static gboolean
2330 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2331     stream)
2332 {
2333   GList *tmp;
2334
2335   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2336     GstAdaptiveDemuxTrack *track = tmp->data;
2337
2338     if (track->selected)
2339       return TRUE;
2340   }
2341
2342   return FALSE;
2343 }
2344
2345 static gboolean
2346 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2347     guint32 seqnum)
2348 {
2349   gboolean selection_handled = TRUE;
2350   GList *iter;
2351   GList *tracks = NULL;
2352
2353   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2354     return FALSE;
2355
2356   TRACKS_LOCK (demux);
2357   /* Validate the streams and fill:
2358    * tracks : list of tracks corresponding to requested streams
2359    */
2360   for (iter = streams; iter; iter = iter->next) {
2361     gchar *stream_id = (gchar *) iter->data;
2362     GstAdaptiveDemuxTrack *track;
2363
2364     GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2365     track = find_track_for_stream_id (demux->output_period, stream_id);
2366     if (!track) {
2367       GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2368       selection_handled = FALSE;
2369       goto select_streams_done;
2370     }
2371     tracks = g_list_append (tracks, track);
2372     GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2373   }
2374
2375   /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2376    * SCHEDULING THREAD */
2377
2378   /* FIXME: We want to iterate all streams, mark them as deselected,
2379    * then iterate tracks and mark any streams that have at least 1
2380    * active output track, then loop over all streams again and start/stop
2381    * them as needed */
2382
2383   /* Go over all tracks present and (de)select based on current selection */
2384   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2385     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2386
2387     if (track->selected && !g_list_find (tracks, track)) {
2388       GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2389           track->stream_id, track->active);
2390       track->selected = FALSE;
2391       track->draining = TRUE;
2392     } else if (!track->selected && g_list_find (tracks, track)) {
2393       GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2394
2395       track->selected = TRUE;
2396     }
2397   }
2398
2399   /* Start or stop streams based on the updated track selection */
2400   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2401     GstAdaptiveDemux2Stream *stream = iter->data;
2402     GList *trackiter;
2403
2404     gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2405     gboolean should_be_running =
2406         gst_adaptive_demux2_stream_has_selected_tracks (stream);
2407
2408     if (!is_running && should_be_running) {
2409       GstClockTime output_running_ts = demux->priv->global_output_position;
2410       GstClockTime start_position;
2411
2412       /* Calculate where we should start the stream, and then
2413        * start it. */
2414       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2415
2416       GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2417           GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2418           GST_TIME_ARGS (output_running_ts), &demux->segment);
2419
2420       start_position =
2421           gst_segment_position_from_running_time (&demux->segment,
2422           GST_FORMAT_TIME, output_running_ts);
2423
2424       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2425
2426       GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2427           GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2428
2429       stream->current_position = stream->start_position = start_position;
2430       stream->compute_segment = TRUE;
2431
2432       /* If output has already begun, ensure we seek this segment
2433        * to the correct restart position when the download loop begins */
2434       if (output_running_ts != 0)
2435         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2436
2437       /* Activate track pads for this stream */
2438       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2439         GstAdaptiveDemuxTrack *track =
2440             (GstAdaptiveDemuxTrack *) trackiter->data;
2441         gst_pad_set_active (track->sinkpad, TRUE);
2442       }
2443
2444       gst_adaptive_demux2_stream_start (stream);
2445     } else if (is_running && !should_be_running) {
2446       /* Stream should not be running and needs stopping */
2447       gst_adaptive_demux2_stream_stop (stream);
2448
2449       /* Set all track sinkpads to inactive for this stream */
2450       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2451         GstAdaptiveDemuxTrack *track =
2452             (GstAdaptiveDemuxTrack *) trackiter->data;
2453         gst_pad_set_active (track->sinkpad, FALSE);
2454       }
2455     }
2456   }
2457
2458   g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2459
2460 select_streams_done:
2461   demux_update_buffering_locked (demux);
2462   demux_post_buffering_locked (demux);
2463
2464   TRACKS_UNLOCK (demux);
2465   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2466
2467   if (tracks)
2468     g_list_free (tracks);
2469   return selection_handled;
2470 }
2471
2472 static gboolean
2473 gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux * demux,
2474     GstEvent * event)
2475 {
2476   GList *streams;
2477   gboolean selection_handled;
2478
2479   if (GST_EVENT_SEQNUM (event) ==
2480       g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2481     GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2482         GST_EVENT_SEQNUM (event));
2483     return TRUE;
2484   }
2485
2486   gst_event_parse_select_streams (event, &streams);
2487   selection_handled =
2488       handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2489   g_list_free_full (streams, g_free);
2490
2491   gst_event_unref (event);
2492   return selection_handled;
2493 }
2494
2495 static gboolean
2496 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2497     GstEvent * event)
2498 {
2499   GstAdaptiveDemux *demux;
2500
2501   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2502
2503   switch (event->type) {
2504     case GST_EVENT_SEEK:
2505     {
2506       guint32 seqnum = gst_event_get_seqnum (event);
2507       if (seqnum == demux->priv->segment_seqnum) {
2508         GST_LOG_OBJECT (pad,
2509             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2510         gst_event_unref (event);
2511         return TRUE;
2512       }
2513       return gst_adaptive_demux_handle_seek_event (demux, event);
2514     }
2515     case GST_EVENT_LATENCY:{
2516       /* Upstream and our internal source are irrelevant
2517        * for latency, and we should not fail here to
2518        * configure the latency */
2519       gst_event_unref (event);
2520       return TRUE;
2521     }
2522     case GST_EVENT_QOS:{
2523       GstClockTimeDiff diff;
2524       GstClockTime timestamp;
2525       GstClockTime earliest_time;
2526
2527       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2528       /* Only take into account lateness if late */
2529       if (diff > 0)
2530         earliest_time = timestamp + 2 * diff;
2531       else
2532         earliest_time = timestamp;
2533
2534       GST_OBJECT_LOCK (demux);
2535       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2536           earliest_time > demux->priv->qos_earliest_time) {
2537         demux->priv->qos_earliest_time = earliest_time;
2538         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2539             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2540       }
2541       GST_OBJECT_UNLOCK (demux);
2542       break;
2543     }
2544     case GST_EVENT_SELECT_STREAMS:
2545     {
2546       return gst_adaptive_demux_handle_select_streams_event (demux, event);
2547     }
2548     default:
2549       break;
2550   }
2551
2552   return gst_pad_event_default (pad, parent, event);
2553 }
2554
2555 static gboolean
2556 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2557     GstQuery * query)
2558 {
2559   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2560   gboolean ret = FALSE;
2561
2562   if (query == NULL)
2563     return FALSE;
2564
2565
2566   switch (query->type) {
2567     case GST_QUERY_DURATION:{
2568       GstFormat fmt;
2569       GstClockTime duration = GST_CLOCK_TIME_NONE;
2570
2571       gst_query_parse_duration (query, &fmt, NULL);
2572
2573       if (gst_adaptive_demux_is_live (demux)) {
2574         /* We are able to answer this query: the duration is unknown */
2575         gst_query_set_duration (query, fmt, -1);
2576         ret = TRUE;
2577         break;
2578       }
2579
2580       if (fmt == GST_FORMAT_TIME
2581           && g_atomic_int_get (&demux->priv->have_manifest)) {
2582
2583         GST_MANIFEST_LOCK (demux);
2584         duration = demux->priv->duration;
2585         GST_MANIFEST_UNLOCK (demux);
2586
2587         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2588           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2589           ret = TRUE;
2590         }
2591       }
2592
2593       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2594           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2595       break;
2596     }
2597     case GST_QUERY_LATENCY:{
2598       gst_query_set_latency (query, FALSE, 0, -1);
2599       ret = TRUE;
2600       break;
2601     }
2602     case GST_QUERY_SEEKING:{
2603       GstFormat fmt;
2604       gint64 stop = -1;
2605       gint64 start = 0;
2606
2607       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2608         GST_INFO_OBJECT (demux,
2609             "Don't have manifest yet, can't answer seeking query");
2610         return FALSE;           /* can't answer without manifest */
2611       }
2612
2613       GST_MANIFEST_LOCK (demux);
2614
2615       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2616       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2617       if (fmt == GST_FORMAT_TIME) {
2618         GstClockTime duration;
2619         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2620
2621         ret = TRUE;
2622         if (can_seek) {
2623           if (gst_adaptive_demux_is_live (demux)) {
2624             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2625             if (!ret) {
2626               GST_MANIFEST_UNLOCK (demux);
2627               GST_INFO_OBJECT (demux, "can't answer seeking query");
2628               return FALSE;
2629             }
2630           } else {
2631             duration = demux->priv->duration;
2632             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2633               stop = duration;
2634           }
2635         }
2636         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2637         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2638             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2639             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2640       }
2641       GST_MANIFEST_UNLOCK (demux);
2642       break;
2643     }
2644     case GST_QUERY_URI:
2645
2646       GST_MANIFEST_LOCK (demux);
2647
2648       /* TODO HLS can answer this differently it seems */
2649       if (demux->manifest_uri) {
2650         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2651          * playlist or the the uri of the last downlowaded fragment? */
2652         gst_query_set_uri (query, demux->manifest_uri);
2653         ret = TRUE;
2654       }
2655
2656       GST_MANIFEST_UNLOCK (demux);
2657       break;
2658     case GST_QUERY_SELECTABLE:
2659       gst_query_set_selectable (query, TRUE);
2660       ret = TRUE;
2661       break;
2662     default:
2663       /* Don't forward queries upstream because of the special nature of this
2664        *  "demuxer", which relies on the upstream element only to be fed
2665        *  the Manifest
2666        */
2667       break;
2668   }
2669
2670   return ret;
2671 }
2672
2673 gboolean
2674 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2675 {
2676   GstEvent *seek;
2677
2678   GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2679
2680   seek =
2681       gst_event_new_seek (1.0, GST_FORMAT_TIME,
2682       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2683       GST_SEEK_TYPE_NONE, 0);
2684   gst_adaptive_demux_handle_seek_event (demux, seek);
2685   return FALSE;
2686 }
2687
2688
2689 /* Called when the scheduler starts, to kick off manifest updates
2690  * and stream downloads */
2691 static gboolean
2692 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2693 {
2694   GList *iter;
2695
2696   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2697
2698   iter = demux->input_period->streams;
2699
2700   for (; iter; iter = g_list_next (iter)) {
2701     GstAdaptiveDemux2Stream *stream = iter->data;
2702
2703     /* If we need to process this stream to discover tracks *OR* it has any
2704      * tracks which are selected, start it now */
2705     if ((stream->pending_tracks == TRUE)
2706         || gst_adaptive_demux2_stream_is_selected_locked (stream))
2707       gst_adaptive_demux2_stream_start (stream);
2708   }
2709
2710   return FALSE;
2711 }
2712
2713 /* must be called with manifest_lock taken */
2714 static void
2715 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2716 {
2717   if (!gst_adaptive_demux2_is_running (demux)) {
2718     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2719     return;
2720   }
2721
2722   GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2723   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2724       (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2725
2726   TRACKS_LOCK (demux);
2727   demux->priv->flushing = FALSE;
2728   GST_DEBUG_OBJECT (demux, "Starting the output task");
2729   gst_task_start (demux->priv->output_task);
2730   TRACKS_UNLOCK (demux);
2731 }
2732
2733 /* must be called with manifest_lock taken */
2734 static void
2735 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2736 {
2737   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2738   if (demux->priv->manifest_updates_cb != 0) {
2739     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2740         demux->priv->manifest_updates_cb);
2741     demux->priv->manifest_updates_cb = 0;
2742   }
2743 }
2744
2745 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2746
2747 /* must be called with manifest_lock taken */
2748 static void
2749 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2750 {
2751   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2752
2753   if (gst_adaptive_demux_is_live (demux)) {
2754     /* Task to periodically update the manifest */
2755     if (demux_class->requires_periodical_playlist_update (demux)) {
2756       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2757       if (demux->priv->manifest_updates_cb == 0) {
2758         demux->priv->manifest_updates_cb =
2759             gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2760             (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2761       }
2762     }
2763   }
2764 }
2765
2766 /* must be called with manifest_lock taken
2767  * This function will temporarily release manifest_lock in order to join the
2768  * download threads.
2769  * The api_lock will still protect it against other threads trying to modify
2770  * the demux element.
2771  */
2772 static void
2773 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2774 {
2775   GST_LOG_OBJECT (demux, "Stopping tasks");
2776
2777   if (stop_updates)
2778     gst_adaptive_demux_stop_manifest_update_task (demux);
2779
2780   TRACKS_LOCK (demux);
2781   if (demux->input_period)
2782     gst_adaptive_demux_period_stop_tasks (demux->input_period);
2783
2784   demux->priv->flushing = TRUE;
2785   g_cond_signal (&demux->priv->tracks_add);
2786   gst_task_stop (demux->priv->output_task);
2787   TRACKS_UNLOCK (demux);
2788
2789   gst_task_join (demux->priv->output_task);
2790
2791   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2792 }
2793
2794 /* must be called with manifest_lock taken */
2795 static gboolean
2796 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2797 {
2798   GList *iter;
2799   gboolean ret = TRUE;
2800
2801   GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2802
2803   TRACKS_LOCK (demux);
2804   for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2805     OutputSlot *slot = (OutputSlot *) iter->data;
2806     gst_event_ref (event);
2807     GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2808     ret = ret & gst_pad_push_event (slot->pad, event);
2809     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2810       slot->pushed_timed_data = FALSE;
2811   }
2812   TRACKS_UNLOCK (demux);
2813   gst_event_unref (event);
2814   return ret;
2815 }
2816
2817 /* must be called with manifest_lock taken */
2818 void
2819 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2820     GstCaps * caps)
2821 {
2822   GST_DEBUG_OBJECT (stream,
2823       "setting new caps for stream %" GST_PTR_FORMAT, caps);
2824   gst_caps_replace (&stream->pending_caps, caps);
2825   gst_caps_unref (caps);
2826 }
2827
2828 /* must be called with manifest_lock taken */
2829 void
2830 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2831     GstTagList * tags)
2832 {
2833   GST_DEBUG_OBJECT (stream,
2834       "setting new tags for stream %" GST_PTR_FORMAT, tags);
2835   if (stream->pending_tags) {
2836     gst_tag_list_unref (stream->pending_tags);
2837   }
2838   stream->pending_tags = tags;
2839 }
2840
2841 /* must be called with manifest_lock taken */
2842 void
2843 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2844     GstEvent * event)
2845 {
2846   stream->pending_events = g_list_append (stream->pending_events, event);
2847 }
2848
2849 static gboolean
2850 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2851     * demux)
2852 {
2853   return TRUE;
2854 }
2855
2856 /* Called when a stream needs waking after the manifest is updated */
2857 void
2858 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
2859 {
2860   demux->priv->stream_waiting_for_manifest = TRUE;
2861 }
2862
2863 static gboolean
2864 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
2865 {
2866   GstFlowReturn ret = GST_FLOW_OK;
2867   gboolean schedule_again = TRUE;
2868
2869   GST_MANIFEST_LOCK (demux);
2870   demux->priv->manifest_updates_cb = 0;
2871
2872   /* Updating playlist only needed for live playlists */
2873   if (!gst_adaptive_demux_is_live (demux)) {
2874     GST_MANIFEST_UNLOCK (demux);
2875     return G_SOURCE_REMOVE;
2876   }
2877
2878   GST_DEBUG_OBJECT (demux, "Updating playlist");
2879   ret = gst_adaptive_demux_update_manifest (demux);
2880
2881   if (ret == GST_FLOW_EOS) {
2882     GST_MANIFEST_UNLOCK (demux);
2883     return G_SOURCE_REMOVE;
2884   }
2885
2886   if (ret == GST_FLOW_OK) {
2887     GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
2888     demux->priv->update_failed_count = 0;
2889
2890     /* Wake up download tasks */
2891     if (demux->priv->stream_waiting_for_manifest) {
2892       GList *iter;
2893
2894       for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2895         GstAdaptiveDemux2Stream *stream = iter->data;
2896         gst_adaptive_demux2_stream_on_manifest_update (stream);
2897       }
2898       demux->priv->stream_waiting_for_manifest = FALSE;
2899     }
2900   } else {
2901     demux->priv->update_failed_count++;
2902
2903     if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
2904       GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
2905           gst_flow_get_name (ret));
2906     } else {
2907       GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2908           (_("Internal data stream error.")), ("Could not update playlist"));
2909       GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
2910       schedule_again = FALSE;
2911     }
2912   }
2913
2914   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC)
2915     gst_adaptive_demux_handle_lost_sync (demux);
2916
2917   if (schedule_again) {
2918     GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2919
2920     demux->priv->manifest_updates_cb =
2921         gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2922         klass->get_manifest_update_interval (demux) * GST_USECOND,
2923         (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2924   }
2925
2926   GST_MANIFEST_UNLOCK (demux);
2927
2928   return G_SOURCE_REMOVE;
2929 }
2930
2931 static gboolean
2932 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
2933 {
2934   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2935
2936   /* Loop for updating of the playlist. This periodically checks if
2937    * the playlist is updated and does so, then signals the streaming
2938    * thread in case it can continue downloading now. */
2939
2940   /* block until the next scheduled update or the signal to quit this thread */
2941   GST_DEBUG_OBJECT (demux, "Started updates task");
2942   demux->priv->manifest_updates_cb =
2943       gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2944       klass->get_manifest_update_interval (demux) * GST_USECOND,
2945       (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2946
2947   return G_SOURCE_REMOVE;
2948 }
2949
2950 static OutputSlot *
2951 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
2952     GstAdaptiveDemuxTrack * track)
2953 {
2954   GList *tmp;
2955
2956   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2957     OutputSlot *slot = (OutputSlot *) tmp->data;
2958     /* Incompatible output type */
2959     if (slot->type != track->type)
2960       continue;
2961
2962     /* Slot which is already assigned to this pending track */
2963     if (slot->pending_track == track)
2964       return slot;
2965
2966     /* slot already used for another pending track */
2967     if (slot->pending_track != NULL)
2968       continue;
2969
2970     /* Current output track is of the same type and is draining */
2971     if (slot->track && slot->track->draining)
2972       return slot;
2973   }
2974
2975   return NULL;
2976 }
2977
2978 /* TRACKS_LOCK taken */
2979 static OutputSlot *
2980 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
2981 {
2982   GList *tmp;
2983
2984   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
2985     OutputSlot *slot = (OutputSlot *) tmp->data;
2986
2987     if (slot->track == track)
2988       return slot;
2989   }
2990
2991   return NULL;
2992 }
2993
2994 /* TRACKS_LOCK held */
2995 static GstMessage *
2996 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
2997 {
2998   GList *tmp;
2999   GstMessage *msg;
3000
3001   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3002     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3003
3004     if (track->selected && !track->active)
3005       return NULL;
3006   }
3007
3008   /* All selected tracks are active, created message */
3009   msg =
3010       gst_message_new_streams_selected (GST_OBJECT (demux),
3011       demux->output_period->collection);
3012   GST_MESSAGE_SEQNUM (msg) = seqnum;
3013   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3014     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3015     if (track->active) {
3016       gst_message_streams_selected_add (msg, track->stream_object);
3017     }
3018   }
3019
3020   return msg;
3021 }
3022
3023 static void
3024 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3025     OutputSlot * slot)
3026 {
3027   GstAdaptiveDemuxTrack *track = slot->track;
3028   GstEvent *event;
3029
3030   /* Send EVENT_STREAM_START */
3031   event = gst_event_new_stream_start (track->stream_id);
3032   if (demux->have_group_id)
3033     gst_event_set_group_id (event, demux->group_id);
3034   gst_event_set_stream_flags (event, track->flags);
3035   gst_event_set_stream (event, track->stream_object);
3036   GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3037       track->stream_id);
3038   gst_pad_push_event (slot->pad, event);
3039
3040   /* Send EVENT_STREAM_COLLECTION */
3041   event = gst_event_new_stream_collection (demux->output_period->collection);
3042   GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3043       track->stream_id);
3044   gst_pad_push_event (slot->pad, event);
3045
3046   /* Mark all sticky events for re-sending */
3047   gst_event_store_mark_all_undelivered (&track->sticky_events);
3048 }
3049
3050 /*
3051  * Called with TRACKS_LOCK taken
3052  */
3053 static void
3054 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3055 {
3056   GList *tmp;
3057   guint requested_selection_seqnum;
3058   GstMessage *msg;
3059
3060   /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3061      output slots vs active/draining tracks */
3062   requested_selection_seqnum =
3063       g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3064
3065   if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3066     return;
3067
3068   GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3069
3070   /* Go over all slots, and if they have a pending track that's no longer
3071    * selected, clear it so the slot can be reused */
3072   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3073     OutputSlot *slot = (OutputSlot *) tmp->data;
3074
3075     if (slot->pending_track != NULL && !slot->pending_track->selected) {
3076       GST_DEBUG_OBJECT (demux,
3077           "Removing deselected track '%s' as pending from output of current track '%s'",
3078           slot->pending_track->stream_id, slot->track->stream_id);
3079       gst_adaptive_demux_track_unref (slot->pending_track);
3080       slot->pending_track = NULL;
3081     }
3082   }
3083
3084   /* Go over all tracks and create/re-assign/remove slots */
3085   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3086     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3087
3088     if (track->selected) {
3089       OutputSlot *slot = find_slot_for_track (demux, track);
3090
3091       /* 0. Track is selected and has a slot. Nothing to do */
3092       if (slot) {
3093         GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3094             track->stream_id);
3095         continue;
3096       }
3097
3098       slot = find_replacement_slot_for_track (demux, track);
3099       if (slot) {
3100         /* 1. There is an existing slot of the same type which is currently
3101          *    draining, assign this track as a replacement for it */
3102         g_assert (slot->pending_track == NULL || slot->pending_track == track);
3103         if (slot->pending_track == NULL) {
3104           slot->pending_track = gst_adaptive_demux_track_ref (track);
3105           GST_DEBUG_OBJECT (demux,
3106               "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3107               track->stream_id, track->period_num,
3108               slot->track->stream_id, slot->track->period_num);
3109         }
3110       } else {
3111         /* 2. There is no compatible replacement slot, create a new one */
3112         slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3113         GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3114             track->stream_id);
3115         demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3116
3117         track->update_next_segment = TRUE;
3118
3119         slot->track = gst_adaptive_demux_track_ref (track);
3120         track->active = TRUE;
3121         gst_adaptive_demux_send_initial_events (demux, slot);
3122       }
3123
3124       /* If we were draining this track, we no longer are */
3125       track->draining = FALSE;
3126     }
3127   }
3128
3129   /* Finally check all slots have a current/pending track. If not remove it */
3130   for (tmp = demux->priv->outputs; tmp;) {
3131     OutputSlot *slot = (OutputSlot *) tmp->data;
3132     /* We should never has slots without target tracks */
3133     g_assert (slot->track);
3134     if (slot->track->draining && !slot->pending_track) {
3135       GstAdaptiveDemux2Stream *stream;
3136
3137       GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3138           slot->track->stream_id);
3139       slot->track->active = FALSE;
3140
3141       /* If the stream feeding this track is stopped, flush and clear
3142        * the track now that it's going inactive. If the stream was not
3143        * found, it means we advanced past that period already (and the
3144        * stream was stopped and discarded) */
3145       stream = find_stream_for_track_locked (demux, slot->track);
3146       if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3147         gst_adaptive_demux_track_flush (slot->track);
3148
3149       tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3150       gst_adaptive_demux_output_slot_free (demux, slot);
3151     } else
3152       tmp = tmp->next;
3153   }
3154
3155   demux->priv->current_selection_seqnum = requested_selection_seqnum;
3156   msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3157   if (msg) {
3158     TRACKS_UNLOCK (demux);
3159     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3160     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3161     TRACKS_LOCK (demux);
3162   }
3163 }
3164
3165 /* TRACKS_LOCK held */
3166 static gboolean
3167 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3168 {
3169   GList *iter;
3170   GstAdaptiveDemuxPeriod *previous_period;
3171   GstStreamCollection *collection;
3172
3173   /* Grab the next period, should be demux->periods->next->data */
3174   previous_period = g_queue_pop_head (demux->priv->periods);
3175
3176   /* Remove ref held by demux->output_period */
3177   gst_adaptive_demux_period_unref (previous_period);
3178   demux->output_period =
3179       gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3180
3181   GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3182       demux->output_period->period_num);
3183
3184   /* We can now post the collection of the new period */
3185   collection = demux->output_period->collection;
3186   TRACKS_UNLOCK (demux);
3187   gst_element_post_message (GST_ELEMENT_CAST (demux),
3188       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3189   TRACKS_LOCK (demux);
3190
3191   /* Unselect all tracks of the previous period */
3192   for (iter = previous_period->tracks; iter; iter = iter->next) {
3193     GstAdaptiveDemuxTrack *track = iter->data;
3194     if (track->selected) {
3195       track->selected = FALSE;
3196       track->draining = TRUE;
3197     }
3198   }
3199
3200   /* Force a selection re-check */
3201   g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3202   check_and_handle_selection_update_locked (demux);
3203
3204   /* Remove the final ref on the previous period now that we have done the switch */
3205   gst_adaptive_demux_period_unref (previous_period);
3206
3207   return TRUE;
3208 }
3209
3210 /* Called with TRACKS_LOCK taken */
3211 static void
3212 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3213     OutputSlot * slot)
3214 {
3215   GstAdaptiveDemuxTrack *track = slot->track;
3216   GstMessage *msg;
3217   gboolean pending_is_ready;
3218   GstAdaptiveDemux2Stream *stream;
3219
3220   /* If we have a pending track for this slot, the current track should be
3221    * draining and no longer selected */
3222   g_assert (track->draining && !track->selected);
3223
3224   /* If we're draining, check if the pending track has enough data *or* that
3225      we've already drained out entirely */
3226   pending_is_ready =
3227       (slot->pending_track->level_time >=
3228       slot->pending_track->buffering_threshold);
3229   pending_is_ready |= slot->pending_track->eos;
3230
3231   if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3232     GST_DEBUG_OBJECT (demux,
3233         "Replacement track '%s' doesn't have enough data for switching yet",
3234         slot->pending_track->stream_id);
3235     return;
3236   }
3237
3238   GST_DEBUG_OBJECT (demux,
3239       "Pending replacement track has enough data, switching");
3240   track->active = FALSE;
3241   track->draining = FALSE;
3242
3243   /* If the stream feeding this track is stopped, flush and clear
3244    * the track now that it's going inactive. If the stream was not
3245    * found, it means we advanced past that period already (and the
3246    * stream was stopped and discarded) */
3247   stream = find_stream_for_track_locked (demux, track);
3248   if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3249     gst_adaptive_demux_track_flush (track);
3250
3251   gst_adaptive_demux_track_unref (track);
3252   /* We steal the reference of pending_track */
3253   track = slot->track = slot->pending_track;
3254   slot->pending_track = NULL;
3255   slot->track->active = TRUE;
3256
3257   /* Make sure the track segment will start at the current position */
3258   track->update_next_segment = TRUE;
3259
3260   /* Send stream start and collection, and schedule sticky events */
3261   gst_adaptive_demux_send_initial_events (demux, slot);
3262
3263   /* Can we emit the streams-selected message now ? */
3264   msg =
3265       all_selected_tracks_are_active (demux,
3266       g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3267   if (msg) {
3268     TRACKS_UNLOCK (demux);
3269     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3270     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3271     TRACKS_LOCK (demux);
3272   }
3273
3274 }
3275
3276 static void
3277 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3278 {
3279   GList *tmp;
3280   GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3281   gboolean wait_for_data = FALSE;
3282   gboolean all_tracks_empty;
3283   GstFlowReturn ret;
3284
3285   GST_DEBUG_OBJECT (demux, "enter");
3286
3287   TRACKS_LOCK (demux);
3288
3289   /* Check if stopping */
3290   if (demux->priv->flushing) {
3291     ret = GST_FLOW_FLUSHING;
3292     goto pause;
3293   }
3294
3295   /* If the selection changed, handle it */
3296   check_and_handle_selection_update_locked (demux);
3297
3298 restart:
3299   ret = GST_FLOW_OK;
3300   global_output_position = GST_CLOCK_STIME_NONE;
3301   all_tracks_empty = TRUE;
3302
3303   if (wait_for_data) {
3304     GST_DEBUG_OBJECT (demux, "Waiting for data");
3305     g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3306     GST_DEBUG_OBJECT (demux, "Done waiting for data");
3307     if (demux->priv->flushing) {
3308       ret = GST_FLOW_FLUSHING;
3309       goto pause;
3310     }
3311     wait_for_data = FALSE;
3312   }
3313
3314   /* Grab/Recalculate current global output position
3315    * This is the minimum pending output position of all tracks used for output
3316    *
3317    * If there is a track which is empty and not EOS, wait for it to receive data
3318    * then recalculate global output position.
3319    *
3320    * This also pushes downstream all non-timed data that might be present.
3321    *
3322    * IF all tracks are EOS : stop task
3323    */
3324   GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3325   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3326     OutputSlot *slot = (OutputSlot *) tmp->data;
3327     GstAdaptiveDemuxTrack *track;
3328
3329     /* If there is a pending track, Check if it's time to switch to it */
3330     if (slot->pending_track)
3331       handle_slot_pending_track_switch_locked (demux, slot);
3332
3333     track = slot->track;
3334
3335     if (!track->active) {
3336       /* Note: Edward: I can't see in what cases we would end up with inactive
3337          tracks assigned to slots. */
3338       GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3339       g_assert (track->active);
3340       continue;
3341     }
3342
3343     if (track->next_position == GST_CLOCK_STIME_NONE) {
3344       gst_adaptive_demux_track_update_next_position (track);
3345     }
3346
3347     GST_TRACE_OBJECT (demux,
3348         "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3349         track->stream_id, track->period_num,
3350         GST_STIME_ARGS (track->next_position));
3351
3352     if (track->next_position != GST_CLOCK_STIME_NONE) {
3353       if (global_output_position == GST_CLOCK_STIME_NONE)
3354         global_output_position = track->next_position;
3355       else
3356         global_output_position =
3357             MIN (global_output_position, track->next_position);
3358       track->waiting_add = FALSE;
3359       all_tracks_empty = FALSE;
3360     } else if (!track->eos) {
3361       GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3362           track->stream_id, track->period_num);
3363       all_tracks_empty = FALSE;
3364       wait_for_data = track->waiting_add = TRUE;
3365     } else {
3366       GST_DEBUG_OBJECT (demux,
3367           "Track %s (period %u) is EOS, not waiting for timed data",
3368           track->stream_id, track->period_num);
3369
3370       if (gst_queue_array_get_length (track->queue) > 0) {
3371         all_tracks_empty = FALSE;
3372       }
3373     }
3374   }
3375
3376   if (wait_for_data)
3377     goto restart;
3378
3379   if (all_tracks_empty && demux->output_period->has_next_period) {
3380     GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3381         demux->output_period->period_num);
3382     if (!gst_adaptive_demux_advance_output_period (demux)) {
3383       /* Failed to move to next period, error out */
3384       ret = GST_FLOW_ERROR;
3385       goto pause;
3386     }
3387     /* Restart the loop */
3388     goto restart;
3389   }
3390
3391   GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3392       GST_STIME_ARGS (global_output_position));
3393
3394   /* For each track:
3395    *
3396    * We know all active tracks have pending timed data
3397    * * while track next_position <= global output position
3398    *   * push pending data
3399    *   * Update track next_position
3400    *     * recalculate global output position
3401    *   * Pop next pending data from track and update pending position
3402    *
3403    */
3404   gboolean need_restart = FALSE;
3405
3406   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3407     OutputSlot *slot = (OutputSlot *) tmp->data;
3408     GstAdaptiveDemuxTrack *track = slot->track;
3409
3410     GST_LOG_OBJECT (track->element,
3411         "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3412         " global_output_position:%" GST_STIME_FORMAT, track->active,
3413         track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3414         GST_STIME_ARGS (global_output_position));
3415
3416     if (!track->active)
3417       continue;
3418
3419     while (global_output_position == GST_CLOCK_STIME_NONE
3420         || !slot->pushed_timed_data
3421         || ((track->next_position != GST_CLOCK_STIME_NONE)
3422             && track->next_position <= global_output_position)
3423         || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3424       GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3425
3426       if (!mo) {
3427         GST_DEBUG_OBJECT (demux,
3428             "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3429             track->stream_id, track->period_num, track->eos,
3430             slot->pushed_timed_data);
3431         /* This should only happen if the track is EOS, or exactly in between
3432          * the parser outputting segment/caps before buffers. */
3433         g_assert (track->eos || !slot->pushed_timed_data);
3434
3435         /* If we drained the track, but there's a pending track on the slot
3436          * loop again to activate it */
3437         if (slot->pending_track) {
3438           GST_DEBUG_OBJECT (demux,
3439               "Track '%s' (period %u) drained, but has a pending track to activate",
3440               track->stream_id, track->period_num);
3441           goto restart;
3442         }
3443         break;
3444       }
3445
3446       demux_update_buffering_locked (demux);
3447       demux_post_buffering_locked (demux);
3448       TRACKS_UNLOCK (demux);
3449
3450       GST_DEBUG_OBJECT (demux,
3451           "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3452           track->period_num, mo);
3453
3454       if (GST_IS_EVENT (mo)) {
3455         GstEvent *event = (GstEvent *) mo;
3456         if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3457           slot->pushed_timed_data = TRUE;
3458         } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3459           /* If there is a pending next period, don't send the EOS */
3460           if (demux->output_period->has_next_period) {
3461             GST_LOG_OBJECT (demux,
3462                 "Dropping EOS on track '%s' (period %u) before next period",
3463                 track->stream_id, track->period_num);
3464             gst_event_store_mark_delivered (&track->sticky_events, event);
3465             gst_event_unref (event);
3466             event = NULL;
3467             /* We'll need to re-check if all tracks are empty again above */
3468             need_restart = TRUE;
3469           }
3470         }
3471
3472         if (event != NULL) {
3473           gst_pad_push_event (slot->pad, gst_event_ref (event));
3474
3475           if (GST_EVENT_IS_STICKY (event))
3476             gst_event_store_mark_delivered (&track->sticky_events, event);
3477           gst_event_unref (event);
3478         }
3479       } else if (GST_IS_BUFFER (mo)) {
3480         GstBuffer *buffer = (GstBuffer *) mo;
3481
3482         if (track->output_discont) {
3483           if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3484             buffer = gst_buffer_make_writable (buffer);
3485             GST_DEBUG_OBJECT (slot->pad,
3486                 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3487                 buffer);
3488             GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3489           }
3490           track->output_discont = FALSE;
3491         }
3492         slot->flow_ret = gst_pad_push (slot->pad, buffer);
3493         ret =
3494             gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3495             slot->pad, slot->flow_ret);
3496         GST_DEBUG_OBJECT (slot->pad,
3497             "track %s (period %u) push returned %s (combined %s)",
3498             track->stream_id, track->period_num,
3499             gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3500         slot->pushed_timed_data = TRUE;
3501       } else {
3502         GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3503       }
3504
3505       TRACKS_LOCK (demux);
3506       gst_adaptive_demux_track_update_next_position (track);
3507
3508       if (ret != GST_FLOW_OK)
3509         goto pause;
3510     }
3511   }
3512
3513   /* Store global output position */
3514   if (global_output_position != GST_CLOCK_STIME_NONE) {
3515     demux->priv->global_output_position = global_output_position;
3516
3517     /* And see if any streams need to be woken for more input */
3518     gst_adaptive_demux_period_check_input_wakeup_locked (demux->input_period,
3519         global_output_position);
3520   }
3521
3522   if (need_restart)
3523     goto restart;
3524
3525   if (global_output_position == GST_CLOCK_STIME_NONE) {
3526     if (!demux->priv->flushing) {
3527       GST_DEBUG_OBJECT (demux,
3528           "Pausing output task after reaching NONE global_output_position");
3529       gst_task_pause (demux->priv->output_task);
3530     }
3531   }
3532
3533   TRACKS_UNLOCK (demux);
3534   GST_DEBUG_OBJECT (demux, "leave");
3535   return;
3536
3537 pause:
3538   {
3539     GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3540     /* If the flushing flag is set, then the task is being
3541      * externally stopped, so don't go to pause(), otherwise we
3542      * should so we don't keep spinning */
3543     if (!demux->priv->flushing) {
3544       GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3545           gst_flow_get_name (ret));
3546       gst_task_pause (demux->priv->output_task);
3547     }
3548
3549     TRACKS_UNLOCK (demux);
3550
3551     if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3552       GstEvent *eos = gst_event_new_eos ();
3553
3554       if (ret != GST_FLOW_EOS) {
3555         GST_ELEMENT_FLOW_ERROR (demux, ret);
3556       }
3557
3558       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3559       if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3560         gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3561       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3562
3563       gst_adaptive_demux_push_src_event (demux, eos);
3564     }
3565
3566     return;
3567   }
3568 }
3569
3570 /* must be called from the scheduler */
3571 gboolean
3572 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3573 {
3574   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3575
3576   if (klass->is_live)
3577     return klass->is_live (demux);
3578   return FALSE;
3579 }
3580
3581 static void
3582 handle_manifest_download_complete (DownloadRequest * request,
3583     DownloadRequestState state, GstAdaptiveDemux * demux)
3584 {
3585   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3586   GstBuffer *buffer;
3587   GstFlowReturn result;
3588
3589   g_free (demux->manifest_base_uri);
3590   g_free (demux->manifest_uri);
3591
3592   if (request->redirect_permanent && request->redirect_uri) {
3593     demux->manifest_uri = g_strdup (request->redirect_uri);
3594     demux->manifest_base_uri = NULL;
3595   } else {
3596     demux->manifest_uri = g_strdup (request->uri);
3597     demux->manifest_base_uri = g_strdup (request->redirect_uri);
3598   }
3599
3600   buffer = download_request_take_buffer (request);
3601
3602   /* We should always have a buffer since this function is the non-error
3603    * callback for the download */
3604   g_assert (buffer);
3605
3606   result = klass->update_manifest_data (demux, buffer);
3607   gst_buffer_unref (buffer);
3608
3609   /* FIXME: Should the manifest uri vars be reverted to original
3610    * values if updating fails? */
3611
3612   if (result == GST_FLOW_OK) {
3613     GstClockTime duration;
3614     /* Send an updated duration message */
3615     duration = klass->get_duration (demux);
3616     if (duration != GST_CLOCK_TIME_NONE) {
3617       GST_DEBUG_OBJECT (demux,
3618           "Sending duration message : %" GST_TIME_FORMAT,
3619           GST_TIME_ARGS (duration));
3620       gst_element_post_message (GST_ELEMENT (demux),
3621           gst_message_new_duration_changed (GST_OBJECT (demux)));
3622     } else {
3623       GST_DEBUG_OBJECT (demux,
3624           "Duration unknown, can not send the duration message");
3625     }
3626
3627     /* If a manifest changes it's liveness or periodic updateness, we need
3628      * to start/stop the manifest update task appropriately */
3629     /* Keep this condition in sync with the one in
3630      * gst_adaptive_demux_start_manifest_update_task()
3631      */
3632     if (gst_adaptive_demux_is_live (demux) &&
3633         klass->requires_periodical_playlist_update (demux)) {
3634       gst_adaptive_demux_start_manifest_update_task (demux);
3635     } else {
3636       gst_adaptive_demux_stop_manifest_update_task (demux);
3637     }
3638   }
3639 }
3640
3641 static void
3642 handle_manifest_download_failure (DownloadRequest * request,
3643     DownloadRequestState state, GstAdaptiveDemux * demux)
3644 {
3645   GST_FIXME_OBJECT (demux, "Manifest download failed.");
3646   /* Retry or error out here */
3647 }
3648
3649 static GstFlowReturn
3650 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3651 {
3652   DownloadRequest *request;
3653   GstFlowReturn ret = GST_FLOW_OK;
3654   GError *error = NULL;
3655
3656   request = download_request_new_uri (demux->manifest_uri);
3657
3658   download_request_set_callbacks (request,
3659       (DownloadRequestEventCallback) handle_manifest_download_complete,
3660       (DownloadRequestEventCallback) handle_manifest_download_failure,
3661       NULL, NULL, demux);
3662
3663   if (!downloadhelper_submit_request (demux->download_helper, NULL,
3664           DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
3665           &error)) {
3666     if (error) {
3667       GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
3668           ("Failed to download manifest: %s", error->message), (NULL));
3669       g_clear_error (&error);
3670     }
3671     ret = GST_FLOW_NOT_LINKED;
3672   }
3673
3674   return ret;
3675 }
3676
3677 /* must be called with manifest_lock taken */
3678 GstFlowReturn
3679 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3680 {
3681   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3682   GstFlowReturn ret;
3683
3684   ret = klass->update_manifest (demux);
3685
3686   return ret;
3687 }
3688
3689 /* must be called with manifest_lock taken */
3690 gboolean
3691 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3692 {
3693   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3694   gboolean ret = FALSE;
3695
3696   if (klass->has_next_period)
3697     ret = klass->has_next_period (demux);
3698   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3699   return ret;
3700 }
3701
3702 /* must be called with manifest_lock taken */
3703 void
3704 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3705 {
3706   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3707   GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
3708
3709   g_return_if_fail (klass->advance_period != NULL);
3710
3711   GST_DEBUG_OBJECT (demux, "Advancing to next period");
3712   /* FIXME : no return value ? What if it fails ? */
3713   klass->advance_period (demux);
3714
3715   if (previous_period == demux->input_period) {
3716     GST_ERROR_OBJECT (demux, "Advancing period failed");
3717     return;
3718   }
3719
3720   /* Stop the previous period stream tasks */
3721   gst_adaptive_demux_period_stop_tasks (previous_period);
3722
3723   gst_adaptive_demux_update_collection (demux, demux->input_period);
3724   /* Figure out a pre-emptive selection based on the output period selection */
3725   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
3726       demux->output_period);
3727
3728   gst_adaptive_demux_prepare_streams (demux, FALSE);
3729   gst_adaptive_demux_start_tasks (demux);
3730 }
3731
3732 /**
3733  * gst_adaptive_demux_get_monotonic_time:
3734  * Returns: a monotonically increasing time, using the system realtime clock
3735  */
3736 GstClockTime
3737 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
3738 {
3739   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3740   return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
3741 }
3742
3743 /**
3744  * gst_adaptive_demux_get_client_now_utc:
3745  * @demux: #GstAdaptiveDemux
3746  * Returns: the client's estimate of UTC
3747  *
3748  * Used to find the client's estimate of UTC, using the system realtime clock.
3749  */
3750 GDateTime *
3751 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
3752 {
3753   return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
3754 }
3755
3756 /**
3757  * gst_adaptive_demux_is_running
3758  * @demux: #GstAdaptiveDemux
3759  * Returns: whether the demuxer is processing data
3760  *
3761  * Returns FALSE if shutdown has started (transitioning down from
3762  * PAUSED), otherwise TRUE.
3763  */
3764 gboolean
3765 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
3766 {
3767   return g_atomic_int_get (&demux->running);
3768 }
3769
3770 /**
3771  * gst_adaptive_demux_get_qos_earliest_time:
3772  *
3773  * Returns: The QOS earliest time
3774  *
3775  * Since: 1.20
3776  */
3777 GstClockTime
3778 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
3779 {
3780   GstClockTime earliest;
3781
3782   GST_OBJECT_LOCK (demux);
3783   earliest = demux->priv->qos_earliest_time;
3784   GST_OBJECT_UNLOCK (demux);
3785
3786   return earliest;
3787 }
3788
3789 gboolean
3790 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
3791     GstAdaptiveDemux2Stream * stream)
3792 {
3793   g_return_val_if_fail (demux && stream, FALSE);
3794
3795   /* FIXME : Migrate to parent */
3796   g_return_val_if_fail (stream->demux == NULL, FALSE);
3797
3798   GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
3799
3800   TRACKS_LOCK (demux);
3801   if (demux->input_period->prepared) {
3802     GST_ERROR_OBJECT (demux,
3803         "Attempted to add streams but no new period was created");
3804     TRACKS_UNLOCK (demux);
3805     return FALSE;
3806   }
3807   stream->demux = demux;
3808   stream->period = demux->input_period;
3809   demux->input_period->streams =
3810       g_list_append (demux->input_period->streams, stream);
3811
3812   if (stream->tracks) {
3813     GList *iter;
3814     for (iter = stream->tracks; iter; iter = iter->next)
3815       if (!gst_adaptive_demux_period_add_track (demux->input_period,
3816               (GstAdaptiveDemuxTrack *) iter->data)) {
3817         GST_ERROR_OBJECT (demux, "Failed to add track elements");
3818         TRACKS_UNLOCK (demux);
3819         return FALSE;
3820       }
3821   }
3822   TRACKS_UNLOCK (demux);
3823   return TRUE;
3824 }
3825
3826 /* Return the current playback rate including any instant rate multiplier */
3827 gdouble
3828 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
3829 {
3830   gdouble rate;
3831   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3832   rate = demux->segment.rate * demux->instant_rate_multiplier;
3833   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3834   return rate;
3835 }