rtsp-server: fix memory leak
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * Copyright (C) 2021-2022 Centricular Ltd
7  *   Author: Edward Hervey <edward@centricular.com>
8  *   Author: Jan Schmidt <jan@centricular.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25
26 /**
27  * SECTION:plugin-adaptivedemux2
28  * @short_description: Next Generation adaptive demuxers
29  *
30  * What is an adaptive demuxer?  Adaptive demuxers are special demuxers in the
31  * sense that they don't actually demux data received from upstream but download
32  * the data themselves.
33  *
34  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35  * of fragments. The manifest describes the available media and the sequence of
36  * fragments to use. Each fragment contains a small part of the media (typically
37  * only a few seconds). It is possible for the manifest to have the same media
38  * available in different configurations (bitrates for example) so that the
39  * client can select the one that best suits its scenario (network fluctuation,
40  * hardware requirements...).
41  *
42  * Furthermore, that manifest can also specify alternative medias (such as audio
43  * or subtitle tracks in different languages). Only the fragments for the
44  * requested selection will be download.
45  *
46  * These elements can therefore "adapt" themselves to the network conditions (as
47  * opposed to the server doing that adaptation) and user choices, which is why
48  * they are called "adaptive" demuxers.
49  *
50  * Note: These elements require a "streams-aware" container to work
51  * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52  * GST_BIN_FLAG_STREAMS_AWARE flag set).
53  *
54  * Subclasses:
55  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56  * about the intrinsics of the subclass formats, so the subclasses are
57  * responsible for maintaining the manifest data structures and stream
58  * information.
59  *
60  * Since: 1.22
61  */
62
63 /*
64 See the adaptive-demuxer.md design documentation for more information
65
66 MT safety.
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
72
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
80
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
97
98 By using the api_lock a thread is protected against other API calls.
99 */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
107
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
111
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
114
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
118
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
121 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
122 #define DEFAULT_MAX_RESOLUTION 0
123 #endif
124
125 #define DEFAULT_MAX_BUFFERING_TIME (30 *  GST_SECOND)
126
127 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
128 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME 0  /* Automatic */
129 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
130 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
131
132 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
133 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
134
135 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
136 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
137 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
138
139 enum
140 {
141   PROP_0,
142   PROP_CONNECTION_SPEED,
143   PROP_BANDWIDTH_TARGET_RATIO,
144   PROP_CONNECTION_BITRATE,
145   PROP_MIN_BITRATE,
146   PROP_MAX_BITRATE,
147   PROP_CURRENT_BANDWIDTH,
148   PROP_MAX_BUFFERING_TIME,
149   PROP_BUFFERING_HIGH_WATERMARK_TIME,
150   PROP_BUFFERING_LOW_WATERMARK_TIME,
151   PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
152   PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
153   PROP_CURRENT_LEVEL_TIME_VIDEO,
154   PROP_CURRENT_LEVEL_TIME_AUDIO,
155 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
156   PROP_MAX_WIDTH,
157   PROP_MAX_HEIGHT,
158 #endif
159   PROP_LAST
160 };
161
162 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
163 GST_STATIC_PAD_TEMPLATE ("video_%02u",
164     GST_PAD_SRC,
165     GST_PAD_SOMETIMES,
166     GST_STATIC_CAPS_ANY);
167
168 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
170     GST_PAD_SRC,
171     GST_PAD_SOMETIMES,
172     GST_STATIC_CAPS_ANY);
173
174 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
175 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
176     GST_PAD_SRC,
177     GST_PAD_SOMETIMES,
178     GST_STATIC_CAPS_ANY);
179
180 /* Private structure for a track being outputted */
181 typedef struct _OutputSlot
182 {
183   /* Output pad */
184   GstPad *pad;
185
186   /* Last flow return */
187   GstFlowReturn flow_ret;
188
189   /* Stream Type */
190   GstStreamType type;
191
192   /* Target track (reference) */
193   GstAdaptiveDemuxTrack *track;
194
195   /* Pending track (which will replace track) */
196   GstAdaptiveDemuxTrack *pending_track;
197
198   /* TRUE if a buffer or a gap event was pushed through this slot. */
199   gboolean pushed_timed_data;
200 } OutputSlot;
201
202 static GstBinClass *parent_class = NULL;
203 static gint private_offset = 0;
204
205 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
206 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
207     GstAdaptiveDemuxClass * klass);
208 static void gst_adaptive_demux_finalize (GObject * object);
209 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
210     element, GstStateChange transition);
211 static gboolean gst_adaptive_demux_query (GstElement * element,
212     GstQuery * query);
213 static gboolean gst_adaptive_demux_send_event (GstElement * element,
214     GstEvent * event);
215
216 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
217
218 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
219     GstEvent * event);
220 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
221     GstObject * parent, GstBuffer * buffer);
222 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
223     GstQuery * query);
224 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
225     GstEvent * event);
226 static gboolean gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
227     GstEvent * event);
228 static gboolean gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux
229     * demux, GstEvent * event);
230
231 static gboolean
232 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
233
234 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
235 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
236 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
237     gboolean first_and_live);
238
239 static GstFlowReturn
240 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
241
242 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
243     demux);
244 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
245     demux);
246
247 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
248 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
249     gboolean stop_updates);
250
251 static gboolean
252 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
253     * demux);
254
255 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
256  * method to get to the padtemplates */
257 GType
258 gst_adaptive_demux_ng_get_type (void)
259 {
260   static gsize type = 0;
261
262   if (g_once_init_enter (&type)) {
263     GType _type;
264     static const GTypeInfo info = {
265       sizeof (GstAdaptiveDemuxClass),
266       NULL,
267       NULL,
268       (GClassInitFunc) gst_adaptive_demux_class_init,
269       NULL,
270       NULL,
271       sizeof (GstAdaptiveDemux),
272       0,
273       (GInstanceInitFunc) gst_adaptive_demux_init,
274     };
275
276     _type = g_type_register_static (GST_TYPE_BIN,
277         "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
278
279     private_offset =
280         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
281
282     g_once_init_leave (&type, _type);
283   }
284   return type;
285 }
286
287 static inline GstAdaptiveDemuxPrivate *
288 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
289 {
290   return (G_STRUCT_MEMBER_P (self, private_offset));
291 }
292
293 static void
294 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
295     const GValue * value, GParamSpec * pspec)
296 {
297   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
298
299   GST_OBJECT_LOCK (demux);
300
301   switch (prop_id) {
302     case PROP_CONNECTION_SPEED:
303       demux->connection_speed = g_value_get_uint (value) * 1000;
304       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
305           demux->connection_speed);
306       break;
307     case PROP_BANDWIDTH_TARGET_RATIO:
308       demux->bandwidth_target_ratio = g_value_get_float (value);
309       break;
310     case PROP_MIN_BITRATE:
311       demux->min_bitrate = g_value_get_uint (value);
312       break;
313     case PROP_MAX_BITRATE:
314       demux->max_bitrate = g_value_get_uint (value);
315       break;
316     case PROP_CONNECTION_BITRATE:
317       demux->connection_speed = g_value_get_uint (value);
318       break;
319       /* FIXME: Recalculate track and buffering levels
320        * when watermarks change? */
321     case PROP_MAX_BUFFERING_TIME:
322       demux->max_buffering_time = g_value_get_uint64 (value);
323       break;
324     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
325       demux->buffering_high_watermark_time = g_value_get_uint64 (value);
326       break;
327     case PROP_BUFFERING_LOW_WATERMARK_TIME:
328       demux->buffering_low_watermark_time = g_value_get_uint64 (value);
329       break;
330     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
331       demux->buffering_high_watermark_fragments = g_value_get_double (value);
332       break;
333     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
334       demux->buffering_low_watermark_fragments = g_value_get_double (value);
335       break;
336 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
337     case PROP_MAX_WIDTH:
338       demux->max_width = g_value_get_uint (value);
339       break;
340     case PROP_MAX_HEIGHT:
341       demux->max_height = g_value_get_uint (value);
342       break;
343 #endif
344     default:
345       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
346       break;
347   }
348
349   GST_OBJECT_UNLOCK (demux);
350 }
351
352 static void
353 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
354     GValue * value, GParamSpec * pspec)
355 {
356   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
357
358   GST_OBJECT_LOCK (demux);
359
360   switch (prop_id) {
361     case PROP_CONNECTION_SPEED:
362       g_value_set_uint (value, demux->connection_speed / 1000);
363       break;
364     case PROP_BANDWIDTH_TARGET_RATIO:
365       g_value_set_float (value, demux->bandwidth_target_ratio);
366       break;
367     case PROP_MIN_BITRATE:
368       g_value_set_uint (value, demux->min_bitrate);
369       break;
370     case PROP_MAX_BITRATE:
371       g_value_set_uint (value, demux->max_bitrate);
372       break;
373     case PROP_CONNECTION_BITRATE:
374       g_value_set_uint (value, demux->connection_speed);
375       break;
376     case PROP_CURRENT_BANDWIDTH:
377       g_value_set_uint (value, demux->current_download_rate);
378       break;
379     case PROP_MAX_BUFFERING_TIME:
380       g_value_set_uint64 (value, demux->max_buffering_time);
381       break;
382     case PROP_BUFFERING_HIGH_WATERMARK_TIME:
383       g_value_set_uint64 (value, demux->buffering_high_watermark_time);
384       break;
385     case PROP_BUFFERING_LOW_WATERMARK_TIME:
386       g_value_set_uint64 (value, demux->buffering_low_watermark_time);
387       break;
388     case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
389       g_value_set_double (value, demux->buffering_high_watermark_fragments);
390       break;
391     case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
392       g_value_set_double (value, demux->buffering_low_watermark_fragments);
393       break;
394     case PROP_CURRENT_LEVEL_TIME_VIDEO:
395       g_value_set_uint64 (value, demux->current_level_time_video);
396       break;
397     case PROP_CURRENT_LEVEL_TIME_AUDIO:
398       g_value_set_uint64 (value, demux->current_level_time_audio);
399       break;
400 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
401     case PROP_MAX_WIDTH:
402       g_value_set_uint (value, demux->max_width);
403       break;
404     case PROP_MAX_HEIGHT:
405       g_value_set_uint (value, demux->max_height);
406       break;
407 #endif
408     default:
409       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
410       break;
411   }
412
413   GST_OBJECT_UNLOCK (demux);
414 }
415
416 static void
417 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
418 {
419   GObjectClass *gobject_class;
420   GstElementClass *gstelement_class;
421   GstBinClass *gstbin_class;
422
423   gobject_class = G_OBJECT_CLASS (klass);
424   gstelement_class = GST_ELEMENT_CLASS (klass);
425   gstbin_class = GST_BIN_CLASS (klass);
426
427   GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
428       "Base Adaptive Demux (ng)");
429
430   parent_class = g_type_class_peek_parent (klass);
431
432   if (private_offset != 0)
433     g_type_class_adjust_private_offset (klass, &private_offset);
434
435   gobject_class->set_property = gst_adaptive_demux_set_property;
436   gobject_class->get_property = gst_adaptive_demux_get_property;
437   gobject_class->finalize = gst_adaptive_demux_finalize;
438
439   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
440       g_param_spec_uint ("connection-speed", "Connection Speed",
441           "Network connection speed to use in kbps (0 = calculate from downloaded"
442           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
443           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444
445   g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
446       g_param_spec_float ("bandwidth-target-ratio",
447           "Ratio of target bandwidth / available bandwidth",
448           "Limit of the available bitrate to use when switching to alternates",
449           0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
450           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
451
452   g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
453       g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
454           "Network connection speed to use (0 = automatic) (bits/s)",
455           0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458   g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
459       g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
460           "Minimum bitrate to use when switching to alternates (bits/s)",
461           0, G_MAXUINT, DEFAULT_MIN_BITRATE,
462           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
463
464   g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
465       g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
466           "Maximum bitrate to use when switching to alternates (bits/s)",
467           0, G_MAXUINT, DEFAULT_MAX_BITRATE,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469
470 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
471   g_object_class_install_property (gobject_class, PROP_MAX_WIDTH,
472       g_param_spec_uint ("max-video-width",
473           "Max video width limit",
474           "Maximum limit of the available video width to use when switching to alternates. (0 = no limit)",
475           0, G_MAXUINT, DEFAULT_MAX_RESOLUTION,
476           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
477
478   g_object_class_install_property (gobject_class, PROP_MAX_HEIGHT,
479       g_param_spec_uint ("max-video-height",
480           "Max video height limit",
481           "Maximum limit of the available video height to use when switching to alternates. (0 = no limit)",
482           0, G_MAXUINT, DEFAULT_MAX_RESOLUTION,
483           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
484 #endif
485
486   g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
487       g_param_spec_uint ("current-bandwidth",
488           "Current download bandwidth (bits/s)",
489           "Report of current download bandwidth (based on arriving data) (bits/s)",
490           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
491
492   g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
493       g_param_spec_uint64 ("max-buffering-time",
494           "Buffering maximum size (ns)",
495           "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
496           0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
497           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
498           G_PARAM_STATIC_STRINGS));
499
500   g_object_class_install_property (gobject_class,
501       PROP_BUFFERING_HIGH_WATERMARK_TIME,
502       g_param_spec_uint64 ("high-watermark-time",
503           "High buffering watermark size (ns)",
504           "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
505           0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
506           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
507           G_PARAM_STATIC_STRINGS));
508
509   g_object_class_install_property (gobject_class,
510       PROP_BUFFERING_LOW_WATERMARK_TIME,
511       g_param_spec_uint64 ("low-watermark-time",
512           "Low buffering watermark size (ns)",
513           "Low watermark for parsed data below which downloads are resumed (in ns, 0=automatic)",
514           0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
515           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
516           G_PARAM_STATIC_STRINGS));
517
518   g_object_class_install_property (gobject_class,
519       PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
520       g_param_spec_double ("high-watermark-fragments",
521           "High buffering watermark size (fragments)",
522           "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
523           0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
524           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
525           G_PARAM_STATIC_STRINGS));
526
527   g_object_class_install_property (gobject_class,
528       PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
529       g_param_spec_double ("low-watermark-fragments",
530           "Low buffering watermark size (fragments)",
531           "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
532           0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
533           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
534           G_PARAM_STATIC_STRINGS));
535
536   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
537       g_param_spec_uint64 ("current-level-time-video",
538           "Currently buffered level of video (ns)",
539           "Currently buffered level of video track(s) (ns)",
540           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
541           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
542           G_PARAM_STATIC_STRINGS));
543
544   g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
545       g_param_spec_uint64 ("current-level-time-audio",
546           "Currently buffered level of audio (ns)",
547           "Currently buffered level of audio track(s) (ns)",
548           0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
549           G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
550           G_PARAM_STATIC_STRINGS));
551
552   gst_element_class_add_static_pad_template (gstelement_class,
553       &gst_adaptive_demux_audiosrc_template);
554   gst_element_class_add_static_pad_template (gstelement_class,
555       &gst_adaptive_demux_videosrc_template);
556   gst_element_class_add_static_pad_template (gstelement_class,
557       &gst_adaptive_demux_subtitlesrc_template);
558
559   gstelement_class->change_state = gst_adaptive_demux_change_state;
560   gstelement_class->query = gst_adaptive_demux_query;
561   gstelement_class->send_event = gst_adaptive_demux_send_event;
562
563   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
564
565   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
566   klass->requires_periodical_playlist_update =
567       gst_adaptive_demux_requires_periodical_playlist_update_default;
568   gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
569 }
570
571 static void
572 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
573     GstAdaptiveDemuxClass * klass)
574 {
575   GstPadTemplate *pad_template;
576
577   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
578
579   demux->priv = gst_adaptive_demux_get_instance_private (demux);
580   demux->priv->input_adapter = gst_adapter_new ();
581   demux->realtime_clock = gst_adaptive_demux_clock_new ();
582
583   demux->download_helper = downloadhelper_new (demux->realtime_clock);
584   demux->priv->segment_seqnum = gst_util_seqnum_next ();
585   demux->have_group_id = FALSE;
586   demux->group_id = G_MAXUINT;
587
588   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
589   demux->instant_rate_multiplier = 1.0;
590
591   GST_OBJECT_FLAG_SET (demux, GST_BIN_FLAG_STREAMS_AWARE);
592   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
593       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
594
595   g_rec_mutex_init (&demux->priv->manifest_lock);
596
597   demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
598
599   g_mutex_init (&demux->priv->api_lock);
600   g_mutex_init (&demux->priv->segment_lock);
601
602   g_mutex_init (&demux->priv->tracks_lock);
603   g_cond_init (&demux->priv->tracks_add);
604
605   g_mutex_init (&demux->priv->buffering_lock);
606
607   demux->priv->periods = g_queue_new ();
608
609   pad_template =
610       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
611   g_return_if_fail (pad_template != NULL);
612
613   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
614   gst_pad_set_event_function (demux->sinkpad,
615       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
616   gst_pad_set_chain_function (demux->sinkpad,
617       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
618
619   /* Properties */
620   demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
621   demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
622   demux->min_bitrate = DEFAULT_MIN_BITRATE;
623   demux->max_bitrate = DEFAULT_MAX_BITRATE;
624 #ifdef TIZEN_FEATURE_ADAPTIVE_VARIANT_LIMIT
625   demux->max_width = DEFAULT_MAX_RESOLUTION;
626   demux->max_height = DEFAULT_MAX_RESOLUTION;
627 #endif
628
629   demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
630   demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
631   demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
632   demux->buffering_high_watermark_fragments =
633       DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
634   demux->buffering_low_watermark_fragments =
635       DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
636
637   demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
638   demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
639
640   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
641
642   demux->priv->duration = GST_CLOCK_TIME_NONE;
643
644   /* Output combiner */
645   demux->priv->flowcombiner = gst_flow_combiner_new ();
646
647   /* Output task */
648   g_rec_mutex_init (&demux->priv->output_lock);
649   demux->priv->output_task =
650       gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
651       NULL);
652   gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
653 }
654
655 static void
656 gst_adaptive_demux_finalize (GObject * object)
657 {
658   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
659   GstAdaptiveDemuxPrivate *priv = demux->priv;
660
661   GST_DEBUG_OBJECT (object, "finalize");
662
663   g_object_unref (priv->input_adapter);
664
665   downloadhelper_free (demux->download_helper);
666
667   g_rec_mutex_clear (&demux->priv->manifest_lock);
668   g_mutex_clear (&demux->priv->api_lock);
669   g_mutex_clear (&demux->priv->segment_lock);
670
671   g_mutex_clear (&demux->priv->buffering_lock);
672
673   gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
674
675   /* The input period is present after a reset, clear it now */
676   if (demux->input_period)
677     gst_adaptive_demux_period_unref (demux->input_period);
678
679   if (demux->realtime_clock) {
680     gst_adaptive_demux_clock_unref (demux->realtime_clock);
681     demux->realtime_clock = NULL;
682   }
683   g_object_unref (priv->output_task);
684   g_rec_mutex_clear (&priv->output_lock);
685
686   gst_flow_combiner_free (priv->flowcombiner);
687
688   g_queue_free (priv->periods);
689
690   G_OBJECT_CLASS (parent_class)->finalize (object);
691 }
692
693 static gboolean
694 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
695 {
696   gboolean ret = FALSE;
697   GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
698
699   if (parent) {
700     ret = GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE);
701     gst_object_unref (parent);
702   }
703
704   return ret;
705 }
706
707 static GstStateChangeReturn
708 gst_adaptive_demux_change_state (GstElement * element,
709     GstStateChange transition)
710 {
711   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
712   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
713
714   switch (transition) {
715     case GST_STATE_CHANGE_NULL_TO_READY:
716       if (!gst_adaptive_demux_check_streams_aware (demux)) {
717         GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
718             (_("Element requires a streams-aware context.")), (NULL));
719         goto fail_out;
720       }
721       break;
722     case GST_STATE_CHANGE_PAUSED_TO_READY:
723       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
724         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
725
726       gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
727       downloadhelper_stop (demux->download_helper);
728
729       TRACKS_LOCK (demux);
730       demux->priv->flushing = TRUE;
731       g_cond_signal (&demux->priv->tracks_add);
732       gst_task_stop (demux->priv->output_task);
733       TRACKS_UNLOCK (demux);
734
735       gst_task_join (demux->priv->output_task);
736
737       GST_API_LOCK (demux);
738       gst_adaptive_demux_reset (demux);
739       GST_API_UNLOCK (demux);
740       break;
741     case GST_STATE_CHANGE_READY_TO_PAUSED:
742       GST_API_LOCK (demux);
743       gst_adaptive_demux_reset (demux);
744
745       gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
746       if (g_atomic_int_get (&demux->priv->have_manifest))
747         gst_adaptive_demux_start_manifest_update_task (demux);
748       GST_API_UNLOCK (demux);
749       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
750         GST_DEBUG_OBJECT (demux, "demuxer has started running");
751       /* gst_task_start (demux->priv->output_task); */
752       break;
753     default:
754       break;
755   }
756
757   /* this must be run with the scheduler and output tasks stopped. */
758   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
759
760   switch (transition) {
761     case GST_STATE_CHANGE_READY_TO_PAUSED:
762       /* Start download task */
763       downloadhelper_start (demux->download_helper);
764       break;
765     default:
766       break;
767   }
768
769 fail_out:
770   return result;
771 }
772
773 static void
774 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
775     OutputSlot * slot)
776 {
777   GstEvent *eos = gst_event_new_eos ();
778   GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
779
780   /* FIXME: The slot might not have output any data, caps or segment yet */
781   gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
782   gst_pad_push_event (slot->pad, eos);
783   gst_pad_set_active (slot->pad, FALSE);
784   gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
785   gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
786   if (slot->track)
787     gst_adaptive_demux_track_unref (slot->track);
788   if (slot->pending_track)
789     gst_adaptive_demux_track_unref (slot->pending_track);
790
791   g_slice_free (OutputSlot, slot);
792 }
793
794 static OutputSlot *
795 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
796     GstStreamType streamtype)
797 {
798   OutputSlot *slot;
799   GstPadTemplate *tmpl;
800   gchar *name;
801
802   switch (streamtype) {
803     case GST_STREAM_TYPE_AUDIO:
804       name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
805       tmpl =
806           gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
807       break;
808     case GST_STREAM_TYPE_VIDEO:
809       name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
810       tmpl =
811           gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
812       break;
813     case GST_STREAM_TYPE_TEXT:
814       name =
815           g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
816       tmpl =
817           gst_static_pad_template_get
818           (&gst_adaptive_demux_subtitlesrc_template);
819       break;
820     default:
821       g_assert_not_reached ();
822       return NULL;
823   }
824
825   slot = g_slice_new0 (OutputSlot);
826   slot->type = streamtype;
827   slot->pushed_timed_data = FALSE;
828
829   /* Create and activate new pads */
830   slot->pad = gst_pad_new_from_template (tmpl, name);
831   g_free (name);
832   gst_object_unref (tmpl);
833
834   gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
835   gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
836   gst_pad_set_active (slot->pad, TRUE);
837
838   gst_pad_set_query_function (slot->pad,
839       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
840   gst_pad_set_event_function (slot->pad,
841       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
842
843   gst_pad_set_element_private (slot->pad, slot);
844
845   GST_INFO_OBJECT (demux, "Created output slot %s:%s",
846       GST_DEBUG_PAD_NAME (slot->pad));
847   return slot;
848 }
849
850 /* Called:
851  * * After `process_manifest` or when a period starts
852  * * Or when all tracks have been created
853  *
854  * Goes over tracks and creates the collection
855  *
856  * Returns TRUE if the collection was fully created.
857  *
858  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
859  * */
860 static gboolean
861 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
862     GstAdaptiveDemuxPeriod * period)
863 {
864   GstStreamCollection *collection;
865   GList *iter;
866
867   GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
868
869   if (!period->tracks_changed) {
870     GST_DEBUG_OBJECT (demux, "Tracks didn't change");
871     return TRUE;
872   }
873
874   if (!period->tracks) {
875     GST_WARNING_OBJECT (demux, "No tracks registered/present");
876     return FALSE;
877   }
878
879   if (gst_adaptive_demux_period_has_pending_tracks (period)) {
880     GST_DEBUG_OBJECT (demux,
881         "Streams still have pending tracks, not creating/updating collection");
882     return FALSE;
883   }
884
885   /* Update collection */
886   collection = gst_stream_collection_new ("adaptivedemux");
887
888   for (iter = period->tracks; iter; iter = iter->next) {
889     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
890
891     GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
892     gst_stream_collection_add_stream (collection,
893         gst_object_ref (track->stream_object));
894   }
895
896   if (period->collection)
897     gst_object_unref (period->collection);
898   period->collection = collection;
899
900   return TRUE;
901 }
902
903 /*
904  * Called for the output period:
905  * * after `update_collection()` if the input period is the same as the output period
906  * * When the output period changes
907  *
908  * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
909  */
910 static gboolean
911 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
912 {
913   GstStreamCollection *collection;
914   GstAdaptiveDemuxPeriod *period = demux->output_period;
915   guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
916
917   g_return_val_if_fail (period, FALSE);
918   if (!period->collection) {
919     GST_DEBUG_OBJECT (demux, "No collection available yet");
920     return TRUE;
921   }
922
923   collection = period->collection;
924
925   GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
926       period->period_num);
927
928   /* Post collection */
929   TRACKS_UNLOCK (demux);
930   GST_MANIFEST_UNLOCK (demux);
931
932   gst_element_post_message (GST_ELEMENT_CAST (demux),
933       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
934
935   GST_MANIFEST_LOCK (demux);
936   TRACKS_LOCK (demux);
937
938   /* If no stream selection was handled, make a default selection */
939   if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
940     gst_adaptive_demux_period_select_default_tracks (demux,
941         demux->output_period);
942   }
943
944   /* Make sure the output task is running */
945   if (gst_adaptive_demux2_is_running (demux)) {
946     demux->priv->flushing = FALSE;
947     GST_DEBUG_OBJECT (demux, "Starting the output task");
948     gst_task_start (demux->priv->output_task);
949   }
950
951   return TRUE;
952 }
953
954 static gboolean
955 handle_incoming_manifest (GstAdaptiveDemux * demux)
956 {
957   GstAdaptiveDemuxClass *demux_class;
958   GstQuery *query;
959   gboolean query_res;
960   gboolean ret = TRUE;
961   gsize available;
962   GstBuffer *manifest_buffer;
963
964   GST_API_LOCK (demux);
965   GST_MANIFEST_LOCK (demux);
966
967   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
968
969   available = gst_adapter_available (demux->priv->input_adapter);
970
971   if (available == 0)
972     goto eos_without_data;
973
974   GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
975
976   /* Need to get the URI to use it as a base to generate the fragment's
977    * uris */
978   query = gst_query_new_uri ();
979   query_res = gst_pad_peer_query (demux->sinkpad, query);
980   if (query_res) {
981     gchar *uri, *redirect_uri;
982     gboolean permanent;
983
984     gst_query_parse_uri (query, &uri);
985     gst_query_parse_uri_redirection (query, &redirect_uri);
986     gst_query_parse_uri_redirection_permanent (query, &permanent);
987
988     if (permanent && redirect_uri) {
989       demux->manifest_uri = redirect_uri;
990       demux->manifest_base_uri = NULL;
991       g_free (uri);
992     } else {
993       demux->manifest_uri = uri;
994       demux->manifest_base_uri = redirect_uri;
995     }
996
997     GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
998         demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
999
1000     if (!g_str_has_prefix (demux->manifest_uri, "data:")
1001         && !g_str_has_prefix (demux->manifest_uri, "http://")
1002         && !g_str_has_prefix (demux->manifest_uri, "https://")) {
1003       GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1004           (_("Invalid manifest URI")),
1005           ("Manifest URI needs to use either data:, http:// or https://"));
1006       gst_query_unref (query);
1007       ret = FALSE;
1008       goto unlock_out;
1009     }
1010   } else {
1011     GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
1012   }
1013   gst_query_unref (query);
1014
1015   /* If somehow we didn't receive a stream-start with a group_id, pick one now */
1016   if (!demux->have_group_id) {
1017     demux->have_group_id = TRUE;
1018     demux->group_id = gst_util_group_id_next ();
1019   }
1020
1021   /* Let the subclass parse the manifest */
1022   manifest_buffer =
1023       gst_adapter_take_buffer (demux->priv->input_adapter, available);
1024   ret = demux_class->process_manifest (demux, manifest_buffer);
1025   gst_buffer_unref (manifest_buffer);
1026
1027   gst_element_post_message (GST_ELEMENT_CAST (demux),
1028       gst_message_new_element (GST_OBJECT_CAST (demux),
1029           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
1030               "manifest-uri", G_TYPE_STRING,
1031               demux->manifest_uri, "uri", G_TYPE_STRING,
1032               demux->manifest_uri,
1033               "manifest-download-start", GST_TYPE_CLOCK_TIME,
1034               GST_CLOCK_TIME_NONE,
1035               "manifest-download-stop", GST_TYPE_CLOCK_TIME,
1036               gst_util_get_timestamp (), NULL)));
1037
1038   if (!ret)
1039     goto invalid_manifest;
1040
1041   /* Streams should have been added to the input period if the manifest parsing
1042    * succeeded */
1043   if (!demux->input_period->streams)
1044     goto no_streams;
1045
1046   g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1047
1048   GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1049   /* Send duration message */
1050   if (!gst_adaptive_demux_is_live (demux)) {
1051     GstClockTime duration = demux_class->get_duration (demux);
1052
1053     demux->priv->duration = duration;
1054     if (duration != GST_CLOCK_TIME_NONE) {
1055       GST_DEBUG_OBJECT (demux,
1056           "Sending duration message : %" GST_TIME_FORMAT,
1057           GST_TIME_ARGS (duration));
1058       gst_element_post_message (GST_ELEMENT (demux),
1059           gst_message_new_duration_changed (GST_OBJECT (demux)));
1060     } else {
1061       GST_DEBUG_OBJECT (demux,
1062           "media duration unknown, can not send the duration message");
1063     }
1064   }
1065
1066   TRACKS_LOCK (demux);
1067   /* New streams/tracks will have been added to the input period */
1068   /* The input period has streams, make it the active output period */
1069   /* FIXME : Factorize this into a function to make a period active */
1070   demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1071   ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1072       gst_adaptive_demux_post_collection (demux);
1073   TRACKS_UNLOCK (demux);
1074
1075   gst_adaptive_demux_prepare_streams (demux,
1076       gst_adaptive_demux_is_live (demux));
1077   gst_adaptive_demux_start_tasks (demux);
1078   gst_adaptive_demux_start_manifest_update_task (demux);
1079
1080 unlock_out:
1081   GST_MANIFEST_UNLOCK (demux);
1082   GST_API_UNLOCK (demux);
1083
1084   return ret;
1085
1086   /* ERRORS */
1087 eos_without_data:
1088   {
1089     GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1090     ret = FALSE;
1091     goto unlock_out;
1092   }
1093
1094 no_streams:
1095   {
1096     /* no streams */
1097     GST_WARNING_OBJECT (demux, "No streams created from manifest");
1098     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1099         (_("This file contains no playable streams.")),
1100         ("No known stream formats found at the Manifest"));
1101     ret = FALSE;
1102     goto unlock_out;
1103   }
1104
1105 invalid_manifest:
1106   {
1107     GST_MANIFEST_UNLOCK (demux);
1108     GST_API_UNLOCK (demux);
1109
1110     /* In most cases, this will happen if we set a wrong url in the
1111      * source element and we have received the 404 HTML response instead of
1112      * the manifest */
1113     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1114     return FALSE;
1115   }
1116 }
1117
1118 struct http_headers_collector
1119 {
1120   GstAdaptiveDemux *demux;
1121   gchar **cookies;
1122 };
1123
1124 static gboolean
1125 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1126     const GValue * value, gpointer userdata)
1127 {
1128   struct http_headers_collector *hdr_data = userdata;
1129   GstAdaptiveDemux *demux = hdr_data->demux;
1130   const gchar *field_name = g_quark_to_string (field_id);
1131
1132   if (G_UNLIKELY (value == NULL))
1133     return TRUE;                /* This should not happen */
1134
1135   if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1136     const gchar *user_agent = g_value_get_string (value);
1137
1138     GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1139     downloadhelper_set_user_agent (demux->download_helper, user_agent);
1140   }
1141
1142   if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1143       g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1144     guint i = 0, prev_len = 0, total_len = 0;
1145     gchar **cookies = NULL;
1146
1147     if (hdr_data->cookies != NULL)
1148       prev_len = g_strv_length (hdr_data->cookies);
1149
1150     if (GST_VALUE_HOLDS_ARRAY (value)) {
1151       total_len = gst_value_array_get_size (value) + prev_len;
1152       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1153
1154       for (i = 0; i < gst_value_array_get_size (value); i++) {
1155         GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1156             g_value_get_string (gst_value_array_get_value (value, i)));
1157         cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1158       }
1159     } else if (G_VALUE_HOLDS_STRING (value)) {
1160       total_len = 1 + prev_len;
1161       cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1162
1163       GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1164           g_value_get_string (value));
1165       cookies[0] = g_value_dup_string (value);
1166     } else {
1167       GST_WARNING_OBJECT (demux, "%s field is not string or array",
1168           g_quark_to_string (field_id));
1169     }
1170
1171     if (cookies) {
1172       if (prev_len) {
1173         guint j;
1174         for (j = 0; j < prev_len; j++) {
1175           GST_DEBUG_OBJECT (demux,
1176               "Append existing cookie %s", hdr_data->cookies[j]);
1177           cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1178         }
1179       }
1180       cookies[total_len] = NULL;
1181
1182       g_strfreev (hdr_data->cookies);
1183       hdr_data->cookies = cookies;
1184     }
1185   }
1186
1187   if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1188     const gchar *referer = g_value_get_string (value);
1189     GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1190
1191     downloadhelper_set_referer (demux->download_helper, referer);
1192   }
1193
1194   /* Date header can be used to estimate server offset */
1195   if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1196     const gchar *http_date = g_value_get_string (value);
1197
1198     if (http_date) {
1199       GstDateTime *datetime =
1200           gst_adaptive_demux_util_parse_http_head_date (http_date);
1201
1202       if (datetime) {
1203         GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1204         gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1205
1206         GST_INFO_OBJECT (demux,
1207             "HTTP response Date %s", GST_STR_NULL (date_string));
1208         g_free (date_string);
1209
1210         gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1211
1212         g_date_time_unref (utc_now);
1213         gst_date_time_unref (datetime);
1214       }
1215     }
1216   }
1217
1218   return TRUE;
1219 }
1220
1221 static gboolean
1222 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1223     GstEvent * event)
1224 {
1225   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1226   gboolean ret;
1227
1228   switch (event->type) {
1229     case GST_EVENT_FLUSH_STOP:{
1230       GST_API_LOCK (demux);
1231       GST_MANIFEST_LOCK (demux);
1232
1233       gst_adaptive_demux_reset (demux);
1234
1235       ret = gst_pad_event_default (pad, parent, event);
1236
1237       GST_MANIFEST_UNLOCK (demux);
1238       GST_API_UNLOCK (demux);
1239
1240       return ret;
1241     }
1242     case GST_EVENT_EOS:
1243     {
1244       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1245         if (!handle_incoming_manifest (demux)) {
1246           GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1247           return gst_pad_event_default (pad, parent, event);
1248         }
1249         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1250       } else {
1251         GST_ERROR_OBJECT (demux,
1252             "Failed to acquire scheduler to handle manifest");
1253         return gst_pad_event_default (pad, parent, event);
1254       }
1255       gst_event_unref (event);
1256       return TRUE;
1257     }
1258     case GST_EVENT_STREAM_START:
1259       if (gst_event_parse_group_id (event, &demux->group_id))
1260         demux->have_group_id = TRUE;
1261       else
1262         demux->have_group_id = FALSE;
1263       /* Swallow stream-start, we'll push our own */
1264       gst_event_unref (event);
1265       return TRUE;
1266     case GST_EVENT_SEGMENT:
1267       /* Swallow newsegments, we'll push our own */
1268       gst_event_unref (event);
1269       return TRUE;
1270     case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1271       const GstStructure *structure = gst_event_get_structure (event);
1272       struct http_headers_collector c = { demux, NULL };
1273
1274       if (gst_structure_has_name (structure, "http-headers")) {
1275         if (gst_structure_has_field (structure, "request-headers")) {
1276           GstStructure *req_headers = NULL;
1277           gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1278               &req_headers, NULL);
1279           if (req_headers) {
1280             gst_structure_foreach (req_headers,
1281                 gst_adaptive_demux_handle_upstream_http_header, &c);
1282             gst_structure_free (req_headers);
1283           }
1284         }
1285         if (gst_structure_has_field (structure, "response-headers")) {
1286           GstStructure *res_headers = NULL;
1287           gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1288               &res_headers, NULL);
1289           if (res_headers) {
1290             gst_structure_foreach (res_headers,
1291                 gst_adaptive_demux_handle_upstream_http_header, &c);
1292             gst_structure_free (res_headers);
1293           }
1294         }
1295
1296         if (c.cookies)
1297           downloadhelper_set_cookies (demux->download_helper, c.cookies);
1298       }
1299       break;
1300     }
1301     default:
1302       break;
1303   }
1304
1305   return gst_pad_event_default (pad, parent, event);
1306 }
1307
1308 static GstFlowReturn
1309 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1310     GstBuffer * buffer)
1311 {
1312   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1313
1314   GST_MANIFEST_LOCK (demux);
1315
1316   gst_adapter_push (demux->priv->input_adapter, buffer);
1317
1318   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1319       (gint) gst_adapter_available (demux->priv->input_adapter));
1320
1321   GST_MANIFEST_UNLOCK (demux);
1322   return GST_FLOW_OK;
1323 }
1324
1325
1326 /* Called with TRACKS_LOCK taken */
1327 static void
1328 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1329 {
1330   GList *tmp;
1331
1332   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1333     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1334
1335     gst_adaptive_demux_track_flush (track);
1336     if (gst_pad_is_active (track->sinkpad)) {
1337       gst_pad_set_active (track->sinkpad, FALSE);
1338       gst_pad_set_active (track->sinkpad, TRUE);
1339     }
1340   }
1341 }
1342
1343 /* Resets all tracks to their initial state, ready to receive new data. */
1344 static void
1345 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1346 {
1347   TRACKS_LOCK (demux);
1348   g_queue_foreach (demux->priv->periods,
1349       (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1350   TRACKS_UNLOCK (demux);
1351 }
1352
1353 /* Subclasses will call this function to ensure that a new input period is
1354  * available to receive new streams and tracks */
1355 gboolean
1356 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1357 {
1358   if (demux->input_period && !demux->input_period->prepared) {
1359     GST_DEBUG_OBJECT (demux, "Using existing input period");
1360     return TRUE;
1361   }
1362
1363   if (demux->input_period) {
1364     GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1365     demux->input_period->has_next_period = TRUE;
1366   }
1367   GST_DEBUG_OBJECT (demux, "Setting up new period");
1368
1369   demux->input_period = gst_adaptive_demux_period_new (demux);
1370
1371   return TRUE;
1372 }
1373
1374 /* must be called with manifest_lock taken */
1375 static void
1376 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1377 {
1378   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1379   GList *iter;
1380
1381   gst_adaptive_demux_stop_tasks (demux, TRUE);
1382
1383   if (klass->reset)
1384     klass->reset (demux);
1385
1386   /* Disable and remove all outputs */
1387   GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1388   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1389     gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1390   }
1391   g_list_free (demux->priv->outputs);
1392   demux->priv->outputs = NULL;
1393
1394   g_queue_clear_full (demux->priv->periods,
1395       (GDestroyNotify) gst_adaptive_demux_period_unref);
1396
1397   /* The output period always has an extra ref taken on it */
1398   if (demux->output_period)
1399     gst_adaptive_demux_period_unref (demux->output_period);
1400   demux->output_period = NULL;
1401   /* The input period doesn't have an extra ref taken on it */
1402   demux->input_period = NULL;
1403
1404   gst_adaptive_demux_start_new_period (demux);
1405
1406   g_free (demux->manifest_uri);
1407   g_free (demux->manifest_base_uri);
1408   demux->manifest_uri = NULL;
1409   demux->manifest_base_uri = NULL;
1410
1411   gst_adapter_clear (demux->priv->input_adapter);
1412   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1413
1414   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1415   demux->instant_rate_multiplier = 1.0;
1416
1417   demux->priv->duration = GST_CLOCK_TIME_NONE;
1418
1419   demux->priv->percent = -1;
1420   demux->priv->is_buffering = TRUE;
1421
1422   demux->have_group_id = FALSE;
1423   demux->group_id = G_MAXUINT;
1424   demux->priv->segment_seqnum = gst_util_seqnum_next ();
1425
1426   demux->priv->global_output_position = 0;
1427
1428   demux->priv->n_audio_streams = 0;
1429   demux->priv->n_video_streams = 0;
1430   demux->priv->n_subtitle_streams = 0;
1431
1432   gst_flow_combiner_reset (demux->priv->flowcombiner);
1433 }
1434
1435 static gboolean
1436 gst_adaptive_demux_send_event (GstElement * element, GstEvent * event)
1437 {
1438   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1439   gboolean res = FALSE;
1440
1441   GST_DEBUG_OBJECT (demux, "Received event %" GST_PTR_FORMAT, event);
1442
1443   switch (GST_EVENT_TYPE (event)) {
1444     case GST_EVENT_SEEK:
1445     {
1446       res = gst_adaptive_demux_handle_seek_event (demux, event);
1447       break;
1448     }
1449     case GST_EVENT_SELECT_STREAMS:
1450     {
1451       res = gst_adaptive_demux_handle_select_streams_event (demux, event);
1452       break;
1453     }
1454     default:
1455       res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
1456       break;
1457   }
1458   return res;
1459 }
1460
1461 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1462 static GstAdaptiveDemux2Stream *
1463 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1464 {
1465   GList *iter;
1466
1467   /* We only look in the streams of the input period (i.e. with active streams) */
1468   for (iter = demux->input_period->streams; iter; iter = iter->next) {
1469     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1470     if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1471       return stream;
1472     }
1473   }
1474
1475   return NULL;
1476 }
1477
1478 static void
1479 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1480     GstMessage * msg)
1481 {
1482   GstAdaptiveDemux2Stream *stream;
1483   GstStreamCollection *collection = NULL;
1484   gboolean pending_tracks_activated = FALSE;
1485
1486   GST_MANIFEST_LOCK (demux);
1487
1488   stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1489   if (stream == NULL) {
1490     GST_WARNING_OBJECT (demux,
1491         "Failed to locate stream for collection message");
1492     goto beach;
1493   }
1494
1495   gst_message_parse_stream_collection (msg, &collection);
1496   if (!collection)
1497     goto beach;
1498
1499   TRACKS_LOCK (demux);
1500
1501   if (!gst_adaptive_demux2_stream_handle_collection (stream, collection,
1502           &pending_tracks_activated)) {
1503     TRACKS_UNLOCK (demux);
1504
1505     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1506         (_("Stream format can't be handled")),
1507         ("The streams provided by the multiplex are ambiguous"));
1508     goto beach;
1509   }
1510
1511   if (pending_tracks_activated) {
1512     /* If pending tracks were handled, then update the demuxer collection */
1513     if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1514         demux->input_period == demux->output_period) {
1515       gst_adaptive_demux_post_collection (demux);
1516     }
1517
1518     /* If we discovered pending tracks and we no longer have any, we can ensure
1519      * selected tracks are started */
1520     if (!gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1521       GList *iter = demux->input_period->streams;
1522       for (; iter; iter = iter->next) {
1523         GstAdaptiveDemux2Stream *new_stream = iter->data;
1524
1525         /* The stream that posted this collection was already started. If a
1526          * different stream is now selected, start it */
1527         if (stream != new_stream
1528             && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1529           gst_adaptive_demux2_stream_start (new_stream);
1530       }
1531     }
1532   }
1533   TRACKS_UNLOCK (demux);
1534
1535 beach:
1536   GST_MANIFEST_UNLOCK (demux);
1537
1538   if (collection)
1539     gst_object_unref (collection);
1540   gst_message_unref (msg);
1541   msg = NULL;
1542 }
1543
1544 static void
1545 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1546 {
1547   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1548
1549   switch (GST_MESSAGE_TYPE (msg)) {
1550     case GST_MESSAGE_STREAM_COLLECTION:
1551     {
1552       gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1553       return;
1554     }
1555     case GST_MESSAGE_ERROR:{
1556       GstAdaptiveDemux2Stream *stream = NULL;
1557       GError *err = NULL;
1558       gchar *debug = NULL;
1559       gchar *new_error = NULL;
1560       const GstStructure *details = NULL;
1561
1562       GST_MANIFEST_LOCK (demux);
1563
1564       stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1565       if (stream == NULL) {
1566         GST_WARNING_OBJECT (demux,
1567             "Failed to locate stream for errored element");
1568         GST_MANIFEST_UNLOCK (demux);
1569         break;
1570       }
1571
1572       gst_message_parse_error (msg, &err, &debug);
1573
1574       GST_WARNING_OBJECT (demux,
1575           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1576           err->message, debug);
1577
1578       if (debug)
1579         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1580       if (new_error) {
1581         g_free (err->message);
1582         err->message = new_error;
1583       }
1584
1585       gst_message_parse_error_details (msg, &details);
1586       if (details) {
1587         gst_structure_get_uint (details, "http-status-code",
1588             &stream->last_status_code);
1589       }
1590
1591       /* error, but ask to retry */
1592       if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1593         gst_adaptive_demux2_stream_parse_error (stream, err);
1594         GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1595       }
1596
1597       g_error_free (err);
1598       g_free (debug);
1599
1600       GST_MANIFEST_UNLOCK (demux);
1601
1602       gst_message_unref (msg);
1603       msg = NULL;
1604     }
1605       break;
1606     default:
1607       break;
1608   }
1609
1610   if (msg)
1611     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1612 }
1613
1614 /* must be called with manifest_lock taken */
1615 GstClockTime
1616 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1617 {
1618   GstAdaptiveDemuxClass *klass;
1619
1620   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1621
1622   if (klass->get_period_start_time == NULL)
1623     return 0;
1624
1625   return klass->get_period_start_time (demux);
1626 }
1627
1628 /* must be called with manifest_lock taken */
1629 static gboolean
1630 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1631     gboolean first_and_live)
1632 {
1633   GList *iter;
1634   GstClockTime period_start;
1635   GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1636   GList *new_streams;
1637
1638   g_return_val_if_fail (demux->input_period->streams, FALSE);
1639   g_assert (demux->input_period->prepared == FALSE);
1640
1641   new_streams = demux->input_period->streams;
1642
1643   if (!gst_adaptive_demux2_is_running (demux)) {
1644     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1645     return TRUE;
1646   }
1647
1648   GST_DEBUG_OBJECT (demux,
1649       "Preparing %d streams for period %d , first_and_live:%d",
1650       g_list_length (new_streams), demux->input_period->period_num,
1651       first_and_live);
1652
1653   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1654     GstAdaptiveDemux2Stream *stream = iter->data;
1655
1656     GST_DEBUG_OBJECT (stream, "Preparing stream");
1657
1658     stream->need_header = TRUE;
1659     stream->discont = TRUE;
1660
1661     /* Grab the first stream time for live streams
1662      * * If the stream is selected
1663      * * Or it provides dynamic tracks (in which case we need to force an update)
1664      */
1665     if (first_and_live
1666         && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1667             || stream->pending_tracks)) {
1668       /* TODO we only need the first timestamp, maybe create a simple function to
1669        * get the current PTS of a fragment ? */
1670       GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1671       gst_adaptive_demux2_stream_update_fragment_info (stream);
1672
1673       GST_DEBUG_OBJECT (stream,
1674           "Got stream time %" GST_STIME_FORMAT,
1675           GST_STIME_ARGS (stream->fragment.stream_time));
1676
1677       if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1678         min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1679       } else {
1680         min_stream_time = stream->fragment.stream_time;
1681       }
1682     }
1683   }
1684
1685   period_start = gst_adaptive_demux_get_period_start_time (demux);
1686
1687   /* For live streams, the subclass is supposed to seek to the current fragment
1688    * and then tell us its stream time in stream->fragment.stream_time.  We now
1689    * also have to seek our demuxer segment to reflect this.
1690    *
1691    * FIXME: This needs some refactoring at some point.
1692    */
1693   if (first_and_live) {
1694     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1695         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1696         GST_SEEK_TYPE_NONE, -1, NULL);
1697   }
1698
1699   GST_DEBUG_OBJECT (demux,
1700       "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1701       " demux segment %" GST_SEGMENT_FORMAT,
1702       GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1703       &demux->segment);
1704
1705   /* Synchronize stream start/current positions */
1706   if (min_stream_time == GST_CLOCK_STIME_NONE)
1707     min_stream_time = period_start;
1708   else
1709     min_stream_time += period_start;
1710   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1711     GstAdaptiveDemux2Stream *stream = iter->data;
1712     stream->start_position = stream->current_position = min_stream_time;
1713   }
1714
1715   for (iter = new_streams; iter; iter = g_list_next (iter)) {
1716     GstAdaptiveDemux2Stream *stream = iter->data;
1717     stream->compute_segment = TRUE;
1718     stream->first_and_live = first_and_live;
1719   }
1720   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1721   demux->input_period->prepared = TRUE;
1722
1723   return TRUE;
1724 }
1725
1726 static GstAdaptiveDemuxTrack *
1727 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1728 {
1729   GList *tmp;
1730
1731   for (tmp = period->tracks; tmp; tmp = tmp->next) {
1732     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1733     if (!g_strcmp0 (track->stream_id, stream_id))
1734       return track;
1735   }
1736
1737   return NULL;
1738 }
1739
1740 /* TRACKS_LOCK hold */
1741 void
1742 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1743 {
1744   GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1745   GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1746   GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1747   GList *tmp;
1748   gint min_percent = -1, percent;
1749   gboolean all_eos = TRUE;
1750
1751   /* Go over all active tracks of the output period and update level */
1752
1753   /* Check that all tracks are above their respective low thresholds (different
1754    * tracks may have different fragment durations yielding different buffering
1755    * percentages) Overall buffering percent is the lowest. */
1756   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1757     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1758
1759     GST_LOG_OBJECT (demux,
1760         "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1761         GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1762         track->stream_id, track->period_num, track->active, track->selected,
1763         track->eos, GST_TIME_ARGS (track->level_time),
1764         GST_TIME_ARGS (track->buffering_threshold));
1765
1766     if (track->active && track->selected) {
1767       if (!track->eos) {
1768         gint cur_percent;
1769
1770         all_eos = FALSE;
1771         if (min_level_time == GST_CLOCK_TIME_NONE) {
1772           min_level_time = track->level_time;
1773         } else if (track->level_time < min_level_time) {
1774           min_level_time = track->level_time;
1775         }
1776
1777         if (track->type & GST_STREAM_TYPE_VIDEO
1778             && video_level_time > track->level_time)
1779           video_level_time = track->level_time;
1780
1781         if (track->type & GST_STREAM_TYPE_AUDIO
1782             && audio_level_time > track->level_time)
1783           audio_level_time = track->level_time;
1784
1785         if (track->level_time != GST_CLOCK_TIME_NONE
1786             && track->buffering_threshold != 0) {
1787           cur_percent =
1788               gst_util_uint64_scale (track->level_time, 100,
1789               track->buffering_threshold);
1790           if (min_percent < 0 || cur_percent < min_percent)
1791             min_percent = cur_percent;
1792         }
1793       }
1794     }
1795   }
1796
1797   GST_DEBUG_OBJECT (demux,
1798       "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1799       GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1800
1801   /* Update demuxer video/audio level properties */
1802   GST_OBJECT_LOCK (demux);
1803   demux->current_level_time_video = video_level_time;
1804   demux->current_level_time_audio = audio_level_time;
1805   GST_OBJECT_UNLOCK (demux);
1806
1807   if (min_percent < 0 && !all_eos)
1808     return;
1809
1810   if (min_percent > 100 || all_eos)
1811     percent = 100;
1812   else
1813     percent = MAX (0, min_percent);
1814
1815   GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1816
1817   if (demux->priv->is_buffering) {
1818     if (percent >= 100)
1819       demux->priv->is_buffering = FALSE;
1820     if (demux->priv->percent != percent) {
1821       demux->priv->percent = percent;
1822       demux->priv->percent_changed = TRUE;
1823     }
1824   } else if (percent < 1) {
1825     demux->priv->is_buffering = TRUE;
1826     if (demux->priv->percent != percent) {
1827       demux->priv->percent = percent;
1828       demux->priv->percent_changed = TRUE;
1829     }
1830   }
1831
1832   if (demux->priv->percent_changed)
1833     GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1834         demux->priv->is_buffering);
1835 }
1836
1837 /* With TRACKS_LOCK held */
1838 void
1839 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1840 {
1841   gint percent;
1842   GstMessage *msg;
1843
1844   if (!demux->priv->percent_changed)
1845     return;
1846
1847   BUFFERING_LOCK (demux);
1848   percent = demux->priv->percent;
1849   msg = gst_message_new_buffering ((GstObject *) demux, percent);
1850   TRACKS_UNLOCK (demux);
1851   gst_element_post_message ((GstElement *) demux, msg);
1852
1853   BUFFERING_UNLOCK (demux);
1854   TRACKS_LOCK (demux);
1855   if (percent == demux->priv->percent)
1856     demux->priv->percent_changed = FALSE;
1857 }
1858
1859 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1860 GstAdaptiveDemux2Stream *
1861 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1862     GstAdaptiveDemuxTrack * track)
1863 {
1864   GList *iter;
1865
1866   for (iter = demux->output_period->streams; iter; iter = iter->next) {
1867     GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1868     if (g_list_find (stream->tracks, track))
1869       return stream;
1870   }
1871
1872   return NULL;
1873 }
1874
1875 /* Called from seek handler
1876  *
1877  * This function is used when a (flushing) seek caused a new period to be activated.
1878  *
1879  * This will ensure that:
1880  * * the current output period is marked as finished (EOS)
1881  * * Any potential intermediate (non-input/non-output) periods are removed
1882  * * That the new input period is prepared and ready
1883  */
1884 static void
1885 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1886 {
1887   GList *iter;
1888
1889   GST_DEBUG_OBJECT (demux,
1890       "Preparing new input period %u", demux->input_period->period_num);
1891
1892   /* Prepare the new input period */
1893   gst_adaptive_demux_update_collection (demux, demux->input_period);
1894
1895   /* Transfer the previous selection to the new input period */
1896   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
1897       demux->output_period);
1898   gst_adaptive_demux_prepare_streams (demux, FALSE);
1899
1900   /* Remove all periods except for the input (last) and output (first) period */
1901   while (demux->priv->periods->length > 2) {
1902     GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
1903     /* Mark all tracks of the removed period as not selected and EOS so they
1904      * will be skipped / ignored */
1905     for (iter = period->tracks; iter; iter = iter->next) {
1906       GstAdaptiveDemuxTrack *track = iter->data;
1907       track->selected = FALSE;
1908       track->eos = TRUE;
1909     }
1910     gst_adaptive_demux_period_unref (period);
1911   }
1912
1913   /* Mark all tracks of the output period as EOS so that the output loop
1914    * will immediately move to the new period */
1915   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
1916     GstAdaptiveDemuxTrack *track = iter->data;
1917     track->eos = TRUE;
1918   }
1919
1920   /* Go over all slots, and clear any pending track */
1921   for (iter = demux->priv->outputs; iter; iter = iter->next) {
1922     OutputSlot *slot = (OutputSlot *) iter->data;
1923
1924     if (slot->pending_track != NULL) {
1925       GST_DEBUG_OBJECT (demux,
1926           "Removing track '%s' as pending from output of current track '%s'",
1927           slot->pending_track->stream_id, slot->track->stream_id);
1928       gst_adaptive_demux_track_unref (slot->pending_track);
1929       slot->pending_track = NULL;
1930     }
1931   }
1932 }
1933
1934 /* must be called with manifest_lock taken */
1935 gboolean
1936 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1937     gint64 * range_start, gint64 * range_stop)
1938 {
1939   GstAdaptiveDemuxClass *klass;
1940
1941   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1942
1943   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1944
1945   return klass->get_live_seek_range (demux, range_start, range_stop);
1946 }
1947
1948 /* must be called with manifest_lock taken */
1949 gboolean
1950 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1951     GstAdaptiveDemux2Stream * stream)
1952 {
1953   gint64 range_start, range_stop;
1954   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1955     GST_LOG_OBJECT (stream,
1956         "stream position %" GST_TIME_FORMAT "  live seek range %"
1957         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1958         GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
1959         GST_STIME_ARGS (range_stop));
1960     return (stream->current_position >= range_start
1961         && stream->current_position <= range_stop);
1962   }
1963
1964   return FALSE;
1965 }
1966
1967 /* must be called with manifest_lock taken */
1968 static gboolean
1969 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1970 {
1971   GstAdaptiveDemuxClass *klass;
1972
1973   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1974   if (gst_adaptive_demux_is_live (demux)) {
1975     return klass->get_live_seek_range != NULL;
1976   }
1977
1978   return klass->seek != NULL;
1979 }
1980
1981 static void
1982 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
1983     GstSeekType start_type, GstSeekType stop_type)
1984 {
1985   GList *iter;
1986
1987   for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
1988     GstAdaptiveDemux2Stream *stream = iter->data;
1989
1990     /* Make sure the download loop clears and restarts on the next start,
1991      * which will recompute the stream segment */
1992     g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
1993         stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
1994     stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
1995     stream->start_position = 0;
1996
1997     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1998       stream->start_position = demux->segment.start;
1999     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
2000       stream->start_position = demux->segment.stop;
2001   }
2002 }
2003
2004 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
2005                               GST_SEEK_FLAG_SNAP_AFTER |          \
2006                               GST_SEEK_FLAG_SNAP_NEAREST |        \
2007                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
2008                               GST_SEEK_FLAG_KEY_UNIT))
2009 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
2010                               GST_SEEK_FLAG_SNAP_AFTER | \
2011                               GST_SEEK_FLAG_SNAP_NEAREST))
2012
2013 static gboolean
2014 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
2015     GstEvent * event)
2016 {
2017   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2018   gdouble rate;
2019   GstFormat format;
2020   GstSeekFlags flags;
2021   GstSeekType start_type, stop_type;
2022   gint64 start, stop;
2023   guint32 seqnum;
2024   gboolean update;
2025   gboolean ret = FALSE;
2026   GstSegment oldsegment;
2027   GstEvent *flush_event;
2028
2029   GST_INFO_OBJECT (demux, "Received seek event");
2030
2031   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2032       &stop_type, &stop);
2033
2034   if (format != GST_FORMAT_TIME) {
2035     GST_WARNING_OBJECT (demux,
2036         "Adaptive demuxers only support TIME-based seeking");
2037     gst_event_unref (event);
2038     return FALSE;
2039   }
2040
2041   if (flags & GST_SEEK_FLAG_SEGMENT) {
2042     GST_FIXME_OBJECT (demux, "Handle segment seeks");
2043     gst_event_unref (event);
2044     return FALSE;
2045   }
2046
2047   seqnum = gst_event_get_seqnum (event);
2048
2049   GST_API_LOCK (demux);
2050   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2051     GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2052     GST_API_UNLOCK (demux);
2053     return FALSE;
2054   }
2055
2056   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2057     /* For instant rate seeks, reply directly and update
2058      * our segment so the new rate is reflected in any future
2059      * fragments */
2060     GstEvent *ev;
2061     gdouble rate_multiplier;
2062
2063     /* instant rate change only supported if direction does not change. All
2064      * other requirements are already checked before creating the seek event
2065      * but let's double-check here to be sure */
2066     if ((demux->segment.rate > 0 && rate < 0) ||
2067         (demux->segment.rate < 0 && rate > 0) ||
2068         start_type != GST_SEEK_TYPE_NONE ||
2069         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2070       GST_ERROR_OBJECT (demux,
2071           "Instant rate change seeks only supported in the "
2072           "same direction, without flushing and position change");
2073       goto unlock_return;
2074     }
2075
2076     rate_multiplier = rate / demux->segment.rate;
2077
2078     ev = gst_event_new_instant_rate_change (rate_multiplier,
2079         (GstSegmentFlags) flags);
2080     gst_event_set_seqnum (ev, seqnum);
2081
2082     ret = gst_adaptive_demux_push_src_event (demux, ev);
2083
2084     if (ret) {
2085       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2086       demux->instant_rate_multiplier = rate_multiplier;
2087       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2088     }
2089     goto unlock_return;
2090   }
2091
2092   if (!gst_adaptive_demux_can_seek (demux))
2093     goto unlock_return;
2094
2095   /* We can only accept flushing seeks from this point onward */
2096   if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2097     GST_ERROR_OBJECT (demux,
2098         "Non-flushing non-instant-rate seeks are not possible");
2099     goto unlock_return;
2100   }
2101
2102   if (gst_adaptive_demux_is_live (demux)) {
2103     gint64 range_start, range_stop;
2104     gboolean changed = FALSE;
2105     gboolean start_valid = TRUE, stop_valid = TRUE;
2106
2107     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2108             &range_stop)) {
2109       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2110       goto unlock_return;
2111     }
2112
2113     GST_DEBUG_OBJECT (demux,
2114         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2115         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2116
2117     /* Handle relative positioning for live streams (relative to the range_stop) */
2118     if (start_type == GST_SEEK_TYPE_END) {
2119       start = range_stop + start;
2120       start_type = GST_SEEK_TYPE_SET;
2121       changed = TRUE;
2122     }
2123     if (stop_type == GST_SEEK_TYPE_END) {
2124       stop = range_stop + stop;
2125       stop_type = GST_SEEK_TYPE_SET;
2126       changed = TRUE;
2127     }
2128
2129     /* Adjust the requested start/stop position if it falls beyond the live
2130      * seek range.
2131      * The only case where we don't adjust is for the starting point of
2132      * an accurate seek (start if forward and stop if backwards)
2133      */
2134     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2135         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2136       GST_DEBUG_OBJECT (demux,
2137           "seek before live stream start, setting to range start: %"
2138           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2139       start = range_start;
2140       changed = TRUE;
2141     }
2142     /* truncate stop position also if set */
2143     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2144         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2145       GST_DEBUG_OBJECT (demux,
2146           "seek ending after live start, adjusting to: %"
2147           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2148       stop = range_stop;
2149       changed = TRUE;
2150     }
2151
2152     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2153         (start < range_start || start > range_stop)) {
2154       GST_WARNING_OBJECT (demux,
2155           "Seek to invalid position start:%" GST_STIME_FORMAT
2156           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2157           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2158           GST_STIME_ARGS (range_stop));
2159       start_valid = FALSE;
2160     }
2161     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2162         (stop < range_start || stop > range_stop)) {
2163       GST_WARNING_OBJECT (demux,
2164           "Seek to invalid position stop:%" GST_STIME_FORMAT
2165           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2166           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2167           GST_STIME_ARGS (range_stop));
2168       stop_valid = FALSE;
2169     }
2170
2171     /* If the seek position is still outside of the seekable range, refuse the seek */
2172     if (!start_valid || !stop_valid)
2173       goto unlock_return;
2174
2175     /* Re-create seek event with changed/updated values */
2176     if (changed) {
2177       gst_event_unref (event);
2178       event =
2179           gst_event_new_seek (rate, format, flags,
2180           start_type, start, stop_type, stop);
2181       gst_event_set_seqnum (event, seqnum);
2182     }
2183   }
2184
2185   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2186
2187   /* have a backup in case seek fails */
2188   gst_segment_copy_into (&demux->segment, &oldsegment);
2189
2190   GST_DEBUG_OBJECT (demux, "sending flush start");
2191   flush_event = gst_event_new_flush_start ();
2192   gst_event_set_seqnum (flush_event, seqnum);
2193
2194   gst_adaptive_demux_push_src_event (demux, flush_event);
2195
2196   gst_adaptive_demux_stop_tasks (demux, FALSE);
2197   gst_adaptive_demux_reset_tracks (demux);
2198
2199   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2200
2201   if (!IS_SNAP_SEEK (flags) && !(flags & GST_SEEK_FLAG_ACCURATE)) {
2202     /* If no accurate seeking was specified, we want to default to seeking to
2203      * the previous segment for efficient/fast playback. */
2204     flags |= GST_SEEK_FLAG_KEY_UNIT;
2205   }
2206
2207   if (IS_SNAP_SEEK (flags)) {
2208     GstAdaptiveDemux2Stream *default_stream = NULL;
2209     GstAdaptiveDemux2Stream *stream = NULL;
2210     GList *iter;
2211     /*
2212      * Handle snap seeks as follows:
2213      * 1) do the snap seeking a (random) active stream
2214      * 1.1) If none are active yet (early-seek), pick a random default one
2215      * 2) use the final position on this stream to seek
2216      *    on the other streams to the same position
2217      *
2218      * We can't snap at all streams at the same time as they might end in
2219      * different positions, so just pick one and align all others to that
2220      * position.
2221      */
2222
2223     /* Pick a random active stream on which to do the stream seek */
2224     for (iter = demux->output_period->streams; iter; iter = iter->next) {
2225       GstAdaptiveDemux2Stream *cand = iter->data;
2226       if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2227         stream = cand;
2228         break;
2229       }
2230       if (default_stream == NULL
2231           && gst_adaptive_demux2_stream_is_default_locked (cand))
2232         default_stream = cand;
2233     }
2234
2235     if (stream == NULL)
2236       stream = default_stream;
2237
2238     if (stream) {
2239       GstClockTimeDiff ts;
2240       GstSeekFlags stream_seek_flags = flags;
2241
2242       /* snap-seek on the chosen stream and then
2243        * use the resulting position to seek on all streams */
2244       if (rate >= 0) {
2245         if (start_type != GST_SEEK_TYPE_NONE)
2246           ts = start;
2247         else {
2248           ts = gst_segment_position_from_running_time (&demux->segment,
2249               GST_FORMAT_TIME, demux->priv->global_output_position);
2250           start_type = GST_SEEK_TYPE_SET;
2251         }
2252       } else {
2253         if (stop_type != GST_SEEK_TYPE_NONE)
2254           ts = stop;
2255         else {
2256           stop_type = GST_SEEK_TYPE_SET;
2257           ts = gst_segment_position_from_running_time (&demux->segment,
2258               GST_FORMAT_TIME, demux->priv->global_output_position);
2259         }
2260       }
2261
2262       if (gst_adaptive_demux2_stream_seek (stream, rate >= 0, stream_seek_flags,
2263               ts, &ts) != GST_FLOW_OK) {
2264         GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2265         goto unlock_return;
2266       }
2267       /* replace event with a new one without snapping to seek on all streams */
2268       gst_event_unref (event);
2269       if (rate >= 0) {
2270         start = ts;
2271       } else {
2272         stop = ts;
2273       }
2274       event =
2275           gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2276           start_type, start, stop_type, stop);
2277       GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2278     }
2279   }
2280
2281   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2282       start, stop_type, stop, &update);
2283
2284   if (ret) {
2285     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2286
2287     ret = demux_class->seek (demux, event);
2288   }
2289
2290   if (!ret) {
2291     /* Is there anything else we can do if it fails? */
2292     gst_segment_copy_into (&oldsegment, &demux->segment);
2293   } else {
2294     demux->priv->segment_seqnum = seqnum;
2295   }
2296   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2297
2298   /* Resetting flow combiner */
2299   gst_flow_combiner_reset (demux->priv->flowcombiner);
2300
2301   GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2302   flush_event = gst_event_new_flush_stop (TRUE);
2303   gst_event_set_seqnum (flush_event, seqnum);
2304   gst_adaptive_demux_push_src_event (demux, flush_event);
2305
2306   /* If the seek generated a new period, prepare it */
2307   if (!demux->input_period->prepared) {
2308     /* This can only happen on flushing seeks */
2309     g_assert (flags & GST_SEEK_FLAG_FLUSH);
2310     gst_adaptive_demux_seek_to_input_period (demux);
2311   }
2312
2313   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2314   GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2315       &demux->segment);
2316   gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2317   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2318
2319   /* Reset the global output position (running time) for when the output loop restarts */
2320   demux->priv->global_output_position = 0;
2321
2322   /* After a flushing seek, any instant-rate override is undone */
2323   demux->instant_rate_multiplier = 1.0;
2324
2325   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2326
2327   /* Restart the demux */
2328   gst_adaptive_demux_start_tasks (demux);
2329
2330 unlock_return:
2331   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2332   GST_API_UNLOCK (demux);
2333   gst_event_unref (event);
2334
2335   return ret;
2336 }
2337
2338 /* Returns TRUE if the stream has at least one selected track */
2339 static gboolean
2340 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2341     stream)
2342 {
2343   GList *tmp;
2344
2345   for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2346     GstAdaptiveDemuxTrack *track = tmp->data;
2347
2348     if (track->selected)
2349       return TRUE;
2350   }
2351
2352   return FALSE;
2353 }
2354
2355 static gboolean
2356 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2357     guint32 seqnum)
2358 {
2359   gboolean selection_handled = TRUE;
2360   GList *iter;
2361   GList *tracks = NULL;
2362
2363   if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2364     return FALSE;
2365
2366   TRACKS_LOCK (demux);
2367   /* We can't do stream selection if we are migrating between periods */
2368   if (demux->input_period && demux->output_period != demux->input_period) {
2369     GST_WARNING_OBJECT (demux,
2370         "Stream selection while migrating between periods is not possible");
2371     TRACKS_UNLOCK (demux);
2372     return FALSE;
2373   }
2374   /* Validate the streams and fill:
2375    * tracks : list of tracks corresponding to requested streams
2376    */
2377   for (iter = streams; iter; iter = iter->next) {
2378     gchar *stream_id = (gchar *) iter->data;
2379     GstAdaptiveDemuxTrack *track;
2380
2381     GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2382     track = find_track_for_stream_id (demux->output_period, stream_id);
2383     if (!track) {
2384       GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2385       selection_handled = FALSE;
2386       goto select_streams_done;
2387     }
2388     tracks = g_list_append (tracks, track);
2389     GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2390   }
2391
2392   /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2393    * SCHEDULING THREAD */
2394
2395   /* FIXME: We want to iterate all streams, mark them as deselected,
2396    * then iterate tracks and mark any streams that have at least 1
2397    * active output track, then loop over all streams again and start/stop
2398    * them as needed */
2399
2400   /* Go over all tracks present and (de)select based on current selection */
2401   for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2402     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2403
2404     if (track->selected && !g_list_find (tracks, track)) {
2405       GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2406           track->stream_id, track->active);
2407       track->selected = FALSE;
2408       track->draining = TRUE;
2409     } else if (!track->selected && g_list_find (tracks, track)) {
2410       GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2411
2412       track->selected = TRUE;
2413     }
2414   }
2415
2416   /* Start or stop streams based on the updated track selection */
2417   for (iter = demux->output_period->streams; iter; iter = iter->next) {
2418     GstAdaptiveDemux2Stream *stream = iter->data;
2419     GList *trackiter;
2420
2421     gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2422     gboolean should_be_running =
2423         gst_adaptive_demux2_stream_has_selected_tracks (stream);
2424
2425     if (!is_running && should_be_running) {
2426       GstClockTime output_running_ts = demux->priv->global_output_position;
2427       GstClockTime start_position;
2428
2429       /* Calculate where we should start the stream, and then
2430        * start it. */
2431       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2432
2433       GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2434           GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2435           GST_TIME_ARGS (output_running_ts), &demux->segment);
2436
2437       start_position =
2438           gst_segment_position_from_running_time (&demux->segment,
2439           GST_FORMAT_TIME, output_running_ts);
2440
2441       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2442
2443       GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2444           GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2445
2446       stream->current_position = stream->start_position = start_position;
2447       stream->compute_segment = TRUE;
2448
2449       /* If output has already begun, ensure we seek this segment
2450        * to the correct restart position when the download loop begins */
2451       if (output_running_ts != 0)
2452         stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2453
2454       /* Activate track pads for this stream */
2455       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2456         GstAdaptiveDemuxTrack *track =
2457             (GstAdaptiveDemuxTrack *) trackiter->data;
2458         gst_pad_set_active (track->sinkpad, TRUE);
2459       }
2460
2461       gst_adaptive_demux2_stream_start (stream);
2462     } else if (is_running && !should_be_running) {
2463       /* Stream should not be running and needs stopping */
2464       gst_adaptive_demux2_stream_stop (stream);
2465
2466       /* Set all track sinkpads to inactive for this stream */
2467       for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2468         GstAdaptiveDemuxTrack *track =
2469             (GstAdaptiveDemuxTrack *) trackiter->data;
2470         gst_pad_set_active (track->sinkpad, FALSE);
2471       }
2472     }
2473   }
2474
2475   g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2476
2477 select_streams_done:
2478   demux_update_buffering_locked (demux);
2479   demux_post_buffering_locked (demux);
2480
2481   TRACKS_UNLOCK (demux);
2482   GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2483
2484   if (tracks)
2485     g_list_free (tracks);
2486   return selection_handled;
2487 }
2488
2489 static gboolean
2490 gst_adaptive_demux_handle_select_streams_event (GstAdaptiveDemux * demux,
2491     GstEvent * event)
2492 {
2493   GList *streams;
2494   gboolean selection_handled;
2495
2496   if (GST_EVENT_SEQNUM (event) ==
2497       g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2498     GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2499         GST_EVENT_SEQNUM (event));
2500     return TRUE;
2501   }
2502
2503   gst_event_parse_select_streams (event, &streams);
2504   selection_handled =
2505       handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2506   g_list_free_full (streams, g_free);
2507
2508   gst_event_unref (event);
2509   return selection_handled;
2510 }
2511
2512 static gboolean
2513 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2514     GstEvent * event)
2515 {
2516   GstAdaptiveDemux *demux;
2517
2518   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2519
2520   switch (event->type) {
2521     case GST_EVENT_SEEK:
2522     {
2523       guint32 seqnum = gst_event_get_seqnum (event);
2524       if (seqnum == demux->priv->segment_seqnum) {
2525         GST_LOG_OBJECT (pad,
2526             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2527         gst_event_unref (event);
2528         return TRUE;
2529       }
2530       return gst_adaptive_demux_handle_seek_event (demux, event);
2531     }
2532     case GST_EVENT_LATENCY:{
2533       /* Upstream and our internal source are irrelevant
2534        * for latency, and we should not fail here to
2535        * configure the latency */
2536       gst_event_unref (event);
2537       return TRUE;
2538     }
2539     case GST_EVENT_QOS:{
2540       GstClockTimeDiff diff;
2541       GstClockTime timestamp;
2542       GstClockTime earliest_time;
2543
2544       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
2545       /* Only take into account lateness if late */
2546       if (diff > 0)
2547         earliest_time = timestamp + 2 * diff;
2548       else
2549         earliest_time = timestamp;
2550
2551       GST_OBJECT_LOCK (demux);
2552       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2553           earliest_time > demux->priv->qos_earliest_time) {
2554         demux->priv->qos_earliest_time = earliest_time;
2555         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2556             GST_TIME_ARGS (demux->priv->qos_earliest_time));
2557       }
2558       GST_OBJECT_UNLOCK (demux);
2559       break;
2560     }
2561     case GST_EVENT_SELECT_STREAMS:
2562     {
2563       return gst_adaptive_demux_handle_select_streams_event (demux, event);
2564     }
2565     default:
2566       break;
2567   }
2568
2569   return gst_pad_event_default (pad, parent, event);
2570 }
2571
2572 static gboolean
2573 gst_adaptive_demux_handle_query_seeking (GstAdaptiveDemux * demux,
2574     GstQuery * query)
2575 {
2576   GstFormat fmt = GST_FORMAT_UNDEFINED;
2577   gint64 stop = -1;
2578   gint64 start = 0;
2579   gboolean ret = FALSE;
2580
2581   if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2582     GST_INFO_OBJECT (demux,
2583         "Don't have manifest yet, can't answer seeking query");
2584     return FALSE;               /* can't answer without manifest */
2585   }
2586
2587   GST_MANIFEST_LOCK (demux);
2588
2589   gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2590   GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2591   if (fmt == GST_FORMAT_TIME) {
2592     GstClockTime duration;
2593     gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2594
2595     ret = TRUE;
2596     if (can_seek) {
2597       if (gst_adaptive_demux_is_live (demux)) {
2598         ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2599
2600         if (!ret) {
2601           GST_MANIFEST_UNLOCK (demux);
2602           GST_INFO_OBJECT (demux, "can't answer seeking query");
2603           return FALSE;
2604         }
2605       } else {
2606         duration = demux->priv->duration;
2607         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2608           stop = duration;
2609       }
2610     }
2611     gst_query_set_seeking (query, fmt, can_seek, start, stop);
2612     GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2613         GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2614         GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2615   }
2616   GST_MANIFEST_UNLOCK (demux);
2617   return ret;
2618 }
2619
2620 static gboolean
2621 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2622     GstQuery * query)
2623 {
2624   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2625   gboolean ret = FALSE;
2626
2627   if (query == NULL)
2628     return FALSE;
2629
2630   switch (query->type) {
2631     case GST_QUERY_DURATION:{
2632       GstFormat fmt;
2633       GstClockTime duration = GST_CLOCK_TIME_NONE;
2634
2635       gst_query_parse_duration (query, &fmt, NULL);
2636
2637       if (gst_adaptive_demux_is_live (demux)) {
2638         /* We are able to answer this query: the duration is unknown */
2639         gst_query_set_duration (query, fmt, -1);
2640         ret = TRUE;
2641         break;
2642       }
2643
2644       if (fmt == GST_FORMAT_TIME
2645           && g_atomic_int_get (&demux->priv->have_manifest)) {
2646
2647         GST_MANIFEST_LOCK (demux);
2648         duration = demux->priv->duration;
2649         GST_MANIFEST_UNLOCK (demux);
2650
2651         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2652           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2653           ret = TRUE;
2654         }
2655       }
2656
2657       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2658           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2659       break;
2660     }
2661     case GST_QUERY_LATENCY:{
2662       gst_query_set_latency (query, FALSE, 0, -1);
2663       ret = TRUE;
2664       break;
2665     }
2666     case GST_QUERY_SEEKING:
2667       ret = gst_adaptive_demux_handle_query_seeking (demux, query);
2668       break;
2669     case GST_QUERY_URI:
2670
2671       GST_MANIFEST_LOCK (demux);
2672
2673       /* TODO HLS can answer this differently it seems */
2674       if (demux->manifest_uri) {
2675         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2676          * playlist or the the uri of the last downlowaded fragment? */
2677         gst_query_set_uri (query, demux->manifest_uri);
2678         ret = TRUE;
2679       }
2680
2681       GST_MANIFEST_UNLOCK (demux);
2682       break;
2683     case GST_QUERY_SELECTABLE:
2684       gst_query_set_selectable (query, TRUE);
2685       ret = TRUE;
2686       break;
2687     default:
2688       /* Don't forward queries upstream because of the special nature of this
2689        *  "demuxer", which relies on the upstream element only to be fed
2690        *  the Manifest
2691        */
2692       break;
2693   }
2694
2695   return ret;
2696 }
2697
2698 static gboolean
2699 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
2700 {
2701   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
2702
2703   GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
2704
2705   switch (GST_QUERY_TYPE (query)) {
2706     case GST_QUERY_BUFFERING:
2707     {
2708       GstFormat format;
2709       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
2710
2711       if (!demux->output_period) {
2712         if (format != GST_FORMAT_TIME) {
2713           GST_DEBUG_OBJECT (demux,
2714               "No period setup yet, can't answer non-TIME buffering queries");
2715           return FALSE;
2716         }
2717
2718         GST_DEBUG_OBJECT (demux,
2719             "No period setup yet, but still answering buffering query");
2720         return TRUE;
2721       }
2722     }
2723     case GST_QUERY_SEEKING:
2724     {
2725       /* Source pads might not be present early on which would cause the default
2726        * element query handler to fail, yet we can answer this query */
2727       return gst_adaptive_demux_handle_query_seeking (demux, query);
2728     }
2729     default:
2730       break;
2731   }
2732
2733   return GST_ELEMENT_CLASS (parent_class)->query (element, query);
2734 }
2735
2736 gboolean
2737 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2738 {
2739   GstEvent *seek;
2740
2741   GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2742
2743   seek =
2744       gst_event_new_seek (1.0, GST_FORMAT_TIME,
2745       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2746       GST_SEEK_TYPE_NONE, 0);
2747   gst_adaptive_demux_handle_seek_event (demux, seek);
2748   return FALSE;
2749 }
2750
2751
2752 /* Called when the scheduler starts, to kick off manifest updates
2753  * and stream downloads */
2754 static gboolean
2755 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2756 {
2757   GList *iter;
2758
2759   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2760
2761   iter = demux->input_period->streams;
2762
2763   for (; iter; iter = g_list_next (iter)) {
2764     GstAdaptiveDemux2Stream *stream = iter->data;
2765
2766     /* If we need to process this stream to discover tracks *OR* it has any
2767      * tracks which are selected, start it now */
2768     if ((stream->pending_tracks == TRUE)
2769         || gst_adaptive_demux2_stream_is_selected_locked (stream))
2770       gst_adaptive_demux2_stream_start (stream);
2771   }
2772
2773   return FALSE;
2774 }
2775
2776 /* must be called with manifest_lock taken */
2777 static void
2778 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2779 {
2780   if (!gst_adaptive_demux2_is_running (demux)) {
2781     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2782     return;
2783   }
2784
2785   GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2786   gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2787       (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2788
2789   TRACKS_LOCK (demux);
2790   demux->priv->flushing = FALSE;
2791   GST_DEBUG_OBJECT (demux, "Starting the output task");
2792   gst_task_start (demux->priv->output_task);
2793   TRACKS_UNLOCK (demux);
2794 }
2795
2796 /* must be called with manifest_lock taken */
2797 static void
2798 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2799 {
2800   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2801   if (demux->priv->manifest_updates_cb != 0) {
2802     gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2803         demux->priv->manifest_updates_cb);
2804     demux->priv->manifest_updates_cb = 0;
2805   }
2806 }
2807
2808 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2809
2810 /* must be called with manifest_lock taken */
2811 static void
2812 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2813 {
2814   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2815
2816   if (gst_adaptive_demux_is_live (demux)) {
2817     /* Task to periodically update the manifest */
2818     if (demux_class->requires_periodical_playlist_update (demux)) {
2819       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2820       if (demux->priv->manifest_updates_cb == 0) {
2821         demux->priv->manifest_updates_cb =
2822             gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2823             (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2824       }
2825     }
2826   }
2827 }
2828
2829 /* must be called with manifest_lock taken
2830  * This function will temporarily release manifest_lock in order to join the
2831  * download threads.
2832  * The api_lock will still protect it against other threads trying to modify
2833  * the demux element.
2834  */
2835 static void
2836 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2837 {
2838   GST_LOG_OBJECT (demux, "Stopping tasks");
2839
2840   if (stop_updates)
2841     gst_adaptive_demux_stop_manifest_update_task (demux);
2842
2843   TRACKS_LOCK (demux);
2844   if (demux->input_period)
2845     gst_adaptive_demux_period_stop_tasks (demux->input_period);
2846
2847   demux->priv->flushing = TRUE;
2848   g_cond_signal (&demux->priv->tracks_add);
2849   gst_task_stop (demux->priv->output_task);
2850   TRACKS_UNLOCK (demux);
2851
2852   gst_task_join (demux->priv->output_task);
2853
2854   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2855 }
2856
2857 /* must be called with manifest_lock taken */
2858 static gboolean
2859 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2860 {
2861   GList *iter;
2862   gboolean ret = TRUE;
2863
2864   GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2865
2866   TRACKS_LOCK (demux);
2867   for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2868     OutputSlot *slot = (OutputSlot *) iter->data;
2869     gst_event_ref (event);
2870     GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2871     ret = ret & gst_pad_push_event (slot->pad, event);
2872     if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2873       slot->pushed_timed_data = FALSE;
2874   }
2875   TRACKS_UNLOCK (demux);
2876   gst_event_unref (event);
2877   return ret;
2878 }
2879
2880 /* must be called with manifest_lock taken */
2881 void
2882 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2883     GstCaps * caps)
2884 {
2885   GST_DEBUG_OBJECT (stream,
2886       "setting new caps for stream %" GST_PTR_FORMAT, caps);
2887   gst_caps_replace (&stream->pending_caps, caps);
2888   gst_caps_unref (caps);
2889 }
2890
2891 /* must be called with manifest_lock taken */
2892 void
2893 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2894     GstTagList * tags)
2895 {
2896   GST_DEBUG_OBJECT (stream,
2897       "setting new tags for stream %" GST_PTR_FORMAT, tags);
2898   gst_clear_tag_list (&stream->pending_tags);
2899   stream->pending_tags = tags;
2900 }
2901
2902 /* must be called with manifest_lock taken */
2903 void
2904 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2905     GstEvent * event)
2906 {
2907   stream->pending_events = g_list_append (stream->pending_events, event);
2908 }
2909
2910 static gboolean
2911 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2912     * demux)
2913 {
2914   return TRUE;
2915 }
2916
2917 /* Called when a stream needs waking after the manifest is updated */
2918 void
2919 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
2920 {
2921   demux->priv->stream_waiting_for_manifest = TRUE;
2922 }
2923
2924 static gboolean
2925 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
2926 {
2927   GstFlowReturn ret = GST_FLOW_OK;
2928   gboolean schedule_again = TRUE;
2929
2930   GST_MANIFEST_LOCK (demux);
2931   demux->priv->manifest_updates_cb = 0;
2932
2933   /* Updating playlist only needed for live playlists */
2934   if (!gst_adaptive_demux_is_live (demux)) {
2935     GST_MANIFEST_UNLOCK (demux);
2936     return G_SOURCE_REMOVE;
2937   }
2938
2939   GST_DEBUG_OBJECT (demux, "Updating playlist");
2940   ret = gst_adaptive_demux_update_manifest (demux);
2941
2942   if (ret == GST_FLOW_EOS) {
2943     GST_MANIFEST_UNLOCK (demux);
2944     return G_SOURCE_REMOVE;
2945   }
2946
2947   if (ret == GST_FLOW_OK) {
2948     GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
2949     demux->priv->update_failed_count = 0;
2950
2951     /* Wake up download tasks */
2952     if (demux->priv->stream_waiting_for_manifest) {
2953       GList *iter;
2954
2955       for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2956         GstAdaptiveDemux2Stream *stream = iter->data;
2957         gst_adaptive_demux2_stream_on_manifest_update (stream);
2958       }
2959       demux->priv->stream_waiting_for_manifest = FALSE;
2960     }
2961   } else {
2962     demux->priv->update_failed_count++;
2963
2964     if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
2965       GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
2966           gst_flow_get_name (ret));
2967     } else {
2968       GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2969           (_("Internal data stream error.")), ("Could not update playlist"));
2970       GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
2971       schedule_again = FALSE;
2972     }
2973   }
2974
2975   if (ret == GST_ADAPTIVE_DEMUX_FLOW_LOST_SYNC)
2976     gst_adaptive_demux_handle_lost_sync (demux);
2977
2978   if (schedule_again) {
2979     GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2980
2981     demux->priv->manifest_updates_cb =
2982         gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
2983         klass->get_manifest_update_interval (demux) * GST_USECOND,
2984         (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
2985   }
2986
2987   GST_MANIFEST_UNLOCK (demux);
2988
2989   return G_SOURCE_REMOVE;
2990 }
2991
2992 static gboolean
2993 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
2994 {
2995   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2996
2997   /* Loop for updating of the playlist. This periodically checks if
2998    * the playlist is updated and does so, then signals the streaming
2999    * thread in case it can continue downloading now. */
3000
3001   /* block until the next scheduled update or the signal to quit this thread */
3002   GST_DEBUG_OBJECT (demux, "Started updates task");
3003   demux->priv->manifest_updates_cb =
3004       gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3005       klass->get_manifest_update_interval (demux) * GST_USECOND,
3006       (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3007
3008   return G_SOURCE_REMOVE;
3009 }
3010
3011 static OutputSlot *
3012 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
3013     GstAdaptiveDemuxTrack * track)
3014 {
3015   GList *tmp;
3016
3017   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3018     OutputSlot *slot = (OutputSlot *) tmp->data;
3019     /* Incompatible output type */
3020     if (slot->type != track->type)
3021       continue;
3022
3023     /* Slot which is already assigned to this pending track */
3024     if (slot->pending_track == track)
3025       return slot;
3026
3027     /* slot already used for another pending track */
3028     if (slot->pending_track != NULL)
3029       continue;
3030
3031     /* Current output track is of the same type and is draining */
3032     if (slot->track && slot->track->draining)
3033       return slot;
3034   }
3035
3036   return NULL;
3037 }
3038
3039 /* TRACKS_LOCK taken */
3040 static OutputSlot *
3041 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3042 {
3043   GList *tmp;
3044
3045   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3046     OutputSlot *slot = (OutputSlot *) tmp->data;
3047
3048     if (slot->track == track)
3049       return slot;
3050   }
3051
3052   return NULL;
3053 }
3054
3055 /* TRACKS_LOCK held */
3056 static GstMessage *
3057 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3058 {
3059   GList *tmp;
3060   GstMessage *msg;
3061
3062   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3063     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3064
3065     if (track->selected && !track->active)
3066       return NULL;
3067   }
3068
3069   /* All selected tracks are active, created message */
3070   msg =
3071       gst_message_new_streams_selected (GST_OBJECT (demux),
3072       demux->output_period->collection);
3073   GST_MESSAGE_SEQNUM (msg) = seqnum;
3074   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3075     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3076     if (track->active) {
3077       gst_message_streams_selected_add (msg, track->stream_object);
3078     }
3079   }
3080
3081   return msg;
3082 }
3083
3084 static void
3085 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3086     OutputSlot * slot)
3087 {
3088   GstAdaptiveDemuxTrack *track = slot->track;
3089   GstEvent *event;
3090
3091   /* Send EVENT_STREAM_START */
3092   event = gst_event_new_stream_start (track->stream_id);
3093   if (demux->have_group_id)
3094     gst_event_set_group_id (event, demux->group_id);
3095   gst_event_set_stream_flags (event, track->flags);
3096   gst_event_set_stream (event, track->stream_object);
3097   GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3098       track->stream_id);
3099   gst_pad_push_event (slot->pad, event);
3100
3101   /* Send EVENT_STREAM_COLLECTION */
3102   event = gst_event_new_stream_collection (demux->output_period->collection);
3103   GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3104       track->stream_id);
3105   gst_pad_push_event (slot->pad, event);
3106
3107   /* Mark all sticky events for re-sending */
3108   gst_event_store_mark_all_undelivered (&track->sticky_events);
3109 }
3110
3111 /*
3112  * Called with TRACKS_LOCK taken
3113  */
3114 static void
3115 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3116 {
3117   GList *tmp;
3118   guint requested_selection_seqnum;
3119   GstMessage *msg;
3120
3121   /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3122      output slots vs active/draining tracks */
3123   requested_selection_seqnum =
3124       g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3125
3126   if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3127     return;
3128
3129   GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3130
3131   /* Go over all slots, and if they have a pending track that's no longer
3132    * selected, clear it so the slot can be reused */
3133   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3134     OutputSlot *slot = (OutputSlot *) tmp->data;
3135
3136     if (slot->pending_track != NULL && !slot->pending_track->selected) {
3137       GST_DEBUG_OBJECT (demux,
3138           "Removing deselected track '%s' as pending from output of current track '%s'",
3139           slot->pending_track->stream_id, slot->track->stream_id);
3140       gst_adaptive_demux_track_unref (slot->pending_track);
3141       slot->pending_track = NULL;
3142     }
3143   }
3144
3145   /* Go over all tracks and create/re-assign/remove slots */
3146   for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3147     GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3148
3149     if (track->selected) {
3150       OutputSlot *slot = find_slot_for_track (demux, track);
3151
3152       /* 0. Track is selected and has a slot. Nothing to do */
3153       if (slot) {
3154         GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3155             track->stream_id);
3156         continue;
3157       }
3158
3159       slot = find_replacement_slot_for_track (demux, track);
3160       if (slot) {
3161         /* 1. There is an existing slot of the same type which is currently
3162          *    draining, assign this track as a replacement for it */
3163         g_assert (slot->pending_track == NULL || slot->pending_track == track);
3164         if (slot->pending_track == NULL) {
3165           slot->pending_track = gst_adaptive_demux_track_ref (track);
3166           GST_DEBUG_OBJECT (demux,
3167               "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3168               track->stream_id, track->period_num,
3169               slot->track->stream_id, slot->track->period_num);
3170         }
3171       } else {
3172         /* 2. There is no compatible replacement slot, create a new one */
3173         slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3174         GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3175             track->stream_id);
3176         demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3177
3178         track->update_next_segment = TRUE;
3179
3180         slot->track = gst_adaptive_demux_track_ref (track);
3181         track->active = TRUE;
3182         gst_adaptive_demux_send_initial_events (demux, slot);
3183       }
3184
3185       /* If we were draining this track, we no longer are */
3186       track->draining = FALSE;
3187     }
3188   }
3189
3190   /* Finally check all slots have a current/pending track. If not remove it */
3191   for (tmp = demux->priv->outputs; tmp;) {
3192     OutputSlot *slot = (OutputSlot *) tmp->data;
3193     /* We should never has slots without target tracks */
3194     g_assert (slot->track);
3195     if (slot->track->draining && !slot->pending_track) {
3196       GstAdaptiveDemux2Stream *stream;
3197
3198       GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3199           slot->track->stream_id);
3200       slot->track->active = FALSE;
3201
3202       /* If the stream feeding this track is stopped, flush and clear
3203        * the track now that it's going inactive. If the stream was not
3204        * found, it means we advanced past that period already (and the
3205        * stream was stopped and discarded) */
3206       stream = find_stream_for_track_locked (demux, slot->track);
3207       if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3208         gst_adaptive_demux_track_flush (slot->track);
3209
3210       tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3211       gst_adaptive_demux_output_slot_free (demux, slot);
3212     } else
3213       tmp = tmp->next;
3214   }
3215
3216   demux->priv->current_selection_seqnum = requested_selection_seqnum;
3217   msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3218   if (msg) {
3219     TRACKS_UNLOCK (demux);
3220     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3221     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3222     TRACKS_LOCK (demux);
3223   }
3224 }
3225
3226 /* TRACKS_LOCK held */
3227 static gboolean
3228 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3229 {
3230   GList *iter;
3231   GstAdaptiveDemuxPeriod *previous_period;
3232   GstStreamCollection *collection;
3233
3234   /* Grab the next period, should be demux->periods->next->data */
3235   previous_period = g_queue_pop_head (demux->priv->periods);
3236
3237   /* Remove ref held by demux->output_period */
3238   gst_adaptive_demux_period_unref (previous_period);
3239   demux->output_period =
3240       gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3241
3242   GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3243       demux->output_period->period_num);
3244
3245   /* We can now post the collection of the new period */
3246   collection = demux->output_period->collection;
3247   TRACKS_UNLOCK (demux);
3248   gst_element_post_message (GST_ELEMENT_CAST (demux),
3249       gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3250   TRACKS_LOCK (demux);
3251
3252   /* Unselect all tracks of the previous period */
3253   for (iter = previous_period->tracks; iter; iter = iter->next) {
3254     GstAdaptiveDemuxTrack *track = iter->data;
3255     if (track->selected) {
3256       track->selected = FALSE;
3257       track->draining = TRUE;
3258     }
3259   }
3260
3261   /* Force a selection re-check */
3262   g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3263   check_and_handle_selection_update_locked (demux);
3264
3265   /* Remove the final ref on the previous period now that we have done the switch */
3266   gst_adaptive_demux_period_unref (previous_period);
3267
3268   return TRUE;
3269 }
3270
3271 /* Called with TRACKS_LOCK taken */
3272 static void
3273 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3274     OutputSlot * slot)
3275 {
3276   GstAdaptiveDemuxTrack *track = slot->track;
3277   GstMessage *msg;
3278   gboolean pending_is_ready;
3279   GstAdaptiveDemux2Stream *stream;
3280
3281   /* If we have a pending track for this slot, the current track should be
3282    * draining and no longer selected */
3283   g_assert (track->draining && !track->selected);
3284
3285   /* If we're draining, check if the pending track has enough data *or* that
3286      we've already drained out entirely */
3287   pending_is_ready =
3288       (slot->pending_track->level_time >=
3289       slot->pending_track->buffering_threshold);
3290   pending_is_ready |= slot->pending_track->eos;
3291
3292   if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3293     GST_DEBUG_OBJECT (demux,
3294         "Replacement track '%s' doesn't have enough data for switching yet",
3295         slot->pending_track->stream_id);
3296     return;
3297   }
3298
3299   GST_DEBUG_OBJECT (demux,
3300       "Pending replacement track has enough data, switching");
3301   track->active = FALSE;
3302   track->draining = FALSE;
3303
3304   /* If the stream feeding this track is stopped, flush and clear
3305    * the track now that it's going inactive. If the stream was not
3306    * found, it means we advanced past that period already (and the
3307    * stream was stopped and discarded) */
3308   stream = find_stream_for_track_locked (demux, track);
3309   if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3310     gst_adaptive_demux_track_flush (track);
3311
3312   gst_adaptive_demux_track_unref (track);
3313   /* We steal the reference of pending_track */
3314   track = slot->track = slot->pending_track;
3315   slot->pending_track = NULL;
3316   slot->track->active = TRUE;
3317
3318   /* Make sure the track segment will start at the current position */
3319   track->update_next_segment = TRUE;
3320
3321   /* Send stream start and collection, and schedule sticky events */
3322   gst_adaptive_demux_send_initial_events (demux, slot);
3323
3324   /* Can we emit the streams-selected message now ? */
3325   msg =
3326       all_selected_tracks_are_active (demux,
3327       g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3328   if (msg) {
3329     TRACKS_UNLOCK (demux);
3330     GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3331     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3332     TRACKS_LOCK (demux);
3333   }
3334
3335 }
3336
3337 static void
3338 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3339 {
3340   GList *tmp;
3341   GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3342   gboolean wait_for_data = FALSE;
3343   gboolean all_tracks_empty;
3344   GstFlowReturn ret;
3345
3346   GST_DEBUG_OBJECT (demux, "enter");
3347
3348   TRACKS_LOCK (demux);
3349
3350   /* Check if stopping */
3351   if (demux->priv->flushing) {
3352     ret = GST_FLOW_FLUSHING;
3353     goto pause;
3354   }
3355
3356   /* If the selection changed, handle it */
3357   check_and_handle_selection_update_locked (demux);
3358
3359 restart:
3360   ret = GST_FLOW_OK;
3361   global_output_position = GST_CLOCK_STIME_NONE;
3362   all_tracks_empty = TRUE;
3363
3364   if (wait_for_data) {
3365     GST_DEBUG_OBJECT (demux, "Waiting for data");
3366     g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3367     GST_DEBUG_OBJECT (demux, "Done waiting for data");
3368     if (demux->priv->flushing) {
3369       ret = GST_FLOW_FLUSHING;
3370       goto pause;
3371     }
3372     wait_for_data = FALSE;
3373   }
3374
3375   /* Grab/Recalculate current global output position
3376    * This is the minimum pending output position of all tracks used for output
3377    *
3378    * If there is a track which is empty and not EOS, wait for it to receive data
3379    * then recalculate global output position.
3380    *
3381    * This also pushes downstream all non-timed data that might be present.
3382    *
3383    * IF all tracks are EOS : stop task
3384    */
3385   GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3386   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3387     OutputSlot *slot = (OutputSlot *) tmp->data;
3388     GstAdaptiveDemuxTrack *track;
3389
3390     /* If there is a pending track, Check if it's time to switch to it */
3391     if (slot->pending_track)
3392       handle_slot_pending_track_switch_locked (demux, slot);
3393
3394     track = slot->track;
3395
3396     if (!track->active) {
3397       /* Note: Edward: I can't see in what cases we would end up with inactive
3398          tracks assigned to slots. */
3399       GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3400       g_assert (track->active);
3401       continue;
3402     }
3403
3404     if (track->next_position == GST_CLOCK_STIME_NONE) {
3405       gst_adaptive_demux_track_update_next_position (track);
3406     }
3407
3408     GST_TRACE_OBJECT (demux,
3409         "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3410         track->stream_id, track->period_num,
3411         GST_STIME_ARGS (track->next_position));
3412
3413     if (track->next_position != GST_CLOCK_STIME_NONE) {
3414       if (global_output_position == GST_CLOCK_STIME_NONE)
3415         global_output_position = track->next_position;
3416       else
3417         global_output_position =
3418             MIN (global_output_position, track->next_position);
3419       track->waiting_add = FALSE;
3420       all_tracks_empty = FALSE;
3421     } else if (!track->eos) {
3422       GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3423           track->stream_id, track->period_num);
3424       all_tracks_empty = FALSE;
3425       wait_for_data = track->waiting_add = TRUE;
3426     } else {
3427       GST_DEBUG_OBJECT (demux,
3428           "Track %s (period %u) is EOS, not waiting for timed data",
3429           track->stream_id, track->period_num);
3430
3431       if (gst_queue_array_get_length (track->queue) > 0) {
3432         all_tracks_empty = FALSE;
3433       }
3434     }
3435   }
3436
3437   if (wait_for_data)
3438     goto restart;
3439
3440   if (all_tracks_empty && demux->output_period->has_next_period) {
3441     GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3442         demux->output_period->period_num);
3443     if (!gst_adaptive_demux_advance_output_period (demux)) {
3444       /* Failed to move to next period, error out */
3445       ret = GST_FLOW_ERROR;
3446       goto pause;
3447     }
3448     /* Restart the loop */
3449     goto restart;
3450   }
3451
3452   GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3453       GST_STIME_ARGS (global_output_position));
3454
3455   /* For each track:
3456    *
3457    * We know all active tracks have pending timed data
3458    * * while track next_position <= global output position
3459    *   * push pending data
3460    *   * Update track next_position
3461    *     * recalculate global output position
3462    *   * Pop next pending data from track and update pending position
3463    *
3464    */
3465   gboolean need_restart = FALSE;
3466
3467   for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3468     OutputSlot *slot = (OutputSlot *) tmp->data;
3469     GstAdaptiveDemuxTrack *track = slot->track;
3470
3471     GST_LOG_OBJECT (track->element,
3472         "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3473         " global_output_position:%" GST_STIME_FORMAT, track->active,
3474         track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3475         GST_STIME_ARGS (global_output_position));
3476
3477     if (!track->active)
3478       continue;
3479
3480     while (global_output_position == GST_CLOCK_STIME_NONE
3481         || !slot->pushed_timed_data
3482         || ((track->next_position != GST_CLOCK_STIME_NONE)
3483             && track->next_position <= global_output_position)
3484         || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3485       GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3486
3487       if (!mo) {
3488         GST_DEBUG_OBJECT (demux,
3489             "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3490             track->stream_id, track->period_num, track->eos,
3491             slot->pushed_timed_data);
3492         /* This should only happen if the track is EOS, or exactly in between
3493          * the parser outputting segment/caps before buffers. */
3494         g_assert (track->eos || !slot->pushed_timed_data);
3495
3496         /* If we drained the track, but there's a pending track on the slot
3497          * loop again to activate it */
3498         if (slot->pending_track) {
3499           GST_DEBUG_OBJECT (demux,
3500               "Track '%s' (period %u) drained, but has a pending track to activate",
3501               track->stream_id, track->period_num);
3502           goto restart;
3503         }
3504         break;
3505       }
3506
3507       demux_update_buffering_locked (demux);
3508       demux_post_buffering_locked (demux);
3509       TRACKS_UNLOCK (demux);
3510
3511       GST_DEBUG_OBJECT (demux,
3512           "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3513           track->period_num, mo);
3514
3515       if (GST_IS_EVENT (mo)) {
3516         GstEvent *event = (GstEvent *) mo;
3517         if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3518           slot->pushed_timed_data = TRUE;
3519         } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3520           /* If there is a pending next period, don't send the EOS */
3521           if (demux->output_period->has_next_period) {
3522             GST_LOG_OBJECT (demux,
3523                 "Dropping EOS on track '%s' (period %u) before next period",
3524                 track->stream_id, track->period_num);
3525             gst_event_store_mark_delivered (&track->sticky_events, event);
3526             gst_event_unref (event);
3527             event = NULL;
3528             /* We'll need to re-check if all tracks are empty again above */
3529             need_restart = TRUE;
3530           }
3531         }
3532
3533         if (event != NULL) {
3534           gst_pad_push_event (slot->pad, gst_event_ref (event));
3535
3536           if (GST_EVENT_IS_STICKY (event))
3537             gst_event_store_mark_delivered (&track->sticky_events, event);
3538           gst_event_unref (event);
3539         }
3540       } else if (GST_IS_BUFFER (mo)) {
3541         GstBuffer *buffer = (GstBuffer *) mo;
3542
3543         if (track->output_discont) {
3544           if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3545             buffer = gst_buffer_make_writable (buffer);
3546             GST_DEBUG_OBJECT (slot->pad,
3547                 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3548                 buffer);
3549             GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3550           }
3551           track->output_discont = FALSE;
3552         }
3553         slot->flow_ret = gst_pad_push (slot->pad, buffer);
3554         ret =
3555             gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3556             slot->pad, slot->flow_ret);
3557         GST_DEBUG_OBJECT (slot->pad,
3558             "track %s (period %u) push returned %s (combined %s)",
3559             track->stream_id, track->period_num,
3560             gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3561         slot->pushed_timed_data = TRUE;
3562       } else {
3563         GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3564       }
3565
3566       TRACKS_LOCK (demux);
3567       gst_adaptive_demux_track_update_next_position (track);
3568
3569       if (ret != GST_FLOW_OK)
3570         goto pause;
3571     }
3572   }
3573
3574   /* Store global output position */
3575   if (global_output_position != GST_CLOCK_STIME_NONE) {
3576     demux->priv->global_output_position = global_output_position;
3577
3578     /* And see if any streams need to be woken for more input */
3579     gst_adaptive_demux_period_check_input_wakeup_locked (demux->input_period,
3580         global_output_position);
3581   }
3582
3583   if (need_restart)
3584     goto restart;
3585
3586   if (global_output_position == GST_CLOCK_STIME_NONE) {
3587     if (!demux->priv->flushing) {
3588       GST_DEBUG_OBJECT (demux,
3589           "Pausing output task after reaching NONE global_output_position");
3590       gst_task_pause (demux->priv->output_task);
3591     }
3592   }
3593
3594   TRACKS_UNLOCK (demux);
3595   GST_DEBUG_OBJECT (demux, "leave");
3596   return;
3597
3598 pause:
3599   {
3600     GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3601     /* If the flushing flag is set, then the task is being
3602      * externally stopped, so don't go to pause(), otherwise we
3603      * should so we don't keep spinning */
3604     if (!demux->priv->flushing) {
3605       GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3606           gst_flow_get_name (ret));
3607       gst_task_pause (demux->priv->output_task);
3608     }
3609
3610     TRACKS_UNLOCK (demux);
3611
3612     if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3613       GstEvent *eos = gst_event_new_eos ();
3614
3615       if (ret != GST_FLOW_EOS) {
3616         GST_ELEMENT_FLOW_ERROR (demux, ret);
3617       }
3618
3619       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3620       if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3621         gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3622       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3623
3624       gst_adaptive_demux_push_src_event (demux, eos);
3625     }
3626
3627     return;
3628   }
3629 }
3630
3631 /* must be called from the scheduler */
3632 gboolean
3633 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3634 {
3635   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3636
3637   if (klass->is_live)
3638     return klass->is_live (demux);
3639   return FALSE;
3640 }
3641
3642 static void
3643 handle_manifest_download_complete (DownloadRequest * request,
3644     DownloadRequestState state, GstAdaptiveDemux * demux)
3645 {
3646   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3647   GstBuffer *buffer;
3648   GstFlowReturn result;
3649
3650   g_free (demux->manifest_base_uri);
3651   g_free (demux->manifest_uri);
3652
3653   if (request->redirect_permanent && request->redirect_uri) {
3654     demux->manifest_uri = g_strdup (request->redirect_uri);
3655     demux->manifest_base_uri = NULL;
3656   } else {
3657     demux->manifest_uri = g_strdup (request->uri);
3658     demux->manifest_base_uri = g_strdup (request->redirect_uri);
3659   }
3660
3661   buffer = download_request_take_buffer (request);
3662
3663   /* We should always have a buffer since this function is the non-error
3664    * callback for the download */
3665   g_assert (buffer);
3666
3667   result = klass->update_manifest_data (demux, buffer);
3668   gst_buffer_unref (buffer);
3669
3670   /* FIXME: Should the manifest uri vars be reverted to original
3671    * values if updating fails? */
3672
3673   if (result == GST_FLOW_OK) {
3674     GstClockTime duration;
3675     /* Send an updated duration message */
3676     duration = klass->get_duration (demux);
3677     if (duration != GST_CLOCK_TIME_NONE) {
3678       GST_DEBUG_OBJECT (demux,
3679           "Sending duration message : %" GST_TIME_FORMAT,
3680           GST_TIME_ARGS (duration));
3681       gst_element_post_message (GST_ELEMENT (demux),
3682           gst_message_new_duration_changed (GST_OBJECT (demux)));
3683     } else {
3684       GST_DEBUG_OBJECT (demux,
3685           "Duration unknown, can not send the duration message");
3686     }
3687
3688     /* If a manifest changes it's liveness or periodic updateness, we need
3689      * to start/stop the manifest update task appropriately */
3690     /* Keep this condition in sync with the one in
3691      * gst_adaptive_demux_start_manifest_update_task()
3692      */
3693     if (gst_adaptive_demux_is_live (demux) &&
3694         klass->requires_periodical_playlist_update (demux)) {
3695       gst_adaptive_demux_start_manifest_update_task (demux);
3696     } else {
3697       gst_adaptive_demux_stop_manifest_update_task (demux);
3698     }
3699   }
3700 }
3701
3702 static void
3703 handle_manifest_download_failure (DownloadRequest * request,
3704     DownloadRequestState state, GstAdaptiveDemux * demux)
3705 {
3706   GST_FIXME_OBJECT (demux, "Manifest download failed.");
3707   /* Retry or error out here */
3708 }
3709
3710 static GstFlowReturn
3711 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3712 {
3713   DownloadRequest *request;
3714   GstFlowReturn ret = GST_FLOW_OK;
3715   GError *error = NULL;
3716
3717   request = download_request_new_uri (demux->manifest_uri);
3718
3719   download_request_set_callbacks (request,
3720       (DownloadRequestEventCallback) handle_manifest_download_complete,
3721       (DownloadRequestEventCallback) handle_manifest_download_failure,
3722       NULL, NULL, demux);
3723
3724   if (!downloadhelper_submit_request (demux->download_helper, NULL,
3725           DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
3726           &error)) {
3727     if (error) {
3728       GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
3729           ("Failed to download manifest: %s", error->message), (NULL));
3730       g_clear_error (&error);
3731     }
3732     ret = GST_FLOW_NOT_LINKED;
3733   }
3734
3735   return ret;
3736 }
3737
3738 /* must be called with manifest_lock taken */
3739 GstFlowReturn
3740 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3741 {
3742   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3743   GstFlowReturn ret;
3744
3745   ret = klass->update_manifest (demux);
3746
3747   return ret;
3748 }
3749
3750 /* must be called with manifest_lock taken */
3751 gboolean
3752 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3753 {
3754   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3755   gboolean ret = FALSE;
3756
3757   if (klass->has_next_period)
3758     ret = klass->has_next_period (demux);
3759   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3760   return ret;
3761 }
3762
3763 /* must be called with manifest_lock taken */
3764 void
3765 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3766 {
3767   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3768   GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
3769
3770   g_return_if_fail (klass->advance_period != NULL);
3771
3772   GST_DEBUG_OBJECT (demux, "Advancing to next period");
3773   /* FIXME : no return value ? What if it fails ? */
3774   klass->advance_period (demux);
3775
3776   if (previous_period == demux->input_period) {
3777     GST_ERROR_OBJECT (demux, "Advancing period failed");
3778     return;
3779   }
3780
3781   /* Stop the previous period stream tasks */
3782   gst_adaptive_demux_period_stop_tasks (previous_period);
3783
3784   gst_adaptive_demux_update_collection (demux, demux->input_period);
3785   /* Figure out a pre-emptive selection based on the output period selection */
3786   gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
3787       demux->output_period);
3788
3789   gst_adaptive_demux_prepare_streams (demux, FALSE);
3790   gst_adaptive_demux_start_tasks (demux);
3791 }
3792
3793 /**
3794  * gst_adaptive_demux_get_monotonic_time:
3795  * Returns: a monotonically increasing time, using the system realtime clock
3796  */
3797 GstClockTime
3798 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
3799 {
3800   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3801   return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
3802 }
3803
3804 /**
3805  * gst_adaptive_demux_get_client_now_utc:
3806  * @demux: #GstAdaptiveDemux
3807  * Returns: the client's estimate of UTC
3808  *
3809  * Used to find the client's estimate of UTC, using the system realtime clock.
3810  */
3811 GDateTime *
3812 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
3813 {
3814   return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
3815 }
3816
3817 /**
3818  * gst_adaptive_demux_is_running
3819  * @demux: #GstAdaptiveDemux
3820  * Returns: whether the demuxer is processing data
3821  *
3822  * Returns FALSE if shutdown has started (transitioning down from
3823  * PAUSED), otherwise TRUE.
3824  */
3825 gboolean
3826 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
3827 {
3828   return g_atomic_int_get (&demux->running);
3829 }
3830
3831 /**
3832  * gst_adaptive_demux_get_qos_earliest_time:
3833  *
3834  * Returns: The QOS earliest time
3835  *
3836  * Since: 1.20
3837  */
3838 GstClockTime
3839 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
3840 {
3841   GstClockTime earliest;
3842
3843   GST_OBJECT_LOCK (demux);
3844   earliest = demux->priv->qos_earliest_time;
3845   GST_OBJECT_UNLOCK (demux);
3846
3847   return earliest;
3848 }
3849
3850 gboolean
3851 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
3852     GstAdaptiveDemux2Stream * stream)
3853 {
3854   g_return_val_if_fail (demux && stream, FALSE);
3855
3856   /* FIXME : Migrate to parent */
3857   g_return_val_if_fail (stream->demux == NULL, FALSE);
3858
3859   GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
3860
3861   TRACKS_LOCK (demux);
3862   if (demux->input_period->prepared) {
3863     GST_ERROR_OBJECT (demux,
3864         "Attempted to add streams but no new period was created");
3865     TRACKS_UNLOCK (demux);
3866     return FALSE;
3867   }
3868   stream->demux = demux;
3869   stream->period = demux->input_period;
3870   demux->input_period->streams =
3871       g_list_append (demux->input_period->streams, stream);
3872
3873   if (stream->tracks) {
3874     GList *iter;
3875     for (iter = stream->tracks; iter; iter = iter->next)
3876       if (!gst_adaptive_demux_period_add_track (demux->input_period,
3877               (GstAdaptiveDemuxTrack *) iter->data)) {
3878         GST_ERROR_OBJECT (demux, "Failed to add track elements");
3879         TRACKS_UNLOCK (demux);
3880         return FALSE;
3881       }
3882   }
3883   TRACKS_UNLOCK (demux);
3884   return TRUE;
3885 }
3886
3887 /* Return the current playback rate including any instant rate multiplier */
3888 gdouble
3889 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
3890 {
3891   gdouble rate;
3892   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3893   rate = demux->segment.rate * demux->instant_rate_multiplier;
3894   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3895   return rate;
3896 }