gst: don't use volatile to mean atomic
[platform/upstream/gstreamer.git] / gst-libs / gst / adaptivedemux / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstadaptivedemux
24  * @short_description: Base class for adaptive demuxers
25  *
26  * What is an adaptive demuxer?
27  * Adaptive demuxers are special demuxers in the sense that they don't
28  * actually demux data received from upstream but download the data
29  * themselves.
30  *
31  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32  * a set of fragments. The manifest describes the available media and
33  * the sequence of fragments to use. Each fragment contains a small
34  * part of the media (typically only a few seconds). It is possible for
35  * the manifest to have the same media available in different configurations
36  * (bitrates for example) so that the client can select the one that
37  * best suits its scenario (network fluctuation, hardware requirements...).
38  * It is possible to switch from one representation of the media to another
39  * during playback. That's why it is called 'adaptive', because it can be
40  * adapted to the client's needs.
41  *
42  * Architectural overview:
43  * The manifest is received by the demuxer in its sink pad and, upon receiving
44  * EOS, it parses the manifest and exposes the streams available in it. For
45  * each stream a source element will be created and will download the list
46  * of fragments one by one. Once a fragment is finished downloading, the next
47  * URI is set to the source element and it starts fetching it and pushing
48  * through the stream's pad. This implies that each stream is independent from
49  * each other as it runs on a separate thread.
50  *
51  * After downloading each fragment, the download rate of it is calculated and
52  * the demuxer has a chance to switch to a different bitrate if needed. The
53  * switch can be done by simply pushing a new caps before the next fragment
54  * when codecs are the same, or by exposing a new pad group if it needs
55  * a codec change.
56  *
57  * Extra features:
58  * - Not linked streams: Streams that are not-linked have their download threads
59  *                       interrupted to save network bandwidth. When they are
60  *                       relinked a reconfigure event is received and the
61  *                       stream is restarted.
62  *
63  * Subclasses:
64  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65  * about the intrinsics of the subclass formats, so the subclasses are
66  * responsible for maintaining the manifest data structures and stream
67  * information.
68  */
69
70 /*
71 MT safety.
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
77
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
85
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
103
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
115 */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
124
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
127
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024    /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
134
135 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
136 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
137     GST_TRACE("Locking from thread %p", g_thread_self()); \
138     g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
139     GST_TRACE("Locked from thread %p", g_thread_self()); \
140  } G_STMT_END
141
142 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
143     GST_TRACE("Unlocking from thread %p", g_thread_self()); \
144     g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
145  } G_STMT_END
146
147 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
148 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
149 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
150
151 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
152 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
153 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
154
155 enum
156 {
157   PROP_0,
158   PROP_CONNECTION_SPEED,
159   PROP_BITRATE_LIMIT,
160   PROP_LAST
161 };
162
163 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
164 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
165
166 struct _GstAdaptiveDemuxPrivate
167 {
168   GstAdapter *input_adapter;    /* protected by manifest_lock */
169   gint have_manifest;           /* MT safe */
170
171   GList *old_streams;           /* protected by manifest_lock */
172
173   GstTask *updates_task;        /* MT safe */
174   GRecMutex updates_lock;
175   GMutex updates_timed_lock;
176   GCond updates_timed_cond;     /* protected by updates_timed_lock */
177   gboolean stop_updates_task;   /* protected by updates_timed_lock */
178
179   /* used only from updates_task, no need to protect it */
180   gint update_failed_count;
181
182   guint32 segment_seqnum;       /* protected by manifest_lock */
183
184   /* main lock used to protect adaptive demux and all its streams.
185    * It serializes the adaptive demux public API.
186    */
187   GRecMutex manifest_lock;
188
189   /* condition to wait for manifest updates on a live stream.
190    * In order to signal the manifest_cond, the caller needs to hold both
191    * manifest_lock and manifest_update_lock (taken in this order)
192    */
193   GCond manifest_cond;
194   GMutex manifest_update_lock;
195
196   /* Lock and condition for prerolling streams before exposing */
197   GMutex preroll_lock;
198   GCond preroll_cond;
199   gint preroll_pending;
200
201   GMutex api_lock;
202
203   /* Protects demux and stream segment information
204    * Needed because seeks can update segment information
205    * without needing to stop tasks when they just want to
206    * update the segment boundaries */
207   GMutex segment_lock;
208
209   GstClockTime qos_earliest_time;
210 };
211
212 typedef struct _GstAdaptiveDemuxTimer
213 {
214   gint ref_count;
215   GCond *cond;
216   GMutex *mutex;
217   GstClockID clock_id;
218   gboolean fired;
219 } GstAdaptiveDemuxTimer;
220
221 static GstBinClass *parent_class = NULL;
222 static gint private_offset = 0;
223
224 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
225 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
226     GstAdaptiveDemuxClass * klass);
227 static void gst_adaptive_demux_finalize (GObject * object);
228 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
229     element, GstStateChange transition);
230
231 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
232
233 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
234     GstEvent * event);
235 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
236     GstObject * parent, GstBuffer * buffer);
237 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
238     GstQuery * query);
239 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
240     GstEvent * event);
241
242 static gboolean
243 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
244
245 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
246 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
247     stream);
248 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
249 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
250     gboolean first_and_live);
251 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
252 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
253 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
254     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
255     GstClockTime ts, GstClockTime * final_ts);
256 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
257     demux, GstAdaptiveDemuxStream * stream);
258 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
259     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
260 static GstFlowReturn
261 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
262     GstAdaptiveDemuxStream * stream);
263 static gint64
264 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
265     GstAdaptiveDemuxStream * stream);
266 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
267     demux);
268 static GstFlowReturn
269 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
270 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
271 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
272
273 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
274 static GstFlowReturn
275 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
276     GstEvent * event);
277
278 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
279     demux);
280 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
281     demux);
282
283 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
284     gboolean start_preroll_streams);
285 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
286     gboolean stop_updates);
287 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
288     demux);
289 static void
290 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
291     stream, GstFlowReturn ret, GError * err);
292 static GstFlowReturn
293 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
294     GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
295 static GstFlowReturn
296 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
297     GstAdaptiveDemuxStream * stream);
298 static GstFlowReturn
299 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
300     GstAdaptiveDemuxStream * stream, GstClockTime duration);
301 static gboolean
302 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
303     GstClockTime end_time);
304 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
305     GstClockTime time, GstClockID id, gpointer user_data);
306 static gboolean
307 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
308     * demux);
309
310 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
311  * method to get to the padtemplates */
312 GType
313 gst_adaptive_demux_get_type (void)
314 {
315   static gsize type = 0;
316
317   if (g_once_init_enter (&type)) {
318     GType _type;
319     static const GTypeInfo info = {
320       sizeof (GstAdaptiveDemuxClass),
321       NULL,
322       NULL,
323       (GClassInitFunc) gst_adaptive_demux_class_init,
324       NULL,
325       NULL,
326       sizeof (GstAdaptiveDemux),
327       0,
328       (GInstanceInitFunc) gst_adaptive_demux_init,
329     };
330
331     _type = g_type_register_static (GST_TYPE_BIN,
332         "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
333
334     private_offset =
335         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
336
337     g_once_init_leave (&type, _type);
338   }
339   return type;
340 }
341
342 static inline GstAdaptiveDemuxPrivate *
343 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
344 {
345   return (G_STRUCT_MEMBER_P (self, private_offset));
346 }
347
348 static void
349 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
350     const GValue * value, GParamSpec * pspec)
351 {
352   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
353
354   GST_API_LOCK (demux);
355   GST_MANIFEST_LOCK (demux);
356
357   switch (prop_id) {
358     case PROP_CONNECTION_SPEED:
359       demux->connection_speed = g_value_get_uint (value) * 1000;
360       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
361           demux->connection_speed);
362       break;
363     case PROP_BITRATE_LIMIT:
364       demux->bitrate_limit = g_value_get_float (value);
365       break;
366     default:
367       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
368       break;
369   }
370
371   GST_MANIFEST_UNLOCK (demux);
372   GST_API_UNLOCK (demux);
373 }
374
375 static void
376 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
377     GValue * value, GParamSpec * pspec)
378 {
379   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
380
381   GST_MANIFEST_LOCK (demux);
382
383   switch (prop_id) {
384     case PROP_CONNECTION_SPEED:
385       g_value_set_uint (value, demux->connection_speed / 1000);
386       break;
387     case PROP_BITRATE_LIMIT:
388       g_value_set_float (value, demux->bitrate_limit);
389       break;
390     default:
391       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
392       break;
393   }
394
395   GST_MANIFEST_UNLOCK (demux);
396 }
397
398 static void
399 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
400 {
401   GObjectClass *gobject_class;
402   GstElementClass *gstelement_class;
403   GstBinClass *gstbin_class;
404
405   gobject_class = G_OBJECT_CLASS (klass);
406   gstelement_class = GST_ELEMENT_CLASS (klass);
407   gstbin_class = GST_BIN_CLASS (klass);
408
409   GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
410       "Base Adaptive Demux");
411
412   parent_class = g_type_class_peek_parent (klass);
413
414   if (private_offset != 0)
415     g_type_class_adjust_private_offset (klass, &private_offset);
416
417   gobject_class->set_property = gst_adaptive_demux_set_property;
418   gobject_class->get_property = gst_adaptive_demux_get_property;
419   gobject_class->finalize = gst_adaptive_demux_finalize;
420
421   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
422       g_param_spec_uint ("connection-speed", "Connection Speed",
423           "Network connection speed in kbps (0 = calculate from downloaded"
424           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
425           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426
427   /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
428   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
429       g_param_spec_float ("bitrate-limit",
430           "Bitrate limit in %",
431           "Limit of the available bitrate to use when switching to alternates.",
432           0, 1, DEFAULT_BITRATE_LIMIT,
433           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434
435   gstelement_class->change_state = gst_adaptive_demux_change_state;
436
437   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
438
439   klass->data_received = gst_adaptive_demux_stream_data_received_default;
440   klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
441   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
442   klass->requires_periodical_playlist_update =
443       gst_adaptive_demux_requires_periodical_playlist_update_default;
444
445 }
446
447 static void
448 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
449     GstAdaptiveDemuxClass * klass)
450 {
451   GstPadTemplate *pad_template;
452   GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
453   GObjectClass *gobject_class;
454
455   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
456
457   demux->priv = gst_adaptive_demux_get_instance_private (demux);
458   demux->priv->input_adapter = gst_adapter_new ();
459   demux->downloader = gst_uri_downloader_new ();
460   gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
461   demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
462   demux->priv->segment_seqnum = gst_util_seqnum_next ();
463   demux->have_group_id = FALSE;
464   demux->group_id = G_MAXUINT;
465
466   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
467
468   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
469       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
470
471   demux->realtime_clock = gst_system_clock_obtain ();
472   g_assert (demux->realtime_clock != NULL);
473   gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
474   if (g_object_class_find_property (gobject_class, "clock-type")) {
475     g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
476   } else {
477     GST_WARNING_OBJECT (demux,
478         "System clock does not have clock-type property");
479   }
480   if (clock_type == GST_CLOCK_TYPE_REALTIME) {
481     demux->clock_offset = 0;
482   } else {
483     GDateTime *utc_now;
484     GstClockTime rtc_now;
485
486     utc_now = g_date_time_new_now_utc ();
487     rtc_now = gst_clock_get_time (demux->realtime_clock);
488     demux->clock_offset =
489         g_date_time_to_unix (utc_now) * G_TIME_SPAN_SECOND +
490         g_date_time_get_microsecond (utc_now) - GST_TIME_AS_USECONDS (rtc_now);
491     g_date_time_unref (utc_now);
492   }
493   g_rec_mutex_init (&demux->priv->updates_lock);
494   demux->priv->updates_task =
495       gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
496       demux, NULL);
497   gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
498
499   g_mutex_init (&demux->priv->updates_timed_lock);
500   g_cond_init (&demux->priv->updates_timed_cond);
501
502   g_cond_init (&demux->priv->manifest_cond);
503   g_mutex_init (&demux->priv->manifest_update_lock);
504
505   g_rec_mutex_init (&demux->priv->manifest_lock);
506   g_mutex_init (&demux->priv->api_lock);
507   g_mutex_init (&demux->priv->segment_lock);
508
509   g_cond_init (&demux->priv->preroll_cond);
510   g_mutex_init (&demux->priv->preroll_lock);
511
512   pad_template =
513       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
514   g_return_if_fail (pad_template != NULL);
515
516   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
517   gst_pad_set_event_function (demux->sinkpad,
518       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
519   gst_pad_set_chain_function (demux->sinkpad,
520       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
521
522   /* Properties */
523   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
524   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
525
526   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
527 }
528
529 static void
530 gst_adaptive_demux_finalize (GObject * object)
531 {
532   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
533   GstAdaptiveDemuxPrivate *priv = demux->priv;
534
535   GST_DEBUG_OBJECT (object, "finalize");
536
537   g_object_unref (priv->input_adapter);
538   g_object_unref (demux->downloader);
539
540   g_mutex_clear (&priv->updates_timed_lock);
541   g_cond_clear (&priv->updates_timed_cond);
542   g_mutex_clear (&demux->priv->manifest_update_lock);
543   g_cond_clear (&demux->priv->manifest_cond);
544   g_object_unref (priv->updates_task);
545   g_rec_mutex_clear (&priv->updates_lock);
546   g_rec_mutex_clear (&demux->priv->manifest_lock);
547   g_mutex_clear (&demux->priv->api_lock);
548   g_mutex_clear (&demux->priv->segment_lock);
549   if (demux->realtime_clock) {
550     gst_object_unref (demux->realtime_clock);
551     demux->realtime_clock = NULL;
552   }
553
554   g_cond_clear (&demux->priv->preroll_cond);
555   g_mutex_clear (&demux->priv->preroll_lock);
556
557   G_OBJECT_CLASS (parent_class)->finalize (object);
558 }
559
560 static GstStateChangeReturn
561 gst_adaptive_demux_change_state (GstElement * element,
562     GstStateChange transition)
563 {
564   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
565   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
566
567   switch (transition) {
568     case GST_STATE_CHANGE_PAUSED_TO_READY:
569       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
570         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
571       gst_uri_downloader_cancel (demux->downloader);
572
573       GST_API_LOCK (demux);
574       GST_MANIFEST_LOCK (demux);
575       gst_adaptive_demux_reset (demux);
576       GST_MANIFEST_UNLOCK (demux);
577       GST_API_UNLOCK (demux);
578       break;
579     case GST_STATE_CHANGE_READY_TO_PAUSED:
580       GST_API_LOCK (demux);
581       GST_MANIFEST_LOCK (demux);
582       gst_adaptive_demux_reset (demux);
583       /* Clear "cancelled" flag in uridownloader since subclass might want to
584        * use uridownloader to fetch another manifest */
585       gst_uri_downloader_reset (demux->downloader);
586       if (g_atomic_int_get (&demux->priv->have_manifest))
587         gst_adaptive_demux_start_manifest_update_task (demux);
588       GST_MANIFEST_UNLOCK (demux);
589       GST_API_UNLOCK (demux);
590       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
591         GST_DEBUG_OBJECT (demux, "demuxer has started running");
592       break;
593     default:
594       break;
595   }
596
597   /* this must be run without MANIFEST_LOCK taken.
598    * For PLAYING to PLAYING state changes, it will want to take a lock in
599    * src element and that lock is held while the streaming thread is running.
600    * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
601    */
602   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
603
604   return result;
605 }
606
607 static gboolean
608 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
609     GstEvent * event)
610 {
611   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
612   gboolean ret;
613
614   switch (event->type) {
615     case GST_EVENT_FLUSH_STOP:{
616       GST_API_LOCK (demux);
617       GST_MANIFEST_LOCK (demux);
618
619       gst_adaptive_demux_reset (demux);
620
621       ret = gst_pad_event_default (pad, parent, event);
622
623       GST_MANIFEST_UNLOCK (demux);
624       GST_API_UNLOCK (demux);
625
626       return ret;
627     }
628     case GST_EVENT_EOS:{
629       GstAdaptiveDemuxClass *demux_class;
630       GstQuery *query;
631       gboolean query_res;
632       gboolean ret = TRUE;
633       gsize available;
634       GstBuffer *manifest_buffer;
635
636       GST_API_LOCK (demux);
637       GST_MANIFEST_LOCK (demux);
638
639       demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
640
641       available = gst_adapter_available (demux->priv->input_adapter);
642
643       if (available == 0) {
644         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
645         ret = gst_pad_event_default (pad, parent, event);
646
647         GST_MANIFEST_UNLOCK (demux);
648         GST_API_UNLOCK (demux);
649
650         return ret;
651       }
652
653       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
654
655       /* Need to get the URI to use it as a base to generate the fragment's
656        * uris */
657       query = gst_query_new_uri ();
658       query_res = gst_pad_peer_query (pad, query);
659       if (query_res) {
660         gchar *uri, *redirect_uri;
661         gboolean permanent;
662
663         gst_query_parse_uri (query, &uri);
664         gst_query_parse_uri_redirection (query, &redirect_uri);
665         gst_query_parse_uri_redirection_permanent (query, &permanent);
666
667         if (permanent && redirect_uri) {
668           demux->manifest_uri = redirect_uri;
669           demux->manifest_base_uri = NULL;
670           g_free (uri);
671         } else {
672           demux->manifest_uri = uri;
673           demux->manifest_base_uri = redirect_uri;
674         }
675
676         GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
677             demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
678       } else {
679         GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
680       }
681       gst_query_unref (query);
682
683       /* Let the subclass parse the manifest */
684       manifest_buffer =
685           gst_adapter_take_buffer (demux->priv->input_adapter, available);
686       if (!demux_class->process_manifest (demux, manifest_buffer)) {
687         /* In most cases, this will happen if we set a wrong url in the
688          * source element and we have received the 404 HTML response instead of
689          * the manifest */
690         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
691             (NULL));
692         ret = FALSE;
693       } else {
694         g_atomic_int_set (&demux->priv->have_manifest, TRUE);
695       }
696       gst_buffer_unref (manifest_buffer);
697
698       gst_element_post_message (GST_ELEMENT_CAST (demux),
699           gst_message_new_element (GST_OBJECT_CAST (demux),
700               gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
701                   "manifest-uri", G_TYPE_STRING,
702                   demux->manifest_uri, "uri", G_TYPE_STRING,
703                   demux->manifest_uri,
704                   "manifest-download-start", GST_TYPE_CLOCK_TIME,
705                   GST_CLOCK_TIME_NONE,
706                   "manifest-download-stop", GST_TYPE_CLOCK_TIME,
707                   gst_util_get_timestamp (), NULL)));
708
709       if (ret) {
710         /* Send duration message */
711         if (!gst_adaptive_demux_is_live (demux)) {
712           GstClockTime duration = demux_class->get_duration (demux);
713
714           if (duration != GST_CLOCK_TIME_NONE) {
715             GST_DEBUG_OBJECT (demux,
716                 "Sending duration message : %" GST_TIME_FORMAT,
717                 GST_TIME_ARGS (duration));
718             gst_element_post_message (GST_ELEMENT (demux),
719                 gst_message_new_duration_changed (GST_OBJECT (demux)));
720           } else {
721             GST_DEBUG_OBJECT (demux,
722                 "media duration unknown, can not send the duration message");
723           }
724         }
725
726         if (demux->next_streams) {
727           gst_adaptive_demux_prepare_streams (demux,
728               gst_adaptive_demux_is_live (demux));
729           gst_adaptive_demux_start_tasks (demux, TRUE);
730           gst_adaptive_demux_start_manifest_update_task (demux);
731         } else {
732           /* no streams */
733           GST_WARNING_OBJECT (demux, "No streams created from manifest");
734           GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
735               (_("This file contains no playable streams.")),
736               ("No known stream formats found at the Manifest"));
737           ret = FALSE;
738         }
739
740       }
741       GST_MANIFEST_UNLOCK (demux);
742       GST_API_UNLOCK (demux);
743
744       gst_event_unref (event);
745       return ret;
746     }
747     case GST_EVENT_SEGMENT:
748       /* Swallow newsegments, we'll push our own */
749       gst_event_unref (event);
750       return TRUE;
751     default:
752       break;
753   }
754
755   return gst_pad_event_default (pad, parent, event);
756 }
757
758 static GstFlowReturn
759 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
760     GstBuffer * buffer)
761 {
762   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
763
764   GST_MANIFEST_LOCK (demux);
765
766   gst_adapter_push (demux->priv->input_adapter, buffer);
767
768   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
769       (gint) gst_adapter_available (demux->priv->input_adapter));
770
771   GST_MANIFEST_UNLOCK (demux);
772   return GST_FLOW_OK;
773 }
774
775 /* must be called with manifest_lock taken */
776 static void
777 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
778 {
779   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
780   GList *iter;
781   GList *old_streams;
782   GstEvent *eos;
783
784   /* take ownership of old_streams before releasing the manifest_lock in
785    * gst_adaptive_demux_stop_tasks
786    */
787   old_streams = demux->priv->old_streams;
788   demux->priv->old_streams = NULL;
789
790   gst_adaptive_demux_stop_tasks (demux, TRUE);
791
792   if (klass->reset)
793     klass->reset (demux);
794
795   eos = gst_event_new_eos ();
796   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
797     GstAdaptiveDemuxStream *stream = iter->data;
798     if (stream->pad) {
799       gst_pad_push_event (stream->pad, gst_event_ref (eos));
800       gst_pad_set_active (stream->pad, FALSE);
801
802       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
803     }
804     gst_adaptive_demux_stream_free (stream);
805   }
806   gst_event_unref (eos);
807   g_list_free (demux->streams);
808   demux->streams = NULL;
809   if (demux->prepared_streams) {
810     g_list_free_full (demux->prepared_streams,
811         (GDestroyNotify) gst_adaptive_demux_stream_free);
812     demux->prepared_streams = NULL;
813   }
814   if (demux->next_streams) {
815     g_list_free_full (demux->next_streams,
816         (GDestroyNotify) gst_adaptive_demux_stream_free);
817     demux->next_streams = NULL;
818   }
819
820   if (old_streams) {
821     g_list_free_full (old_streams,
822         (GDestroyNotify) gst_adaptive_demux_stream_free);
823   }
824
825   if (demux->priv->old_streams) {
826     g_list_free_full (demux->priv->old_streams,
827         (GDestroyNotify) gst_adaptive_demux_stream_free);
828     demux->priv->old_streams = NULL;
829   }
830
831   g_free (demux->manifest_uri);
832   g_free (demux->manifest_base_uri);
833   demux->manifest_uri = NULL;
834   demux->manifest_base_uri = NULL;
835
836   gst_adapter_clear (demux->priv->input_adapter);
837   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
838
839   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
840
841   demux->have_group_id = FALSE;
842   demux->group_id = G_MAXUINT;
843   demux->priv->segment_seqnum = gst_util_seqnum_next ();
844 }
845
846 static void
847 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
848 {
849   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
850
851   switch (GST_MESSAGE_TYPE (msg)) {
852     case GST_MESSAGE_ERROR:{
853       GList *iter;
854       GstAdaptiveDemuxStream *stream = NULL;
855       GError *err = NULL;
856       gchar *debug = NULL;
857       gchar *new_error = NULL;
858       const GstStructure *details = NULL;
859
860       GST_MANIFEST_LOCK (demux);
861
862       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
863         GstAdaptiveDemuxStream *cur = iter->data;
864         if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
865                 GST_OBJECT_CAST (cur->src))) {
866           stream = cur;
867           break;
868         }
869       }
870       if (stream == NULL) {
871         for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
872           GstAdaptiveDemuxStream *cur = iter->data;
873           if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
874                   GST_OBJECT_CAST (cur->src))) {
875             stream = cur;
876             break;
877           }
878         }
879         if (stream == NULL) {
880           GST_WARNING_OBJECT (demux,
881               "Failed to locate stream for errored element");
882           break;
883         }
884       }
885
886       gst_message_parse_error (msg, &err, &debug);
887
888       GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
889           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
890           err->message, debug);
891
892       if (debug)
893         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
894       if (new_error) {
895         g_free (err->message);
896         err->message = new_error;
897       }
898
899       gst_message_parse_error_details (msg, &details);
900       if (details) {
901         gst_structure_get_uint (details, "http-status-code",
902             &stream->last_status_code);
903       }
904
905       /* error, but ask to retry */
906       gst_adaptive_demux_stream_fragment_download_finish (stream,
907           GST_FLOW_CUSTOM_ERROR, err);
908
909       g_error_free (err);
910       g_free (debug);
911
912       GST_MANIFEST_UNLOCK (demux);
913
914       gst_message_unref (msg);
915       msg = NULL;
916     }
917       break;
918     default:
919       break;
920   }
921
922   if (msg)
923     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
924 }
925
926 void
927 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
928     gsize struct_size)
929 {
930   GST_API_LOCK (demux);
931   GST_MANIFEST_LOCK (demux);
932   demux->stream_struct_size = struct_size;
933   GST_MANIFEST_UNLOCK (demux);
934   GST_API_UNLOCK (demux);
935 }
936
937 /* must be called with manifest_lock taken */
938 static gboolean
939 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
940     GstAdaptiveDemuxStream * stream)
941 {
942   GstPad *pad = stream->pad;
943   gchar *name = gst_pad_get_name (pad);
944   GstEvent *event;
945   gchar *stream_id;
946
947   gst_pad_set_active (pad, TRUE);
948   stream->need_header = TRUE;
949
950   stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
951
952   event =
953       gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
954       GST_EVENT_STREAM_START, 0);
955   if (event) {
956     if (gst_event_parse_group_id (event, &demux->group_id))
957       demux->have_group_id = TRUE;
958     else
959       demux->have_group_id = FALSE;
960     gst_event_unref (event);
961   } else if (!demux->have_group_id) {
962     demux->have_group_id = TRUE;
963     demux->group_id = gst_util_group_id_next ();
964   }
965   event = gst_event_new_stream_start (stream_id);
966   if (demux->have_group_id)
967     gst_event_set_group_id (event, demux->group_id);
968
969   gst_pad_push_event (pad, event);
970   g_free (stream_id);
971   g_free (name);
972
973   GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
974
975   stream->discont = TRUE;
976
977   return TRUE;
978 }
979
980 static gboolean
981 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
982     GstAdaptiveDemuxStream * stream)
983 {
984   gboolean ret;
985   GstPad *pad = stream->pad;
986   GstCaps *caps;
987
988   if (stream->pending_caps) {
989     gst_pad_set_caps (pad, stream->pending_caps);
990     caps = stream->pending_caps;
991     stream->pending_caps = NULL;
992   } else {
993     caps = gst_pad_get_current_caps (pad);
994   }
995
996   GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
997       GST_DEBUG_PAD_NAME (pad), caps);
998   if (caps)
999     gst_caps_unref (caps);
1000
1001   gst_object_ref (pad);
1002
1003   /* Don't hold the manifest lock while exposing a pad */
1004   GST_MANIFEST_UNLOCK (demux);
1005   ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1006   GST_MANIFEST_LOCK (demux);
1007
1008   return ret;
1009 }
1010
1011 /* must be called with manifest_lock taken */
1012 static GstClockTime
1013 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1014     GstAdaptiveDemuxStream * stream)
1015 {
1016   GstAdaptiveDemuxClass *klass;
1017
1018   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1019
1020   if (klass->get_presentation_offset == NULL)
1021     return 0;
1022
1023   return klass->get_presentation_offset (demux, stream);
1024 }
1025
1026 /* must be called with manifest_lock taken */
1027 static GstClockTime
1028 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1029 {
1030   GstAdaptiveDemuxClass *klass;
1031
1032   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1033
1034   if (klass->get_period_start_time == NULL)
1035     return 0;
1036
1037   return klass->get_period_start_time (demux);
1038 }
1039
1040 /* must be called with manifest_lock taken */
1041 static gboolean
1042 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1043     gboolean first_and_live)
1044 {
1045   GList *iter;
1046   GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1047
1048   g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1049   if (demux->prepared_streams != NULL) {
1050     /* Old streams that were never exposed, due to a seek or so */
1051     GST_FIXME_OBJECT (demux,
1052         "Preparing new streams without cleaning up old ones!");
1053     return FALSE;
1054   }
1055
1056   demux->prepared_streams = demux->next_streams;
1057   demux->next_streams = NULL;
1058
1059   if (!gst_adaptive_demux_is_running (demux)) {
1060     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1061     return TRUE;
1062   }
1063
1064   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1065     GstAdaptiveDemuxStream *stream = iter->data;
1066
1067     stream->do_block = TRUE;
1068
1069     if (!gst_adaptive_demux_prepare_stream (demux,
1070             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1071       /* TODO act on error */
1072       GST_FIXME_OBJECT (stream->pad,
1073           "Do something on failure to expose stream");
1074     }
1075
1076     if (first_and_live) {
1077       /* TODO we only need the first timestamp, maybe create a simple function to
1078        * get the current PTS of a fragment ? */
1079       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1080       gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1081
1082       if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1083         min_pts = MIN (min_pts, stream->fragment.timestamp);
1084       } else {
1085         min_pts = stream->fragment.timestamp;
1086       }
1087     }
1088   }
1089
1090   period_start = gst_adaptive_demux_get_period_start_time (demux);
1091
1092   /* For live streams, the subclass is supposed to seek to the current
1093    * fragment and then tell us its timestamp in stream->fragment.timestamp.
1094    * We now also have to seek our demuxer segment to reflect this.
1095    *
1096    * FIXME: This needs some refactoring at some point.
1097    */
1098   if (first_and_live) {
1099     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1100         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1101         GST_SEEK_TYPE_NONE, -1, NULL);
1102   }
1103
1104   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1105     GstAdaptiveDemuxStream *stream = iter->data;
1106     GstClockTime offset;
1107
1108     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1109     stream->segment = demux->segment;
1110
1111     /* The demuxer segment is just built from seek events, but for each stream
1112      * we have to adjust segments according to the current period and the
1113      * stream specific presentation time offset.
1114      *
1115      * For each period, buffer timestamps start again from 0. Additionally the
1116      * buffer timestamps are shifted by the stream specific presentation time
1117      * offset, so the first buffer timestamp of a period is 0 + presentation
1118      * time offset. If the stream contains timestamps itself, this is also
1119      * supposed to be the presentation time stored inside the stream.
1120      *
1121      * The stream time over periods is supposed to be continuous, that is the
1122      * buffer timestamp 0 + presentation time offset should map to the start
1123      * time of the current period.
1124      *
1125      *
1126      * The adjustment of the stream segments as such works the following.
1127      *
1128      * If the demuxer segment start is bigger than the period start, this
1129      * means that we have to drop some media at the beginning of the current
1130      * period, e.g. because a seek into the middle of the period has
1131      * happened. The amount of media to drop is the difference between the
1132      * period start and the demuxer segment start, and as each period starts
1133      * again from 0, this difference is going to be the actual stream's
1134      * segment start. As all timestamps of the stream are shifted by the
1135      * presentation time offset, we will also have to move the segment start
1136      * by that offset.
1137      *
1138      * Likewise, the demuxer segment stop value is adjusted in the same
1139      * fashion.
1140      *
1141      * Now the running time and stream time at the stream's segment start has
1142      * to be the one that is stored inside the demuxer's segment, which means
1143      * that segment.base and segment.time have to be copied over (done just
1144      * above)
1145      *
1146      *
1147      * If the demuxer segment start is smaller than the period start time,
1148      * this means that the whole period is inside the segment. As each period
1149      * starts timestamps from 0, and additionally timestamps are shifted by
1150      * the presentation time offset, the stream's first timestamp (and as such
1151      * the stream's segment start) has to be the presentation time offset.
1152      * The stream time at the segment start is supposed to be the stream time
1153      * of the period start according to the demuxer segment, so the stream
1154      * segment's time would be set to that. The same goes for the stream
1155      * segment's base, which is supposed to be the running time of the period
1156      * start according to the demuxer's segment.
1157      *
1158      * The same logic applies for negative rates with the segment stop and
1159      * the period stop time (which gets clamped).
1160      *
1161      *
1162      * For the first case where not the complete period is inside the segment,
1163      * the segment time and base as calculated by the second case would be
1164      * equivalent.
1165      */
1166     GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1167         &demux->segment);
1168     GST_DEBUG_OBJECT (demux,
1169         "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1170         GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1171     /* note for readers:
1172      * Since stream->segment is initially a copy of demux->segment,
1173      * only the values that need updating are modified below. */
1174     if (first_and_live) {
1175       /* If first and live, demuxer did seek to the current position already */
1176       stream->segment.start = demux->segment.start - period_start + offset;
1177       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1178         stream->segment.stop = demux->segment.stop - period_start + offset;
1179       /* FIXME : Do we need to handle negative rates for this ? */
1180       stream->segment.position = stream->segment.start;
1181     } else if (demux->segment.start > period_start) {
1182       /* seek within a period */
1183       stream->segment.start = demux->segment.start - period_start + offset;
1184       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1185         stream->segment.stop = demux->segment.stop - period_start + offset;
1186       if (stream->segment.rate >= 0)
1187         stream->segment.position = offset;
1188       else
1189         stream->segment.position = stream->segment.stop;
1190     } else {
1191       stream->segment.start = offset;
1192       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1193         stream->segment.stop = demux->segment.stop - period_start + offset;
1194       if (stream->segment.rate >= 0)
1195         stream->segment.position = offset;
1196       else
1197         stream->segment.position = stream->segment.stop;
1198       stream->segment.time =
1199           gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1200           period_start);
1201       stream->segment.base =
1202           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1203           period_start);
1204     }
1205
1206     stream->pending_segment = gst_event_new_segment (&stream->segment);
1207     gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1208
1209     GST_DEBUG_OBJECT (demux,
1210         "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1211         &stream->segment, stream);
1212   }
1213   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1214
1215   return TRUE;
1216 }
1217
1218 static gboolean
1219 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1220 {
1221   GList *iter;
1222   GList *old_streams;
1223
1224   g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1225
1226   old_streams = demux->streams;
1227   demux->streams = demux->prepared_streams;
1228   demux->prepared_streams = NULL;
1229
1230   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1231     GstAdaptiveDemuxStream *stream = iter->data;
1232
1233     if (!gst_adaptive_demux_expose_stream (demux,
1234             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1235       /* TODO act on error */
1236     }
1237   }
1238   demux->priv->preroll_pending = 0;
1239
1240   GST_MANIFEST_UNLOCK (demux);
1241   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1242   GST_MANIFEST_LOCK (demux);
1243
1244   if (old_streams) {
1245     GstEvent *eos = gst_event_new_eos ();
1246
1247     /* before we put streams in the demux->priv->old_streams list,
1248      * we ask the download task to stop. In this way, it will no longer be
1249      * allowed to change the demux object.
1250      */
1251     for (iter = old_streams; iter; iter = g_list_next (iter)) {
1252       GstAdaptiveDemuxStream *stream = iter->data;
1253       GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1254
1255       GST_MANIFEST_UNLOCK (demux);
1256
1257       GST_DEBUG_OBJECT (pad, "Pushing EOS");
1258       gst_pad_push_event (pad, gst_event_ref (eos));
1259       gst_pad_set_active (pad, FALSE);
1260
1261       GST_LOG_OBJECT (pad, "Removing stream");
1262       gst_element_remove_pad (GST_ELEMENT (demux), pad);
1263       GST_MANIFEST_LOCK (demux);
1264
1265       gst_object_unref (GST_OBJECT (pad));
1266
1267       /* ask the download task to stop.
1268        * We will not join it now, because our thread can be one of these tasks.
1269        * We will do the joining later, from another stream download task or
1270        * from gst_adaptive_demux_stop_tasks.
1271        * We also cannot change the state of the stream->src element, because
1272        * that will wait on the streaming thread (which could be this thread)
1273        * to stop first.
1274        * Because we sent an EOS to the downstream element, the stream->src
1275        * element should detect this in its streaming task and stop.
1276        * Even if it doesn't do that, we will change its state later in
1277        * gst_adaptive_demux_stop_tasks.
1278        */
1279       GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
1280           "Marking stream as cancelled");
1281       gst_task_stop (stream->download_task);
1282       g_mutex_lock (&stream->fragment_download_lock);
1283       stream->cancelled = TRUE;
1284       stream->replaced = TRUE;
1285       g_cond_signal (&stream->fragment_download_cond);
1286       g_mutex_unlock (&stream->fragment_download_lock);
1287     }
1288     gst_event_unref (eos);
1289
1290     /* The list should be freed from another thread as we can't properly
1291      * cleanup a GstTask from itself */
1292     demux->priv->old_streams =
1293         g_list_concat (demux->priv->old_streams, old_streams);
1294   }
1295
1296   /* Unblock after removing oldstreams */
1297   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1298     GstAdaptiveDemuxStream *stream = iter->data;
1299     stream->do_block = FALSE;
1300   }
1301
1302   GST_DEBUG_OBJECT (demux, "All streams are exposed");
1303
1304   return TRUE;
1305 }
1306
1307 /* must be called with manifest_lock taken */
1308 GstAdaptiveDemuxStream *
1309 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1310 {
1311   GstAdaptiveDemuxStream *stream;
1312
1313   stream = g_malloc0 (demux->stream_struct_size);
1314
1315   /* Downloading task */
1316   g_rec_mutex_init (&stream->download_lock);
1317   stream->download_task =
1318       gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1319       stream, NULL);
1320   gst_task_set_lock (stream->download_task, &stream->download_lock);
1321
1322   stream->pad = pad;
1323   stream->demux = demux;
1324   stream->fragment_bitrates =
1325       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1326   gst_pad_set_element_private (pad, stream);
1327   stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1328
1329   g_mutex_lock (&demux->priv->preroll_lock);
1330   stream->do_block = TRUE;
1331   demux->priv->preroll_pending++;
1332   g_mutex_unlock (&demux->priv->preroll_lock);
1333
1334   gst_pad_set_query_function (pad,
1335       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1336   gst_pad_set_event_function (pad,
1337       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1338
1339   gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1340   g_cond_init (&stream->fragment_download_cond);
1341   g_mutex_init (&stream->fragment_download_lock);
1342
1343   demux->next_streams = g_list_append (demux->next_streams, stream);
1344
1345   return stream;
1346 }
1347
1348 GstAdaptiveDemuxStream *
1349 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1350 {
1351   GList *iter;
1352
1353   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1354     GstAdaptiveDemuxStream *stream = iter->data;
1355     if (stream->pad == pad) {
1356       return stream;
1357     }
1358   }
1359
1360   return NULL;
1361 }
1362
1363 /* must be called with manifest_lock taken.
1364  * It will temporarily drop the manifest_lock in order to join the task.
1365  * It will join only the old_streams (the demux->streams are joined by
1366  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1367  * called)
1368  */
1369 static void
1370 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1371 {
1372   GstAdaptiveDemux *demux = stream->demux;
1373   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1374
1375   if (klass->stream_free)
1376     klass->stream_free (stream);
1377
1378   g_clear_error (&stream->last_error);
1379   if (stream->download_task) {
1380     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1381       GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1382           GST_DEBUG_PAD_NAME (stream->pad));
1383
1384       gst_task_stop (stream->download_task);
1385
1386       g_mutex_lock (&stream->fragment_download_lock);
1387       stream->cancelled = TRUE;
1388       g_cond_signal (&stream->fragment_download_cond);
1389       g_mutex_unlock (&stream->fragment_download_lock);
1390     }
1391     GST_LOG_OBJECT (demux, "Waiting for task to finish");
1392
1393     /* temporarily drop the manifest lock to join the task */
1394     GST_MANIFEST_UNLOCK (demux);
1395
1396     gst_task_join (stream->download_task);
1397
1398     GST_MANIFEST_LOCK (demux);
1399
1400     GST_LOG_OBJECT (demux, "Finished");
1401     gst_object_unref (stream->download_task);
1402     g_rec_mutex_clear (&stream->download_lock);
1403     stream->download_task = NULL;
1404   }
1405
1406   gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1407
1408   if (stream->pending_segment) {
1409     gst_event_unref (stream->pending_segment);
1410     stream->pending_segment = NULL;
1411   }
1412
1413   if (stream->pending_events) {
1414     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1415     stream->pending_events = NULL;
1416   }
1417
1418   if (stream->internal_pad) {
1419     gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1420   }
1421
1422   if (stream->src_srcpad) {
1423     gst_object_unref (stream->src_srcpad);
1424     stream->src_srcpad = NULL;
1425   }
1426
1427   if (stream->src) {
1428     GstElement *src = stream->src;
1429
1430     stream->src = NULL;
1431
1432     GST_MANIFEST_UNLOCK (demux);
1433     gst_element_set_locked_state (src, TRUE);
1434     gst_element_set_state (src, GST_STATE_NULL);
1435     gst_bin_remove (GST_BIN_CAST (demux), src);
1436     GST_MANIFEST_LOCK (demux);
1437   }
1438
1439   g_cond_clear (&stream->fragment_download_cond);
1440   g_mutex_clear (&stream->fragment_download_lock);
1441   g_free (stream->fragment_bitrates);
1442
1443   if (stream->pad) {
1444     gst_object_unref (stream->pad);
1445     stream->pad = NULL;
1446   }
1447   if (stream->pending_caps)
1448     gst_caps_unref (stream->pending_caps);
1449
1450   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1451
1452   g_free (stream);
1453 }
1454
1455 /* must be called with manifest_lock taken */
1456 static gboolean
1457 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1458     gint64 * range_start, gint64 * range_stop)
1459 {
1460   GstAdaptiveDemuxClass *klass;
1461
1462   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1463
1464   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1465
1466   return klass->get_live_seek_range (demux, range_start, range_stop);
1467 }
1468
1469 /* must be called with manifest_lock taken */
1470 static gboolean
1471 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1472     GstAdaptiveDemuxStream * stream)
1473 {
1474   gint64 range_start, range_stop;
1475   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1476     GST_LOG_OBJECT (stream->pad,
1477         "stream position %" GST_TIME_FORMAT "  live seek range %"
1478         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1479         GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1480         GST_STIME_ARGS (range_stop));
1481     return (stream->segment.position >= range_start
1482         && stream->segment.position <= range_stop);
1483   }
1484
1485   return FALSE;
1486 }
1487
1488 /* must be called with manifest_lock taken */
1489 static gboolean
1490 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1491 {
1492   GstAdaptiveDemuxClass *klass;
1493
1494   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1495   if (gst_adaptive_demux_is_live (demux)) {
1496     return klass->get_live_seek_range != NULL;
1497   }
1498
1499   return klass->seek != NULL;
1500 }
1501
1502 static void
1503 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1504     GList * streams, gint64 period_start, GstSeekType start_type,
1505     GstSeekType stop_type)
1506 {
1507   GList *iter;
1508   for (iter = streams; iter; iter = g_list_next (iter)) {
1509     GstAdaptiveDemuxStream *stream = iter->data;
1510     GstEvent *seg_evt;
1511     GstClockTime offset;
1512
1513     /* See comments in gst_adaptive_demux_get_period_start_time() for
1514      * an explanation of the segment modifications */
1515     stream->segment = demux->segment;
1516     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1517     stream->segment.start += offset - period_start;
1518     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1519       stream->segment.stop += offset - period_start;
1520     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1521       stream->segment.position = stream->segment.start;
1522     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1523       stream->segment.position = stream->segment.stop;
1524     seg_evt = gst_event_new_segment (&stream->segment);
1525     gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1526     gst_event_replace (&stream->pending_segment, seg_evt);
1527     GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1528         stream->pending_segment);
1529     gst_event_unref (seg_evt);
1530     /* Make sure the first buffer after a seek has the discont flag */
1531     stream->discont = TRUE;
1532   }
1533   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1534 }
1535
1536 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
1537                               GST_SEEK_FLAG_SNAP_AFTER |          \
1538                               GST_SEEK_FLAG_SNAP_NEAREST |        \
1539                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1540                               GST_SEEK_FLAG_KEY_UNIT))
1541 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1542                               GST_SEEK_FLAG_SNAP_AFTER | \
1543                               GST_SEEK_FLAG_SNAP_NEAREST))
1544
1545 static gboolean
1546 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1547     GstEvent * event)
1548 {
1549   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1550   gdouble rate;
1551   GstFormat format;
1552   GstSeekFlags flags;
1553   GstSeekType start_type, stop_type;
1554   gint64 start, stop;
1555   guint32 seqnum;
1556   gboolean update;
1557   gboolean ret;
1558   GstSegment oldsegment;
1559   GstAdaptiveDemuxStream *stream = NULL;
1560
1561   GST_INFO_OBJECT (demux, "Received seek event");
1562
1563   GST_API_LOCK (demux);
1564   GST_MANIFEST_LOCK (demux);
1565
1566   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1567       &stop_type, &stop);
1568
1569   if (format != GST_FORMAT_TIME) {
1570     GST_MANIFEST_UNLOCK (demux);
1571     GST_API_UNLOCK (demux);
1572     GST_WARNING_OBJECT (demux,
1573         "Adaptive demuxers only support TIME-based seeking");
1574     gst_event_unref (event);
1575     return FALSE;
1576   }
1577
1578   if (flags & GST_SEEK_FLAG_SEGMENT) {
1579     GST_FIXME_OBJECT (demux, "Handle segment seeks");
1580     GST_MANIFEST_UNLOCK (demux);
1581     GST_API_UNLOCK (demux);
1582     gst_event_unref (event);
1583     return FALSE;
1584   }
1585
1586   seqnum = gst_event_get_seqnum (event);
1587
1588   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
1589     /* For instant rate seeks, reply directly and update
1590      * our segment so the new rate is reflected in any future
1591      * fragments */
1592     GstEvent *ev;
1593
1594     /* instant rate change only supported if direction does not change. All
1595      * other requirements are already checked before creating the seek event
1596      * but let's double-check here to be sure */
1597     if ((demux->segment.rate > 0 && rate < 0) ||
1598         (demux->segment.rate < 0 && rate > 0) ||
1599         start_type != GST_SEEK_TYPE_NONE ||
1600         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
1601       GST_ERROR_OBJECT (demux,
1602           "Instant rate change seeks only supported in the "
1603           "same direction, without flushing and position change");
1604       GST_MANIFEST_UNLOCK (demux);
1605       GST_API_UNLOCK (demux);
1606       return FALSE;
1607     }
1608
1609     ev = gst_event_new_instant_rate_change (rate / demux->segment.rate,
1610         (GstSegmentFlags) flags);
1611     gst_event_set_seqnum (ev, seqnum);
1612
1613     GST_MANIFEST_UNLOCK (demux);
1614
1615     ret = gst_adaptive_demux_push_src_event (demux, ev);
1616
1617     GST_API_UNLOCK (demux);
1618     gst_event_unref (event);
1619
1620     return ret;
1621   }
1622
1623   if (!gst_adaptive_demux_can_seek (demux)) {
1624     GST_MANIFEST_UNLOCK (demux);
1625     GST_API_UNLOCK (demux);
1626     gst_event_unref (event);
1627     return FALSE;
1628   }
1629
1630   if (gst_adaptive_demux_is_live (demux)) {
1631     gint64 range_start, range_stop;
1632     gboolean changed = FALSE;
1633     gboolean start_valid = TRUE, stop_valid = TRUE;
1634
1635     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1636             &range_stop)) {
1637       GST_MANIFEST_UNLOCK (demux);
1638       GST_API_UNLOCK (demux);
1639       gst_event_unref (event);
1640       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1641       return FALSE;
1642     }
1643
1644     GST_DEBUG_OBJECT (demux,
1645         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1646         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1647
1648     /* Handle relative positioning for live streams (relative to the range_stop) */
1649     if (start_type == GST_SEEK_TYPE_END) {
1650       start = range_stop + start;
1651       start_type = GST_SEEK_TYPE_SET;
1652       changed = TRUE;
1653     }
1654     if (stop_type == GST_SEEK_TYPE_END) {
1655       stop = range_stop + stop;
1656       stop_type = GST_SEEK_TYPE_SET;
1657       changed = TRUE;
1658     }
1659
1660     /* Adjust the requested start/stop position if it falls beyond the live
1661      * seek range.
1662      * The only case where we don't adjust is for the starting point of
1663      * an accurate seek (start if forward and stop if backwards)
1664      */
1665     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1666         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1667       GST_DEBUG_OBJECT (demux,
1668           "seek before live stream start, setting to range start: %"
1669           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1670       start = range_start;
1671       changed = TRUE;
1672     }
1673     /* truncate stop position also if set */
1674     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1675         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1676       GST_DEBUG_OBJECT (demux,
1677           "seek ending after live start, adjusting to: %"
1678           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1679       stop = range_stop;
1680       changed = TRUE;
1681     }
1682
1683     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1684         (start < range_start || start > range_stop)) {
1685       GST_WARNING_OBJECT (demux,
1686           "Seek to invalid position start:%" GST_STIME_FORMAT
1687           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1688           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1689           GST_STIME_ARGS (range_stop));
1690       start_valid = FALSE;
1691     }
1692     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1693         (stop < range_start || stop > range_stop)) {
1694       GST_WARNING_OBJECT (demux,
1695           "Seek to invalid position stop:%" GST_STIME_FORMAT
1696           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1697           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1698           GST_STIME_ARGS (range_stop));
1699       stop_valid = FALSE;
1700     }
1701
1702     /* If the seek position is still outside of the seekable range, refuse the seek */
1703     if (!start_valid || !stop_valid) {
1704       GST_MANIFEST_UNLOCK (demux);
1705       GST_API_UNLOCK (demux);
1706       gst_event_unref (event);
1707       return FALSE;
1708     }
1709
1710     /* Re-create seek event with changed/updated values */
1711     if (changed) {
1712       gst_event_unref (event);
1713       event =
1714           gst_event_new_seek (rate, format, flags,
1715           start_type, start, stop_type, stop);
1716       gst_event_set_seqnum (event, seqnum);
1717     }
1718   }
1719
1720   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1721
1722   /* have a backup in case seek fails */
1723   gst_segment_copy_into (&demux->segment, &oldsegment);
1724
1725   if (flags & GST_SEEK_FLAG_FLUSH) {
1726     GstEvent *fevent;
1727
1728     GST_DEBUG_OBJECT (demux, "sending flush start");
1729     fevent = gst_event_new_flush_start ();
1730     gst_event_set_seqnum (fevent, seqnum);
1731     GST_MANIFEST_UNLOCK (demux);
1732     gst_adaptive_demux_push_src_event (demux, fevent);
1733     GST_MANIFEST_LOCK (demux);
1734
1735     gst_adaptive_demux_stop_tasks (demux, FALSE);
1736   } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1737       (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1738
1739     gst_adaptive_demux_stop_tasks (demux, FALSE);
1740   }
1741
1742   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1743
1744   /*
1745    * Handle snap seeks as follows:
1746    * 1) do the snap seeking on the stream that received
1747    *    the event
1748    * 2) use the final position on this stream to seek
1749    *    on the other streams to the same position
1750    *
1751    * We can't snap at all streams at the same time as
1752    * they might end in different positions, so just
1753    * use the one that received the event as the 'leading'
1754    * one to do the snap seek.
1755    */
1756   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1757           gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1758     GstClockTime ts;
1759     GstSeekFlags stream_seek_flags = flags;
1760
1761     /* snap-seek on the stream that received the event and then
1762      * use the resulting position to seek on all streams */
1763
1764     if (rate >= 0) {
1765       if (start_type != GST_SEEK_TYPE_NONE)
1766         ts = start;
1767       else {
1768         ts = stream->segment.position;
1769         start_type = GST_SEEK_TYPE_SET;
1770       }
1771     } else {
1772       if (stop_type != GST_SEEK_TYPE_NONE)
1773         ts = stop;
1774       else {
1775         stop_type = GST_SEEK_TYPE_SET;
1776         ts = stream->segment.position;
1777       }
1778     }
1779
1780     if (stream) {
1781       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1782     }
1783
1784     /* replace event with a new one without snapping to seek on all streams */
1785     gst_event_unref (event);
1786     if (rate >= 0) {
1787       start = ts;
1788     } else {
1789       stop = ts;
1790     }
1791     event =
1792         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1793         start_type, start, stop_type, stop);
1794     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1795   }
1796   stream = NULL;
1797
1798   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1799       start, stop_type, stop, &update);
1800
1801   if (ret) {
1802     /* FIXME - this seems unatural, do_seek() is updating base when we
1803      * only want the start/stop position to change, maybe do_seek() needs
1804      * some fixing? */
1805     if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1806                 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1807                 && stop_type == GST_SEEK_TYPE_NONE))) {
1808       demux->segment.base = oldsegment.base;
1809     }
1810
1811     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1812
1813     ret = demux_class->seek (demux, event);
1814   }
1815
1816   if (!ret) {
1817     /* Is there anything else we can do if it fails? */
1818     gst_segment_copy_into (&oldsegment, &demux->segment);
1819   } else {
1820     demux->priv->segment_seqnum = seqnum;
1821   }
1822   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1823
1824   if (flags & GST_SEEK_FLAG_FLUSH) {
1825     GstEvent *fevent;
1826
1827     GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1828     fevent = gst_event_new_flush_stop (TRUE);
1829     gst_event_set_seqnum (fevent, seqnum);
1830     gst_adaptive_demux_push_src_event (demux, fevent);
1831   }
1832
1833   if (demux->next_streams) {
1834     /* If the seek generated new streams, get them
1835      * to preroll */
1836     gst_adaptive_demux_prepare_streams (demux, FALSE);
1837     gst_adaptive_demux_start_tasks (demux, TRUE);
1838   } else {
1839     GstClockTime period_start =
1840         gst_adaptive_demux_get_period_start_time (demux);
1841
1842     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1843     gst_adaptive_demux_update_streams_segment (demux, demux->streams,
1844         period_start, start_type, stop_type);
1845     gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
1846         period_start, start_type, stop_type);
1847     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1848
1849     /* Restart the demux */
1850     gst_adaptive_demux_start_tasks (demux, FALSE);
1851   }
1852
1853   GST_MANIFEST_UNLOCK (demux);
1854   GST_API_UNLOCK (demux);
1855   gst_event_unref (event);
1856
1857   return ret;
1858 }
1859
1860 static gboolean
1861 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1862     GstEvent * event)
1863 {
1864   GstAdaptiveDemux *demux;
1865
1866   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1867
1868   /* FIXME handle events received on pads that are to be removed */
1869
1870   switch (event->type) {
1871     case GST_EVENT_SEEK:
1872     {
1873       guint32 seqnum = gst_event_get_seqnum (event);
1874       if (seqnum == demux->priv->segment_seqnum) {
1875         GST_LOG_OBJECT (pad,
1876             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
1877         gst_event_unref (event);
1878         return TRUE;
1879       }
1880       return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1881     }
1882     case GST_EVENT_RECONFIGURE:{
1883       GstAdaptiveDemuxStream *stream;
1884
1885       GST_MANIFEST_LOCK (demux);
1886       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1887
1888       if (stream) {
1889         if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
1890             stream->last_ret == GST_FLOW_NOT_LINKED) {
1891           stream->last_ret = GST_FLOW_OK;
1892           stream->restart_download = TRUE;
1893           stream->need_header = TRUE;
1894           stream->discont = TRUE;
1895           GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1896           gst_task_start (stream->download_task);
1897         }
1898         gst_event_unref (event);
1899         GST_MANIFEST_UNLOCK (demux);
1900         return TRUE;
1901       }
1902       GST_MANIFEST_UNLOCK (demux);
1903     }
1904       break;
1905     case GST_EVENT_LATENCY:{
1906       /* Upstream and our internal source are irrelevant
1907        * for latency, and we should not fail here to
1908        * configure the latency */
1909       gst_event_unref (event);
1910       return TRUE;
1911     }
1912     case GST_EVENT_QOS:{
1913       GstClockTimeDiff diff;
1914       GstClockTime timestamp;
1915       GstClockTime earliest_time;
1916
1917       gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
1918       /* Only take into account lateness if late */
1919       if (diff > 0)
1920         earliest_time = timestamp + 2 * diff;
1921       else
1922         earliest_time = timestamp;
1923
1924       GST_OBJECT_LOCK (demux);
1925       if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
1926           earliest_time > demux->priv->qos_earliest_time) {
1927         demux->priv->qos_earliest_time = earliest_time;
1928         GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
1929             GST_TIME_ARGS (demux->priv->qos_earliest_time));
1930       }
1931       GST_OBJECT_UNLOCK (demux);
1932       break;
1933     }
1934     default:
1935       break;
1936   }
1937
1938   return gst_pad_event_default (pad, parent, event);
1939 }
1940
1941 static gboolean
1942 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1943     GstQuery * query)
1944 {
1945   GstAdaptiveDemux *demux;
1946   GstAdaptiveDemuxClass *demux_class;
1947   gboolean ret = FALSE;
1948
1949   if (query == NULL)
1950     return FALSE;
1951
1952   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1953   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1954
1955   switch (query->type) {
1956     case GST_QUERY_DURATION:{
1957       GstClockTime duration = -1;
1958       GstFormat fmt;
1959
1960       gst_query_parse_duration (query, &fmt, NULL);
1961
1962       if (gst_adaptive_demux_is_live (demux)) {
1963         /* We are able to answer this query: the duration is unknown */
1964         gst_query_set_duration (query, fmt, -1);
1965         ret = TRUE;
1966         break;
1967       }
1968
1969       if (fmt == GST_FORMAT_TIME
1970           && g_atomic_int_get (&demux->priv->have_manifest)) {
1971         duration = demux_class->get_duration (demux);
1972
1973         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1974           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1975           ret = TRUE;
1976         }
1977       }
1978
1979       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1980           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1981       break;
1982     }
1983     case GST_QUERY_LATENCY:{
1984       gst_query_set_latency (query, FALSE, 0, -1);
1985       ret = TRUE;
1986       break;
1987     }
1988     case GST_QUERY_SEEKING:{
1989       GstFormat fmt;
1990       gint64 stop = -1;
1991       gint64 start = 0;
1992
1993       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
1994         GST_INFO_OBJECT (demux,
1995             "Don't have manifest yet, can't answer seeking query");
1996         return FALSE;           /* can't answer without manifest */
1997       }
1998
1999       GST_MANIFEST_LOCK (demux);
2000
2001       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2002       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2003       if (fmt == GST_FORMAT_TIME) {
2004         GstClockTime duration;
2005         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2006
2007         ret = TRUE;
2008         if (can_seek) {
2009           if (gst_adaptive_demux_is_live (demux)) {
2010             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2011             if (!ret) {
2012               GST_MANIFEST_UNLOCK (demux);
2013               GST_INFO_OBJECT (demux, "can't answer seeking query");
2014               return FALSE;
2015             }
2016           } else {
2017             duration = demux_class->get_duration (demux);
2018             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2019               stop = duration;
2020           }
2021         }
2022         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2023         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2024             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2025             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2026       }
2027       GST_MANIFEST_UNLOCK (demux);
2028       break;
2029     }
2030     case GST_QUERY_URI:
2031
2032       GST_MANIFEST_LOCK (demux);
2033
2034       /* TODO HLS can answer this differently it seems */
2035       if (demux->manifest_uri) {
2036         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2037          * playlist or the the uri of the last downlowaded fragment? */
2038         gst_query_set_uri (query, demux->manifest_uri);
2039         ret = TRUE;
2040       }
2041
2042       GST_MANIFEST_UNLOCK (demux);
2043       break;
2044     default:
2045       /* Don't forward queries upstream because of the special nature of this
2046        *  "demuxer", which relies on the upstream element only to be fed
2047        *  the Manifest
2048        */
2049       break;
2050   }
2051
2052   return ret;
2053 }
2054
2055 /* must be called with manifest_lock taken */
2056 static void
2057 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2058     gboolean start_preroll_streams)
2059 {
2060   GList *iter;
2061
2062   if (!gst_adaptive_demux_is_running (demux)) {
2063     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2064     return;
2065   }
2066
2067   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2068
2069   iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2070
2071   for (; iter; iter = g_list_next (iter)) {
2072     GstAdaptiveDemuxStream *stream = iter->data;
2073
2074     if (!start_preroll_streams) {
2075       g_mutex_lock (&stream->fragment_download_lock);
2076       stream->cancelled = FALSE;
2077       stream->replaced = FALSE;
2078       g_mutex_unlock (&stream->fragment_download_lock);
2079     }
2080
2081     stream->last_ret = GST_FLOW_OK;
2082     gst_task_start (stream->download_task);
2083   }
2084 }
2085
2086 /* must be called with manifest_lock taken */
2087 static void
2088 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2089 {
2090   gst_uri_downloader_cancel (demux->downloader);
2091
2092   gst_task_stop (demux->priv->updates_task);
2093
2094   g_mutex_lock (&demux->priv->updates_timed_lock);
2095   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2096   demux->priv->stop_updates_task = TRUE;
2097   g_cond_signal (&demux->priv->updates_timed_cond);
2098   g_mutex_unlock (&demux->priv->updates_timed_lock);
2099 }
2100
2101 /* must be called with manifest_lock taken */
2102 static void
2103 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2104 {
2105   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2106
2107   if (gst_adaptive_demux_is_live (demux)) {
2108     gst_uri_downloader_reset (demux->downloader);
2109     g_mutex_lock (&demux->priv->updates_timed_lock);
2110     demux->priv->stop_updates_task = FALSE;
2111     g_mutex_unlock (&demux->priv->updates_timed_lock);
2112     /* Task to periodically update the manifest */
2113     if (demux_class->requires_periodical_playlist_update (demux)) {
2114       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2115       gst_task_start (demux->priv->updates_task);
2116     }
2117   }
2118 }
2119
2120 /* must be called with manifest_lock taken
2121  * This function will temporarily release manifest_lock in order to join the
2122  * download threads.
2123  * The api_lock will still protect it against other threads trying to modify
2124  * the demux element.
2125  */
2126 static void
2127 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2128 {
2129   int i;
2130   GList *iter;
2131   GList *list_to_process;
2132
2133   GST_LOG_OBJECT (demux, "Stopping tasks");
2134
2135   if (stop_updates)
2136     gst_adaptive_demux_stop_manifest_update_task (demux);
2137
2138   list_to_process = demux->streams;
2139   for (i = 0; i < 2; ++i) {
2140     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2141       GstAdaptiveDemuxStream *stream = iter->data;
2142
2143       g_mutex_lock (&stream->fragment_download_lock);
2144       stream->cancelled = TRUE;
2145       gst_task_stop (stream->download_task);
2146       g_cond_signal (&stream->fragment_download_cond);
2147       g_mutex_unlock (&stream->fragment_download_lock);
2148     }
2149     list_to_process = demux->prepared_streams;
2150   }
2151
2152   GST_MANIFEST_UNLOCK (demux);
2153   g_mutex_lock (&demux->priv->preroll_lock);
2154   g_cond_broadcast (&demux->priv->preroll_cond);
2155   g_mutex_unlock (&demux->priv->preroll_lock);
2156   GST_MANIFEST_LOCK (demux);
2157
2158   g_mutex_lock (&demux->priv->manifest_update_lock);
2159   g_cond_broadcast (&demux->priv->manifest_cond);
2160   g_mutex_unlock (&demux->priv->manifest_update_lock);
2161
2162   /* need to release manifest_lock before stopping the src element.
2163    * The streams were asked to cancel, so they will not make any writes to demux
2164    * object. Even if we temporarily release manifest_lock, the demux->streams
2165    * cannot change and iter cannot be invalidated.
2166    */
2167   list_to_process = demux->streams;
2168   for (i = 0; i < 2; ++i) {
2169     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2170       GstAdaptiveDemuxStream *stream = iter->data;
2171       GstElement *src = stream->src;
2172
2173       GST_MANIFEST_UNLOCK (demux);
2174
2175       if (src) {
2176         gst_element_set_locked_state (src, TRUE);
2177         gst_element_set_state (src, GST_STATE_READY);
2178       }
2179
2180       /* stream->download_task value never changes, so it is safe to read it
2181        * outside critical section
2182        */
2183       gst_task_join (stream->download_task);
2184
2185       GST_MANIFEST_LOCK (demux);
2186     }
2187     list_to_process = demux->prepared_streams;
2188   }
2189
2190   GST_MANIFEST_UNLOCK (demux);
2191   if (stop_updates)
2192     gst_task_join (demux->priv->updates_task);
2193
2194   GST_MANIFEST_LOCK (demux);
2195
2196   list_to_process = demux->streams;
2197   for (i = 0; i < 2; ++i) {
2198     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2199       GstAdaptiveDemuxStream *stream = iter->data;
2200
2201       stream->download_error_count = 0;
2202       stream->need_header = TRUE;
2203     }
2204     list_to_process = demux->prepared_streams;
2205   }
2206   demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2207 }
2208
2209 /* must be called with manifest_lock taken */
2210 static gboolean
2211 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2212 {
2213   GList *iter;
2214   gboolean ret = TRUE;
2215
2216   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2217     GstAdaptiveDemuxStream *stream = iter->data;
2218     gst_event_ref (event);
2219     ret = ret & gst_pad_push_event (stream->pad, event);
2220   }
2221   gst_event_unref (event);
2222   return ret;
2223 }
2224
2225 /* must be called with manifest_lock taken */
2226 void
2227 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2228     GstCaps * caps)
2229 {
2230   GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2231       caps);
2232   gst_caps_replace (&stream->pending_caps, caps);
2233   gst_caps_unref (caps);
2234 }
2235
2236 /* must be called with manifest_lock taken */
2237 void
2238 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2239     GstTagList * tags)
2240 {
2241   GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2242       tags);
2243   if (stream->pending_tags) {
2244     gst_tag_list_unref (stream->pending_tags);
2245   }
2246   stream->pending_tags = tags;
2247 }
2248
2249 /* must be called with manifest_lock taken */
2250 void
2251 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2252     GstEvent * event)
2253 {
2254   stream->pending_events = g_list_append (stream->pending_events, event);
2255 }
2256
2257 /* must be called with manifest_lock taken */
2258 static guint64
2259 _update_average_bitrate (GstAdaptiveDemux * demux,
2260     GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2261 {
2262   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2263
2264   stream->moving_bitrate -= stream->fragment_bitrates[index];
2265   stream->fragment_bitrates[index] = new_bitrate;
2266   stream->moving_bitrate += new_bitrate;
2267
2268   stream->moving_index += 1;
2269
2270   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2271     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2272   return stream->moving_bitrate / stream->moving_index;
2273 }
2274
2275 /* must be called with manifest_lock taken */
2276 static guint64
2277 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2278     GstAdaptiveDemuxStream * stream)
2279 {
2280   guint64 average_bitrate;
2281   guint64 fragment_bitrate;
2282
2283   if (demux->connection_speed) {
2284     GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2285         demux->connection_speed / 1000);
2286     stream->current_download_rate = demux->connection_speed;
2287     return demux->connection_speed;
2288   }
2289
2290   fragment_bitrate = stream->last_bitrate;
2291   GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2292       fragment_bitrate);
2293
2294   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2295
2296   GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2297       "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
2298   GST_INFO_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2299       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2300       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2301
2302   /* Conservative approach, make sure we don't upgrade too fast */
2303   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2304
2305   stream->current_download_rate *= demux->bitrate_limit;
2306   GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2307       G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2308
2309 #if 0
2310   /* Debugging code, modulate the bitrate every few fragments */
2311   {
2312     static guint ctr = 0;
2313     if (ctr % 3 == 0) {
2314       GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2315       stream->current_download_rate /= 2;
2316     }
2317     ctr++;
2318   }
2319 #endif
2320
2321   return stream->current_download_rate;
2322 }
2323
2324 /* must be called with manifest_lock taken */
2325 static GstFlowReturn
2326 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2327 {
2328   gboolean all_notlinked = TRUE;
2329   gboolean all_eos = TRUE;
2330   GList *iter;
2331
2332   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2333     GstAdaptiveDemuxStream *stream = iter->data;
2334
2335     if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2336       all_notlinked = FALSE;
2337       if (stream->last_ret != GST_FLOW_EOS)
2338         all_eos = FALSE;
2339     }
2340
2341     if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2342         || stream->last_ret == GST_FLOW_FLUSHING) {
2343       return stream->last_ret;
2344     }
2345   }
2346   if (all_notlinked)
2347     return GST_FLOW_NOT_LINKED;
2348   else if (all_eos)
2349     return GST_FLOW_EOS;
2350   return GST_FLOW_OK;
2351 }
2352
2353 /* Called with preroll_lock */
2354 static void
2355 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2356     GstAdaptiveDemuxStream * stream)
2357 {
2358   demux->priv->preroll_pending--;
2359   if (demux->priv->preroll_pending == 0) {
2360     /* That was the last one, time to release all streams
2361      * and expose them */
2362     GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2363     gst_adaptive_demux_expose_streams (demux);
2364     g_cond_broadcast (&demux->priv->preroll_cond);
2365   }
2366 }
2367
2368 /* must be called with manifest_lock taken.
2369  * Temporarily releases manifest_lock
2370  */
2371 GstFlowReturn
2372 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2373     GstBuffer * buffer)
2374 {
2375   GstAdaptiveDemux *demux = stream->demux;
2376   GstFlowReturn ret = GST_FLOW_OK;
2377   gboolean discont = FALSE;
2378   /* Pending events */
2379   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2380   GList *pending_events = NULL;
2381
2382   /* FIXME : 
2383    * This is duplicating *exactly* the same thing as what is done at the beginning
2384    * of _src_chain if starting_fragment is TRUE */
2385   if (stream->first_fragment_buffer) {
2386     GstClockTime offset =
2387         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2388     GstClockTime period_start =
2389         gst_adaptive_demux_get_period_start_time (demux);
2390
2391     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2392     if (demux->segment.rate < 0)
2393       /* Set DISCONT flag for every first buffer in reverse playback mode
2394        * as each fragment for its own has to be reversed */
2395       discont = TRUE;
2396
2397     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2398     if (GST_BUFFER_PTS_IS_VALID (buffer))
2399       GST_BUFFER_PTS (buffer) += offset;
2400
2401     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2402       stream->segment.position = GST_BUFFER_PTS (buffer);
2403
2404       /* Convert from position inside the stream's segment to the demuxer's
2405        * segment, they are not necessarily the same */
2406       if (stream->segment.position - offset + period_start >
2407           demux->segment.position)
2408         demux->segment.position =
2409             stream->segment.position - offset + period_start;
2410     }
2411     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2412
2413     GST_LOG_OBJECT (stream->pad,
2414         "Going to push buffer with PTS %" GST_TIME_FORMAT,
2415         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2416   } else {
2417     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2418   }
2419
2420   if (stream->discont) {
2421     discont = TRUE;
2422     stream->discont = FALSE;
2423   }
2424
2425   if (discont) {
2426     GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2427     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2428   } else {
2429     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2430   }
2431
2432   stream->first_fragment_buffer = FALSE;
2433
2434   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2435   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2436   if (G_UNLIKELY (stream->pending_caps)) {
2437     pending_caps = gst_event_new_caps (stream->pending_caps);
2438     gst_caps_unref (stream->pending_caps);
2439     stream->pending_caps = NULL;
2440   }
2441
2442   if (stream->do_block) {
2443
2444     g_mutex_lock (&demux->priv->preroll_lock);
2445
2446     /* If we are preroll state, set caps in here */
2447     if (pending_caps) {
2448       gst_pad_push_event (stream->pad, pending_caps);
2449       pending_caps = NULL;
2450     }
2451
2452     gst_adaptive_demux_handle_preroll (demux, stream);
2453     GST_MANIFEST_UNLOCK (demux);
2454
2455     while (stream->do_block && !stream->cancelled) {
2456       GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2457       g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2458     }
2459     if (stream->cancelled) {
2460       GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2461       gst_buffer_unref (buffer);
2462       g_mutex_unlock (&demux->priv->preroll_lock);
2463       return GST_FLOW_FLUSHING;
2464     }
2465
2466     g_mutex_unlock (&demux->priv->preroll_lock);
2467     GST_MANIFEST_LOCK (demux);
2468   }
2469
2470   if (G_UNLIKELY (stream->pending_segment)) {
2471     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2472     pending_segment = stream->pending_segment;
2473     stream->pending_segment = NULL;
2474     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2475   }
2476   if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2477     GstTagList *tags = stream->pending_tags;
2478
2479     stream->pending_tags = NULL;
2480     stream->bitrate_changed = 0;
2481
2482     if (stream->fragment.bitrate != 0) {
2483       if (tags)
2484         tags = gst_tag_list_make_writable (tags);
2485       else
2486         tags = gst_tag_list_new_empty ();
2487
2488       gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2489           GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2490     }
2491     if (tags)
2492       pending_tags = gst_event_new_tag (tags);
2493   }
2494   if (G_UNLIKELY (stream->pending_events)) {
2495     pending_events = stream->pending_events;
2496     stream->pending_events = NULL;
2497   }
2498
2499   GST_MANIFEST_UNLOCK (demux);
2500
2501   /* Do not push events or buffers holding the manifest lock */
2502   if (G_UNLIKELY (pending_caps)) {
2503     GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2504         pending_caps);
2505     gst_pad_push_event (stream->pad, pending_caps);
2506   }
2507   if (G_UNLIKELY (pending_segment)) {
2508     GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2509         pending_segment);
2510     gst_pad_push_event (stream->pad, pending_segment);
2511   }
2512   if (G_UNLIKELY (pending_tags)) {
2513     GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2514         pending_tags);
2515     gst_pad_push_event (stream->pad, pending_tags);
2516   }
2517   while (pending_events != NULL) {
2518     GstEvent *event = pending_events->data;
2519
2520     if (!gst_pad_push_event (stream->pad, event))
2521       GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2522
2523     pending_events = g_list_delete_link (pending_events, pending_events);
2524   }
2525
2526   /* Wait for preroll if blocking */
2527   GST_DEBUG_OBJECT (stream->pad,
2528       "About to push buffer of size %" G_GSIZE_FORMAT,
2529       gst_buffer_get_size (buffer));
2530
2531   ret = gst_pad_push (stream->pad, buffer);
2532
2533   GST_MANIFEST_LOCK (demux);
2534
2535   g_mutex_lock (&stream->fragment_download_lock);
2536   if (G_UNLIKELY (stream->cancelled)) {
2537     GST_LOG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
2538         "Stream was cancelled");
2539     ret = stream->last_ret = GST_FLOW_FLUSHING;
2540     g_mutex_unlock (&stream->fragment_download_lock);
2541     return ret;
2542   }
2543   g_mutex_unlock (&stream->fragment_download_lock);
2544
2545   GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2546       gst_flow_get_name (ret));
2547
2548   return ret;
2549 }
2550
2551 /* must be called with manifest_lock taken */
2552 static GstFlowReturn
2553 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2554     GstAdaptiveDemuxStream * stream)
2555 {
2556   /* No need to advance, this isn't a real fragment */
2557   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2558     return GST_FLOW_OK;
2559
2560   return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2561       stream->fragment.duration);
2562 }
2563
2564 /* must be called with manifest_lock taken.
2565  * Can temporarily release manifest_lock
2566  */
2567 static GstFlowReturn
2568 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2569     GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2570 {
2571   return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2572 }
2573
2574 static gboolean
2575 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2576     * demux)
2577 {
2578   return TRUE;
2579 }
2580
2581 static GstFlowReturn
2582 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2583 {
2584   GstAdaptiveDemuxStream *stream;
2585   GstAdaptiveDemux *demux;
2586   GstAdaptiveDemuxClass *klass;
2587   GstFlowReturn ret = GST_FLOW_OK;
2588
2589   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2590   stream = gst_pad_get_element_private (pad);
2591   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2592
2593   GST_MANIFEST_LOCK (demux);
2594
2595   /* do not make any changes if the stream is cancelled */
2596   g_mutex_lock (&stream->fragment_download_lock);
2597   if (G_UNLIKELY (stream->cancelled)) {
2598     g_mutex_unlock (&stream->fragment_download_lock);
2599     gst_buffer_unref (buffer);
2600     ret = stream->last_ret = GST_FLOW_FLUSHING;
2601     GST_MANIFEST_UNLOCK (demux);
2602     return ret;
2603   }
2604   g_mutex_unlock (&stream->fragment_download_lock);
2605
2606   /* starting_fragment is set to TRUE at the beginning of
2607    * _stream_download_fragment()
2608    * /!\ If there is a header/index being downloaded, then this will
2609    * be TRUE for the first one ... but FALSE for the remaining ones,
2610    * including the *actual* fragment ! */
2611   if (stream->starting_fragment) {
2612     GstClockTime offset =
2613         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2614     GstClockTime period_start =
2615         gst_adaptive_demux_get_period_start_time (demux);
2616
2617     stream->starting_fragment = FALSE;
2618     if (klass->start_fragment) {
2619       if (!klass->start_fragment (demux, stream)) {
2620         ret = GST_FLOW_ERROR;
2621         goto error;
2622       }
2623     }
2624
2625     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2626     if (GST_BUFFER_PTS_IS_VALID (buffer))
2627       GST_BUFFER_PTS (buffer) += offset;
2628
2629     GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2630         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2631
2632     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2633       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2634       stream->segment.position = GST_BUFFER_PTS (buffer);
2635
2636       /* Convert from position inside the stream's segment to the demuxer's
2637        * segment, they are not necessarily the same */
2638       if (stream->segment.position - offset + period_start >
2639           demux->segment.position)
2640         demux->segment.position =
2641             stream->segment.position - offset + period_start;
2642       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2643     }
2644
2645   } else {
2646     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2647   }
2648
2649   /* downloading_first_buffer is set to TRUE in download_uri() just before
2650    * activating the source (i.e. requesting a given URI)
2651    *
2652    * The difference with starting_fragment is that this will be called
2653    * for *all* first buffers (of index, and header, and fragment)
2654    *
2655    * ... to then only do something useful (in this block) for actual
2656    * fragments... */
2657   if (stream->downloading_first_buffer) {
2658     gint64 chunk_size = 0;
2659
2660     stream->downloading_first_buffer = FALSE;
2661
2662     if (!stream->downloading_header && !stream->downloading_index) {
2663       /* If this is the first buffer of a fragment (not the headers or index)
2664        * and we don't have a birate from the sub-class, then see if we
2665        * can work it out from the fragment size and duration */
2666       if (stream->fragment.bitrate == 0 &&
2667           stream->fragment.duration != 0 &&
2668           gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2669               &chunk_size) && chunk_size != -1) {
2670         guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2671                 8 * GST_SECOND, stream->fragment.duration));
2672         GST_LOG_OBJECT (demux,
2673             "Fragment has size %" G_GINT64_FORMAT " duration %" GST_TIME_FORMAT
2674             " = bitrate %u", chunk_size,
2675             GST_TIME_ARGS (stream->fragment.duration), bitrate);
2676         stream->fragment.bitrate = bitrate;
2677       }
2678       if (stream->fragment.bitrate) {
2679         stream->bitrate_changed = TRUE;
2680       } else {
2681         GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2682       }
2683     }
2684   }
2685
2686   stream->download_total_bytes += gst_buffer_get_size (buffer);
2687
2688   GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2689       gst_buffer_get_size (buffer));
2690
2691   ret = klass->data_received (demux, stream, buffer);
2692
2693   if (ret == GST_FLOW_FLUSHING) {
2694     /* do not make any changes if the stream is cancelled */
2695     g_mutex_lock (&stream->fragment_download_lock);
2696     if (G_UNLIKELY (stream->cancelled)) {
2697       g_mutex_unlock (&stream->fragment_download_lock);
2698       GST_MANIFEST_UNLOCK (demux);
2699       return ret;
2700     }
2701     g_mutex_unlock (&stream->fragment_download_lock);
2702   }
2703
2704   if (ret != GST_FLOW_OK) {
2705     gboolean finished = FALSE;
2706
2707     if (ret < GST_FLOW_EOS) {
2708       GST_ELEMENT_FLOW_ERROR (demux, ret);
2709
2710       /* TODO push this on all pads */
2711       gst_pad_push_event (stream->pad, gst_event_new_eos ());
2712     } else {
2713       GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2714           gst_flow_get_name (ret));
2715     }
2716
2717     if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2718       ret = GST_FLOW_EOS;       /* return EOS to make the source stop */
2719     } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
2720       /* Behaves like an EOS event from upstream */
2721       stream->fragment.finished = TRUE;
2722       ret = klass->finish_fragment (demux, stream);
2723       if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2724         ret = GST_FLOW_EOS;     /* return EOS to make the source stop */
2725       } else if (ret != GST_FLOW_OK) {
2726         goto error;
2727       }
2728       finished = TRUE;
2729     }
2730
2731     gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2732     if (finished)
2733       ret = GST_FLOW_EOS;
2734   }
2735
2736 error:
2737
2738   GST_MANIFEST_UNLOCK (demux);
2739
2740   return ret;
2741 }
2742
2743 /* must be called with manifest_lock taken */
2744 static void
2745 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2746     stream, GstFlowReturn ret, GError * err)
2747 {
2748   GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2749       gst_flow_get_name (ret), err);
2750
2751   /* if we have an error, only replace last_ret if it was OK before to avoid
2752    * overwriting the first error we got */
2753   if (stream->last_ret == GST_FLOW_OK) {
2754     stream->last_ret = ret;
2755     if (err) {
2756       g_clear_error (&stream->last_error);
2757       stream->last_error = g_error_copy (err);
2758     }
2759   }
2760   g_mutex_lock (&stream->fragment_download_lock);
2761   stream->download_finished = TRUE;
2762   g_cond_signal (&stream->fragment_download_cond);
2763   g_mutex_unlock (&stream->fragment_download_lock);
2764 }
2765
2766 static GstFlowReturn
2767 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
2768 {
2769   GstFlowReturn ret = GST_FLOW_OK;
2770   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
2771
2772   if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
2773       || !klass->need_another_chunk (stream)
2774       || stream->fragment.chunk_size == 0) {
2775     stream->fragment.finished = TRUE;
2776
2777     /* Last chance to figure out a fallback nominal bitrate if neither baseclass
2778        nor the HTTP Content-Length implementation worked. */
2779     if (stream->fragment.bitrate == 0 && stream->fragment.duration != 0 &&
2780         stream->fragment_bytes_downloaded != 0 && !stream->downloading_index &&
2781         !stream->downloading_header) {
2782       guint bitrate = MIN (G_MAXUINT,
2783           gst_util_uint64_scale (stream->fragment_bytes_downloaded,
2784               8 * GST_SECOND, stream->fragment.duration));
2785       GST_LOG_OBJECT (stream->pad,
2786           "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2787           " = bitrate %u", stream->fragment_bytes_downloaded,
2788           GST_TIME_ARGS (stream->fragment.duration), bitrate);
2789       stream->fragment.bitrate = bitrate;
2790       stream->bitrate_changed = TRUE;
2791     }
2792     ret = klass->finish_fragment (stream->demux, stream);
2793   }
2794   gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2795
2796   return ret;
2797 }
2798
2799 static gboolean
2800 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2801 {
2802   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2803   GstAdaptiveDemux *demux = stream->demux;
2804
2805   switch (GST_EVENT_TYPE (event)) {
2806     case GST_EVENT_EOS:{
2807       GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
2808       GST_MANIFEST_LOCK (demux);
2809
2810       gst_adaptive_demux_eos_handling (stream);
2811
2812       /* FIXME ?
2813        * _eos_handling() calls  fragment_download_finish() which does the
2814        * same thing as below.
2815        * Could this cause races ? */
2816       g_mutex_lock (&stream->fragment_download_lock);
2817       stream->download_finished = TRUE;
2818       g_cond_signal (&stream->fragment_download_cond);
2819       g_mutex_unlock (&stream->fragment_download_lock);
2820
2821       GST_MANIFEST_UNLOCK (demux);
2822       break;
2823     }
2824     default:
2825       break;
2826   }
2827
2828   gst_event_unref (event);
2829
2830   return TRUE;
2831 }
2832
2833 static gboolean
2834 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2835 {
2836   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2837
2838   switch (GST_QUERY_TYPE (query)) {
2839     case GST_QUERY_ALLOCATION:
2840       return FALSE;
2841       break;
2842     default:
2843       break;
2844   }
2845
2846   return gst_pad_peer_query (stream->pad, query);
2847 }
2848
2849 static GstPadProbeReturn
2850 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
2851     GstAdaptiveDemuxStream * stream)
2852 {
2853   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2854
2855   if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
2856     GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2857     if (stream->fragment_bytes_downloaded == 0) {
2858       stream->last_latency =
2859           gst_adaptive_demux_get_monotonic_time (stream->demux) -
2860           (stream->download_start_time * GST_USECOND);
2861       GST_DEBUG_OBJECT (pad,
2862           "FIRST BYTE since download_start %" GST_TIME_FORMAT,
2863           GST_TIME_ARGS (stream->last_latency));
2864     }
2865     stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
2866     GST_LOG_OBJECT (pad,
2867         "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
2868         gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
2869   } else if (GST_PAD_PROBE_INFO_TYPE (info) &
2870       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
2871     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2872     GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
2873         GST_EVENT_TYPE_NAME (ev), ev);
2874     switch (GST_EVENT_TYPE (ev)) {
2875       case GST_EVENT_SEGMENT:
2876         stream->fragment_bytes_downloaded = 0;
2877         break;
2878       case GST_EVENT_EOS:
2879       {
2880         stream->last_download_time =
2881             gst_adaptive_demux_get_monotonic_time (stream->demux) -
2882             (stream->download_start_time * GST_USECOND);
2883         stream->last_bitrate =
2884             gst_util_uint64_scale (stream->fragment_bytes_downloaded,
2885             8 * GST_SECOND, stream->last_download_time);
2886         GST_DEBUG_OBJECT (pad,
2887             "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
2888             G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
2889             stream->last_bitrate);
2890         /* Calculate bitrate since URI request */
2891       }
2892         break;
2893       default:
2894         break;
2895     }
2896   }
2897
2898   return ret;
2899 }
2900
2901 /* must be called with manifest_lock taken.
2902  * Can temporarily release manifest_lock
2903  */
2904 static gboolean
2905 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2906     GstAdaptiveDemuxStream * stream)
2907 {
2908   gboolean ret = TRUE;
2909
2910   /* Wait until we're cancelled or there's something for
2911    * us to download in the playlist or the playlist
2912    * became non-live */
2913   while (TRUE) {
2914     GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2915
2916     /* get the manifest_update_lock while still holding the manifest_lock.
2917      * This will prevent other threads to signal the condition (they will need
2918      * both manifest_lock and manifest_update_lock in order to signal).
2919      * It cannot deadlock because all threads always get the manifest_lock first
2920      * and manifest_update_lock second.
2921      */
2922     g_mutex_lock (&demux->priv->manifest_update_lock);
2923
2924     GST_MANIFEST_UNLOCK (demux);
2925
2926     g_cond_wait (&demux->priv->manifest_cond,
2927         &demux->priv->manifest_update_lock);
2928     g_mutex_unlock (&demux->priv->manifest_update_lock);
2929
2930     GST_MANIFEST_LOCK (demux);
2931
2932     /* check for cancelled every time we get the manifest_lock */
2933     g_mutex_lock (&stream->fragment_download_lock);
2934     if (G_UNLIKELY (stream->cancelled)) {
2935       ret = FALSE;
2936       stream->last_ret = GST_FLOW_FLUSHING;
2937       g_mutex_unlock (&stream->fragment_download_lock);
2938       break;
2939     }
2940     g_mutex_unlock (&stream->fragment_download_lock);
2941
2942     /* Got a new fragment or not live anymore? */
2943     if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
2944         GST_FLOW_OK) {
2945       GST_DEBUG_OBJECT (demux, "new fragment available, "
2946           "not waiting for manifest update");
2947       ret = TRUE;
2948       break;
2949     }
2950
2951     if (!gst_adaptive_demux_is_live (demux)) {
2952       GST_DEBUG_OBJECT (demux, "Not live anymore, "
2953           "not waiting for manifest update");
2954       ret = FALSE;
2955       break;
2956     }
2957   }
2958   GST_DEBUG_OBJECT (demux, "Retrying now");
2959   return ret;
2960 }
2961
2962 /* must be called with manifest_lock taken */
2963 static gboolean
2964 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2965     const gchar * uri, const gchar * referer, gboolean refresh,
2966     gboolean allow_cache)
2967 {
2968   GstAdaptiveDemux *demux = stream->demux;
2969
2970   if (!gst_uri_is_valid (uri)) {
2971     GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2972     return FALSE;
2973   }
2974
2975   /* Try to re-use existing source element */
2976   if (stream->src != NULL) {
2977     gchar *old_protocol, *new_protocol;
2978     gchar *old_uri;
2979
2980     old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2981     old_protocol = gst_uri_get_protocol (old_uri);
2982     new_protocol = gst_uri_get_protocol (uri);
2983
2984     if (!g_str_equal (old_protocol, new_protocol)) {
2985       GstElement *src = stream->src;
2986
2987       stream->src = NULL;
2988       gst_object_unref (stream->src_srcpad);
2989       stream->src_srcpad = NULL;
2990       GST_MANIFEST_UNLOCK (demux);
2991       gst_element_set_locked_state (src, TRUE);
2992       gst_element_set_state (src, GST_STATE_NULL);
2993       gst_bin_remove (GST_BIN_CAST (demux), src);
2994       GST_MANIFEST_LOCK (demux);
2995       GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2996     } else {
2997       GError *err = NULL;
2998
2999       GST_DEBUG_OBJECT (demux, "Re-using old source element");
3000       if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
3001               &err)) {
3002         GstElement *src = stream->src;
3003
3004         stream->src = NULL;
3005         GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
3006             err ? err->message : "Unknown error");
3007         g_clear_error (&err);
3008         gst_object_unref (stream->src_srcpad);
3009         stream->src_srcpad = NULL;
3010         GST_MANIFEST_UNLOCK (demux);
3011         gst_element_set_locked_state (src, TRUE);
3012         gst_element_set_state (src, GST_STATE_NULL);
3013         gst_bin_remove (GST_BIN_CAST (demux), src);
3014         GST_MANIFEST_LOCK (demux);
3015       }
3016     }
3017     g_free (old_uri);
3018     g_free (old_protocol);
3019     g_free (new_protocol);
3020   }
3021
3022   if (stream->src == NULL) {
3023     GstPad *uri_handler_src;
3024     GstPad *queue_sink;
3025     GstPad *queue_src;
3026     GstElement *uri_handler;
3027     GstElement *queue;
3028     GstPadLinkReturn pad_link_ret;
3029     GObjectClass *gobject_class;
3030     gchar *internal_name, *bin_name;
3031
3032     /* Our src consists of a bin containing uri_handler -> queue . The
3033      * purpose of the queue is to allow the uri_handler to download an
3034      * entire fragment without blocking, so we can accurately measure the
3035      * download bitrate. */
3036
3037     queue = gst_element_factory_make ("queue", NULL);
3038     if (queue == NULL)
3039       return FALSE;
3040
3041     g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
3042     g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
3043     g_object_set (queue, "max-size-time", (guint64) 0, NULL);
3044
3045     uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3046     if (uri_handler == NULL) {
3047       GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
3048           ("Missing plugin to handle URI: '%s'", uri), (NULL));
3049       gst_object_unref (queue);
3050       return FALSE;
3051     }
3052
3053     gobject_class = G_OBJECT_GET_CLASS (uri_handler);
3054
3055     if (g_object_class_find_property (gobject_class, "compress"))
3056       g_object_set (uri_handler, "compress", FALSE, NULL);
3057     if (g_object_class_find_property (gobject_class, "keep-alive"))
3058       g_object_set (uri_handler, "keep-alive", TRUE, NULL);
3059     if (g_object_class_find_property (gobject_class, "extra-headers")) {
3060       if (referer || refresh || !allow_cache) {
3061         GstStructure *extra_headers = gst_structure_new_empty ("headers");
3062
3063         if (referer)
3064           gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3065               NULL);
3066
3067         if (!allow_cache)
3068           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3069               "no-cache", NULL);
3070         else if (refresh)
3071           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3072               "max-age=0", NULL);
3073
3074         g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3075
3076         gst_structure_free (extra_headers);
3077       } else {
3078         g_object_set (uri_handler, "extra-headers", NULL, NULL);
3079       }
3080     }
3081
3082     /* Source bin creation */
3083     bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3084     stream->src = gst_bin_new (bin_name);
3085     g_free (bin_name);
3086     if (stream->src == NULL) {
3087       gst_object_unref (queue);
3088       gst_object_unref (uri_handler);
3089       return FALSE;
3090     }
3091
3092     gst_bin_add (GST_BIN_CAST (stream->src), queue);
3093     gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3094
3095     uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3096     queue_sink = gst_element_get_static_pad (queue, "sink");
3097
3098     pad_link_ret =
3099         gst_pad_link_full (uri_handler_src, queue_sink,
3100         GST_PAD_LINK_CHECK_NOTHING);
3101     if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3102       GST_WARNING_OBJECT (demux,
3103           "Could not link pads %s:%s to %s:%s for reason %d",
3104           GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3105           pad_link_ret);
3106       g_object_unref (queue_sink);
3107       g_object_unref (uri_handler_src);
3108       gst_object_unref (stream->src);
3109       stream->src = NULL;
3110       return FALSE;
3111     }
3112
3113     /* Add a downstream event and data probe */
3114     gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3115         (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3116
3117     g_object_unref (queue_sink);
3118     g_object_unref (uri_handler_src);
3119     queue_src = gst_element_get_static_pad (queue, "src");
3120     stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3121     g_object_unref (queue_src);
3122     gst_element_add_pad (stream->src, stream->src_srcpad);
3123
3124     gst_element_set_locked_state (stream->src, TRUE);
3125     gst_bin_add (GST_BIN_CAST (demux), stream->src);
3126     stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3127
3128     /* set up our internal floating pad to drop all events from
3129      * the http src we don't care about. On the chain function
3130      * we just push the buffer forward */
3131     internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3132     stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3133     g_free (internal_name);
3134     gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3135         GST_OBJECT_CAST (demux));
3136     GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3137     gst_pad_set_element_private (stream->internal_pad, stream);
3138     gst_pad_set_active (stream->internal_pad, TRUE);
3139     gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3140     gst_pad_set_event_function (stream->internal_pad, _src_event);
3141     gst_pad_set_query_function (stream->internal_pad, _src_query);
3142
3143     if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3144             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3145       GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3146       return FALSE;
3147     }
3148
3149     stream->uri_handler = uri_handler;
3150     stream->queue = queue;
3151
3152     stream->last_status_code = 200;     /* default to OK */
3153   }
3154   return TRUE;
3155 }
3156
3157 static GstPadProbeReturn
3158 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3159     gpointer user_data)
3160 {
3161   GstAdaptiveDemuxStream *stream = user_data;
3162
3163   /* The source's src pad is IDLE so now set the state to READY */
3164   g_mutex_lock (&stream->fragment_download_lock);
3165   stream->src_at_ready = TRUE;
3166   g_cond_signal (&stream->fragment_download_cond);
3167   g_mutex_unlock (&stream->fragment_download_lock);
3168
3169   return GST_PAD_PROBE_REMOVE;
3170 }
3171
3172 #ifndef GST_DISABLE_GST_DEBUG
3173 static const char *
3174 uritype (GstAdaptiveDemuxStream * s)
3175 {
3176   if (s->downloading_header)
3177     return "header";
3178   if (s->downloading_index)
3179     return "index";
3180   return "fragment";
3181 }
3182 #endif
3183
3184 /* must be called with manifest_lock taken.
3185  * Can temporarily release manifest_lock
3186  *
3187  * Will return when URI is fully downloaded (or aborted/errored)
3188  */
3189 static GstFlowReturn
3190 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3191     GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3192     gint64 end, guint * http_status)
3193 {
3194   GstFlowReturn ret = GST_FLOW_OK;
3195   GST_DEBUG_OBJECT (stream->pad,
3196       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3197       uritype (stream), uri, start, end);
3198
3199   if (http_status)
3200     *http_status = 200;         /* default to ok if no further information */
3201
3202   if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3203     ret = stream->last_ret = GST_FLOW_ERROR;
3204     return ret;
3205   }
3206
3207   gst_element_set_locked_state (stream->src, TRUE);
3208
3209   GST_MANIFEST_UNLOCK (demux);
3210   if (gst_element_set_state (stream->src,
3211           GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3212     /* If ranges are specified, seek to it */
3213     if (start != 0 || end != -1) {
3214       /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3215        * stop position */
3216       if (end != -1)
3217         end += 1;
3218       /* Send the seek event to the uri_handler, as the other pipeline elements
3219        * can't handle it when READY. */
3220       if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3221                   GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3222                   GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3223
3224         GST_MANIFEST_LOCK (demux);
3225         /* looks like the source can't handle seeks in READY */
3226         g_clear_error (&stream->last_error);
3227         stream->last_error = g_error_new (GST_CORE_ERROR,
3228             GST_CORE_ERROR_NOT_IMPLEMENTED,
3229             "Source element can't handle range requests");
3230         stream->last_ret = GST_FLOW_ERROR;
3231       } else {
3232         GST_MANIFEST_LOCK (demux);
3233       }
3234     } else {
3235       GST_MANIFEST_LOCK (demux);
3236     }
3237
3238     if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3239       stream->download_start_time =
3240           GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3241
3242       /* src element is in state READY. Before we start it, we reset
3243        * download_finished
3244        */
3245       g_mutex_lock (&stream->fragment_download_lock);
3246       stream->download_finished = FALSE;
3247       stream->downloading_first_buffer = TRUE;
3248       g_mutex_unlock (&stream->fragment_download_lock);
3249
3250       GST_MANIFEST_UNLOCK (demux);
3251
3252       if (!gst_element_sync_state_with_parent (stream->src)) {
3253         GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3254         GST_MANIFEST_LOCK (demux);
3255         ret = stream->last_ret = GST_FLOW_ERROR;
3256         return ret;
3257       }
3258
3259       /* wait for the fragment to be completely downloaded */
3260       GST_DEBUG_OBJECT (stream->pad,
3261           "Waiting for %s download to finish: %s", uritype (stream), uri);
3262
3263       g_mutex_lock (&stream->fragment_download_lock);
3264       stream->src_at_ready = FALSE;
3265       if (G_UNLIKELY (stream->cancelled)) {
3266         g_mutex_unlock (&stream->fragment_download_lock);
3267         GST_MANIFEST_LOCK (demux);
3268         ret = stream->last_ret = GST_FLOW_FLUSHING;
3269         return ret;
3270       }
3271       /* download_finished is only set:
3272        * * in ::fragment_download_finish()
3273        * * if EOS is received on the _src pad
3274        * */
3275       while (!stream->cancelled && !stream->download_finished) {
3276         g_cond_wait (&stream->fragment_download_cond,
3277             &stream->fragment_download_lock);
3278       }
3279       g_mutex_unlock (&stream->fragment_download_lock);
3280
3281       GST_DEBUG_OBJECT (stream->pad,
3282           "Finished Waiting for %s download: %s", uritype (stream), uri);
3283
3284       GST_MANIFEST_LOCK (demux);
3285       g_mutex_lock (&stream->fragment_download_lock);
3286       if (G_UNLIKELY (stream->cancelled)) {
3287         ret = stream->last_ret = GST_FLOW_FLUSHING;
3288         g_mutex_unlock (&stream->fragment_download_lock);
3289         return ret;
3290       }
3291       g_mutex_unlock (&stream->fragment_download_lock);
3292
3293       ret = stream->last_ret;
3294
3295       GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3296           uritype (stream), uri, stream->last_ret,
3297           gst_flow_get_name (stream->last_ret));
3298       if (stream->last_ret != GST_FLOW_OK && http_status) {
3299         *http_status = stream->last_status_code;
3300       }
3301     }
3302
3303     /* changing src element state might try to join the streaming thread, so
3304      * we must not hold the manifest lock.
3305      */
3306     GST_MANIFEST_UNLOCK (demux);
3307   } else {
3308     GST_MANIFEST_UNLOCK (demux);
3309     if (stream->last_ret == GST_FLOW_OK)
3310       stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3311     ret = GST_FLOW_CUSTOM_ERROR;
3312   }
3313
3314   stream->src_at_ready = FALSE;
3315
3316   gst_element_set_locked_state (stream->src, TRUE);
3317   gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3318       gst_ad_stream_src_to_ready_cb, stream, NULL);
3319
3320   g_mutex_lock (&stream->fragment_download_lock);
3321   while (!stream->src_at_ready) {
3322     g_cond_wait (&stream->fragment_download_cond,
3323         &stream->fragment_download_lock);
3324   }
3325   g_mutex_unlock (&stream->fragment_download_lock);
3326
3327   gst_element_set_state (stream->src, GST_STATE_READY);
3328
3329   /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3330   GST_MANIFEST_LOCK (demux);
3331   g_mutex_lock (&stream->fragment_download_lock);
3332   if (G_UNLIKELY (stream->cancelled)) {
3333     ret = stream->last_ret = GST_FLOW_FLUSHING;
3334     g_mutex_unlock (&stream->fragment_download_lock);
3335     return ret;
3336   }
3337   g_mutex_unlock (&stream->fragment_download_lock);
3338
3339   /* deactivate and reactivate our ghostpad to make it fresh for a new
3340    * stream */
3341   gst_pad_set_active (stream->internal_pad, FALSE);
3342   gst_pad_set_active (stream->internal_pad, TRUE);
3343
3344   return ret;
3345 }
3346
3347 /* must be called with manifest_lock taken.
3348  * Can temporarily release manifest_lock
3349  */
3350 static GstFlowReturn
3351 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3352     stream)
3353 {
3354   GstAdaptiveDemux *demux = stream->demux;
3355   GstFlowReturn ret = GST_FLOW_OK;
3356
3357   if (stream->fragment.header_uri != NULL) {
3358     GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3359         G_GINT64_FORMAT, stream->fragment.header_uri,
3360         stream->fragment.header_range_start, stream->fragment.header_range_end);
3361
3362     stream->downloading_header = TRUE;
3363     ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3364         stream->fragment.header_uri, stream->fragment.header_range_start,
3365         stream->fragment.header_range_end, NULL);
3366     stream->downloading_header = FALSE;
3367   }
3368
3369   /* check if we have an index */
3370   if (ret == GST_FLOW_OK) {     /* TODO check for other valid types */
3371
3372     if (stream->fragment.index_uri != NULL) {
3373       GST_DEBUG_OBJECT (demux,
3374           "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3375           stream->fragment.index_uri,
3376           stream->fragment.index_range_start, stream->fragment.index_range_end);
3377       stream->downloading_index = TRUE;
3378       ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3379           stream->fragment.index_uri, stream->fragment.index_range_start,
3380           stream->fragment.index_range_end, NULL);
3381       stream->downloading_index = FALSE;
3382     }
3383   }
3384
3385   return ret;
3386 }
3387
3388 /* must be called with manifest_lock taken.
3389  * Can temporarily release manifest_lock
3390  */
3391 static GstFlowReturn
3392 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3393 {
3394   GstAdaptiveDemux *demux = stream->demux;
3395   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3396   gchar *url = NULL;
3397   GstFlowReturn ret;
3398   gboolean retried_once = FALSE, live;
3399   guint http_status;
3400   guint last_status_code;
3401
3402   /* FIXME :  */
3403   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3404   stream->starting_fragment = TRUE;
3405   stream->last_ret = GST_FLOW_OK;
3406   stream->first_fragment_buffer = TRUE;
3407
3408   GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3409       stream->fragment.uri ? "FRAGMENT " : "",
3410       stream->fragment.header_uri ? "HEADER " : "",
3411       stream->fragment.index_uri ? "INDEX" : "");
3412
3413   if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3414       stream->fragment.index_uri == NULL)
3415     goto no_url_error;
3416
3417   if (stream->need_header) {
3418     ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3419     if (ret != GST_FLOW_OK) {
3420       return ret;
3421     }
3422     stream->need_header = FALSE;
3423   }
3424
3425 again:
3426   ret = GST_FLOW_OK;
3427   url = stream->fragment.uri;
3428   GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3429   if (!url)
3430     return ret;
3431
3432   stream->last_ret = GST_FLOW_OK;
3433   http_status = 200;
3434
3435   /* Download the actual fragment, either in fragments or in one go */
3436   if (klass->need_another_chunk && klass->need_another_chunk (stream)
3437       && stream->fragment.chunk_size != 0) {
3438     /* Handle chunk downloading */
3439     gint64 range_start, range_end, chunk_start, chunk_end;
3440     guint64 download_total_bytes;
3441     gint chunk_size = stream->fragment.chunk_size;
3442
3443     range_start = chunk_start = stream->fragment.range_start;
3444     range_end = stream->fragment.range_end;
3445     /* HTTP ranges are inclusive for the end */
3446     if (chunk_size != -1)
3447       chunk_end = range_start + chunk_size - 1;
3448     else
3449       chunk_end = range_end;
3450
3451     if (range_end != -1)
3452       chunk_end = MIN (chunk_end, range_end);
3453
3454     while (!stream->fragment.finished && (chunk_start <= range_end
3455             || range_end == -1)) {
3456       download_total_bytes = stream->download_total_bytes;
3457
3458       ret =
3459           gst_adaptive_demux_stream_download_uri (demux, stream, url,
3460           chunk_start, chunk_end, &http_status);
3461
3462       GST_DEBUG_OBJECT (stream->pad,
3463           "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3464           http_status, gst_flow_get_name (stream->last_ret));
3465
3466       /* Don't retry for any chunks except the first. We would have sent
3467        * data downstream already otherwise and it's difficult to recover
3468        * from that in a meaningful way */
3469       if (chunk_start > range_start)
3470         retried_once = TRUE;
3471
3472       /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3473        * downloading up to -1. We don't know the full duration.
3474        * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3475       if (ret != GST_FLOW_OK && chunk_end == -1) {
3476         break;
3477       } else if (ret != GST_FLOW_OK) {
3478         chunk_end = -1;
3479         stream->last_ret = GST_FLOW_OK;
3480         continue;
3481       }
3482
3483       if (chunk_end == -1)
3484         break;
3485
3486       /* Short read, we're at the end now */
3487       if (stream->download_total_bytes - download_total_bytes <
3488           chunk_end + 1 - chunk_start)
3489         break;
3490
3491       if (!klass->need_another_chunk (stream))
3492         break;
3493
3494       /* HTTP ranges are inclusive for the end */
3495       chunk_start += chunk_size;
3496       chunk_size = stream->fragment.chunk_size;
3497       if (chunk_size != -1)
3498         chunk_end = chunk_start + chunk_size - 1;
3499       else
3500         chunk_end = range_end;
3501
3502       if (range_end != -1)
3503         chunk_end = MIN (chunk_end, range_end);
3504     }
3505   } else {
3506     ret =
3507         gst_adaptive_demux_stream_download_uri (demux, stream, url,
3508         stream->fragment.range_start, stream->fragment.range_end, &http_status);
3509     GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3510         stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3511   }
3512   if (ret == GST_FLOW_OK)
3513     goto beach;
3514
3515   g_mutex_lock (&stream->fragment_download_lock);
3516   if (G_UNLIKELY (stream->cancelled)) {
3517     g_mutex_unlock (&stream->fragment_download_lock);
3518     return ret;
3519   }
3520   g_mutex_unlock (&stream->fragment_download_lock);
3521
3522   /* TODO check if we are truly stopping */
3523   if (ret != GST_FLOW_CUSTOM_ERROR)
3524     goto beach;
3525
3526   last_status_code = stream->last_status_code;
3527   GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3528       last_status_code, stream->download_error_count);
3529
3530   live = gst_adaptive_demux_is_live (demux);
3531   if (!retried_once && ((last_status_code / 100 == 4 && live)
3532           || last_status_code / 100 == 5)) {
3533     /* 4xx/5xx */
3534     /* if current position is before available start, switch to next */
3535     if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3536       goto flushing;
3537
3538     if (live) {
3539       gint64 range_start, range_stop;
3540
3541       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3542               &range_stop))
3543         goto flushing;
3544
3545       if (demux->segment.position < range_start) {
3546         GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3547         stream->last_ret = GST_FLOW_OK;
3548         ret = gst_adaptive_demux_eos_handling (stream);
3549         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3550             gst_flow_get_name (ret));
3551         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3552         ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3553         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3554             gst_flow_get_name (ret));
3555         if (ret == GST_FLOW_OK) {
3556           retried_once = TRUE;
3557           goto again;
3558         }
3559       } else if (demux->segment.position > range_stop) {
3560         /* wait a bit to be in range, we don't have any locks at that point */
3561         gint64 wait_time =
3562             gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3563         if (wait_time > 0) {
3564           gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3565
3566           GST_DEBUG_OBJECT (stream->pad,
3567               "Download waiting for %" GST_TIME_FORMAT,
3568               GST_TIME_ARGS (wait_time));
3569
3570           GST_MANIFEST_UNLOCK (demux);
3571           g_mutex_lock (&stream->fragment_download_lock);
3572           if (G_UNLIKELY (stream->cancelled)) {
3573             g_mutex_unlock (&stream->fragment_download_lock);
3574             GST_MANIFEST_LOCK (demux);
3575             stream->last_ret = GST_FLOW_FLUSHING;
3576             goto flushing;
3577           }
3578           do {
3579             g_cond_wait_until (&stream->fragment_download_cond,
3580                 &stream->fragment_download_lock, end_time);
3581             if (G_UNLIKELY (stream->cancelled)) {
3582               g_mutex_unlock (&stream->fragment_download_lock);
3583               GST_MANIFEST_LOCK (demux);
3584               stream->last_ret = GST_FLOW_FLUSHING;
3585               goto flushing;
3586             }
3587           } while (!stream->download_finished);
3588           g_mutex_unlock (&stream->fragment_download_lock);
3589
3590           GST_MANIFEST_LOCK (demux);
3591         }
3592       }
3593     }
3594
3595   flushing:
3596     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3597       /* looks like there is no way of knowing when a live stream has ended
3598        * Have to assume we are falling behind and cause a manifest reload */
3599       GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3600       return GST_FLOW_EOS;
3601     }
3602   } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3603     /* If this is the last fragment, consider failures EOS and not actual
3604      * errors. Due to rounding errors in the durations, the last fragment
3605      * might not actually exist */
3606     GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3607     return GST_FLOW_EOS;
3608   } else {
3609     /* retry once (same segment) for 5xx (server errors) */
3610     if (!retried_once) {
3611       retried_once = TRUE;
3612       /* wait a short time in case the server needs a bit to recover, we don't
3613        * care if we get woken up before end time. We can use sleep here since
3614        * we're already blocking and just want to wait some time. */
3615       g_usleep (100000);        /* a tenth of a second */
3616       goto again;
3617     }
3618   }
3619
3620 beach:
3621   return ret;
3622
3623 no_url_error:
3624   {
3625     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3626         (_("Failed to get fragment URL.")),
3627         ("An error happened when getting fragment URL"));
3628     gst_task_stop (stream->download_task);
3629     return GST_FLOW_ERROR;
3630   }
3631 }
3632
3633 /* this function will take the manifest_lock and will keep it until the end.
3634  * It will release it temporarily only when going to sleep.
3635  * Every time it takes the manifest_lock, it will check for cancelled condition
3636  */
3637 static void
3638 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3639 {
3640   GstAdaptiveDemux *demux = stream->demux;
3641   GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3642   GstFlowReturn ret;
3643   gboolean live;
3644
3645   GST_LOG_OBJECT (stream->pad, "download loop start");
3646
3647   GST_MANIFEST_LOCK (demux);
3648
3649   g_mutex_lock (&stream->fragment_download_lock);
3650   if (G_UNLIKELY (stream->cancelled)) {
3651     stream->last_ret = GST_FLOW_FLUSHING;
3652     g_mutex_unlock (&stream->fragment_download_lock);
3653     goto cancelled;
3654   }
3655   g_mutex_unlock (&stream->fragment_download_lock);
3656
3657   /* Check if we're done with our segment */
3658   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3659   if (demux->segment.rate > 0) {
3660     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3661         && stream->segment.position >= stream->segment.stop) {
3662       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3663       ret = GST_FLOW_EOS;
3664       gst_task_stop (stream->download_task);
3665       goto end_of_manifest;
3666     }
3667   } else {
3668     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3669         && stream->segment.position <= stream->segment.start) {
3670       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3671       ret = GST_FLOW_EOS;
3672       gst_task_stop (stream->download_task);
3673       goto end_of_manifest;
3674     }
3675   }
3676   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3677
3678   /* Cleanup old streams if any */
3679   if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3680     GList *old_streams = demux->priv->old_streams;
3681     demux->priv->old_streams = NULL;
3682
3683     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
3684     g_list_free_full (old_streams,
3685         (GDestroyNotify) gst_adaptive_demux_stream_free);
3686     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
3687
3688     /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
3689      * Recheck the cancelled flag.
3690      */
3691     g_mutex_lock (&stream->fragment_download_lock);
3692     if (G_UNLIKELY (stream->cancelled)) {
3693       stream->last_ret = GST_FLOW_FLUSHING;
3694       g_mutex_unlock (&stream->fragment_download_lock);
3695       goto cancelled;
3696     }
3697     g_mutex_unlock (&stream->fragment_download_lock);
3698   }
3699
3700   /* Restarting download, figure out new position
3701    * FIXME : Move this to a separate function ? */
3702   if (G_UNLIKELY (stream->restart_download)) {
3703     GstEvent *seg_event;
3704     GstClockTime cur, ts = 0;
3705     gint64 pos;
3706
3707     GST_DEBUG_OBJECT (stream->pad,
3708         "Activating stream due to reconfigure event");
3709
3710     if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
3711       ts = (GstClockTime) pos;
3712       GST_DEBUG_OBJECT (demux, "Downstream position: %"
3713           GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3714     } else {
3715       /* query other pads as some faulty element in the pad's branch might
3716        * reject position queries. This should be better than using the
3717        * demux segment position that can be much ahead */
3718       GList *iter;
3719
3720       for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
3721         GstAdaptiveDemuxStream *cur_stream =
3722             (GstAdaptiveDemuxStream *) iter->data;
3723
3724         if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
3725                 &pos)) {
3726           ts = (GstClockTime) pos;
3727           GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
3728               GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3729           break;
3730         }
3731       }
3732     }
3733
3734     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3735     cur =
3736         gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
3737         stream->segment.position);
3738
3739     /* we might have already pushed this data */
3740     ts = MAX (ts, cur);
3741
3742     GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
3743         "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3744
3745     if (GST_CLOCK_TIME_IS_VALID (ts)) {
3746       GstClockTime offset, period_start;
3747
3748       offset =
3749           gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3750       period_start = gst_adaptive_demux_get_period_start_time (demux);
3751
3752       /* TODO check return */
3753       gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
3754           0, ts, &ts);
3755
3756       stream->segment.position = ts - period_start + offset;
3757     }
3758
3759     /* The stream's segment is still correct except for
3760      * the position, so let's send a new one with the
3761      * updated position */
3762     seg_event = gst_event_new_segment (&stream->segment);
3763     gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
3764     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3765
3766     GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
3767         GST_PTR_FORMAT, seg_event);
3768     gst_pad_push_event (stream->pad, seg_event);
3769
3770     stream->discont = TRUE;
3771     stream->restart_download = FALSE;
3772   }
3773
3774   live = gst_adaptive_demux_is_live (demux);
3775
3776   /* Get information about the fragment to download */
3777   GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3778   ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3779   GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
3780       ret, gst_flow_get_name (ret));
3781   if (ret == GST_FLOW_OK) {
3782
3783     /* wait for live fragments to be available */
3784     if (live) {
3785       gint64 wait_time =
3786           gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3787       if (wait_time > 0) {
3788         GstClockTime end_time =
3789             gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
3790
3791         GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
3792             GST_TIME_ARGS (wait_time));
3793
3794         GST_MANIFEST_UNLOCK (demux);
3795
3796         g_mutex_lock (&stream->fragment_download_lock);
3797         if (G_UNLIKELY (stream->cancelled)) {
3798           g_mutex_unlock (&stream->fragment_download_lock);
3799           GST_MANIFEST_LOCK (demux);
3800           stream->last_ret = GST_FLOW_FLUSHING;
3801           goto cancelled;
3802         }
3803         gst_adaptive_demux_wait_until (demux->realtime_clock,
3804             &stream->fragment_download_cond, &stream->fragment_download_lock,
3805             end_time);
3806         g_mutex_unlock (&stream->fragment_download_lock);
3807
3808         GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
3809
3810         GST_MANIFEST_LOCK (demux);
3811
3812         g_mutex_lock (&stream->fragment_download_lock);
3813         if (G_UNLIKELY (stream->cancelled)) {
3814           stream->last_ret = GST_FLOW_FLUSHING;
3815           g_mutex_unlock (&stream->fragment_download_lock);
3816           goto cancelled;
3817         }
3818         g_mutex_unlock (&stream->fragment_download_lock);
3819       }
3820     }
3821
3822     stream->last_ret = GST_FLOW_OK;
3823
3824     next_download = gst_adaptive_demux_get_monotonic_time (demux);
3825     ret = gst_adaptive_demux_stream_download_fragment (stream);
3826
3827     if (ret == GST_FLOW_FLUSHING) {
3828       g_mutex_lock (&stream->fragment_download_lock);
3829       if (G_UNLIKELY (stream->cancelled)) {
3830         stream->last_ret = GST_FLOW_FLUSHING;
3831         g_mutex_unlock (&stream->fragment_download_lock);
3832         goto cancelled;
3833       }
3834       g_mutex_unlock (&stream->fragment_download_lock);
3835     }
3836
3837   } else {
3838     stream->last_ret = ret;
3839   }
3840
3841   switch (ret) {
3842     case GST_FLOW_OK:
3843       break;                    /* all is good, let's go */
3844     case GST_FLOW_EOS:
3845       GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
3846
3847       /* we push the EOS after releasing the object lock */
3848       if (gst_adaptive_demux_is_live (demux)
3849           && (demux->segment.rate == 1.0
3850               || gst_adaptive_demux_stream_in_live_seek_range (demux,
3851                   stream))) {
3852         GstAdaptiveDemuxClass *demux_class =
3853             GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3854
3855         /* this might be a fragment download error, refresh the manifest, just in case */
3856         if (!demux_class->requires_periodical_playlist_update (demux)) {
3857           ret = gst_adaptive_demux_update_manifest (demux);
3858           break;
3859           /* Wait only if we can ensure current manifest has been expired.
3860            * The meaning "we have next period" *WITH* EOS is that, current
3861            * period has been ended but we can continue to the next period */
3862         } else if (!gst_adaptive_demux_has_next_period (demux) &&
3863             gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
3864           goto end;
3865         }
3866         gst_task_stop (stream->download_task);
3867         if (stream->replaced) {
3868           goto end;
3869         }
3870       } else {
3871         gst_task_stop (stream->download_task);
3872       }
3873
3874       if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
3875         if (gst_adaptive_demux_has_next_period (demux)) {
3876           GST_DEBUG_OBJECT (stream->pad,
3877               "Next period available, not sending EOS");
3878           gst_adaptive_demux_advance_period (demux);
3879           ret = GST_FLOW_OK;
3880         }
3881       }
3882       break;
3883
3884     case GST_FLOW_NOT_LINKED:
3885     {
3886       GstFlowReturn ret;
3887       gst_task_stop (stream->download_task);
3888
3889       ret = gst_adaptive_demux_combine_flows (demux);
3890       if (ret == GST_FLOW_NOT_LINKED) {
3891         GST_ELEMENT_FLOW_ERROR (demux, ret);
3892       }
3893     }
3894       break;
3895
3896     case GST_FLOW_FLUSHING:{
3897       GList *iter;
3898
3899       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
3900         GstAdaptiveDemuxStream *other;
3901
3902         other = iter->data;
3903         gst_task_stop (other->download_task);
3904       }
3905     }
3906       break;
3907
3908     default:
3909       if (ret <= GST_FLOW_ERROR) {
3910         gboolean is_live = gst_adaptive_demux_is_live (demux);
3911         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
3912         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
3913           goto download_error;
3914         }
3915
3916         g_clear_error (&stream->last_error);
3917
3918         /* First try to update the playlist for non-live playlists
3919          * in case the URIs have changed in the meantime. But only
3920          * try it the first time, after that we're going to wait a
3921          * a bit to not flood the server */
3922         if (stream->download_error_count == 1 && !is_live) {
3923           /* TODO hlsdemux had more options to this function (boolean and err) */
3924
3925           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3926             /* Retry immediately, the playlist actually has changed */
3927             GST_DEBUG_OBJECT (demux, "Updated the playlist");
3928             goto end;
3929           }
3930         }
3931
3932         /* Wait half the fragment duration before retrying */
3933         next_download += stream->fragment.duration / 2;
3934
3935         GST_MANIFEST_UNLOCK (demux);
3936
3937         g_mutex_lock (&stream->fragment_download_lock);
3938         if (G_UNLIKELY (stream->cancelled)) {
3939           g_mutex_unlock (&stream->fragment_download_lock);
3940           GST_MANIFEST_LOCK (demux);
3941           stream->last_ret = GST_FLOW_FLUSHING;
3942           goto cancelled;
3943         }
3944         gst_adaptive_demux_wait_until (demux->realtime_clock,
3945             &stream->fragment_download_cond, &stream->fragment_download_lock,
3946             next_download);
3947         g_mutex_unlock (&stream->fragment_download_lock);
3948
3949         GST_DEBUG_OBJECT (demux, "Retrying now");
3950
3951         GST_MANIFEST_LOCK (demux);
3952
3953         g_mutex_lock (&stream->fragment_download_lock);
3954         if (G_UNLIKELY (stream->cancelled)) {
3955           stream->last_ret = GST_FLOW_FLUSHING;
3956           g_mutex_unlock (&stream->fragment_download_lock);
3957           goto cancelled;
3958         }
3959         g_mutex_unlock (&stream->fragment_download_lock);
3960
3961         /* Refetch the playlist now after we waited */
3962         if (!is_live
3963             && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3964           GST_DEBUG_OBJECT (demux, "Updated the playlist");
3965         }
3966         goto end;
3967       }
3968       break;
3969   }
3970
3971 end_of_manifest:
3972   if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3973     if (GST_OBJECT_PARENT (stream->pad) != NULL) {
3974       if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
3975         GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
3976         gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3977       } else {
3978         GST_DEBUG_OBJECT (stream->src,
3979             "Stream is EOS, but we're switching fragments. Not sending.");
3980       }
3981     } else {
3982       GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
3983       goto download_error;
3984     }
3985   }
3986
3987 end:
3988   GST_MANIFEST_UNLOCK (demux);
3989   GST_LOG_OBJECT (stream->pad, "download loop end");
3990   return;
3991
3992 cancelled:
3993   {
3994     GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3995     goto end;
3996   }
3997 download_error:
3998   {
3999     GstMessage *msg;
4000
4001     if (stream->last_error) {
4002       gchar *debug = g_strdup_printf ("Error on stream %s:%s",
4003           GST_DEBUG_PAD_NAME (stream->pad));
4004       msg =
4005           gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
4006           debug);
4007       GST_ERROR_OBJECT (stream->pad, "Download error: %s",
4008           stream->last_error->message);
4009       g_free (debug);
4010     } else {
4011       GError *err =
4012           g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
4013           _("Couldn't download fragments"));
4014       msg =
4015           gst_message_new_error (GST_OBJECT_CAST (demux), err,
4016           "Fragment downloading has failed consecutive times");
4017       g_error_free (err);
4018       GST_ERROR_OBJECT (stream->pad,
4019           "Download error: Couldn't download fragments, too many failures");
4020     }
4021
4022     gst_task_stop (stream->download_task);
4023     if (stream->src) {
4024       GstElement *src = stream->src;
4025
4026       stream->src = NULL;
4027       GST_MANIFEST_UNLOCK (demux);
4028       gst_element_set_locked_state (src, TRUE);
4029       gst_element_set_state (src, GST_STATE_NULL);
4030       gst_bin_remove (GST_BIN_CAST (demux), src);
4031       GST_MANIFEST_LOCK (demux);
4032     }
4033
4034     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
4035
4036     goto end;
4037   }
4038 }
4039
4040 static void
4041 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
4042 {
4043   GstClockTime next_update;
4044   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4045
4046   /* Loop for updating of the playlist. This periodically checks if
4047    * the playlist is updated and does so, then signals the streaming
4048    * thread in case it can continue downloading now. */
4049
4050   /* block until the next scheduled update or the signal to quit this thread */
4051   GST_DEBUG_OBJECT (demux, "Started updates task");
4052
4053   GST_MANIFEST_LOCK (demux);
4054
4055   next_update =
4056       gst_adaptive_demux_get_monotonic_time (demux) +
4057       klass->get_manifest_update_interval (demux) * GST_USECOND;
4058
4059   /* Updating playlist only needed for live playlists */
4060   while (gst_adaptive_demux_is_live (demux)) {
4061     GstFlowReturn ret = GST_FLOW_OK;
4062
4063     /* Wait here until we should do the next update or we're cancelled */
4064     GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4065
4066     GST_MANIFEST_UNLOCK (demux);
4067
4068     g_mutex_lock (&demux->priv->updates_timed_lock);
4069     if (demux->priv->stop_updates_task) {
4070       g_mutex_unlock (&demux->priv->updates_timed_lock);
4071       goto quit;
4072     }
4073     gst_adaptive_demux_wait_until (demux->realtime_clock,
4074         &demux->priv->updates_timed_cond,
4075         &demux->priv->updates_timed_lock, next_update);
4076     g_mutex_unlock (&demux->priv->updates_timed_lock);
4077
4078     g_mutex_lock (&demux->priv->updates_timed_lock);
4079     if (demux->priv->stop_updates_task) {
4080       g_mutex_unlock (&demux->priv->updates_timed_lock);
4081       goto quit;
4082     }
4083     g_mutex_unlock (&demux->priv->updates_timed_lock);
4084
4085     GST_MANIFEST_LOCK (demux);
4086
4087     GST_DEBUG_OBJECT (demux, "Updating playlist");
4088
4089     ret = gst_adaptive_demux_update_manifest (demux);
4090
4091     if (ret == GST_FLOW_EOS) {
4092     } else if (ret != GST_FLOW_OK) {
4093       /* update_failed_count is used only here, no need to protect it */
4094       demux->priv->update_failed_count++;
4095       if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4096         GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4097             gst_flow_get_name (ret));
4098         next_update = gst_adaptive_demux_get_monotonic_time (demux)
4099             + klass->get_manifest_update_interval (demux) * GST_USECOND;
4100       } else {
4101         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4102             (_("Internal data stream error.")), ("Could not update playlist"));
4103         GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4104         gst_task_stop (demux->priv->updates_task);
4105         GST_MANIFEST_UNLOCK (demux);
4106         goto end;
4107       }
4108     } else {
4109       GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4110       demux->priv->update_failed_count = 0;
4111       next_update =
4112           gst_adaptive_demux_get_monotonic_time (demux) +
4113           klass->get_manifest_update_interval (demux) * GST_USECOND;
4114
4115       /* Wake up download tasks */
4116       g_mutex_lock (&demux->priv->manifest_update_lock);
4117       g_cond_broadcast (&demux->priv->manifest_cond);
4118       g_mutex_unlock (&demux->priv->manifest_update_lock);
4119     }
4120   }
4121
4122   GST_MANIFEST_UNLOCK (demux);
4123
4124 quit:
4125   {
4126     GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4127   }
4128
4129 end:
4130   {
4131     return;
4132   }
4133 }
4134
4135 /* must be called with manifest_lock taken */
4136 static gboolean
4137 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4138     GstEvent * event)
4139 {
4140   gboolean ret;
4141   GstPad *pad;
4142   GstAdaptiveDemux *demux = stream->demux;
4143
4144   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4145     stream->eos = TRUE;
4146   }
4147
4148   pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4149
4150   /* Can't push events holding the manifest lock */
4151   GST_MANIFEST_UNLOCK (demux);
4152
4153   GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4154       "Pushing event %" GST_PTR_FORMAT, event);
4155
4156   ret = gst_pad_push_event (pad, event);
4157
4158   gst_object_unref (pad);
4159
4160   GST_MANIFEST_LOCK (demux);
4161
4162   return ret;
4163 }
4164
4165 /* must be called with manifest_lock taken */
4166 static gboolean
4167 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4168 {
4169   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4170
4171   if (klass->is_live)
4172     return klass->is_live (demux);
4173   return FALSE;
4174 }
4175
4176 /* must be called with manifest_lock taken */
4177 static GstFlowReturn
4178 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4179     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4180     GstClockTime ts, GstClockTime * final_ts)
4181 {
4182   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4183
4184   if (klass->stream_seek)
4185     return klass->stream_seek (stream, forward, flags, ts, final_ts);
4186   return GST_FLOW_ERROR;
4187 }
4188
4189 /* must be called with manifest_lock taken */
4190 static gboolean
4191 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4192     GstAdaptiveDemuxStream * stream)
4193 {
4194   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4195   gboolean ret = TRUE;
4196
4197   if (klass->stream_has_next_fragment)
4198     ret = klass->stream_has_next_fragment (stream);
4199
4200   return ret;
4201 }
4202
4203 /* must be called with manifest_lock taken */
4204 /* Called from:
4205  *  the ::finish_fragment() handlers when an *actual* fragment is done
4206  *   */
4207 GstFlowReturn
4208 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4209     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4210 {
4211   GstFlowReturn ret;
4212
4213   if (stream->last_ret == GST_FLOW_OK) {
4214     stream->last_ret =
4215         gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4216         duration);
4217   }
4218   ret = stream->last_ret;
4219
4220   return ret;
4221 }
4222
4223 /* must be called with manifest_lock taken */
4224 GstFlowReturn
4225 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4226     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4227 {
4228   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4229   GstFlowReturn ret;
4230
4231   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4232
4233   GST_LOG_OBJECT (stream->pad,
4234       "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4235       GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4236
4237   stream->download_error_count = 0;
4238   g_clear_error (&stream->last_error);
4239
4240   /* FIXME - url has no indication of byte ranges for subsegments */
4241   /* FIXME : All those time statistics are biased, since they are calculated
4242    * *AFTER* the queue2, which might be blocking. They should ideally be
4243    * calculated *before* queue2 in the uri_handler_probe */
4244   gst_element_post_message (GST_ELEMENT_CAST (demux),
4245       gst_message_new_element (GST_OBJECT_CAST (demux),
4246           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4247               "manifest-uri", G_TYPE_STRING,
4248               demux->manifest_uri, "uri", G_TYPE_STRING,
4249               stream->fragment.uri, "fragment-start-time",
4250               GST_TYPE_CLOCK_TIME, stream->download_start_time,
4251               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4252               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4253               stream->download_total_bytes, "fragment-download-time",
4254               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4255
4256   /* Don't update to the end of the segment if in reverse playback */
4257   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4258   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4259     GstClockTime offset =
4260         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4261     GstClockTime period_start =
4262         gst_adaptive_demux_get_period_start_time (demux);
4263
4264     stream->segment.position += duration;
4265
4266     /* Convert from position inside the stream's segment to the demuxer's
4267      * segment, they are not necessarily the same */
4268     if (stream->segment.position - offset + period_start >
4269         demux->segment.position)
4270       demux->segment.position =
4271           stream->segment.position - offset + period_start;
4272   }
4273   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4274
4275   /* When advancing with a non 1.0 rate on live streams, we need to check
4276    * the live seeking range again to make sure we can still advance to
4277    * that position */
4278   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4279     if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4280       ret = GST_FLOW_EOS;
4281     else
4282       ret = klass->stream_advance_fragment (stream);
4283   } else if (gst_adaptive_demux_is_live (demux)
4284       || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4285     ret = klass->stream_advance_fragment (stream);
4286   } else {
4287     ret = GST_FLOW_EOS;
4288   }
4289
4290   stream->download_start_time =
4291       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4292
4293   if (ret == GST_FLOW_OK) {
4294     if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4295             gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4296       stream->need_header = TRUE;
4297       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4298     }
4299
4300     /* the subclass might want to switch pads */
4301     if (G_UNLIKELY (demux->next_streams)) {
4302       GList *iter;
4303       gboolean can_expose = TRUE;
4304
4305       gst_task_stop (stream->download_task);
4306
4307       ret = GST_FLOW_EOS;
4308
4309       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4310         /* Only expose if all streams are now cancelled or finished downloading */
4311         GstAdaptiveDemuxStream *other = iter->data;
4312         if (other != stream) {
4313           g_mutex_lock (&other->fragment_download_lock);
4314           can_expose &= (other->cancelled == TRUE
4315               || other->download_finished == TRUE);
4316           g_mutex_unlock (&other->fragment_download_lock);
4317         }
4318       }
4319
4320       if (can_expose) {
4321         GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4322             "to do bitrate switching");
4323         gst_adaptive_demux_prepare_streams (demux, FALSE);
4324         gst_adaptive_demux_start_tasks (demux, TRUE);
4325       } else {
4326         GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4327       }
4328     }
4329   }
4330
4331   return ret;
4332 }
4333
4334 /* must be called with manifest_lock taken */
4335 static gboolean
4336 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4337     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4338 {
4339   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4340
4341   if (klass->stream_select_bitrate)
4342     return klass->stream_select_bitrate (stream, bitrate);
4343   return FALSE;
4344 }
4345
4346 /* must be called with manifest_lock taken */
4347 static GstFlowReturn
4348 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4349     GstAdaptiveDemuxStream * stream)
4350 {
4351   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4352   GstFlowReturn ret;
4353
4354   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4355       GST_FLOW_ERROR);
4356
4357   /* Make sure the sub-class will update bitrate, or else
4358    * we will later */
4359   stream->fragment.bitrate = 0;
4360   stream->fragment.finished = FALSE;
4361
4362   GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4363       GST_TIME_ARGS (stream->segment.position));
4364
4365   ret = klass->stream_update_fragment_info (stream);
4366
4367   GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4368       stream->fragment.uri);
4369   if (ret == GST_FLOW_OK) {
4370     GST_LOG_OBJECT (stream->pad,
4371         "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4372         GST_TIME_ARGS (stream->fragment.timestamp),
4373         GST_TIME_ARGS (stream->fragment.duration));
4374     GST_LOG_OBJECT (stream->pad,
4375         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4376         stream->fragment.range_start, stream->fragment.range_end);
4377   }
4378
4379   return ret;
4380 }
4381
4382 /* must be called with manifest_lock taken */
4383 static gint64
4384 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4385     demux, GstAdaptiveDemuxStream * stream)
4386 {
4387   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4388
4389   if (klass->stream_get_fragment_waiting_time)
4390     return klass->stream_get_fragment_waiting_time (stream);
4391   return 0;
4392 }
4393
4394 /* must be called with manifest_lock taken */
4395 static GstFlowReturn
4396 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4397 {
4398   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4399   GstFragment *download;
4400   GstBuffer *buffer;
4401   GstFlowReturn ret;
4402   GError *error = NULL;
4403
4404   download = gst_uri_downloader_fetch_uri (demux->downloader,
4405       demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4406   if (download) {
4407     g_free (demux->manifest_uri);
4408     g_free (demux->manifest_base_uri);
4409     if (download->redirect_permanent && download->redirect_uri) {
4410       demux->manifest_uri = g_strdup (download->redirect_uri);
4411       demux->manifest_base_uri = NULL;
4412     } else {
4413       demux->manifest_uri = g_strdup (download->uri);
4414       demux->manifest_base_uri = g_strdup (download->redirect_uri);
4415     }
4416
4417     buffer = gst_fragment_get_buffer (download);
4418     g_object_unref (download);
4419     ret = klass->update_manifest_data (demux, buffer);
4420     gst_buffer_unref (buffer);
4421     /* FIXME: Should the manifest uri vars be reverted to original
4422      * values if updating fails? */
4423   } else {
4424     GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4425         error->message);
4426     ret = GST_FLOW_NOT_LINKED;
4427   }
4428   g_clear_error (&error);
4429
4430   return ret;
4431 }
4432
4433 /* must be called with manifest_lock taken */
4434 static GstFlowReturn
4435 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4436 {
4437   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4438   GstFlowReturn ret;
4439
4440   ret = klass->update_manifest (demux);
4441
4442   if (ret == GST_FLOW_OK) {
4443     GstClockTime duration;
4444     /* Send an updated duration message */
4445     duration = klass->get_duration (demux);
4446     if (duration != GST_CLOCK_TIME_NONE) {
4447       GST_DEBUG_OBJECT (demux,
4448           "Sending duration message : %" GST_TIME_FORMAT,
4449           GST_TIME_ARGS (duration));
4450       gst_element_post_message (GST_ELEMENT (demux),
4451           gst_message_new_duration_changed (GST_OBJECT (demux)));
4452     } else {
4453       GST_DEBUG_OBJECT (demux,
4454           "Duration unknown, can not send the duration message");
4455     }
4456
4457     /* If a manifest changes it's liveness or periodic updateness, we need
4458      * to start/stop the manifest update task appropriately */
4459     /* Keep this condition in sync with the one in
4460      * gst_adaptive_demux_start_manifest_update_task()
4461      */
4462     if (gst_adaptive_demux_is_live (demux) &&
4463         klass->requires_periodical_playlist_update (demux)) {
4464       gst_adaptive_demux_start_manifest_update_task (demux);
4465     } else {
4466       gst_adaptive_demux_stop_manifest_update_task (demux);
4467     }
4468   }
4469
4470   return ret;
4471 }
4472
4473 void
4474 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4475 {
4476   g_free (f->uri);
4477   f->uri = NULL;
4478   f->range_start = 0;
4479   f->range_end = -1;
4480
4481   g_free (f->header_uri);
4482   f->header_uri = NULL;
4483   f->header_range_start = 0;
4484   f->header_range_end = -1;
4485
4486   g_free (f->index_uri);
4487   f->index_uri = NULL;
4488   f->index_range_start = 0;
4489   f->index_range_end = -1;
4490
4491   f->finished = FALSE;
4492 }
4493
4494 /* must be called with manifest_lock taken */
4495 static gboolean
4496 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4497 {
4498   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4499   gboolean ret = FALSE;
4500
4501   if (klass->has_next_period)
4502     ret = klass->has_next_period (demux);
4503   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4504   return ret;
4505 }
4506
4507 /* must be called with manifest_lock taken */
4508 static void
4509 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4510 {
4511   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4512
4513   g_return_if_fail (klass->advance_period != NULL);
4514
4515   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4516   klass->advance_period (demux);
4517   gst_adaptive_demux_prepare_streams (demux, FALSE);
4518   gst_adaptive_demux_start_tasks (demux, TRUE);
4519 }
4520
4521 /**
4522  * gst_adaptive_demux_get_monotonic_time:
4523  * Returns: a monotonically increasing time, using the system realtime clock
4524  */
4525 GstClockTime
4526 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4527 {
4528   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4529   return gst_clock_get_time (demux->realtime_clock);
4530 }
4531
4532 /**
4533  * gst_adaptive_demux_get_client_now_utc:
4534  * @demux: #GstAdaptiveDemux
4535  * Returns: the client's estimate of UTC
4536  *
4537  * Used to find the client's estimate of UTC, using the system realtime clock.
4538  */
4539 GDateTime *
4540 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4541 {
4542   GstClockTime rtc_now;
4543   GDateTime *unix_datetime;
4544   GDateTime *result_datetime;
4545   gint64 utc_now_in_us;
4546
4547   rtc_now = gst_clock_get_time (demux->realtime_clock);
4548   utc_now_in_us = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4549   unix_datetime =
4550       g_date_time_new_from_unix_utc (utc_now_in_us / G_TIME_SPAN_SECOND);
4551   result_datetime =
4552       g_date_time_add (unix_datetime, utc_now_in_us % G_TIME_SPAN_SECOND);
4553   g_date_time_unref (unix_datetime);
4554   return result_datetime;
4555 }
4556
4557 /**
4558  * gst_adaptive_demux_is_running
4559  * @demux: #GstAdaptiveDemux
4560  * Returns: whether the demuxer is processing data
4561  *
4562  * Returns FALSE if shutdown has started (transitioning down from
4563  * PAUSED), otherwise TRUE.
4564  */
4565 gboolean
4566 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4567 {
4568   return g_atomic_int_get (&demux->running);
4569 }
4570
4571 static GstAdaptiveDemuxTimer *
4572 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4573 {
4574   GstAdaptiveDemuxTimer *timer;
4575
4576   timer = g_slice_new (GstAdaptiveDemuxTimer);
4577   timer->fired = FALSE;
4578   timer->cond = cond;
4579   timer->mutex = mutex;
4580   g_atomic_int_set (&timer->ref_count, 1);
4581   return timer;
4582 }
4583
4584 static GstAdaptiveDemuxTimer *
4585 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4586 {
4587   g_return_val_if_fail (timer != NULL, NULL);
4588   g_atomic_int_inc (&timer->ref_count);
4589   return timer;
4590 }
4591
4592 static void
4593 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4594 {
4595   g_return_if_fail (timer != NULL);
4596   if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4597     g_slice_free (GstAdaptiveDemuxTimer, timer);
4598   }
4599 }
4600
4601 /* gst_adaptive_demux_wait_until:
4602  * A replacement for g_cond_wait_until that uses the clock rather
4603  * than system time to control the duration of the sleep. Typically
4604  * clock is actually a #GstSystemClock, in which case this function
4605  * behaves exactly like g_cond_wait_until. Inside unit tests,
4606  * the clock is typically a #GstTestClock, which allows tests to run
4607  * in non-realtime.
4608  * This function must be called with mutex held.
4609  */
4610 static gboolean
4611 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4612     GstClockTime end_time)
4613 {
4614   GstAdaptiveDemuxTimer *timer;
4615   gboolean fired;
4616   GstClockReturn res;
4617
4618   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4619     /* for an invalid time, gst_clock_id_wait_async will try to call
4620      * gst_adaptive_demux_clock_callback from the current thread.
4621      * It still holds the mutex while doing that, so it will deadlock.
4622      * g_cond_wait_until would return immediately with false, so we'll do the same.
4623      */
4624     return FALSE;
4625   }
4626   timer = gst_adaptive_demux_timer_new (cond, mutex);
4627   timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4628   res =
4629       gst_clock_id_wait_async (timer->clock_id,
4630       gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4631       (GDestroyNotify) gst_adaptive_demux_timer_unref);
4632   /* clock does not support asynchronously wait. Assert and return */
4633   if (res == GST_CLOCK_UNSUPPORTED) {
4634     gst_clock_id_unref (timer->clock_id);
4635     gst_adaptive_demux_timer_unref (timer);
4636     g_return_val_if_reached (TRUE);
4637   }
4638   g_assert (!timer->fired);
4639   /* the gst_adaptive_demux_clock_callback() will signal the
4640    * cond when the clock's single shot timer fires, or the cond will be
4641    * signalled by another thread that wants to cause this wait to finish
4642    * early (e.g. to terminate the waiting thread).
4643    * There is no need for a while loop here, because that logic is
4644    * implemented by the function calling gst_adaptive_demux_wait_until() */
4645   g_cond_wait (cond, mutex);
4646   fired = timer->fired;
4647   if (!fired)
4648     gst_clock_id_unschedule (timer->clock_id);
4649   gst_clock_id_unref (timer->clock_id);
4650   gst_adaptive_demux_timer_unref (timer);
4651   return !fired;
4652 }
4653
4654 static gboolean
4655 gst_adaptive_demux_clock_callback (GstClock * clock,
4656     GstClockTime time, GstClockID id, gpointer user_data)
4657 {
4658   GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4659   g_return_val_if_fail (timer != NULL, FALSE);
4660   g_mutex_lock (timer->mutex);
4661   timer->fired = TRUE;
4662   g_cond_signal (timer->cond);
4663   g_mutex_unlock (timer->mutex);
4664   return TRUE;
4665 }
4666
4667 /**
4668  * gst_adaptive_demux_get_qos_earliest_time:
4669  *
4670  * Returns: The QOS earliest time
4671  *
4672  * Since: 1.20
4673  */
4674 GstClockTime
4675 gst_adaptive_demux_get_qos_earliest_time (GstAdaptiveDemux * demux)
4676 {
4677   GstClockTime earliest;
4678
4679   GST_OBJECT_LOCK (demux);
4680   earliest = demux->priv->qos_earliest_time;
4681   GST_OBJECT_UNLOCK (demux);
4682
4683   return earliest;
4684 }