adaptivedemux: Get rid of internal stream adapter and let subclasses handle this...
[platform/upstream/gstreamer.git] / gst-libs / gst / adaptivedemux / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstadaptivedemux
24  * @short_description: Base class for adaptive demuxers
25  * @see_also:
26  *
27  * What is an adaptive demuxer?
28  * Adaptive demuxers are special demuxers in the sense that they don't
29  * actually demux data received from upstream but download the data
30  * themselves.
31  *
32  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
33  * a set of fragments. The manifest describes the available media and
34  * the sequence of fragments to use. Each fragments contains a small
35  * part of the media (typically only a few seconds). It is possible for
36  * the manifest to have the same media available in different configurations
37  * (bitrates for example) so that the client can select the one that
38  * best suits its scenario (network fluctuation, hardware requirements...).
39  * It is possible to switch from one representation of the media to another
40  * during playback. That's why it is called 'adaptive', because it can be
41  * adapted to the client's needs.
42  *
43  * Architectural overview:
44  * The manifest is received by the demuxer in its sink pad and, upon receiving
45  * EOS, it parses the manifest and exposes the streams available in it. For
46  * each stream a source element will be created and will download the list
47  * of fragments one by one. Once a fragment is finished downloading, the next
48  * URI is set to the source element and it starts fetching it and pushing
49  * through the stream's pad. This implies that each stream is independent from
50  * each other as it runs on a separate thread.
51  *
52  * After downloading each fragment, the download rate of it is calculated and
53  * the demuxer has a chance to switch to a different bitrate if needed. The
54  * switch can be done by simply pushing a new caps before the next fragment
55  * when codecs are the same, or by exposing a new pad group if it needs
56  * a codec change.
57  *
58  * Extra features:
59  * - Not linked streams: Streams that are not-linked have their download threads
60  *                       interrupted to save network bandwidth. When they are
61  *                       relinked a reconfigure event is received and the
62  *                       stream is restarted.
63  *
64  * Subclasses:
65  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
66  * about the intrinsics of the subclass formats, so the subclasses are
67  * resposible for maintaining the manifest data structures and stream
68  * information.
69  */
70
71 /*
72 MT safety.
73 The following rules were observed while implementing MT safety in adaptive demux:
74 1. If a variable is accessed from multiple threads and at least one thread
75 writes to it, then all the accesses needs to be done from inside a critical section.
76 2. If thread A wants to join thread B then at the moment it calls gst_task_join
77 it must not hold any mutexes that thread B might take.
78
79 Adaptive demux API can be called from several threads. More, adaptive demux
80 starts some threads to monitor the download of fragments. In order to protect
81 accesses to shared variables (demux and streams) all the API functions that
82 can be run in different threads will need to get a mutex (manifest_lock)
83 when they start and release it when they end. Because some of those functions
84 can indirectly call other API functions (eg they can generate events or messages
85 that are processed in the same thread) the manifest_lock must be recursive.
86
87 The manifest_lock will serialize the public API making access to shared
88 variables safe. But some of these functions will try at some moment to join
89 threads created by adaptive demux, or to change the state of src elements
90 (which will block trying to join the src element streaming thread). Because
91 of rule 2, those functions will need to release the manifest_lock during the
92 call of gst_task_join. During this time they can be interrupted by other API calls.
93 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
94 is called and this will join all threads. In order to prevent interruptions
95 during such period, all the API functions will also use a second lock: api_lock.
96 This will be taken at the beginning of the function and released at the end,
97 but this time this lock will not be temporarily released during join.
98 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
99 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
100 to hold it while joining the threads or changing the src element state. The
101 api_lock will serialise all external requests to adaptive demux. In order to
102 avoid deadlocks, if a function needs to acquire both manifest and api locks,
103 the api_lock will be taken first and the manifest_lock second.
104
105 By using the api_lock a thread is protected against other API calls. But when
106 temporarily dropping the manifest_lock, it will be vulnerable to changes from
107 threads that use only the manifest_lock and not the api_lock. These threads run
108 one of the following functions: gst_adaptive_demux_stream_download_loop,
109 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
110 that all operations during an API call are not impacted by other writes, the
111 above mentioned functions must check a cancelled flag every time they reacquire
112 the manifest_lock. If the flag is set, they must exit immediately, without
113 performing any changes on the shared data. In this way, an API call (eg seek
114 request) can set the cancel flag before releasing the manifest_lock and be sure
115 that the demux object and its streams are not changed by anybody else.
116 */
117
118 #ifdef HAVE_CONFIG_H
119 #include "config.h"
120 #endif
121
122 #include "gstadaptivedemux.h"
123 #include "gst/gst-i18n-plugin.h"
124 #include <gst/base/gstadapter.h>
125
126 GST_DEBUG_CATEGORY (adaptivedemux_debug);
127 #define GST_CAT_DEFAULT adaptivedemux_debug
128
129 #define GST_ADAPTIVE_DEMUX_GET_PRIVATE(obj)  \
130     (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_ADAPTIVE_DEMUX, \
131         GstAdaptiveDemuxPrivate))
132
133 #define MAX_DOWNLOAD_ERROR_COUNT 3
134 #define DEFAULT_FAILED_COUNT 3
135 #define DEFAULT_CONNECTION_SPEED 0
136 #define DEFAULT_BITRATE_LIMIT 0.8
137 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024    /* For safety. Large enough to hold a segment. */
138 #define NUM_LOOKBACK_FRAGMENTS 3
139
140 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
141 #define GST_MANIFEST_LOCK(d) g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d));
142 #define GST_MANIFEST_UNLOCK(d) g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d));
143
144 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
145 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
146 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
147
148 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
149 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
150 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
151
152 enum
153 {
154   PROP_0,
155   PROP_CONNECTION_SPEED,
156   PROP_BITRATE_LIMIT,
157   PROP_LAST
158 };
159
160 enum GstAdaptiveDemuxFlowReturn
161 {
162   GST_ADAPTIVE_DEMUX_FLOW_SWITCH = GST_FLOW_CUSTOM_SUCCESS_2 + 1
163 };
164
165 struct _GstAdaptiveDemuxPrivate
166 {
167   GstAdapter *input_adapter;    /* protected by manifest_lock */
168   gboolean have_manifest;       /* protected by manifest_lock */
169
170   GList *old_streams;           /* protected by manifest_lock */
171
172   GstTask *updates_task;        /* MT safe */
173   GRecMutex updates_lock;
174   GMutex updates_timed_lock;
175   GCond updates_timed_cond;     /* protected by updates_timed_lock */
176   gboolean stop_updates_task;   /* protected by updates_timed_lock */
177
178   /* used only from updates_task, no need to protect it */
179   gint update_failed_count;
180
181   guint32 segment_seqnum;       /* protected by manifest_lock */
182
183   /* main lock used to protect adaptive demux and all its streams.
184    * It serializes the adaptive demux public API.
185    */
186   GRecMutex manifest_lock;
187
188   /* condition to wait for manifest updates on a live stream.
189    * In order to signal the manifest_cond, the caller needs to hold both
190    * manifest_lock and manifest_update_lock (taken in this order)
191    */
192   GCond manifest_cond;
193   GMutex manifest_update_lock;
194
195   GMutex api_lock;
196
197   /* Protects demux and stream segment information
198    * Needed because seeks can update segment information
199    * without needing to stop tasks when they just want to
200    * update the segment boundaries */
201   GMutex segment_lock;
202 };
203
204 typedef struct _GstAdaptiveDemuxTimer
205 {
206   volatile gint ref_count;
207   GCond *cond;
208   GMutex *mutex;
209   GstClockID clock_id;
210   gboolean fired;
211 } GstAdaptiveDemuxTimer;
212
213 static GstBinClass *parent_class = NULL;
214 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
215 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
216     GstAdaptiveDemuxClass * klass);
217 static void gst_adaptive_demux_finalize (GObject * object);
218 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
219     element, GstStateChange transition);
220
221 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
222
223 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
224     GstEvent * event);
225 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
226     GstObject * parent, GstBuffer * buffer);
227 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
228     GstQuery * query);
229 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
230     GstEvent * event);
231
232 static gboolean
233 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
234
235 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
236 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
237     stream);
238 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
239 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
240     gboolean first_and_live);
241 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
242 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
243     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
244     GstClockTime ts, GstClockTime * final_ts);
245 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
246     demux, GstAdaptiveDemuxStream * stream);
247 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
248     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
249 static GstFlowReturn
250 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
251     GstAdaptiveDemuxStream * stream);
252 static gint64
253 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
254     GstAdaptiveDemuxStream * stream);
255 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
256     demux);
257 static GstFlowReturn
258 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
259 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
260 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
261
262 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
263 static GstFlowReturn
264 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
265     GstEvent * event);
266
267 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
268 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux);
269 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
270     demux);
271 static void
272 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
273     stream, GstFlowReturn ret, GError * err);
274 static GstFlowReturn
275 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
276     GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
277 static GstFlowReturn
278 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
279     GstAdaptiveDemuxStream * stream);
280 static GstFlowReturn
281 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
282     GstAdaptiveDemuxStream * stream, GstClockTime duration);
283 static gboolean
284 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
285     GstClockTime end_time);
286 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
287     GstClockTime time, GstClockID id, gpointer user_data);
288
289 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
290  * method to get to the padtemplates */
291 GType
292 gst_adaptive_demux_get_type (void)
293 {
294   static volatile gsize type = 0;
295
296   if (g_once_init_enter (&type)) {
297     GType _type;
298     static const GTypeInfo info = {
299       sizeof (GstAdaptiveDemuxClass),
300       NULL,
301       NULL,
302       (GClassInitFunc) gst_adaptive_demux_class_init,
303       NULL,
304       NULL,
305       sizeof (GstAdaptiveDemux),
306       0,
307       (GInstanceInitFunc) gst_adaptive_demux_init,
308     };
309
310     _type = g_type_register_static (GST_TYPE_BIN,
311         "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
312     g_once_init_leave (&type, _type);
313   }
314   return type;
315 }
316
317 static void
318 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
319     const GValue * value, GParamSpec * pspec)
320 {
321   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
322
323   GST_API_LOCK (demux);
324   GST_MANIFEST_LOCK (demux);
325
326   switch (prop_id) {
327     case PROP_CONNECTION_SPEED:
328       demux->connection_speed = g_value_get_uint (value) * 1000;
329       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
330           demux->connection_speed);
331       break;
332     case PROP_BITRATE_LIMIT:
333       demux->bitrate_limit = g_value_get_float (value);
334       break;
335     default:
336       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
337       break;
338   }
339
340   GST_MANIFEST_UNLOCK (demux);
341   GST_API_UNLOCK (demux);
342 }
343
344 static void
345 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
346     GValue * value, GParamSpec * pspec)
347 {
348   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
349
350   GST_MANIFEST_LOCK (demux);
351
352   switch (prop_id) {
353     case PROP_CONNECTION_SPEED:
354       g_value_set_uint (value, demux->connection_speed / 1000);
355       break;
356     case PROP_BITRATE_LIMIT:
357       g_value_set_float (value, demux->bitrate_limit);
358       break;
359     default:
360       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
361       break;
362   }
363
364   GST_MANIFEST_UNLOCK (demux);
365 }
366
367 static void
368 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
369 {
370   GObjectClass *gobject_class;
371   GstElementClass *gstelement_class;
372   GstBinClass *gstbin_class;
373
374   gobject_class = G_OBJECT_CLASS (klass);
375   gstelement_class = GST_ELEMENT_CLASS (klass);
376   gstbin_class = GST_BIN_CLASS (klass);
377
378   GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
379       "Base Adaptive Demux");
380
381   parent_class = g_type_class_peek_parent (klass);
382   g_type_class_add_private (klass, sizeof (GstAdaptiveDemuxPrivate));
383
384   gobject_class->set_property = gst_adaptive_demux_set_property;
385   gobject_class->get_property = gst_adaptive_demux_get_property;
386   gobject_class->finalize = gst_adaptive_demux_finalize;
387
388   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
389       g_param_spec_uint ("connection-speed", "Connection Speed",
390           "Network connection speed in kbps (0 = calculate from downloaded"
391           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
392           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393
394   /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
395   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
396       g_param_spec_float ("bitrate-limit",
397           "Bitrate limit in %",
398           "Limit of the available bitrate to use when switching to alternates.",
399           0, 1, DEFAULT_BITRATE_LIMIT,
400           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
401
402   gstelement_class->change_state = gst_adaptive_demux_change_state;
403
404   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
405
406   klass->data_received = gst_adaptive_demux_stream_data_received_default;
407   klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
408   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
409 }
410
411 static void
412 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
413     GstAdaptiveDemuxClass * klass)
414 {
415   GstPadTemplate *pad_template;
416   GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
417   GObjectClass *gobject_class;
418
419   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
420
421   demux->priv = GST_ADAPTIVE_DEMUX_GET_PRIVATE (demux);
422   demux->priv->input_adapter = gst_adapter_new ();
423   demux->downloader = gst_uri_downloader_new ();
424   demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
425   demux->priv->segment_seqnum = gst_util_seqnum_next ();
426   demux->have_group_id = FALSE;
427   demux->group_id = G_MAXUINT;
428
429   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
430
431   demux->realtime_clock = gst_system_clock_obtain ();
432   g_assert (demux->realtime_clock != NULL);
433   gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
434   if (g_object_class_find_property (gobject_class, "clock-type")) {
435     g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
436   } else {
437     GST_WARNING_OBJECT (demux,
438         "System clock does not have clock-type property");
439   }
440   if (clock_type == GST_CLOCK_TYPE_REALTIME) {
441     demux->clock_offset = 0;
442   } else {
443     GDateTime *utc_now;
444     GstClockTime rtc_now;
445     GTimeVal gtv;
446
447     utc_now = g_date_time_new_now_utc ();
448     rtc_now = gst_clock_get_time (demux->realtime_clock);
449     g_date_time_to_timeval (utc_now, &gtv);
450     demux->clock_offset =
451         gtv.tv_sec * G_TIME_SPAN_SECOND + gtv.tv_usec -
452         GST_TIME_AS_USECONDS (rtc_now);
453     g_date_time_unref (utc_now);
454   }
455   g_rec_mutex_init (&demux->priv->updates_lock);
456   demux->priv->updates_task =
457       gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
458       demux, NULL);
459   gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
460
461   g_mutex_init (&demux->priv->updates_timed_lock);
462   g_cond_init (&demux->priv->updates_timed_cond);
463
464   g_cond_init (&demux->priv->manifest_cond);
465   g_mutex_init (&demux->priv->manifest_update_lock);
466
467   g_rec_mutex_init (&demux->priv->manifest_lock);
468   g_mutex_init (&demux->priv->api_lock);
469   g_mutex_init (&demux->priv->segment_lock);
470
471   pad_template =
472       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
473   g_return_if_fail (pad_template != NULL);
474
475   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
476   gst_pad_set_event_function (demux->sinkpad,
477       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
478   gst_pad_set_chain_function (demux->sinkpad,
479       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
480
481   /* Properties */
482   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
483   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
484
485   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
486 }
487
488 static void
489 gst_adaptive_demux_finalize (GObject * object)
490 {
491   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
492   GstAdaptiveDemuxPrivate *priv = demux->priv;
493
494   GST_DEBUG_OBJECT (object, "finalize");
495
496   g_object_unref (priv->input_adapter);
497   g_object_unref (demux->downloader);
498
499   g_mutex_clear (&priv->updates_timed_lock);
500   g_cond_clear (&priv->updates_timed_cond);
501   g_mutex_clear (&demux->priv->manifest_update_lock);
502   g_cond_clear (&demux->priv->manifest_cond);
503   g_object_unref (priv->updates_task);
504   g_rec_mutex_clear (&priv->updates_lock);
505   g_rec_mutex_clear (&demux->priv->manifest_lock);
506   g_mutex_clear (&demux->priv->api_lock);
507   g_mutex_clear (&demux->priv->segment_lock);
508   if (demux->realtime_clock) {
509     gst_object_unref (demux->realtime_clock);
510     demux->realtime_clock = NULL;
511   }
512
513   G_OBJECT_CLASS (parent_class)->finalize (object);
514 }
515
516 static GstStateChangeReturn
517 gst_adaptive_demux_change_state (GstElement * element,
518     GstStateChange transition)
519 {
520   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
521   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
522
523   GST_API_LOCK (demux);
524
525   switch (transition) {
526     case GST_STATE_CHANGE_PAUSED_TO_READY:
527       GST_MANIFEST_LOCK (demux);
528       gst_adaptive_demux_reset (demux);
529       GST_MANIFEST_UNLOCK (demux);
530       break;
531     default:
532       break;
533   }
534
535   /* this must be run without MANIFEST_LOCK taken.
536    * For PLAYING to PLAYING state changes, it will want to take a lock in
537    * src element and that lock is held while the streaming thread is running.
538    * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
539    */
540   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
541
542   GST_API_UNLOCK (demux);
543   return result;
544 }
545
546 static gboolean
547 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
548     GstEvent * event)
549 {
550   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
551   GstAdaptiveDemuxClass *demux_class;
552   gboolean ret;
553
554   switch (event->type) {
555     case GST_EVENT_FLUSH_STOP:
556       GST_API_LOCK (demux);
557       GST_MANIFEST_LOCK (demux);
558
559       gst_adaptive_demux_reset (demux);
560
561       ret = gst_pad_event_default (pad, parent, event);
562
563       GST_MANIFEST_UNLOCK (demux);
564       GST_API_UNLOCK (demux);
565
566       return ret;
567     case GST_EVENT_EOS:{
568       GstQuery *query;
569       gboolean query_res;
570       gboolean ret = TRUE;
571       gsize available;
572       GstBuffer *manifest_buffer;
573
574       GST_API_LOCK (demux);
575       GST_MANIFEST_LOCK (demux);
576
577       demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
578
579       available = gst_adapter_available (demux->priv->input_adapter);
580
581       if (available == 0) {
582         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
583         ret = gst_pad_event_default (pad, parent, event);
584
585         GST_MANIFEST_UNLOCK (demux);
586         GST_API_UNLOCK (demux);
587
588         return ret;
589       }
590
591       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
592
593       /* Need to get the URI to use it as a base to generate the fragment's
594        * uris */
595       query = gst_query_new_uri ();
596       query_res = gst_pad_peer_query (pad, query);
597       if (query_res) {
598         gchar *uri, *redirect_uri;
599         gboolean permanent;
600
601         gst_query_parse_uri (query, &uri);
602         gst_query_parse_uri_redirection (query, &redirect_uri);
603         gst_query_parse_uri_redirection_permanent (query, &permanent);
604
605         if (permanent && redirect_uri) {
606           demux->manifest_uri = redirect_uri;
607           demux->manifest_base_uri = NULL;
608           g_free (uri);
609         } else {
610           demux->manifest_uri = uri;
611           demux->manifest_base_uri = redirect_uri;
612         }
613
614         GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
615             demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
616       } else {
617         GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
618       }
619       gst_query_unref (query);
620
621       /* Let the subclass parse the manifest */
622       manifest_buffer =
623           gst_adapter_take_buffer (demux->priv->input_adapter, available);
624       if (!demux_class->process_manifest (demux, manifest_buffer)) {
625         /* In most cases, this will happen if we set a wrong url in the
626          * source element and we have received the 404 HTML response instead of
627          * the manifest */
628         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
629             (NULL));
630         ret = FALSE;
631       } else {
632         demux->priv->have_manifest = TRUE;
633       }
634       gst_buffer_unref (manifest_buffer);
635
636       gst_element_post_message (GST_ELEMENT_CAST (demux),
637           gst_message_new_element (GST_OBJECT_CAST (demux),
638               gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
639                   "manifest-uri", G_TYPE_STRING,
640                   demux->manifest_uri, "uri", G_TYPE_STRING,
641                   demux->manifest_uri,
642                   "manifest-download-start", GST_TYPE_CLOCK_TIME,
643                   GST_CLOCK_TIME_NONE,
644                   "manifest-download-stop", GST_TYPE_CLOCK_TIME,
645                   gst_util_get_timestamp (), NULL)));
646
647       if (ret) {
648         /* Send duration message */
649         if (!gst_adaptive_demux_is_live (demux)) {
650           GstClockTime duration = demux_class->get_duration (demux);
651
652           if (duration != GST_CLOCK_TIME_NONE) {
653             GST_DEBUG_OBJECT (demux,
654                 "Sending duration message : %" GST_TIME_FORMAT,
655                 GST_TIME_ARGS (duration));
656             gst_element_post_message (GST_ELEMENT (demux),
657                 gst_message_new_duration_changed (GST_OBJECT (demux)));
658           } else {
659             GST_DEBUG_OBJECT (demux,
660                 "media duration unknown, can not send the duration message");
661           }
662         }
663
664         if (demux->next_streams) {
665           gst_adaptive_demux_expose_streams (demux,
666               gst_adaptive_demux_is_live (demux));
667           gst_adaptive_demux_start_tasks (demux);
668           if (gst_adaptive_demux_is_live (demux)) {
669             /* Task to periodically update the manifest */
670             gst_task_start (demux->priv->updates_task);
671           }
672         } else {
673           /* no streams */
674           GST_WARNING_OBJECT (demux, "No streams created from manifest");
675           GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
676               (_("This file contains no playable streams.")),
677               ("No known stream formats found at the Manifest"));
678           ret = FALSE;
679         }
680
681       }
682       GST_MANIFEST_UNLOCK (demux);
683       GST_API_UNLOCK (demux);
684
685       gst_event_unref (event);
686       return ret;
687     }
688     case GST_EVENT_SEGMENT:
689       /* Swallow newsegments, we'll push our own */
690       gst_event_unref (event);
691       return TRUE;
692     default:
693       break;
694   }
695
696   return gst_pad_event_default (pad, parent, event);
697 }
698
699 static GstFlowReturn
700 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
701     GstBuffer * buffer)
702 {
703   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
704
705   GST_MANIFEST_LOCK (demux);
706
707   gst_adapter_push (demux->priv->input_adapter, buffer);
708
709   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
710       (gint) gst_adapter_available (demux->priv->input_adapter));
711
712   GST_MANIFEST_UNLOCK (demux);
713   return GST_FLOW_OK;
714 }
715
716 /* must be called with manifest_lock taken */
717 static void
718 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
719 {
720   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
721   GList *iter;
722   GList *old_streams;
723   GstEvent *eos;
724
725   /* take ownership of old_streams before releasing the manifest_lock in
726    * gst_adaptive_demux_stop_tasks
727    */
728   old_streams = demux->priv->old_streams;
729   demux->priv->old_streams = NULL;
730
731   gst_adaptive_demux_stop_tasks (demux);
732   gst_uri_downloader_reset (demux->downloader);
733
734   if (klass->reset)
735     klass->reset (demux);
736
737   eos = gst_event_new_eos ();
738   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
739     GstAdaptiveDemuxStream *stream = iter->data;
740     if (stream->pad) {
741       gst_pad_push_event (stream->pad, gst_event_ref (eos));
742       gst_pad_set_active (stream->pad, FALSE);
743
744       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
745     }
746     gst_adaptive_demux_stream_free (stream);
747   }
748   gst_event_unref (eos);
749   g_list_free (demux->streams);
750   demux->streams = NULL;
751
752   if (old_streams) {
753     g_list_free_full (old_streams,
754         (GDestroyNotify) gst_adaptive_demux_stream_free);
755   }
756
757   g_free (demux->manifest_uri);
758   g_free (demux->manifest_base_uri);
759   demux->manifest_uri = NULL;
760   demux->manifest_base_uri = NULL;
761
762   gst_adapter_clear (demux->priv->input_adapter);
763   demux->priv->have_manifest = FALSE;
764
765   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
766
767   demux->have_group_id = FALSE;
768   demux->group_id = G_MAXUINT;
769   demux->priv->segment_seqnum = gst_util_seqnum_next ();
770 }
771
772 static void
773 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
774 {
775   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
776
777   switch (GST_MESSAGE_TYPE (msg)) {
778     case GST_MESSAGE_ERROR:{
779       GList *iter;
780       GstAdaptiveDemuxStream *stream;
781       GError *err = NULL;
782       gchar *debug = NULL;
783       gchar *new_error = NULL;
784
785       GST_MANIFEST_LOCK (demux);
786
787       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
788         stream = iter->data;
789         if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
790                 GST_OBJECT_CAST (stream->src))) {
791           gst_message_parse_error (msg, &err, &debug);
792
793           GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
794               "Source posted error: %d:%d %s (%s)", err->domain, err->code,
795               err->message, debug);
796
797           if (debug)
798             new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
799           if (new_error) {
800             g_free (err->message);
801             err->message = new_error;
802           }
803
804           /* error, but ask to retry */
805           gst_adaptive_demux_stream_fragment_download_finish (stream,
806               GST_FLOW_CUSTOM_ERROR, err);
807
808           g_error_free (err);
809           g_free (debug);
810           break;
811         }
812       }
813
814       GST_MANIFEST_UNLOCK (demux);
815
816       gst_message_unref (msg);
817       msg = NULL;
818     }
819       break;
820     default:
821       break;
822   }
823
824   if (msg)
825     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
826 }
827
828 void
829 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
830     gsize struct_size)
831 {
832   GST_API_LOCK (demux);
833   GST_MANIFEST_LOCK (demux);
834   demux->stream_struct_size = struct_size;
835   GST_MANIFEST_UNLOCK (demux);
836   GST_API_UNLOCK (demux);
837 }
838
839 /* must be called with manifest_lock taken */
840 static gboolean
841 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
842     GstAdaptiveDemuxStream * stream)
843 {
844   GstPad *pad = stream->pad;
845   gchar *name = gst_pad_get_name (pad);
846   GstEvent *event;
847   gchar *stream_id;
848
849   gst_pad_set_active (pad, TRUE);
850   stream->need_header = TRUE;
851
852   stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
853
854   event =
855       gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
856       GST_EVENT_STREAM_START, 0);
857   if (event) {
858     if (gst_event_parse_group_id (event, &demux->group_id))
859       demux->have_group_id = TRUE;
860     else
861       demux->have_group_id = FALSE;
862     gst_event_unref (event);
863   } else if (!demux->have_group_id) {
864     demux->have_group_id = TRUE;
865     demux->group_id = gst_util_group_id_next ();
866   }
867   event = gst_event_new_stream_start (stream_id);
868   if (demux->have_group_id)
869     gst_event_set_group_id (event, demux->group_id);
870
871   gst_pad_push_event (pad, event);
872   g_free (stream_id);
873   g_free (name);
874
875   GST_DEBUG_OBJECT (demux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
876       GST_DEBUG_PAD_NAME (pad), stream->pending_caps);
877
878   if (stream->pending_caps) {
879     gst_pad_set_caps (pad, stream->pending_caps);
880     gst_caps_unref (stream->pending_caps);
881     stream->pending_caps = NULL;
882   }
883
884   stream->discont = TRUE;
885
886   gst_object_ref (pad);
887
888   return gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
889 }
890
891 /* must be called with manifest_lock taken */
892 static GstClockTime
893 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
894     GstAdaptiveDemuxStream * stream)
895 {
896   GstAdaptiveDemuxClass *klass;
897
898   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
899
900   if (klass->get_presentation_offset == NULL)
901     return 0;
902
903   return klass->get_presentation_offset (demux, stream);
904 }
905
906 /* must be called with manifest_lock taken */
907 static GstClockTime
908 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
909 {
910   GstAdaptiveDemuxClass *klass;
911
912   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
913
914   if (klass->get_period_start_time == NULL)
915     return 0;
916
917   return klass->get_period_start_time (demux);
918 }
919
920 /* must be called with manifest_lock taken */
921 static gboolean
922 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux,
923     gboolean first_and_live)
924 {
925   GList *iter;
926   GList *old_streams;
927   GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
928
929   g_return_val_if_fail (demux->next_streams != NULL, FALSE);
930
931   old_streams = demux->streams;
932   demux->streams = demux->next_streams;
933   demux->next_streams = NULL;
934
935   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
936     GstAdaptiveDemuxStream *stream = iter->data;
937
938     if (!gst_adaptive_demux_expose_stream (demux,
939             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
940       /* TODO act on error */
941     }
942
943     if (first_and_live) {
944       /* TODO we only need the first timestamp, maybe create a simple function */
945       gst_adaptive_demux_stream_update_fragment_info (demux, stream);
946
947       if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
948         min_pts = MIN (min_pts, stream->fragment.timestamp);
949       } else {
950         min_pts = stream->fragment.timestamp;
951       }
952     }
953   }
954
955   /* For live streams, the subclass is supposed to seek to the current
956    * fragment and then tell us its timestamp in stream->fragment.timestamp.
957    * We now also have to seek our demuxer segment to reflect this.
958    *
959    * FIXME: This needs some refactoring at some point.
960    */
961   if (first_and_live) {
962     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
963         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts, GST_SEEK_TYPE_NONE, -1,
964         NULL);
965   }
966
967   period_start = gst_adaptive_demux_get_period_start_time (demux);
968
969   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
970     GstAdaptiveDemuxStream *stream = iter->data;
971     GstClockTime offset;
972
973     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
974     stream->segment = demux->segment;
975
976     /* The demuxer segment is just built from seek events, but for each stream
977      * we have to adjust segments according to the current period and the
978      * stream specific presentation time offset.
979      *
980      * For each period, buffer timestamps start again from 0. Additionally the
981      * buffer timestamps are shifted by the stream specific presentation time
982      * offset, so the first buffer timestamp of a period is 0 + presentation
983      * time offset. If the stream contains timestamps itself, this is also
984      * supposed to be the presentation time stored inside the stream.
985      *
986      * The stream time over periods is supposed to be continuous, that is the
987      * buffer timestamp 0 + presentation time offset should map to the start
988      * time of the current period.
989      *
990      *
991      * The adjustment of the stream segments as such works the following.
992      *
993      * If the demuxer segment start is bigger than the period start, this
994      * means that we have to drop some media at the beginning of the current
995      * period, e.g. because a seek into the middle of the period has
996      * happened. The amount of media to drop is the difference between the
997      * period start and the demuxer segment start, and as each period starts
998      * again from 0, this difference is going to be the actual stream's
999      * segment start. As all timestamps of the stream are shifted by the
1000      * presentation time offset, we will also have to move the segment start
1001      * by that offset.
1002      *
1003      * Now the running time and stream time at the stream's segment start has to
1004      * be the one that is stored inside the demuxer's segment, which means
1005      * that segment.base and segment.time have to be copied over.
1006      *
1007      *
1008      * If the demuxer segment start is smaller than the period start time,
1009      * this means that the whole period is inside the segment. As each period
1010      * starts timestamps from 0, and additionally timestamps are shifted by
1011      * the presentation time offset, the stream's first timestamp (and as such
1012      * the stream's segment start) has to be the presentation time offset.
1013      * The stream time at the segment start is supposed to be the stream time
1014      * of the period start according to the demuxer segment, so the stream
1015      * segment's time would be set to that. The same goes for the stream
1016      * segment's base, which is supposed to be the running time of the period
1017      * start according to the demuxer's segment.
1018      *
1019      *
1020      * For the first case where not the complete period is inside the segment,
1021      * the segment time and base as calculated by the second case would be
1022      * equivalent.
1023      */
1024
1025     if (demux->segment.start > period_start) {
1026       stream->segment.start = demux->segment.start - period_start + offset;
1027       stream->segment.position = offset;
1028       stream->segment.time = demux->segment.time;
1029       stream->segment.base = demux->segment.base;
1030     } else {
1031       stream->segment.start = offset;
1032       stream->segment.position = offset;
1033       stream->segment.time =
1034           gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1035           period_start);
1036       stream->segment.base =
1037           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1038           period_start);
1039     }
1040
1041     stream->pending_segment = gst_event_new_segment (&stream->segment);
1042     gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1043   }
1044
1045   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1046
1047   if (old_streams) {
1048     GstEvent *eos = gst_event_new_eos ();
1049
1050     /* before we put streams in the demux->priv->old_streams list,
1051      * we ask the download task to stop. In this way, it will no longer be
1052      * allowed to change the demux object.
1053      */
1054     for (iter = old_streams; iter; iter = g_list_next (iter)) {
1055       GstAdaptiveDemuxStream *stream = iter->data;
1056
1057       GST_LOG_OBJECT (stream->pad, "Removing stream");
1058       gst_pad_push_event (stream->pad, gst_event_ref (eos));
1059       gst_pad_set_active (stream->pad, FALSE);
1060       gst_element_remove_pad (GST_ELEMENT (demux), stream->pad);
1061
1062       /* ask the download task to stop.
1063        * We will not join it now, because our thread can be one of these tasks.
1064        * We will do the joining later, from another stream download task or
1065        * from gst_adaptive_demux_stop_tasks.
1066        * We also cannot change the state of the stream->src element, because
1067        * that will wait on the streaming thread (which could be this thread)
1068        * to stop first.
1069        * Because we sent an EOS to the downstream element, the stream->src
1070        * element should detect this in its streaming task and stop.
1071        * Even if it doesn't do that, we will change its state later in
1072        * gst_adaptive_demux_stop_tasks.
1073        */
1074       gst_task_stop (stream->download_task);
1075       g_mutex_lock (&stream->fragment_download_lock);
1076       stream->cancelled = TRUE;
1077       g_cond_signal (&stream->fragment_download_cond);
1078       g_mutex_unlock (&stream->fragment_download_lock);
1079     }
1080     gst_event_unref (eos);
1081
1082     /* The list should be freed from another thread as we can't properly
1083      * cleanup a GstTask from itself */
1084     demux->priv->old_streams =
1085         g_list_concat (demux->priv->old_streams, old_streams);
1086   }
1087
1088   return TRUE;
1089 }
1090
1091 /* must be called with manifest_lock taken */
1092 GstAdaptiveDemuxStream *
1093 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1094 {
1095   GstAdaptiveDemuxStream *stream;
1096
1097   stream = g_malloc0 (demux->stream_struct_size);
1098
1099   /* Downloading task */
1100   g_rec_mutex_init (&stream->download_lock);
1101   stream->download_task =
1102       gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1103       stream, NULL);
1104   gst_task_set_lock (stream->download_task, &stream->download_lock);
1105
1106   stream->pad = pad;
1107   stream->demux = demux;
1108   stream->fragment_bitrates =
1109       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1110   gst_pad_set_element_private (pad, stream);
1111
1112   gst_pad_set_query_function (pad,
1113       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1114   gst_pad_set_event_function (pad,
1115       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1116
1117   gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1118   g_cond_init (&stream->fragment_download_cond);
1119   g_mutex_init (&stream->fragment_download_lock);
1120
1121   demux->next_streams = g_list_append (demux->next_streams, stream);
1122
1123   return stream;
1124 }
1125
1126 GstAdaptiveDemuxStream *
1127 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1128 {
1129   GList *iter;
1130
1131   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1132     GstAdaptiveDemuxStream *stream = iter->data;
1133     if (stream->pad == pad) {
1134       return stream;
1135     }
1136   }
1137   return NULL;
1138 }
1139
1140 /* must be called with manifest_lock taken.
1141  * It will temporarily drop the manifest_lock in order to join the task.
1142  * It will join only the old_streams (the demux->streams are joined by
1143  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1144  * called)
1145  */
1146 static void
1147 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1148 {
1149   GstAdaptiveDemux *demux = stream->demux;
1150   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1151
1152   if (klass->stream_free)
1153     klass->stream_free (stream);
1154
1155   g_clear_error (&stream->last_error);
1156   if (stream->download_task) {
1157     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1158       GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1159           GST_DEBUG_PAD_NAME (stream->pad));
1160
1161       gst_task_stop (stream->download_task);
1162
1163       g_mutex_lock (&stream->fragment_download_lock);
1164       stream->cancelled = TRUE;
1165       g_cond_signal (&stream->fragment_download_cond);
1166       g_mutex_unlock (&stream->fragment_download_lock);
1167     }
1168     GST_LOG_OBJECT (demux, "Waiting for task to finish");
1169
1170     /* temporarily drop the manifest lock to join the task */
1171     GST_MANIFEST_UNLOCK (demux);
1172
1173     gst_task_join (stream->download_task);
1174
1175     GST_MANIFEST_LOCK (demux);
1176
1177     GST_LOG_OBJECT (demux, "Finished");
1178     gst_object_unref (stream->download_task);
1179     g_rec_mutex_clear (&stream->download_lock);
1180     stream->download_task = NULL;
1181   }
1182
1183   gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1184
1185   if (stream->pending_segment) {
1186     gst_event_unref (stream->pending_segment);
1187     stream->pending_segment = NULL;
1188   }
1189
1190   if (stream->pending_events) {
1191     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1192     stream->pending_events = NULL;
1193   }
1194
1195   if (stream->internal_pad) {
1196     gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1197   }
1198
1199   if (stream->src_srcpad) {
1200     gst_object_unref (stream->src_srcpad);
1201     stream->src_srcpad = NULL;
1202   }
1203
1204   if (stream->src) {
1205     gst_element_set_state (stream->src, GST_STATE_NULL);
1206     gst_bin_remove (GST_BIN_CAST (demux), stream->src);
1207     stream->src = NULL;
1208   }
1209
1210   g_cond_clear (&stream->fragment_download_cond);
1211   g_mutex_clear (&stream->fragment_download_lock);
1212   g_free (stream->fragment_bitrates);
1213
1214   if (stream->pad) {
1215     gst_object_unref (stream->pad);
1216     stream->pad = NULL;
1217   }
1218   if (stream->pending_caps)
1219     gst_caps_unref (stream->pending_caps);
1220
1221   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1222
1223   g_free (stream);
1224 }
1225
1226 /* must be called with manifest_lock taken */
1227 static gboolean
1228 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1229     gint64 * range_start, gint64 * range_stop)
1230 {
1231   GstAdaptiveDemuxClass *klass;
1232
1233   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1234
1235   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1236
1237   return klass->get_live_seek_range (demux, range_start, range_stop);
1238 }
1239
1240 /* must be called with manifest_lock taken */
1241 static gboolean
1242 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1243 {
1244   GstAdaptiveDemuxClass *klass;
1245
1246   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1247   if (gst_adaptive_demux_is_live (demux)) {
1248     return klass->get_live_seek_range != NULL;
1249   }
1250
1251   return klass->seek != NULL;
1252 }
1253
1254 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1255                               GST_SEEK_FLAG_SNAP_AFTER | \
1256                               GST_SEEK_FLAG_SNAP_NEAREST))
1257 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1258                               GST_SEEK_FLAG_SNAP_AFTER | \
1259                               GST_SEEK_FLAG_SNAP_NEAREST))
1260
1261 static gboolean
1262 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1263     GstEvent * event)
1264 {
1265   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1266   gdouble rate;
1267   GstFormat format;
1268   GstSeekFlags flags;
1269   GstSeekType start_type, stop_type;
1270   gint64 start, stop;
1271   guint32 seqnum;
1272   gboolean update;
1273   gboolean ret;
1274   GstSegment oldsegment;
1275   GstAdaptiveDemuxStream *stream = NULL;
1276
1277   GST_INFO_OBJECT (demux, "Received seek event");
1278
1279   GST_API_LOCK (demux);
1280   GST_MANIFEST_LOCK (demux);
1281
1282   if (!gst_adaptive_demux_can_seek (demux)) {
1283     GST_MANIFEST_UNLOCK (demux);
1284     GST_API_UNLOCK (demux);
1285     gst_event_unref (event);
1286     return FALSE;
1287   }
1288
1289   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1290       &stop_type, &stop);
1291
1292   if (format != GST_FORMAT_TIME) {
1293     GST_MANIFEST_UNLOCK (demux);
1294     GST_API_UNLOCK (demux);
1295     gst_event_unref (event);
1296     return FALSE;
1297   }
1298
1299   if (gst_adaptive_demux_is_live (demux)) {
1300     gint64 range_start, range_stop;
1301     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1302             &range_stop)) {
1303       GST_MANIFEST_UNLOCK (demux);
1304       GST_API_UNLOCK (demux);
1305       gst_event_unref (event);
1306       return FALSE;
1307     }
1308     if (start < range_start || start >= range_stop) {
1309       GST_MANIFEST_UNLOCK (demux);
1310       GST_API_UNLOCK (demux);
1311       GST_WARNING_OBJECT (demux, "Seek to invalid position");
1312       gst_event_unref (event);
1313       return FALSE;
1314     }
1315   }
1316
1317   seqnum = gst_event_get_seqnum (event);
1318
1319   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1320
1321   /* have a backup in case seek fails */
1322   gst_segment_copy_into (&demux->segment, &oldsegment);
1323
1324   if (flags & GST_SEEK_FLAG_FLUSH) {
1325     GstEvent *fevent;
1326
1327     GST_DEBUG_OBJECT (demux, "sending flush start");
1328     fevent = gst_event_new_flush_start ();
1329     gst_event_set_seqnum (fevent, seqnum);
1330     gst_adaptive_demux_push_src_event (demux, fevent);
1331
1332     gst_adaptive_demux_stop_tasks (demux);
1333   } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1334       (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1335
1336     gst_adaptive_demux_stop_tasks (demux);
1337   }
1338
1339   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1340
1341   /*
1342    * Handle snap seeks as follows:
1343    * 1) do the snap seeking on the stream that received
1344    *    the event
1345    * 2) use the final position on this stream to seek
1346    *    on the other streams to the same position
1347    *
1348    * We can't snap at all streams at the same time as
1349    * they might end in different positions, so just
1350    * use the one that received the event as the 'leading'
1351    * one to do the snap seek.
1352    */
1353   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1354           gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1355     GstClockTime ts;
1356     GstSeekFlags stream_seek_flags = flags;
1357
1358     /* snap-seek on the stream that received the event and then
1359      * use the resulting position to seek on all streams */
1360
1361     if (rate >= 0) {
1362       if (start_type != GST_SEEK_TYPE_NONE)
1363         ts = start;
1364       else {
1365         ts = stream->segment.position;
1366         start_type = GST_SEEK_TYPE_SET;
1367       }
1368     } else {
1369       if (stop_type != GST_SEEK_TYPE_NONE)
1370         ts = stop;
1371       else {
1372         stop_type = GST_SEEK_TYPE_SET;
1373         ts = stream->segment.position;
1374       }
1375     }
1376
1377     demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1378
1379     /* replace event with a new one without snaping to seek on all streams */
1380     gst_event_unref (event);
1381     if (rate >= 0) {
1382       start = ts;
1383     } else {
1384       stop = ts;
1385     }
1386     event =
1387         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1388         start_type, start, stop_type, stop);
1389     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1390   }
1391   stream = NULL;
1392
1393   gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1394       start, stop_type, stop, &update);
1395
1396   /* FIXME - this seems unatural, do_seek() is updating base when we
1397    * only want the start/stop position to change, maybe do_seek() needs
1398    * some fixing? */
1399   if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1400               && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1401               && stop_type == GST_SEEK_TYPE_NONE))) {
1402     demux->segment.base = oldsegment.base;
1403   }
1404
1405   GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1406
1407   ret = demux_class->seek (demux, event);
1408
1409   if (!ret) {
1410     /* Is there anything else we can do if it fails? */
1411     gst_segment_copy_into (&oldsegment, &demux->segment);
1412   } else {
1413     demux->priv->segment_seqnum = seqnum;
1414   }
1415   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1416
1417   if (flags & GST_SEEK_FLAG_FLUSH) {
1418     GstEvent *fevent;
1419
1420     GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1421     fevent = gst_event_new_flush_stop (TRUE);
1422     gst_event_set_seqnum (fevent, seqnum);
1423     gst_adaptive_demux_push_src_event (demux, fevent);
1424   }
1425
1426   if (demux->next_streams) {
1427     gst_adaptive_demux_expose_streams (demux, FALSE);
1428   } else {
1429     GList *iter;
1430     GstClockTime period_start =
1431         gst_adaptive_demux_get_period_start_time (demux);
1432
1433     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1434     for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1435       GstAdaptiveDemuxStream *stream = iter->data;
1436       GstEvent *seg_evt;
1437       GstClockTime offset;
1438
1439       /* See comments in gst_adaptive_demux_get_period_start_time() for
1440        * an explanation of the segment modifications */
1441       stream->segment = demux->segment;
1442       offset =
1443           gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1444       stream->segment.start += offset - period_start;
1445       if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1446         stream->segment.position = stream->segment.start;
1447       else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1448         stream->segment.position = stream->segment.stop;
1449       seg_evt = gst_event_new_segment (&stream->segment);
1450       gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1451       gst_event_replace (&stream->pending_segment, seg_evt);
1452       gst_event_unref (seg_evt);
1453       /* Make sure the first buffer after a seek has the discont flag */
1454       stream->discont = TRUE;
1455     }
1456
1457     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1458   }
1459
1460   /* Restart the demux */
1461   gst_adaptive_demux_start_tasks (demux);
1462   GST_MANIFEST_UNLOCK (demux);
1463   GST_API_UNLOCK (demux);
1464   gst_event_unref (event);
1465
1466   return ret;
1467 }
1468
1469 static gboolean
1470 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1471     GstEvent * event)
1472 {
1473   GstAdaptiveDemux *demux;
1474
1475   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1476
1477   /* FIXME handle events received on pads that are to be removed */
1478
1479   switch (event->type) {
1480     case GST_EVENT_SEEK:
1481     {
1482       return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1483     }
1484     case GST_EVENT_RECONFIGURE:{
1485       GstAdaptiveDemuxStream *stream;
1486
1487       GST_MANIFEST_LOCK (demux);
1488       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1489
1490       if (stream) {
1491         if (stream->last_ret == GST_FLOW_NOT_LINKED) {
1492           stream->last_ret = GST_FLOW_OK;
1493           stream->restart_download = TRUE;
1494           stream->need_header = TRUE;
1495           stream->discont = TRUE;
1496           GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1497           gst_task_start (stream->download_task);
1498         }
1499         gst_event_unref (event);
1500         GST_MANIFEST_UNLOCK (demux);
1501         return TRUE;
1502       }
1503       GST_MANIFEST_UNLOCK (demux);
1504     }
1505       break;
1506     case GST_EVENT_LATENCY:{
1507       /* Upstream and our internal source are irrelevant
1508        * for latency, and we should not fail here to
1509        * configure the latency */
1510       gst_event_unref (event);
1511       return TRUE;
1512     }
1513     default:
1514       break;
1515   }
1516
1517   return gst_pad_event_default (pad, parent, event);
1518 }
1519
1520 static gboolean
1521 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1522     GstQuery * query)
1523 {
1524   GstAdaptiveDemux *demux;
1525   GstAdaptiveDemuxClass *demux_class;
1526   gboolean ret = FALSE;
1527
1528   if (query == NULL)
1529     return FALSE;
1530
1531   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1532   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1533
1534   switch (query->type) {
1535     case GST_QUERY_DURATION:{
1536       GstClockTime duration = -1;
1537       GstFormat fmt;
1538
1539       gst_query_parse_duration (query, &fmt, NULL);
1540
1541       GST_MANIFEST_LOCK (demux);
1542
1543       if (fmt == GST_FORMAT_TIME && demux->priv->have_manifest
1544           && !gst_adaptive_demux_is_live (demux)) {
1545         duration = demux_class->get_duration (demux);
1546
1547         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1548           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1549           ret = TRUE;
1550         }
1551       }
1552
1553       GST_MANIFEST_UNLOCK (demux);
1554
1555       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1556           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1557       break;
1558     }
1559     case GST_QUERY_LATENCY:{
1560       gst_query_set_latency (query, FALSE, 0, -1);
1561       ret = TRUE;
1562       break;
1563     }
1564     case GST_QUERY_SEEKING:{
1565       GstFormat fmt;
1566       gint64 stop = -1;
1567       gint64 start = 0;
1568
1569       GST_MANIFEST_LOCK (demux);
1570
1571       if (!demux->priv->have_manifest) {
1572         GST_MANIFEST_UNLOCK (demux);
1573         GST_INFO_OBJECT (demux,
1574             "Don't have manifest yet, can't answer seeking query");
1575         return FALSE;           /* can't answer without manifest */
1576       }
1577
1578       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1579       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
1580       if (fmt == GST_FORMAT_TIME) {
1581         GstClockTime duration;
1582         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
1583
1584         ret = TRUE;
1585         if (can_seek) {
1586           if (gst_adaptive_demux_is_live (demux)) {
1587             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
1588             if (!ret) {
1589               GST_MANIFEST_UNLOCK (demux);
1590               GST_INFO_OBJECT (demux, "can't answer seeking query");
1591               return FALSE;
1592             }
1593           } else {
1594             duration = demux_class->get_duration (demux);
1595             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
1596               stop = duration;
1597           }
1598         }
1599         gst_query_set_seeking (query, fmt, can_seek, start, stop);
1600         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
1601             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
1602             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
1603       }
1604       GST_MANIFEST_UNLOCK (demux);
1605       break;
1606     }
1607     case GST_QUERY_URI:
1608
1609       GST_MANIFEST_LOCK (demux);
1610
1611       /* TODO HLS can answer this differently it seems */
1612       if (demux->manifest_uri) {
1613         /* FIXME: (hls) Do we answer with the variant playlist, with the current
1614          * playlist or the the uri of the last downlowaded fragment? */
1615         gst_query_set_uri (query, demux->manifest_uri);
1616         ret = TRUE;
1617       }
1618
1619       GST_MANIFEST_UNLOCK (demux);
1620       break;
1621     default:
1622       /* Don't forward queries upstream because of the special nature of this
1623        *  "demuxer", which relies on the upstream element only to be fed
1624        *  the Manifest
1625        */
1626       break;
1627   }
1628
1629   return ret;
1630 }
1631
1632 /* must be called with manifest_lock taken */
1633 static void
1634 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
1635 {
1636   GList *iter;
1637
1638   GST_INFO_OBJECT (demux, "Starting streams' tasks");
1639   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1640     GstAdaptiveDemuxStream *stream = iter->data;
1641
1642     g_mutex_lock (&stream->fragment_download_lock);
1643     stream->cancelled = FALSE;
1644     g_mutex_unlock (&stream->fragment_download_lock);
1645
1646     stream->last_ret = GST_FLOW_OK;
1647     gst_task_start (stream->download_task);
1648   }
1649 }
1650
1651 /* must be called with manifest_lock taken
1652  * This function will temporarily release manifest_lock in order to join the
1653  * download threads.
1654  * The api_lock will still protect it against other threads trying to modify
1655  * the demux element.
1656  */
1657 static void
1658 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux)
1659 {
1660   GList *iter;
1661
1662   gst_task_stop (demux->priv->updates_task);
1663
1664   g_mutex_lock (&demux->priv->updates_timed_lock);
1665   demux->priv->stop_updates_task = TRUE;
1666   g_cond_signal (&demux->priv->updates_timed_cond);
1667   g_mutex_unlock (&demux->priv->updates_timed_lock);
1668
1669   gst_uri_downloader_cancel (demux->downloader);
1670
1671   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1672     GstAdaptiveDemuxStream *stream = iter->data;
1673
1674     g_mutex_lock (&stream->fragment_download_lock);
1675     stream->cancelled = TRUE;
1676     gst_task_stop (stream->download_task);
1677     g_cond_signal (&stream->fragment_download_cond);
1678     g_mutex_unlock (&stream->fragment_download_lock);
1679   }
1680
1681   g_mutex_lock (&demux->priv->manifest_update_lock);
1682   g_cond_broadcast (&demux->priv->manifest_cond);
1683   g_mutex_unlock (&demux->priv->manifest_update_lock);
1684
1685   /* need to release manifest_lock before stopping the src element.
1686    * The streams were asked to cancel, so they will not make any writes to demux
1687    * object. Even if we temporarily release manifest_lock, the demux->streams
1688    * cannot change and iter cannot be invalidated.
1689    */
1690   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1691     GstAdaptiveDemuxStream *stream = iter->data;
1692     GstElement *src = stream->src;
1693
1694     GST_MANIFEST_UNLOCK (demux);
1695
1696     if (src) {
1697       gst_element_set_state (src, GST_STATE_READY);
1698     }
1699
1700     /* stream->download_task value never changes, so it is safe to read it
1701      * outside critical section
1702      */
1703     gst_task_join (stream->download_task);
1704
1705     GST_MANIFEST_LOCK (demux);
1706   }
1707
1708   GST_MANIFEST_UNLOCK (demux);
1709
1710   /* demux->priv->updates_task value never changes, so it is safe to read it
1711    * outside critical section
1712    */
1713   gst_task_join (demux->priv->updates_task);
1714
1715   GST_MANIFEST_LOCK (demux);
1716
1717   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1718     GstAdaptiveDemuxStream *stream = iter->data;
1719
1720     stream->download_error_count = 0;
1721     stream->need_header = TRUE;
1722   }
1723 }
1724
1725 /* must be called with manifest_lock taken */
1726 static gboolean
1727 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
1728 {
1729   GList *iter;
1730   gboolean ret = TRUE;
1731
1732   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1733     GstAdaptiveDemuxStream *stream = iter->data;
1734     gst_event_ref (event);
1735     ret = ret & gst_pad_push_event (stream->pad, event);
1736   }
1737   gst_event_unref (event);
1738   return ret;
1739 }
1740
1741 /* must be called with manifest_lock taken */
1742 void
1743 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
1744     GstCaps * caps)
1745 {
1746   GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
1747       caps);
1748   gst_caps_replace (&stream->pending_caps, caps);
1749   gst_caps_unref (caps);
1750 }
1751
1752 /* must be called with manifest_lock taken */
1753 void
1754 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
1755     GstTagList * tags)
1756 {
1757   GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
1758       tags);
1759   if (stream->pending_tags) {
1760     gst_tag_list_unref (stream->pending_tags);
1761   }
1762   stream->pending_tags = tags;
1763 }
1764
1765 /* must be called with manifest_lock taken */
1766 void
1767 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
1768     GstEvent * event)
1769 {
1770   stream->pending_events = g_list_append (stream->pending_events, event);
1771 }
1772
1773 /* must be called with manifest_lock taken */
1774 static guint64
1775 _update_average_bitrate (GstAdaptiveDemux * demux,
1776     GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
1777 {
1778   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
1779
1780   stream->moving_bitrate -= stream->fragment_bitrates[index];
1781   stream->fragment_bitrates[index] = new_bitrate;
1782   stream->moving_bitrate += new_bitrate;
1783
1784   stream->moving_index += 1;
1785
1786   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
1787     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
1788   return stream->moving_bitrate / stream->moving_index;
1789 }
1790
1791 /* must be called with manifest_lock taken */
1792 static guint64
1793 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
1794     GstAdaptiveDemuxStream * stream)
1795 {
1796   guint64 average_bitrate;
1797   guint64 fragment_bitrate;
1798
1799   if (demux->connection_speed) {
1800     GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
1801         demux->connection_speed / 1000);
1802     return demux->connection_speed;
1803   }
1804
1805   g_object_get (stream->queue, "avg-in-rate", &fragment_bitrate, NULL);
1806   fragment_bitrate *= 8;
1807   GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
1808       fragment_bitrate);
1809
1810   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
1811
1812   GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
1813       fragment_bitrate);
1814   GST_INFO_OBJECT (stream,
1815       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
1816       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
1817
1818   /* Conservative approach, make sure we don't upgrade too fast */
1819   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
1820
1821   stream->current_download_rate *= demux->bitrate_limit;
1822   GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
1823       G_GUINT64_FORMAT, demux->bitrate_limit,
1824       stream->current_download_rate * 8);
1825   return stream->current_download_rate;
1826 }
1827
1828 /* must be called with manifest_lock taken */
1829 static GstFlowReturn
1830 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
1831 {
1832   gboolean all_notlinked = TRUE;
1833   gboolean all_eos = TRUE;
1834   GList *iter;
1835
1836   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1837     GstAdaptiveDemuxStream *stream = iter->data;
1838
1839     if (stream->last_ret != GST_FLOW_NOT_LINKED) {
1840       all_notlinked = FALSE;
1841       if (stream->last_ret != GST_FLOW_EOS)
1842         all_eos = FALSE;
1843     }
1844
1845     if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
1846         || stream->last_ret == GST_FLOW_FLUSHING) {
1847       return stream->last_ret;
1848     }
1849   }
1850   if (all_notlinked)
1851     return GST_FLOW_NOT_LINKED;
1852   else if (all_eos)
1853     return GST_FLOW_EOS;
1854   return GST_FLOW_OK;
1855 }
1856
1857 /* must be called with manifest_lock taken.
1858  * Temporarily releases manifest_lock
1859  */
1860 GstFlowReturn
1861 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
1862     GstBuffer * buffer)
1863 {
1864   GstAdaptiveDemux *demux = stream->demux;
1865   GstFlowReturn ret = GST_FLOW_OK;
1866   gboolean discont = FALSE;
1867   /* Pending events */
1868   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
1869   GList *pending_events = NULL;
1870
1871   if (stream->first_fragment_buffer) {
1872     GstClockTime offset =
1873         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1874     GstClockTime period_start =
1875         gst_adaptive_demux_get_period_start_time (demux);
1876
1877     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1878     if (demux->segment.rate < 0)
1879       /* Set DISCONT flag for every first buffer in reverse playback mode
1880        * as each fragment for its own has to be reversed */
1881       discont = TRUE;
1882
1883     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
1884     if (GST_BUFFER_PTS_IS_VALID (buffer))
1885       GST_BUFFER_PTS (buffer) += offset;
1886
1887     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
1888       stream->segment.position = GST_BUFFER_PTS (buffer);
1889
1890       /* Convert from position inside the stream's segment to the demuxer's
1891        * segment, they are not necessarily the same */
1892       if (stream->segment.position - offset + period_start >
1893           demux->segment.position)
1894         demux->segment.position =
1895             stream->segment.position - offset + period_start;
1896     }
1897     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1898
1899     GST_LOG_OBJECT (stream->pad,
1900         "Going to push buffer with PTS %" GST_TIME_FORMAT,
1901         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
1902   } else {
1903     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
1904   }
1905
1906   if (stream->discont) {
1907     discont = TRUE;
1908     stream->discont = FALSE;
1909   }
1910
1911   if (discont) {
1912     GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
1913     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1914   } else {
1915     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
1916   }
1917
1918   stream->first_fragment_buffer = FALSE;
1919
1920   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
1921   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
1922   if (G_UNLIKELY (stream->pending_caps)) {
1923     pending_caps = gst_event_new_caps (stream->pending_caps);
1924     gst_caps_unref (stream->pending_caps);
1925     stream->pending_caps = NULL;
1926   }
1927   if (G_UNLIKELY (stream->pending_segment)) {
1928     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1929     pending_segment = stream->pending_segment;
1930     stream->pending_segment = NULL;
1931     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1932   }
1933   if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
1934     GstTagList *tags = stream->pending_tags;
1935
1936     stream->pending_tags = NULL;
1937     stream->bitrate_changed = 0;
1938
1939     if (stream->fragment.bitrate != 0) {
1940       if (tags)
1941         tags = gst_tag_list_make_writable (tags);
1942       else
1943         tags = gst_tag_list_new_empty ();
1944
1945       gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
1946           GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
1947     }
1948     pending_tags = gst_event_new_tag (tags);
1949   }
1950   if (G_UNLIKELY (stream->pending_events)) {
1951     pending_events = stream->pending_events;
1952     stream->pending_events = NULL;
1953   }
1954
1955   GST_MANIFEST_UNLOCK (demux);
1956
1957   /* Do not push events or buffers holding the manifest lock */
1958   if (G_UNLIKELY (pending_caps)) {
1959     GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
1960         pending_caps);
1961     gst_pad_push_event (stream->pad, pending_caps);
1962   }
1963   if (G_UNLIKELY (pending_segment)) {
1964     GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
1965         pending_segment);
1966     gst_pad_push_event (stream->pad, pending_segment);
1967   }
1968   if (G_UNLIKELY (pending_tags)) {
1969     GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
1970         pending_tags);
1971     gst_pad_push_event (stream->pad, pending_tags);
1972   }
1973   while (pending_events != NULL) {
1974     GstEvent *event = pending_events->data;
1975
1976     if (!gst_pad_push_event (stream->pad, event))
1977       GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
1978
1979     pending_events = g_list_delete_link (pending_events, pending_events);
1980   }
1981
1982   ret = gst_pad_push (stream->pad, buffer);
1983
1984   GST_MANIFEST_LOCK (demux);
1985
1986   g_mutex_lock (&stream->fragment_download_lock);
1987   if (G_UNLIKELY (stream->cancelled)) {
1988     ret = stream->last_ret = GST_FLOW_FLUSHING;
1989     g_mutex_unlock (&stream->fragment_download_lock);
1990     return ret;
1991   }
1992   g_mutex_unlock (&stream->fragment_download_lock);
1993
1994   GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
1995       gst_flow_get_name (ret));
1996
1997   return ret;
1998 }
1999
2000 /* must be called with manifest_lock taken */
2001 static GstFlowReturn
2002 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2003     GstAdaptiveDemuxStream * stream)
2004 {
2005   /* No need to advance, this isn't a real fragment */
2006   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2007     return GST_FLOW_OK;
2008
2009   return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2010       stream->fragment.duration);
2011 }
2012
2013 /* must be called with manifest_lock taken.
2014  * Can temporarily release manifest_lock
2015  */
2016 static GstFlowReturn
2017 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2018     GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2019 {
2020   return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2021 }
2022
2023 static GstFlowReturn
2024 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2025 {
2026   GstAdaptiveDemuxStream *stream;
2027   GstAdaptiveDemux *demux;
2028   GstAdaptiveDemuxClass *klass;
2029   GstFlowReturn ret = GST_FLOW_OK;
2030
2031   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2032   stream = gst_pad_get_element_private (pad);
2033   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2034
2035   GST_MANIFEST_LOCK (demux);
2036
2037   /* do not make any changes if the stream is cancelled */
2038   g_mutex_lock (&stream->fragment_download_lock);
2039   if (G_UNLIKELY (stream->cancelled)) {
2040     g_mutex_unlock (&stream->fragment_download_lock);
2041     gst_buffer_unref (buffer);
2042     ret = stream->last_ret = GST_FLOW_FLUSHING;
2043     GST_MANIFEST_UNLOCK (demux);
2044     return ret;
2045   }
2046   g_mutex_unlock (&stream->fragment_download_lock);
2047
2048   if (stream->starting_fragment) {
2049     GstClockTime offset =
2050         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2051     GstClockTime period_start =
2052         gst_adaptive_demux_get_period_start_time (demux);
2053
2054     stream->starting_fragment = FALSE;
2055     if (klass->start_fragment) {
2056       if (!klass->start_fragment (demux, stream)) {
2057         ret = GST_FLOW_ERROR;
2058         goto error;
2059       }
2060     }
2061
2062     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2063     if (GST_BUFFER_PTS_IS_VALID (buffer))
2064       GST_BUFFER_PTS (buffer) += offset;
2065
2066     GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2067         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2068
2069     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2070       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2071       stream->segment.position = GST_BUFFER_PTS (buffer);
2072
2073       /* Convert from position inside the stream's segment to the demuxer's
2074        * segment, they are not necessarily the same */
2075       if (stream->segment.position - offset + period_start >
2076           demux->segment.position)
2077         demux->segment.position =
2078             stream->segment.position - offset + period_start;
2079       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2080     }
2081
2082   } else {
2083     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2084   }
2085   if (stream->downloading_first_buffer) {
2086     gint64 chunk_size = 0;
2087
2088     stream->downloading_first_buffer = FALSE;
2089
2090     if (!stream->downloading_header && !stream->downloading_index) {
2091       /* If this is the first buffer of a fragment (not the headers or index)
2092        * and we don't have a birate from the sub-class, then see if we
2093        * can work it out from the fragment size and duration */
2094       if (stream->fragment.bitrate == 0 &&
2095           stream->fragment.duration != 0 &&
2096           gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2097               &chunk_size)) {
2098         guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2099                 8 * GST_SECOND, stream->fragment.duration));
2100         GST_LOG_OBJECT (demux,
2101             "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2102             " = bitrate %u", chunk_size,
2103             GST_TIME_ARGS (stream->fragment.duration), bitrate);
2104         stream->fragment.bitrate = bitrate;
2105       }
2106       if (stream->fragment.bitrate) {
2107         stream->bitrate_changed = TRUE;
2108       } else {
2109         GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2110       }
2111     }
2112   }
2113
2114   stream->download_total_time +=
2115       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux)) -
2116       stream->download_chunk_start_time;
2117   stream->download_total_bytes += gst_buffer_get_size (buffer);
2118
2119   GST_DEBUG_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2120       gst_buffer_get_size (buffer));
2121
2122   ret = klass->data_received (demux, stream, buffer);
2123
2124   if (ret == GST_FLOW_FLUSHING) {
2125     /* do not make any changes if the stream is cancelled */
2126     g_mutex_lock (&stream->fragment_download_lock);
2127     if (G_UNLIKELY (stream->cancelled)) {
2128       g_mutex_unlock (&stream->fragment_download_lock);
2129       GST_MANIFEST_UNLOCK (demux);
2130       return ret;
2131     }
2132     g_mutex_unlock (&stream->fragment_download_lock);
2133   }
2134
2135   stream->download_chunk_start_time =
2136       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
2137
2138   if (ret != GST_FLOW_OK) {
2139     if (ret < GST_FLOW_EOS) {
2140       GST_ELEMENT_ERROR (demux, STREAM, FAILED, (NULL),
2141           ("stream stopped, reason %s", gst_flow_get_name (ret)));
2142
2143       /* TODO push this on all pads */
2144       gst_pad_push_event (stream->pad, gst_event_new_eos ());
2145     } else {
2146       GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2147           gst_flow_get_name (ret));
2148     }
2149
2150     gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2151     if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH)
2152       ret = GST_FLOW_EOS;       /* return EOS to make the source stop */
2153   }
2154
2155 error:
2156
2157   GST_MANIFEST_UNLOCK (demux);
2158
2159   return ret;
2160 }
2161
2162 /* must be called with manifest_lock taken */
2163 static void
2164 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2165     stream, GstFlowReturn ret, GError * err)
2166 {
2167   GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2168       gst_flow_get_name (ret), err);
2169
2170   /* if we have an error, only replace last_ret if it was OK before to avoid
2171    * overwriting the first error we got */
2172   if (stream->last_ret == GST_FLOW_OK) {
2173     stream->last_ret = ret;
2174     if (err) {
2175       g_clear_error (&stream->last_error);
2176       stream->last_error = g_error_copy (err);
2177     }
2178   }
2179   g_mutex_lock (&stream->fragment_download_lock);
2180   stream->download_finished = TRUE;
2181   g_cond_signal (&stream->fragment_download_cond);
2182   g_mutex_unlock (&stream->fragment_download_lock);
2183 }
2184
2185 static gboolean
2186 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2187 {
2188   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2189   GstAdaptiveDemux *demux = stream->demux;
2190
2191   switch (GST_EVENT_TYPE (event)) {
2192     case GST_EVENT_EOS:{
2193       GstAdaptiveDemuxClass *klass;
2194       GstFlowReturn ret;
2195
2196       GST_MANIFEST_LOCK (demux);
2197
2198       klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2199       ret = klass->finish_fragment (demux, stream);
2200       gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2201
2202       GST_MANIFEST_UNLOCK (demux);
2203       break;
2204     }
2205     default:
2206       break;
2207   }
2208
2209   gst_event_unref (event);
2210
2211   return TRUE;
2212 }
2213
2214 static gboolean
2215 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2216 {
2217   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2218
2219   switch (GST_QUERY_TYPE (query)) {
2220     case GST_QUERY_ALLOCATION:
2221       return FALSE;
2222       break;
2223     default:
2224       break;
2225   }
2226
2227   return gst_pad_peer_query (stream->pad, query);
2228 }
2229
2230 /* must be called with manifest_lock taken.
2231  * Can temporarily release manifest_lock
2232  */
2233 static gboolean
2234 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2235     GstAdaptiveDemuxStream * stream)
2236 {
2237   gboolean ret = TRUE;
2238
2239   /* Wait until we're cancelled or there's something for
2240    * us to download in the playlist or the playlist
2241    * became non-live */
2242   while (TRUE) {
2243     GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2244
2245     /* get the manifest_update_lock while still holding the manifest_lock.
2246      * This will prevent other threads to signal the condition (they will need
2247      * both manifest_lock and manifest_update_lock in order to signal).
2248      * It cannot deadlock because all threads always get the manifest_lock first
2249      * and manifest_update_lock second.
2250      */
2251     g_mutex_lock (&demux->priv->manifest_update_lock);
2252
2253     GST_MANIFEST_UNLOCK (demux);
2254
2255     g_cond_wait (&demux->priv->manifest_cond,
2256         &demux->priv->manifest_update_lock);
2257     g_mutex_unlock (&demux->priv->manifest_update_lock);
2258
2259     GST_MANIFEST_LOCK (demux);
2260
2261     /* check for cancelled every time we get the manifest_lock */
2262     g_mutex_lock (&stream->fragment_download_lock);
2263     if (G_UNLIKELY (stream->cancelled)) {
2264       ret = FALSE;
2265       stream->last_ret = GST_FLOW_FLUSHING;
2266       g_mutex_unlock (&stream->fragment_download_lock);
2267       break;
2268     }
2269     g_mutex_unlock (&stream->fragment_download_lock);
2270
2271     /* Got a new fragment or not live anymore? */
2272     if (gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
2273       GST_DEBUG_OBJECT (demux, "new fragment available, "
2274           "not waiting for manifest update");
2275       ret = TRUE;
2276       break;
2277     }
2278
2279     if (!gst_adaptive_demux_is_live (demux)) {
2280       GST_DEBUG_OBJECT (demux, "Not live anymore, "
2281           "not waiting for manifest update");
2282       ret = FALSE;
2283       break;
2284     }
2285   }
2286   GST_DEBUG_OBJECT (demux, "Retrying now");
2287   return ret;
2288 }
2289
2290 /* must be called with manifest_lock taken */
2291 static gboolean
2292 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2293     const gchar * uri, const gchar * referer, gboolean refresh,
2294     gboolean allow_cache)
2295 {
2296   GstAdaptiveDemux *demux = stream->demux;
2297
2298   if (!gst_uri_is_valid (uri)) {
2299     GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2300     return FALSE;
2301   }
2302
2303   if (stream->src != NULL) {
2304     gchar *old_protocol, *new_protocol;
2305     gchar *old_uri;
2306
2307     old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2308     old_protocol = gst_uri_get_protocol (old_uri);
2309     new_protocol = gst_uri_get_protocol (uri);
2310
2311     if (!g_str_equal (old_protocol, new_protocol)) {
2312       gst_object_unref (stream->src_srcpad);
2313       gst_element_set_state (stream->src, GST_STATE_NULL);
2314       gst_bin_remove (GST_BIN_CAST (demux), stream->src);
2315       stream->src = NULL;
2316       stream->src_srcpad = NULL;
2317       GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2318     } else {
2319       GError *err = NULL;
2320
2321       GST_DEBUG_OBJECT (demux, "Re-using old source element");
2322       if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
2323               &err)) {
2324         GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
2325             err->message);
2326         g_clear_error (&err);
2327         gst_object_unref (stream->src_srcpad);
2328         gst_element_set_state (stream->src, GST_STATE_NULL);
2329         gst_bin_remove (GST_BIN_CAST (demux), stream->src);
2330         stream->src = NULL;
2331         stream->src_srcpad = NULL;
2332       }
2333     }
2334     g_free (old_uri);
2335     g_free (old_protocol);
2336     g_free (new_protocol);
2337   }
2338
2339   if (stream->src == NULL) {
2340     GstPad *uri_handler_src;
2341     GstPad *queue_sink;
2342     GstPad *queue_src;
2343     GstElement *uri_handler;
2344     GstElement *queue;
2345     GstPadLinkReturn pad_link_ret;
2346     GObjectClass *gobject_class;
2347     gchar *internal_name, *bin_name;
2348
2349     /* Our src consists of a bin containing uri_handler -> queue2 . The
2350      * purpose of the queue2 is to allow the uri_handler to download an
2351      * entire fragment without blocking, so we can accurately measure the
2352      * download bitrate. */
2353
2354     queue = gst_element_factory_make ("queue2", NULL);
2355     if (queue == NULL)
2356       return FALSE;
2357
2358     g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
2359     g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
2360     g_object_set (queue, "max-size-time", (guint64) 0, NULL);
2361
2362     uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
2363     if (uri_handler == NULL) {
2364       GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
2365           ("Missing plugin to handle URI: '%s'", uri), (NULL));
2366       gst_object_unref (queue);
2367       return FALSE;
2368     }
2369
2370     gobject_class = G_OBJECT_GET_CLASS (uri_handler);
2371
2372     if (g_object_class_find_property (gobject_class, "compress"))
2373       g_object_set (uri_handler, "compress", FALSE, NULL);
2374     if (g_object_class_find_property (gobject_class, "keep-alive"))
2375       g_object_set (uri_handler, "keep-alive", TRUE, NULL);
2376     if (g_object_class_find_property (gobject_class, "extra-headers")) {
2377       if (referer || refresh || !allow_cache) {
2378         GstStructure *extra_headers = gst_structure_new_empty ("headers");
2379
2380         if (referer)
2381           gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
2382               NULL);
2383
2384         if (!allow_cache)
2385           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
2386               "no-cache", NULL);
2387         else if (refresh)
2388           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
2389               "max-age=0", NULL);
2390
2391         g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
2392
2393         gst_structure_free (extra_headers);
2394       } else {
2395         g_object_set (uri_handler, "extra-headers", NULL, NULL);
2396       }
2397     }
2398
2399     /* Source bin creation */
2400     bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
2401     stream->src = gst_bin_new (bin_name);
2402     g_free (bin_name);
2403     if (stream->src == NULL) {
2404       gst_object_unref (queue);
2405       gst_object_unref (uri_handler);
2406       return FALSE;
2407     }
2408
2409     gst_bin_add (GST_BIN_CAST (stream->src), queue);
2410     gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
2411
2412     uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
2413     queue_sink = gst_element_get_static_pad (queue, "sink");
2414
2415     pad_link_ret =
2416         gst_pad_link_full (uri_handler_src, queue_sink,
2417         GST_PAD_LINK_CHECK_NOTHING);
2418     if (GST_PAD_LINK_FAILED (pad_link_ret)) {
2419       GST_WARNING_OBJECT (demux,
2420           "Could not link pads %s:%s to %s:%s for reason %d",
2421           GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
2422           pad_link_ret);
2423       g_object_unref (queue_sink);
2424       g_object_unref (uri_handler_src);
2425       gst_object_unref (stream->src);
2426       stream->src = NULL;
2427       return FALSE;
2428     }
2429
2430     g_object_unref (queue_sink);
2431     g_object_unref (uri_handler_src);
2432     queue_src = gst_element_get_static_pad (queue, "src");
2433     stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
2434     g_object_unref (queue_src);
2435     gst_element_add_pad (stream->src, stream->src_srcpad);
2436
2437     gst_element_set_locked_state (stream->src, TRUE);
2438     gst_bin_add (GST_BIN_CAST (demux), stream->src);
2439     stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
2440
2441     /* set up our internal floating pad to drop all events from
2442      * the http src we don't care about. On the chain function
2443      * we just push the buffer forward */
2444     internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
2445     stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
2446     g_free (internal_name);
2447     gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
2448         GST_OBJECT_CAST (demux));
2449     GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
2450     gst_pad_set_element_private (stream->internal_pad, stream);
2451     gst_pad_set_active (stream->internal_pad, TRUE);
2452     gst_pad_set_chain_function (stream->internal_pad, _src_chain);
2453     gst_pad_set_event_function (stream->internal_pad, _src_event);
2454     gst_pad_set_query_function (stream->internal_pad, _src_query);
2455
2456     if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
2457             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2458       GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
2459       return FALSE;
2460     }
2461
2462     stream->uri_handler = uri_handler;
2463     stream->queue = queue;
2464   }
2465   return TRUE;
2466 }
2467
2468 /* must be called with manifest_lock taken.
2469  * Can temporarily release manifest_lock
2470  */
2471 static GstFlowReturn
2472 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
2473     GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
2474     gint64 end)
2475 {
2476   GstFlowReturn ret = GST_FLOW_OK;
2477   GST_DEBUG_OBJECT (stream->pad, "Downloading uri: %s, range:%" G_GINT64_FORMAT
2478       " - %" G_GINT64_FORMAT, uri, start, end);
2479
2480   if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
2481     ret = stream->last_ret = GST_FLOW_ERROR;
2482     return ret;
2483   }
2484
2485   if (gst_element_set_state (stream->src,
2486           GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
2487     if (start != 0 || end != -1) {
2488       /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
2489        * stop position */
2490       if (end != -1)
2491         end += 1;
2492       /* Send the seek event to the uri_handler, as the other pipeline elements
2493        * can't handle it when READY. */
2494       if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
2495                   GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
2496                   GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
2497
2498         /* looks like the source can't handle seeks in READY */
2499         g_clear_error (&stream->last_error);
2500         stream->last_error = g_error_new (GST_CORE_ERROR,
2501             GST_CORE_ERROR_NOT_IMPLEMENTED,
2502             "Source element can't handle range requests");
2503         stream->last_ret = GST_FLOW_ERROR;
2504       }
2505     }
2506
2507     if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
2508       stream->download_start_time =
2509           GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
2510       stream->download_chunk_start_time = stream->download_start_time;
2511
2512       /* src element is in state READY. Before we start it, we reset
2513        * download_finished
2514        */
2515       g_mutex_lock (&stream->fragment_download_lock);
2516       stream->download_finished = FALSE;
2517       stream->downloading_first_buffer = TRUE;
2518       g_mutex_unlock (&stream->fragment_download_lock);
2519
2520       GST_MANIFEST_UNLOCK (demux);
2521
2522       if (!gst_element_sync_state_with_parent (stream->src)) {
2523         GST_WARNING_OBJECT (demux, "Could not sync state for src element");
2524         GST_MANIFEST_LOCK (demux);
2525         ret = stream->last_ret = GST_FLOW_ERROR;
2526         return ret;
2527       }
2528
2529       /* wait for the fragment to be completely downloaded */
2530       GST_DEBUG_OBJECT (stream->pad,
2531           "Waiting for fragment download to finish: %s", uri);
2532
2533       g_mutex_lock (&stream->fragment_download_lock);
2534       if (G_UNLIKELY (stream->cancelled)) {
2535         g_mutex_unlock (&stream->fragment_download_lock);
2536         GST_MANIFEST_LOCK (demux);
2537         ret = stream->last_ret = GST_FLOW_FLUSHING;
2538         return ret;
2539       }
2540       while (!stream->cancelled && !stream->download_finished) {
2541         g_cond_wait (&stream->fragment_download_cond,
2542             &stream->fragment_download_lock);
2543       }
2544       g_mutex_unlock (&stream->fragment_download_lock);
2545
2546       GST_MANIFEST_LOCK (demux);
2547       g_mutex_lock (&stream->fragment_download_lock);
2548       if (G_UNLIKELY (stream->cancelled)) {
2549         ret = stream->last_ret = GST_FLOW_FLUSHING;
2550         g_mutex_unlock (&stream->fragment_download_lock);
2551         return ret;
2552       }
2553       g_mutex_unlock (&stream->fragment_download_lock);
2554
2555       ret = stream->last_ret;
2556
2557       GST_DEBUG_OBJECT (stream->pad, "Fragment download finished: %s %d %s",
2558           uri, stream->last_ret, gst_flow_get_name (stream->last_ret));
2559     }
2560   } else {
2561     if (stream->last_ret == GST_FLOW_OK)
2562       stream->last_ret = GST_FLOW_CUSTOM_ERROR;
2563     ret = GST_FLOW_CUSTOM_ERROR;
2564   }
2565
2566   /* changing src element state might try to join the streaming thread, so
2567    * we must not hold the manifest lock.
2568    */
2569   GST_MANIFEST_UNLOCK (demux);
2570
2571   gst_element_set_state (stream->src, GST_STATE_READY);
2572
2573   GST_MANIFEST_LOCK (demux);
2574   g_mutex_lock (&stream->fragment_download_lock);
2575   if (G_UNLIKELY (stream->cancelled)) {
2576     ret = stream->last_ret = GST_FLOW_FLUSHING;
2577     g_mutex_unlock (&stream->fragment_download_lock);
2578     return ret;
2579   }
2580   g_mutex_unlock (&stream->fragment_download_lock);
2581
2582   /* deactivate and reactivate our ghostpad to make it fresh for a new
2583    * stream */
2584   gst_pad_set_active (stream->internal_pad, FALSE);
2585   gst_pad_set_active (stream->internal_pad, TRUE);
2586
2587   return ret;
2588 }
2589
2590 /* must be called with manifest_lock taken.
2591  * Can temporarily release manifest_lock
2592  */
2593 static GstFlowReturn
2594 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
2595     stream)
2596 {
2597   GstAdaptiveDemux *demux = stream->demux;
2598   GstFlowReturn ret = GST_FLOW_OK;
2599
2600   if (stream->fragment.header_uri != NULL) {
2601     GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
2602         G_GINT64_FORMAT, stream->fragment.header_uri,
2603         stream->fragment.header_range_start, stream->fragment.header_range_end);
2604
2605     stream->downloading_header = TRUE;
2606     ret = gst_adaptive_demux_stream_download_uri (demux, stream,
2607         stream->fragment.header_uri, stream->fragment.header_range_start,
2608         stream->fragment.header_range_end);
2609     stream->downloading_header = FALSE;
2610   }
2611
2612   /* check if we have an index */
2613   if (ret == GST_FLOW_OK) {     /* TODO check for other valid types */
2614
2615     if (stream->fragment.index_uri != NULL) {
2616       GST_DEBUG_OBJECT (demux,
2617           "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
2618           stream->fragment.index_uri,
2619           stream->fragment.index_range_start, stream->fragment.index_range_end);
2620       stream->downloading_index = TRUE;
2621       ret = gst_adaptive_demux_stream_download_uri (demux, stream,
2622           stream->fragment.index_uri, stream->fragment.index_range_start,
2623           stream->fragment.index_range_end);
2624       stream->downloading_index = FALSE;
2625     }
2626   }
2627
2628   return ret;
2629 }
2630
2631 /* must be called with manifest_lock taken.
2632  * Can temporarily release manifest_lock
2633  */
2634 static GstFlowReturn
2635 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
2636 {
2637   GstAdaptiveDemux *demux = stream->demux;
2638   gchar *url = NULL;
2639   GstFlowReturn ret = GST_FLOW_OK;
2640
2641   stream->starting_fragment = TRUE;
2642   stream->last_ret = GST_FLOW_OK;
2643   stream->first_fragment_buffer = TRUE;
2644
2645   if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
2646       stream->fragment.index_uri == NULL)
2647     goto no_url_error;
2648
2649   if (stream->need_header) {
2650     ret = gst_adaptive_demux_stream_download_header_fragment (stream);
2651     if (ret != GST_FLOW_OK) {
2652       return ret;
2653     }
2654     stream->need_header = FALSE;
2655   }
2656
2657   url = stream->fragment.uri;
2658   GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
2659   if (url) {
2660     ret =
2661         gst_adaptive_demux_stream_download_uri (demux, stream, url,
2662         stream->fragment.range_start, stream->fragment.range_end);
2663     GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d %s",
2664         stream->last_ret, gst_flow_get_name (stream->last_ret));
2665     if (ret != GST_FLOW_OK) {
2666       g_mutex_lock (&stream->fragment_download_lock);
2667       if (G_UNLIKELY (stream->cancelled)) {
2668         g_mutex_unlock (&stream->fragment_download_lock);
2669         return ret;
2670       }
2671       g_mutex_unlock (&stream->fragment_download_lock);
2672
2673       /* TODO check if we are truly stopping */
2674       if (ret == GST_FLOW_CUSTOM_ERROR && gst_adaptive_demux_is_live (demux)) {
2675         if (++stream->download_error_count <= MAX_DOWNLOAD_ERROR_COUNT) {
2676           /* looks like there is no way of knowing when a live stream has ended
2677            * Have to assume we are falling behind and cause a manifest reload */
2678           GST_DEBUG_OBJECT (stream->pad,
2679               "Converting error of live stream to EOS");
2680           return GST_FLOW_EOS;
2681         }
2682       } else if (ret == GST_FLOW_CUSTOM_ERROR
2683           && !gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
2684         /* If this is the last fragment, consider failures EOS and not actual
2685          * errors. Due to rounding errors in the durations, the last fragment
2686          * might not actually exist */
2687         GST_DEBUG_OBJECT (stream->pad,
2688             "Converting error for last fragment to EOS");
2689         return GST_FLOW_EOS;
2690       }
2691     }
2692   }
2693
2694   return ret;
2695
2696 no_url_error:
2697   {
2698     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
2699         (_("Failed to get fragment URL.")),
2700         ("An error happened when getting fragment URL"));
2701     gst_task_stop (stream->download_task);
2702     return GST_FLOW_ERROR;
2703   }
2704 }
2705
2706 /* this function will take the manifest_lock and will keep it until the end.
2707  * It will release it temporarily only when going to sleep.
2708  * Every time it takes the manifest_lock, it will check for cancelled condition
2709  */
2710 static void
2711 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
2712 {
2713   GstAdaptiveDemux *demux = stream->demux;
2714   GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
2715   GstFlowReturn ret;
2716   gboolean live;
2717
2718   GST_LOG_OBJECT (stream->pad, "download loop start");
2719
2720   GST_MANIFEST_LOCK (demux);
2721
2722   g_mutex_lock (&stream->fragment_download_lock);
2723   if (G_UNLIKELY (stream->cancelled)) {
2724     stream->last_ret = GST_FLOW_FLUSHING;
2725     g_mutex_unlock (&stream->fragment_download_lock);
2726     goto cancelled;
2727   }
2728   g_mutex_unlock (&stream->fragment_download_lock);
2729
2730   /* Check if we're done with our segment */
2731   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2732   if (demux->segment.rate > 0) {
2733     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
2734         && stream->segment.position >= stream->segment.stop) {
2735       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2736       ret = GST_FLOW_EOS;
2737       gst_task_stop (stream->download_task);
2738       goto end_of_manifest;
2739     }
2740   } else {
2741     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
2742         && stream->segment.position <= stream->segment.start) {
2743       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2744       ret = GST_FLOW_EOS;
2745       gst_task_stop (stream->download_task);
2746       goto end_of_manifest;
2747     }
2748   }
2749   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2750
2751   /* Cleanup old streams if any */
2752   if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
2753     GList *old_streams = demux->priv->old_streams;
2754     demux->priv->old_streams = NULL;
2755
2756     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
2757     g_list_free_full (old_streams,
2758         (GDestroyNotify) gst_adaptive_demux_stream_free);
2759     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
2760
2761     /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
2762      * Recheck the cancelled flag.
2763      */
2764     g_mutex_lock (&stream->fragment_download_lock);
2765     if (G_UNLIKELY (stream->cancelled)) {
2766       stream->last_ret = GST_FLOW_FLUSHING;
2767       g_mutex_unlock (&stream->fragment_download_lock);
2768       goto cancelled;
2769     }
2770     g_mutex_unlock (&stream->fragment_download_lock);
2771   }
2772
2773   if (G_UNLIKELY (stream->restart_download)) {
2774     GstEvent *seg_event;
2775     GstClockTime cur, ts = 0;
2776     gint64 pos;
2777
2778     GST_DEBUG_OBJECT (stream->pad,
2779         "Activating stream due to reconfigure event");
2780
2781     if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
2782       ts = (GstClockTime) pos;
2783       GST_DEBUG_OBJECT (demux, "Downstream position: %"
2784           GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2785     } else {
2786       /* query other pads as some faulty element in the pad's branch might
2787        * reject position queries. This should be better than using the
2788        * demux segment position that can be much ahead */
2789       GList *iter;
2790
2791       for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
2792         GstAdaptiveDemuxStream *cur_stream =
2793             (GstAdaptiveDemuxStream *) iter->data;
2794
2795         if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
2796                 &pos)) {
2797           ts = (GstClockTime) pos;
2798           GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
2799               GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2800           break;
2801         }
2802       }
2803     }
2804
2805     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2806     cur =
2807         gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
2808         stream->segment.position);
2809
2810     /* we might have already pushed this data */
2811     ts = MAX (ts, cur);
2812
2813     GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
2814         "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2815
2816     if (GST_CLOCK_TIME_IS_VALID (ts)) {
2817       GstClockTime offset, period_start;
2818
2819       offset =
2820           gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2821       period_start = gst_adaptive_demux_get_period_start_time (demux);
2822
2823       /* TODO check return */
2824       gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
2825           0, ts, &ts);
2826
2827       stream->segment.position = ts - period_start + offset;
2828     }
2829
2830     /* The stream's segment is still correct except for
2831      * the position, so let's send a new one with the
2832      * updated position */
2833     seg_event = gst_event_new_segment (&stream->segment);
2834     gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
2835     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2836
2837     GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
2838         GST_PTR_FORMAT, seg_event);
2839     gst_pad_push_event (stream->pad, seg_event);
2840
2841     stream->discont = TRUE;
2842     stream->restart_download = FALSE;
2843   }
2844
2845   live = gst_adaptive_demux_is_live (demux);
2846
2847   ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
2848   GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
2849       ret, gst_flow_get_name (ret));
2850   if (ret == GST_FLOW_OK) {
2851
2852     /* wait for live fragments to be available */
2853     if (live) {
2854       gint64 wait_time =
2855           gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
2856       if (wait_time > 0) {
2857         GstClockTime end_time =
2858             gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
2859
2860         GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
2861             GST_TIME_ARGS (wait_time));
2862
2863         GST_MANIFEST_UNLOCK (demux);
2864
2865         g_mutex_lock (&stream->fragment_download_lock);
2866         if (G_UNLIKELY (stream->cancelled)) {
2867           g_mutex_unlock (&stream->fragment_download_lock);
2868           GST_MANIFEST_LOCK (demux);
2869           stream->last_ret = GST_FLOW_FLUSHING;
2870           goto cancelled;
2871         }
2872         gst_adaptive_demux_wait_until (demux->realtime_clock,
2873             &stream->fragment_download_cond, &stream->fragment_download_lock,
2874             end_time);
2875         g_mutex_unlock (&stream->fragment_download_lock);
2876
2877         GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
2878
2879         GST_MANIFEST_LOCK (demux);
2880
2881         g_mutex_lock (&stream->fragment_download_lock);
2882         if (G_UNLIKELY (stream->cancelled)) {
2883           stream->last_ret = GST_FLOW_FLUSHING;
2884           g_mutex_unlock (&stream->fragment_download_lock);
2885           goto cancelled;
2886         }
2887         g_mutex_unlock (&stream->fragment_download_lock);
2888       }
2889     }
2890
2891     stream->last_ret = GST_FLOW_OK;
2892
2893     next_download = gst_adaptive_demux_get_monotonic_time (demux);
2894     ret = gst_adaptive_demux_stream_download_fragment (stream);
2895
2896     if (ret == GST_FLOW_FLUSHING) {
2897       g_mutex_lock (&stream->fragment_download_lock);
2898       if (G_UNLIKELY (stream->cancelled)) {
2899         stream->last_ret = GST_FLOW_FLUSHING;
2900         g_mutex_unlock (&stream->fragment_download_lock);
2901         goto cancelled;
2902       }
2903       g_mutex_unlock (&stream->fragment_download_lock);
2904     }
2905
2906   } else {
2907     stream->last_ret = ret;
2908   }
2909
2910   switch (ret) {
2911     case GST_FLOW_OK:
2912       break;                    /* all is good, let's go */
2913     case GST_FLOW_EOS:
2914       GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
2915       /* we push the EOS after releasing the object lock */
2916       if (gst_adaptive_demux_is_live (demux)) {
2917         if (gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
2918           goto end;
2919         }
2920         gst_task_stop (stream->download_task);
2921       } else {
2922         gst_task_stop (stream->download_task);
2923         if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
2924           if (gst_adaptive_demux_has_next_period (demux)) {
2925             gst_adaptive_demux_advance_period (demux);
2926             ret = GST_FLOW_OK;
2927           }
2928         }
2929       }
2930       break;
2931
2932     case GST_FLOW_NOT_LINKED:
2933       gst_task_stop (stream->download_task);
2934       if (gst_adaptive_demux_combine_flows (demux)
2935           == GST_FLOW_NOT_LINKED) {
2936         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
2937             (_("Internal data stream error.")), ("stream stopped, reason %s",
2938                 gst_flow_get_name (GST_FLOW_NOT_LINKED)));
2939       }
2940       break;
2941
2942     case GST_FLOW_FLUSHING:{
2943       GList *iter;
2944
2945       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2946         GstAdaptiveDemuxStream *other;
2947
2948         other = iter->data;
2949         gst_task_stop (other->download_task);
2950       }
2951     }
2952       break;
2953
2954     default:
2955       if (ret <= GST_FLOW_ERROR) {
2956         gboolean is_live = gst_adaptive_demux_is_live (demux);
2957         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
2958         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
2959           goto download_error;
2960         }
2961
2962         g_clear_error (&stream->last_error);
2963
2964         /* First try to update the playlist for non-live playlists
2965          * in case the URIs have changed in the meantime. But only
2966          * try it the first time, after that we're going to wait a
2967          * a bit to not flood the server */
2968         if (stream->download_error_count == 1 && !is_live) {
2969           /* TODO hlsdemux had more options to this function (boolean and err) */
2970
2971           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
2972             /* Retry immediately, the playlist actually has changed */
2973             GST_DEBUG_OBJECT (demux, "Updated the playlist");
2974             goto end;
2975           }
2976         }
2977
2978         /* Wait half the fragment duration before retrying */
2979         next_download += stream->fragment.duration / 2;
2980
2981         GST_MANIFEST_UNLOCK (demux);
2982
2983         g_mutex_lock (&stream->fragment_download_lock);
2984         if (G_UNLIKELY (stream->cancelled)) {
2985           g_mutex_unlock (&stream->fragment_download_lock);
2986           GST_MANIFEST_LOCK (demux);
2987           stream->last_ret = GST_FLOW_FLUSHING;
2988           goto cancelled;
2989         }
2990         gst_adaptive_demux_wait_until (demux->realtime_clock,
2991             &stream->fragment_download_cond, &stream->fragment_download_lock,
2992             next_download);
2993         g_mutex_unlock (&stream->fragment_download_lock);
2994
2995         GST_DEBUG_OBJECT (demux, "Retrying now");
2996
2997         GST_MANIFEST_LOCK (demux);
2998
2999         g_mutex_lock (&stream->fragment_download_lock);
3000         if (G_UNLIKELY (stream->cancelled)) {
3001           stream->last_ret = GST_FLOW_FLUSHING;
3002           g_mutex_unlock (&stream->fragment_download_lock);
3003           goto cancelled;
3004         }
3005         g_mutex_unlock (&stream->fragment_download_lock);
3006
3007         /* Refetch the playlist now after we waited */
3008         if (!is_live
3009             && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3010           GST_DEBUG_OBJECT (demux, "Updated the playlist");
3011         }
3012         goto end;
3013       }
3014       break;
3015   }
3016
3017 end_of_manifest:
3018   if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3019     gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3020   }
3021
3022 end:
3023   GST_MANIFEST_UNLOCK (demux);
3024   GST_LOG_OBJECT (stream->pad, "download loop end");
3025   return;
3026
3027 cancelled:
3028   {
3029     GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3030     goto end;
3031   }
3032 download_error:
3033   {
3034     GstMessage *msg;
3035
3036     if (stream->last_error) {
3037       gchar *debug = g_strdup_printf ("Error on stream %s:%s",
3038           GST_DEBUG_PAD_NAME (stream->pad));
3039       msg =
3040           gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
3041           debug);
3042       GST_ERROR_OBJECT (stream->pad, "Download error: %s",
3043           stream->last_error->message);
3044       g_free (debug);
3045     } else {
3046       GError *err =
3047           g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
3048           _("Couldn't download fragments"));
3049       msg =
3050           gst_message_new_error (GST_OBJECT_CAST (demux), err,
3051           "Fragment downloading has failed consecutive times");
3052       g_error_free (err);
3053       GST_ERROR_OBJECT (stream->pad,
3054           "Download error: Couldn't download fragments, too many failures");
3055     }
3056
3057     gst_task_stop (stream->download_task);
3058     if (stream->src) {
3059       gst_element_set_state (stream->src, GST_STATE_NULL);
3060       gst_bin_remove (GST_BIN_CAST (demux), stream->src);
3061       stream->src = NULL;
3062     }
3063
3064     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3065
3066     goto end;
3067   }
3068 }
3069
3070 static void
3071 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
3072 {
3073   GstClockTime next_update;
3074   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3075
3076   /* Loop for updating of the playlist. This periodically checks if
3077    * the playlist is updated and does so, then signals the streaming
3078    * thread in case it can continue downloading now. */
3079
3080   /* block until the next scheduled update or the signal to quit this thread */
3081   GST_DEBUG_OBJECT (demux, "Started updates task");
3082
3083   GST_MANIFEST_LOCK (demux);
3084
3085   next_update =
3086       gst_adaptive_demux_get_monotonic_time (demux) +
3087       klass->get_manifest_update_interval (demux) * GST_USECOND;
3088
3089   /* Updating playlist only needed for live playlists */
3090   while (gst_adaptive_demux_is_live (demux)) {
3091     GstFlowReturn ret = GST_FLOW_OK;
3092
3093     /* Wait here until we should do the next update or we're cancelled */
3094     GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
3095
3096     GST_MANIFEST_UNLOCK (demux);
3097
3098     g_mutex_lock (&demux->priv->updates_timed_lock);
3099     if (demux->priv->stop_updates_task) {
3100       g_mutex_unlock (&demux->priv->updates_timed_lock);
3101       goto quit;
3102     }
3103     gst_adaptive_demux_wait_until (demux->realtime_clock,
3104         &demux->priv->updates_timed_cond,
3105         &demux->priv->updates_timed_lock, next_update);
3106     g_mutex_unlock (&demux->priv->updates_timed_lock);
3107
3108     GST_MANIFEST_LOCK (demux);
3109
3110     g_mutex_lock (&demux->priv->updates_timed_lock);
3111     if (demux->priv->stop_updates_task) {
3112       g_mutex_unlock (&demux->priv->updates_timed_lock);
3113       GST_MANIFEST_UNLOCK (demux);
3114       goto quit;
3115     }
3116     g_mutex_unlock (&demux->priv->updates_timed_lock);
3117
3118     GST_DEBUG_OBJECT (demux, "Updating playlist");
3119
3120     ret = gst_adaptive_demux_update_manifest (demux);
3121
3122     if (ret == GST_FLOW_EOS) {
3123     } else if (ret != GST_FLOW_OK) {
3124       /* update_failed_count is used only here, no need to protect it */
3125       demux->priv->update_failed_count++;
3126       if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
3127         GST_WARNING_OBJECT (demux, "Could not update the playlist");
3128         next_update = gst_adaptive_demux_get_monotonic_time (demux)
3129             + klass->get_manifest_update_interval (demux) * GST_USECOND;
3130       } else {
3131         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
3132             (_("Internal data stream error.")), ("Could not update playlist"));
3133         GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
3134         gst_task_stop (demux->priv->updates_task);
3135         GST_MANIFEST_UNLOCK (demux);
3136         goto end;
3137       }
3138     } else {
3139       GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
3140       next_update =
3141           gst_adaptive_demux_get_monotonic_time (demux) +
3142           klass->get_manifest_update_interval (demux) * GST_USECOND;
3143
3144       /* Wake up download tasks */
3145       g_mutex_lock (&demux->priv->manifest_update_lock);
3146       g_cond_broadcast (&demux->priv->manifest_cond);
3147       g_mutex_unlock (&demux->priv->manifest_update_lock);
3148     }
3149   }
3150
3151   GST_MANIFEST_UNLOCK (demux);
3152
3153 quit:
3154   {
3155     GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
3156   }
3157
3158 end:
3159   {
3160     return;
3161   }
3162 }
3163
3164 /* must be called with manifest_lock taken */
3165 static gboolean
3166 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
3167     GstEvent * event)
3168 {
3169   gboolean ret;
3170
3171   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3172     stream->eos = TRUE;
3173   }
3174   GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
3175       "Pushing event %" GST_PTR_FORMAT, event);
3176   ret = gst_pad_push_event (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream), event);
3177   return ret;
3178 }
3179
3180 /* must be called with manifest_lock taken */
3181 static gboolean
3182 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3183 {
3184   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3185
3186   if (klass->is_live)
3187     return klass->is_live (demux);
3188   return FALSE;
3189 }
3190
3191 /* must be called with manifest_lock taken */
3192 static GstFlowReturn
3193 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
3194     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
3195     GstClockTime ts, GstClockTime * final_ts)
3196 {
3197   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3198
3199   if (klass->stream_seek)
3200     return klass->stream_seek (stream, forward, flags, ts, final_ts);
3201   return GST_FLOW_ERROR;
3202 }
3203
3204 /* must be called with manifest_lock taken */
3205 static gboolean
3206 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
3207     GstAdaptiveDemuxStream * stream)
3208 {
3209   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3210   gboolean ret = TRUE;
3211
3212   if (klass->stream_has_next_fragment)
3213     ret = klass->stream_has_next_fragment (stream);
3214
3215   return ret;
3216 }
3217
3218 /* must be called with manifest_lock taken */
3219 GstFlowReturn
3220 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
3221     GstAdaptiveDemuxStream * stream, GstClockTime duration)
3222 {
3223   GstFlowReturn ret;
3224
3225   if (stream->last_ret == GST_FLOW_OK) {
3226     stream->last_ret =
3227         gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
3228         duration);
3229   }
3230   ret = stream->last_ret;
3231
3232   return ret;
3233 }
3234
3235 /* must be called with manifest_lock taken */
3236 GstFlowReturn
3237 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
3238     GstAdaptiveDemuxStream * stream, GstClockTime duration)
3239 {
3240   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3241   GstFlowReturn ret;
3242
3243   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
3244
3245   stream->download_error_count = 0;
3246   g_clear_error (&stream->last_error);
3247
3248   /* FIXME - url has no indication of byte ranges for subsegments */
3249   gst_element_post_message (GST_ELEMENT_CAST (demux),
3250       gst_message_new_element (GST_OBJECT_CAST (demux),
3251           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
3252               "manifest-uri", G_TYPE_STRING,
3253               demux->manifest_uri, "uri", G_TYPE_STRING,
3254               stream->fragment.uri, "fragment-start-time",
3255               GST_TYPE_CLOCK_TIME, stream->download_start_time,
3256               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
3257               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
3258               stream->download_total_bytes, "fragment-download-time",
3259               GST_TYPE_CLOCK_TIME,
3260               stream->download_total_time * GST_USECOND, NULL)));
3261
3262   /* Don't update to the end of the segment if in reverse playback */
3263   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3264   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
3265     GstClockTime offset =
3266         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3267     GstClockTime period_start =
3268         gst_adaptive_demux_get_period_start_time (demux);
3269
3270     stream->segment.position += duration;
3271
3272     /* Convert from position inside the stream's segment to the demuxer's
3273      * segment, they are not necessarily the same */
3274     if (stream->segment.position - offset + period_start >
3275         demux->segment.position)
3276       demux->segment.position =
3277           stream->segment.position - offset + period_start;
3278   }
3279   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3280
3281   if (gst_adaptive_demux_is_live (demux)
3282       || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3283     ret = klass->stream_advance_fragment (stream);
3284   } else {
3285     ret = GST_FLOW_EOS;
3286   }
3287
3288   stream->download_start_time = stream->download_chunk_start_time =
3289       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3290
3291   if (ret == GST_FLOW_OK) {
3292     if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
3293             gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
3294       stream->need_header = TRUE;
3295       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
3296     }
3297
3298     /* the subclass might want to switch pads */
3299     if (G_UNLIKELY (demux->next_streams)) {
3300       gst_task_stop (stream->download_task);
3301       /* TODO only allow switching streams if other downloads are not ongoing */
3302       GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
3303           "to do bitrate switching");
3304       gst_adaptive_demux_expose_streams (demux, FALSE);
3305       gst_adaptive_demux_start_tasks (demux);
3306       ret = GST_FLOW_EOS;
3307     }
3308   }
3309
3310   return ret;
3311 }
3312
3313 /* must be called with manifest_lock taken */
3314 static gboolean
3315 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
3316     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
3317 {
3318   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3319
3320   /* FIXME: Currently several issues have be found when letting bitrate adaptation
3321    * happen using trick modes (such as 'All streams finished without buffers') and
3322    * the adaptive algorithm does not properly behave. */
3323   if (demux->segment.rate != 1.0)
3324     return FALSE;
3325
3326   if (klass->stream_select_bitrate)
3327     return klass->stream_select_bitrate (stream, bitrate);
3328   return FALSE;
3329 }
3330
3331 /* must be called with manifest_lock taken */
3332 static GstFlowReturn
3333 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
3334     GstAdaptiveDemuxStream * stream)
3335 {
3336   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3337
3338   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
3339       GST_FLOW_ERROR);
3340
3341   /* Make sure the sub-class will update bitrate, or else
3342    * we will later */
3343   stream->fragment.bitrate = 0;
3344
3345   return klass->stream_update_fragment_info (stream);
3346 }
3347
3348 /* must be called with manifest_lock taken */
3349 static gint64
3350 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
3351     demux, GstAdaptiveDemuxStream * stream)
3352 {
3353   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3354
3355   if (klass->stream_get_fragment_waiting_time)
3356     return klass->stream_get_fragment_waiting_time (stream);
3357   return 0;
3358 }
3359
3360 /* must be called with manifest_lock taken */
3361 static GstFlowReturn
3362 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
3363 {
3364   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3365   GstFragment *download;
3366   GstBuffer *buffer;
3367   GstFlowReturn ret;
3368
3369   download = gst_uri_downloader_fetch_uri (demux->downloader,
3370       demux->manifest_uri, NULL, TRUE, TRUE, TRUE, NULL);
3371   if (download) {
3372     g_free (demux->manifest_uri);
3373     g_free (demux->manifest_base_uri);
3374     if (download->redirect_permanent && download->redirect_uri) {
3375       demux->manifest_uri = g_strdup (download->redirect_uri);
3376       demux->manifest_base_uri = NULL;
3377     } else {
3378       demux->manifest_uri = g_strdup (download->uri);
3379       demux->manifest_base_uri = g_strdup (download->redirect_uri);
3380     }
3381
3382     buffer = gst_fragment_get_buffer (download);
3383     g_object_unref (download);
3384     ret = klass->update_manifest_data (demux, buffer);
3385     gst_buffer_unref (buffer);
3386     /* FIXME: Should the manifest uri vars be reverted to original
3387      * values if updating fails? */
3388   } else {
3389     ret = GST_FLOW_NOT_LINKED;
3390   }
3391
3392   return ret;
3393 }
3394
3395 /* must be called with manifest_lock taken */
3396 static GstFlowReturn
3397 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
3398 {
3399   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3400   GstFlowReturn ret;
3401
3402   ret = klass->update_manifest (demux);
3403
3404   if (ret == GST_FLOW_OK) {
3405     GstClockTime duration;
3406     /* Send an updated duration message */
3407     duration = klass->get_duration (demux);
3408     if (duration != GST_CLOCK_TIME_NONE) {
3409       GST_DEBUG_OBJECT (demux,
3410           "Sending duration message : %" GST_TIME_FORMAT,
3411           GST_TIME_ARGS (duration));
3412       gst_element_post_message (GST_ELEMENT (demux),
3413           gst_message_new_duration_changed (GST_OBJECT (demux)));
3414     } else {
3415       GST_DEBUG_OBJECT (demux,
3416           "Duration unknown, can not send the duration message");
3417     }
3418   }
3419
3420   return ret;
3421 }
3422
3423 void
3424 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
3425 {
3426   g_free (f->uri);
3427   f->uri = NULL;
3428   f->range_start = 0;
3429   f->range_end = -1;
3430
3431   g_free (f->header_uri);
3432   f->header_uri = NULL;
3433   f->header_range_start = 0;
3434   f->header_range_end = -1;
3435
3436   g_free (f->index_uri);
3437   f->index_uri = NULL;
3438   f->index_range_start = 0;
3439   f->index_range_end = -1;
3440 }
3441
3442 /* must be called with manifest_lock taken */
3443 static gboolean
3444 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
3445 {
3446   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3447   gboolean ret = FALSE;
3448
3449   if (klass->has_next_period)
3450     ret = klass->has_next_period (demux);
3451   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
3452   return ret;
3453 }
3454
3455 /* must be called with manifest_lock taken */
3456 static void
3457 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
3458 {
3459   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3460
3461   g_return_if_fail (klass->advance_period != NULL);
3462
3463   GST_DEBUG_OBJECT (demux, "Advancing to next period");
3464   klass->advance_period (demux);
3465   gst_adaptive_demux_expose_streams (demux, FALSE);
3466   gst_adaptive_demux_start_tasks (demux);
3467 }
3468
3469 /**
3470  * gst_adaptive_demux_get_monotonic_time:
3471  * Returns: a monotonically increasing time, using the system realtime clock
3472  */
3473 GstClockTime
3474 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
3475 {
3476   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
3477   return gst_clock_get_time (demux->realtime_clock);
3478 }
3479
3480 /**
3481  * gst_adaptive_demux_get_client_now_utc:
3482  * @demux: #GstAdaptiveDemux
3483  * Returns: the client's estimate of UTC
3484  *
3485  * Used to find the client's estimate of UTC, using the system realtime clock.
3486  */
3487 GDateTime *
3488 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
3489 {
3490   GstClockTime rtc_now;
3491   gint64 utc_now;
3492   GTimeVal gtv;
3493
3494   rtc_now = gst_clock_get_time (demux->realtime_clock);
3495   utc_now = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
3496   gtv.tv_sec = utc_now / G_TIME_SPAN_SECOND;
3497   gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
3498   return g_date_time_new_from_timeval_utc (&gtv);
3499 }
3500
3501 static GstAdaptiveDemuxTimer *
3502 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
3503 {
3504   GstAdaptiveDemuxTimer *timer;
3505
3506   timer = g_slice_new (GstAdaptiveDemuxTimer);
3507   timer->fired = FALSE;
3508   timer->cond = cond;
3509   timer->mutex = mutex;
3510   timer->ref_count = 1;
3511   return timer;
3512 }
3513
3514 static GstAdaptiveDemuxTimer *
3515 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
3516 {
3517   g_return_val_if_fail (timer != NULL, NULL);
3518   g_atomic_int_inc (&timer->ref_count);
3519   return timer;
3520 }
3521
3522 static void
3523 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
3524 {
3525   g_return_if_fail (timer != NULL);
3526   if (g_atomic_int_dec_and_test (&timer->ref_count)) {
3527     g_slice_free (GstAdaptiveDemuxTimer, timer);
3528   }
3529 }
3530
3531 /* gst_adaptive_demux_wait_until:
3532  * A replacement for g_cond_wait_until that uses the clock rather
3533  * than system time to control the duration of the sleep. Typically
3534  * clock is actually a #GstSystemClock, in which case this function
3535  * behaves exactly like g_cond_wait_until. Inside unit tests,
3536  * the clock is typically a #GstTestClock, which allows tests to run
3537  * in non-realtime.
3538  * This function must be called with mutex held.
3539  */
3540 static gboolean
3541 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
3542     GstClockTime end_time)
3543 {
3544   GstAdaptiveDemuxTimer *timer;
3545   gboolean fired;
3546   GstClockReturn res;
3547
3548   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
3549     /* for an invalid time, gst_clock_id_wait_async will try to call
3550      * gst_adaptive_demux_clock_callback from the current thread.
3551      * It still holds the mutex while doing that, so it will deadlock.
3552      * g_cond_wait_until would return immediately with false, so we'll do the same.
3553      */
3554     return FALSE;
3555   }
3556   timer = gst_adaptive_demux_timer_new (cond, mutex);
3557   timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
3558   res =
3559       gst_clock_id_wait_async (timer->clock_id,
3560       gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
3561       (GDestroyNotify) gst_adaptive_demux_timer_unref);
3562   /* clock does not support asynchronously wait. Assert and return */
3563   if (res == GST_CLOCK_UNSUPPORTED) {
3564     gst_clock_id_unref (timer->clock_id);
3565     gst_adaptive_demux_timer_unref (timer);
3566     g_return_val_if_reached (TRUE);
3567   }
3568   g_assert (!timer->fired);
3569   /* the gst_adaptive_demux_clock_callback() will signal the
3570    * cond when the clock's single shot timer fires, or the cond will be
3571    * signalled by another thread that wants to cause this wait to finish
3572    * early (e.g. to terminate the waiting thread).
3573    * There is no need for a while loop here, because that logic is
3574    * implemented by the function calling gst_adaptive_demux_wait_until() */
3575   g_cond_wait (cond, mutex);
3576   fired = timer->fired;
3577   if (!fired)
3578     gst_clock_id_unschedule (timer->clock_id);
3579   gst_clock_id_unref (timer->clock_id);
3580   gst_adaptive_demux_timer_unref (timer);
3581   return !fired;
3582 }
3583
3584 static gboolean
3585 gst_adaptive_demux_clock_callback (GstClock * clock,
3586     GstClockTime time, GstClockID id, gpointer user_data)
3587 {
3588   GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
3589   g_return_val_if_fail (timer != NULL, FALSE);
3590   g_mutex_lock (timer->mutex);
3591   timer->fired = TRUE;
3592   g_cond_signal (timer->cond);
3593   g_mutex_unlock (timer->mutex);
3594   return TRUE;
3595 }