documentation: fixed a heap o' typos
[platform/upstream/gstreamer.git] / gst-libs / gst / adaptivedemux / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstadaptivedemux
24  * @short_description: Base class for adaptive demuxers
25  *
26  * What is an adaptive demuxer?
27  * Adaptive demuxers are special demuxers in the sense that they don't
28  * actually demux data received from upstream but download the data
29  * themselves.
30  *
31  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32  * a set of fragments. The manifest describes the available media and
33  * the sequence of fragments to use. Each fragment contains a small
34  * part of the media (typically only a few seconds). It is possible for
35  * the manifest to have the same media available in different configurations
36  * (bitrates for example) so that the client can select the one that
37  * best suits its scenario (network fluctuation, hardware requirements...).
38  * It is possible to switch from one representation of the media to another
39  * during playback. That's why it is called 'adaptive', because it can be
40  * adapted to the client's needs.
41  *
42  * Architectural overview:
43  * The manifest is received by the demuxer in its sink pad and, upon receiving
44  * EOS, it parses the manifest and exposes the streams available in it. For
45  * each stream a source element will be created and will download the list
46  * of fragments one by one. Once a fragment is finished downloading, the next
47  * URI is set to the source element and it starts fetching it and pushing
48  * through the stream's pad. This implies that each stream is independent from
49  * each other as it runs on a separate thread.
50  *
51  * After downloading each fragment, the download rate of it is calculated and
52  * the demuxer has a chance to switch to a different bitrate if needed. The
53  * switch can be done by simply pushing a new caps before the next fragment
54  * when codecs are the same, or by exposing a new pad group if it needs
55  * a codec change.
56  *
57  * Extra features:
58  * - Not linked streams: Streams that are not-linked have their download threads
59  *                       interrupted to save network bandwidth. When they are
60  *                       relinked a reconfigure event is received and the
61  *                       stream is restarted.
62  *
63  * Subclasses:
64  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65  * about the intrinsics of the subclass formats, so the subclasses are
66  * responsible for maintaining the manifest data structures and stream
67  * information.
68  */
69
70 /*
71 MT safety.
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
77
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
85
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
103
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
115 */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
124
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
127
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024    /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
134
135 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
136 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
137     GST_TRACE("Locking from thread %p", g_thread_self()); \
138     g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
139     GST_TRACE("Locked from thread %p", g_thread_self()); \
140  } G_STMT_END
141
142 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
143     GST_TRACE("Unlocking from thread %p", g_thread_self()); \
144     g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
145  } G_STMT_END
146
147 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
148 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
149 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
150
151 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
152 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
153 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
154
155 enum
156 {
157   PROP_0,
158   PROP_CONNECTION_SPEED,
159   PROP_BITRATE_LIMIT,
160   PROP_LAST
161 };
162
163 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
164 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
165
166 struct _GstAdaptiveDemuxPrivate
167 {
168   GstAdapter *input_adapter;    /* protected by manifest_lock */
169   gint have_manifest;           /* MT safe */
170
171   GList *old_streams;           /* protected by manifest_lock */
172
173   GstTask *updates_task;        /* MT safe */
174   GRecMutex updates_lock;
175   GMutex updates_timed_lock;
176   GCond updates_timed_cond;     /* protected by updates_timed_lock */
177   gboolean stop_updates_task;   /* protected by updates_timed_lock */
178
179   /* used only from updates_task, no need to protect it */
180   gint update_failed_count;
181
182   guint32 segment_seqnum;       /* protected by manifest_lock */
183
184   /* main lock used to protect adaptive demux and all its streams.
185    * It serializes the adaptive demux public API.
186    */
187   GRecMutex manifest_lock;
188
189   /* condition to wait for manifest updates on a live stream.
190    * In order to signal the manifest_cond, the caller needs to hold both
191    * manifest_lock and manifest_update_lock (taken in this order)
192    */
193   GCond manifest_cond;
194   GMutex manifest_update_lock;
195
196   /* Lock and condition for prerolling streams before exposing */
197   GMutex preroll_lock;
198   GCond preroll_cond;
199   gint preroll_pending;
200
201   GMutex api_lock;
202
203   /* Protects demux and stream segment information
204    * Needed because seeks can update segment information
205    * without needing to stop tasks when they just want to
206    * update the segment boundaries */
207   GMutex segment_lock;
208 };
209
210 typedef struct _GstAdaptiveDemuxTimer
211 {
212   volatile gint ref_count;
213   GCond *cond;
214   GMutex *mutex;
215   GstClockID clock_id;
216   gboolean fired;
217 } GstAdaptiveDemuxTimer;
218
219 static GstBinClass *parent_class = NULL;
220 static gint private_offset = 0;
221
222 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
223 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
224     GstAdaptiveDemuxClass * klass);
225 static void gst_adaptive_demux_finalize (GObject * object);
226 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
227     element, GstStateChange transition);
228
229 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
230
231 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
232     GstEvent * event);
233 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
234     GstObject * parent, GstBuffer * buffer);
235 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
236     GstQuery * query);
237 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
238     GstEvent * event);
239
240 static gboolean
241 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
242
243 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
244 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
245     stream);
246 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
247 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
248     gboolean first_and_live);
249 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
250 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
251 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
252     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
253     GstClockTime ts, GstClockTime * final_ts);
254 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
255     demux, GstAdaptiveDemuxStream * stream);
256 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
257     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
258 static GstFlowReturn
259 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
260     GstAdaptiveDemuxStream * stream);
261 static gint64
262 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
263     GstAdaptiveDemuxStream * stream);
264 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
265     demux);
266 static GstFlowReturn
267 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
268 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
269 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
270
271 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
272 static GstFlowReturn
273 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
274     GstEvent * event);
275
276 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
277     demux);
278 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
279     demux);
280
281 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
282     gboolean start_preroll_streams);
283 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
284     gboolean stop_updates);
285 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
286     demux);
287 static void
288 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
289     stream, GstFlowReturn ret, GError * err);
290 static GstFlowReturn
291 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
292     GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
293 static GstFlowReturn
294 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
295     GstAdaptiveDemuxStream * stream);
296 static GstFlowReturn
297 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
298     GstAdaptiveDemuxStream * stream, GstClockTime duration);
299 static gboolean
300 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
301     GstClockTime end_time);
302 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
303     GstClockTime time, GstClockID id, gpointer user_data);
304 static gboolean
305 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
306     * demux);
307
308 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
309  * method to get to the padtemplates */
310 GType
311 gst_adaptive_demux_get_type (void)
312 {
313   static volatile gsize type = 0;
314
315   if (g_once_init_enter (&type)) {
316     GType _type;
317     static const GTypeInfo info = {
318       sizeof (GstAdaptiveDemuxClass),
319       NULL,
320       NULL,
321       (GClassInitFunc) gst_adaptive_demux_class_init,
322       NULL,
323       NULL,
324       sizeof (GstAdaptiveDemux),
325       0,
326       (GInstanceInitFunc) gst_adaptive_demux_init,
327     };
328
329     _type = g_type_register_static (GST_TYPE_BIN,
330         "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
331
332     private_offset =
333         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
334
335     g_once_init_leave (&type, _type);
336   }
337   return type;
338 }
339
340 static inline GstAdaptiveDemuxPrivate *
341 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
342 {
343   return (G_STRUCT_MEMBER_P (self, private_offset));
344 }
345
346 static void
347 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
348     const GValue * value, GParamSpec * pspec)
349 {
350   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
351
352   GST_API_LOCK (demux);
353   GST_MANIFEST_LOCK (demux);
354
355   switch (prop_id) {
356     case PROP_CONNECTION_SPEED:
357       demux->connection_speed = g_value_get_uint (value) * 1000;
358       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
359           demux->connection_speed);
360       break;
361     case PROP_BITRATE_LIMIT:
362       demux->bitrate_limit = g_value_get_float (value);
363       break;
364     default:
365       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
366       break;
367   }
368
369   GST_MANIFEST_UNLOCK (demux);
370   GST_API_UNLOCK (demux);
371 }
372
373 static void
374 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
375     GValue * value, GParamSpec * pspec)
376 {
377   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
378
379   GST_MANIFEST_LOCK (demux);
380
381   switch (prop_id) {
382     case PROP_CONNECTION_SPEED:
383       g_value_set_uint (value, demux->connection_speed / 1000);
384       break;
385     case PROP_BITRATE_LIMIT:
386       g_value_set_float (value, demux->bitrate_limit);
387       break;
388     default:
389       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
390       break;
391   }
392
393   GST_MANIFEST_UNLOCK (demux);
394 }
395
396 static void
397 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
398 {
399   GObjectClass *gobject_class;
400   GstElementClass *gstelement_class;
401   GstBinClass *gstbin_class;
402
403   gobject_class = G_OBJECT_CLASS (klass);
404   gstelement_class = GST_ELEMENT_CLASS (klass);
405   gstbin_class = GST_BIN_CLASS (klass);
406
407   GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
408       "Base Adaptive Demux");
409
410   parent_class = g_type_class_peek_parent (klass);
411
412   if (private_offset != 0)
413     g_type_class_adjust_private_offset (klass, &private_offset);
414
415   gobject_class->set_property = gst_adaptive_demux_set_property;
416   gobject_class->get_property = gst_adaptive_demux_get_property;
417   gobject_class->finalize = gst_adaptive_demux_finalize;
418
419   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
420       g_param_spec_uint ("connection-speed", "Connection Speed",
421           "Network connection speed in kbps (0 = calculate from downloaded"
422           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424
425   /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
426   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
427       g_param_spec_float ("bitrate-limit",
428           "Bitrate limit in %",
429           "Limit of the available bitrate to use when switching to alternates.",
430           0, 1, DEFAULT_BITRATE_LIMIT,
431           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432
433   gstelement_class->change_state = gst_adaptive_demux_change_state;
434
435   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
436
437   klass->data_received = gst_adaptive_demux_stream_data_received_default;
438   klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
439   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
440   klass->requires_periodical_playlist_update =
441       gst_adaptive_demux_requires_periodical_playlist_update_default;
442
443 }
444
445 static void
446 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
447     GstAdaptiveDemuxClass * klass)
448 {
449   GstPadTemplate *pad_template;
450   GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
451   GObjectClass *gobject_class;
452
453   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
454
455   demux->priv = gst_adaptive_demux_get_instance_private (demux);
456   demux->priv->input_adapter = gst_adapter_new ();
457   demux->downloader = gst_uri_downloader_new ();
458   gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
459   demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
460   demux->priv->segment_seqnum = gst_util_seqnum_next ();
461   demux->have_group_id = FALSE;
462   demux->group_id = G_MAXUINT;
463
464   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
465
466   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
467       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
468
469   demux->realtime_clock = gst_system_clock_obtain ();
470   g_assert (demux->realtime_clock != NULL);
471   gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
472   if (g_object_class_find_property (gobject_class, "clock-type")) {
473     g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
474   } else {
475     GST_WARNING_OBJECT (demux,
476         "System clock does not have clock-type property");
477   }
478   if (clock_type == GST_CLOCK_TYPE_REALTIME) {
479     demux->clock_offset = 0;
480   } else {
481     GDateTime *utc_now;
482     GstClockTime rtc_now;
483     GTimeVal gtv;
484
485     utc_now = g_date_time_new_now_utc ();
486     rtc_now = gst_clock_get_time (demux->realtime_clock);
487     g_date_time_to_timeval (utc_now, &gtv);
488     demux->clock_offset =
489         gtv.tv_sec * G_TIME_SPAN_SECOND + gtv.tv_usec -
490         GST_TIME_AS_USECONDS (rtc_now);
491     g_date_time_unref (utc_now);
492   }
493   g_rec_mutex_init (&demux->priv->updates_lock);
494   demux->priv->updates_task =
495       gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
496       demux, NULL);
497   gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
498
499   g_mutex_init (&demux->priv->updates_timed_lock);
500   g_cond_init (&demux->priv->updates_timed_cond);
501
502   g_cond_init (&demux->priv->manifest_cond);
503   g_mutex_init (&demux->priv->manifest_update_lock);
504
505   g_rec_mutex_init (&demux->priv->manifest_lock);
506   g_mutex_init (&demux->priv->api_lock);
507   g_mutex_init (&demux->priv->segment_lock);
508
509   g_cond_init (&demux->priv->preroll_cond);
510   g_mutex_init (&demux->priv->preroll_lock);
511
512   pad_template =
513       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
514   g_return_if_fail (pad_template != NULL);
515
516   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
517   gst_pad_set_event_function (demux->sinkpad,
518       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
519   gst_pad_set_chain_function (demux->sinkpad,
520       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
521
522   /* Properties */
523   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
524   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
525
526   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
527 }
528
529 static void
530 gst_adaptive_demux_finalize (GObject * object)
531 {
532   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
533   GstAdaptiveDemuxPrivate *priv = demux->priv;
534
535   GST_DEBUG_OBJECT (object, "finalize");
536
537   g_object_unref (priv->input_adapter);
538   g_object_unref (demux->downloader);
539
540   g_mutex_clear (&priv->updates_timed_lock);
541   g_cond_clear (&priv->updates_timed_cond);
542   g_mutex_clear (&demux->priv->manifest_update_lock);
543   g_cond_clear (&demux->priv->manifest_cond);
544   g_object_unref (priv->updates_task);
545   g_rec_mutex_clear (&priv->updates_lock);
546   g_rec_mutex_clear (&demux->priv->manifest_lock);
547   g_mutex_clear (&demux->priv->api_lock);
548   g_mutex_clear (&demux->priv->segment_lock);
549   if (demux->realtime_clock) {
550     gst_object_unref (demux->realtime_clock);
551     demux->realtime_clock = NULL;
552   }
553
554   g_cond_clear (&demux->priv->preroll_cond);
555   g_mutex_clear (&demux->priv->preroll_lock);
556
557   G_OBJECT_CLASS (parent_class)->finalize (object);
558 }
559
560 static GstStateChangeReturn
561 gst_adaptive_demux_change_state (GstElement * element,
562     GstStateChange transition)
563 {
564   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
565   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
566
567   switch (transition) {
568     case GST_STATE_CHANGE_PAUSED_TO_READY:
569       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
570         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
571       gst_uri_downloader_cancel (demux->downloader);
572
573       GST_API_LOCK (demux);
574       GST_MANIFEST_LOCK (demux);
575       gst_adaptive_demux_reset (demux);
576       GST_MANIFEST_UNLOCK (demux);
577       GST_API_UNLOCK (demux);
578       break;
579     case GST_STATE_CHANGE_READY_TO_PAUSED:
580       GST_API_LOCK (demux);
581       GST_MANIFEST_LOCK (demux);
582       gst_adaptive_demux_reset (demux);
583       /* Clear "cancelled" flag in uridownloader since subclass might want to
584        * use uridownloader to fetch another manifest */
585       gst_uri_downloader_reset (demux->downloader);
586       if (g_atomic_int_get (&demux->priv->have_manifest))
587         gst_adaptive_demux_start_manifest_update_task (demux);
588       GST_MANIFEST_UNLOCK (demux);
589       GST_API_UNLOCK (demux);
590       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
591         GST_DEBUG_OBJECT (demux, "demuxer has started running");
592       break;
593     default:
594       break;
595   }
596
597   /* this must be run without MANIFEST_LOCK taken.
598    * For PLAYING to PLAYING state changes, it will want to take a lock in
599    * src element and that lock is held while the streaming thread is running.
600    * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
601    */
602   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
603
604   return result;
605 }
606
607 static gboolean
608 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
609     GstEvent * event)
610 {
611   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
612   gboolean ret;
613
614   switch (event->type) {
615     case GST_EVENT_FLUSH_STOP:{
616       GST_API_LOCK (demux);
617       GST_MANIFEST_LOCK (demux);
618
619       gst_adaptive_demux_reset (demux);
620
621       ret = gst_pad_event_default (pad, parent, event);
622
623       GST_MANIFEST_UNLOCK (demux);
624       GST_API_UNLOCK (demux);
625
626       return ret;
627     }
628     case GST_EVENT_EOS:{
629       GstAdaptiveDemuxClass *demux_class;
630       GstQuery *query;
631       gboolean query_res;
632       gboolean ret = TRUE;
633       gsize available;
634       GstBuffer *manifest_buffer;
635
636       GST_API_LOCK (demux);
637       GST_MANIFEST_LOCK (demux);
638
639       demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
640
641       available = gst_adapter_available (demux->priv->input_adapter);
642
643       if (available == 0) {
644         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
645         ret = gst_pad_event_default (pad, parent, event);
646
647         GST_MANIFEST_UNLOCK (demux);
648         GST_API_UNLOCK (demux);
649
650         return ret;
651       }
652
653       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
654
655       /* Need to get the URI to use it as a base to generate the fragment's
656        * uris */
657       query = gst_query_new_uri ();
658       query_res = gst_pad_peer_query (pad, query);
659       if (query_res) {
660         gchar *uri, *redirect_uri;
661         gboolean permanent;
662
663         gst_query_parse_uri (query, &uri);
664         gst_query_parse_uri_redirection (query, &redirect_uri);
665         gst_query_parse_uri_redirection_permanent (query, &permanent);
666
667         if (permanent && redirect_uri) {
668           demux->manifest_uri = redirect_uri;
669           demux->manifest_base_uri = NULL;
670           g_free (uri);
671         } else {
672           demux->manifest_uri = uri;
673           demux->manifest_base_uri = redirect_uri;
674         }
675
676         GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
677             demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
678       } else {
679         GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
680       }
681       gst_query_unref (query);
682
683       /* Let the subclass parse the manifest */
684       manifest_buffer =
685           gst_adapter_take_buffer (demux->priv->input_adapter, available);
686       if (!demux_class->process_manifest (demux, manifest_buffer)) {
687         /* In most cases, this will happen if we set a wrong url in the
688          * source element and we have received the 404 HTML response instead of
689          * the manifest */
690         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
691             (NULL));
692         ret = FALSE;
693       } else {
694         g_atomic_int_set (&demux->priv->have_manifest, TRUE);
695       }
696       gst_buffer_unref (manifest_buffer);
697
698       gst_element_post_message (GST_ELEMENT_CAST (demux),
699           gst_message_new_element (GST_OBJECT_CAST (demux),
700               gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
701                   "manifest-uri", G_TYPE_STRING,
702                   demux->manifest_uri, "uri", G_TYPE_STRING,
703                   demux->manifest_uri,
704                   "manifest-download-start", GST_TYPE_CLOCK_TIME,
705                   GST_CLOCK_TIME_NONE,
706                   "manifest-download-stop", GST_TYPE_CLOCK_TIME,
707                   gst_util_get_timestamp (), NULL)));
708
709       if (ret) {
710         /* Send duration message */
711         if (!gst_adaptive_demux_is_live (demux)) {
712           GstClockTime duration = demux_class->get_duration (demux);
713
714           if (duration != GST_CLOCK_TIME_NONE) {
715             GST_DEBUG_OBJECT (demux,
716                 "Sending duration message : %" GST_TIME_FORMAT,
717                 GST_TIME_ARGS (duration));
718             gst_element_post_message (GST_ELEMENT (demux),
719                 gst_message_new_duration_changed (GST_OBJECT (demux)));
720           } else {
721             GST_DEBUG_OBJECT (demux,
722                 "media duration unknown, can not send the duration message");
723           }
724         }
725
726         if (demux->next_streams) {
727           gst_adaptive_demux_prepare_streams (demux,
728               gst_adaptive_demux_is_live (demux));
729           gst_adaptive_demux_start_tasks (demux, TRUE);
730           gst_adaptive_demux_start_manifest_update_task (demux);
731         } else {
732           /* no streams */
733           GST_WARNING_OBJECT (demux, "No streams created from manifest");
734           GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
735               (_("This file contains no playable streams.")),
736               ("No known stream formats found at the Manifest"));
737           ret = FALSE;
738         }
739
740       }
741       GST_MANIFEST_UNLOCK (demux);
742       GST_API_UNLOCK (demux);
743
744       gst_event_unref (event);
745       return ret;
746     }
747     case GST_EVENT_SEGMENT:
748       /* Swallow newsegments, we'll push our own */
749       gst_event_unref (event);
750       return TRUE;
751     default:
752       break;
753   }
754
755   return gst_pad_event_default (pad, parent, event);
756 }
757
758 static GstFlowReturn
759 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
760     GstBuffer * buffer)
761 {
762   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
763
764   GST_MANIFEST_LOCK (demux);
765
766   gst_adapter_push (demux->priv->input_adapter, buffer);
767
768   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
769       (gint) gst_adapter_available (demux->priv->input_adapter));
770
771   GST_MANIFEST_UNLOCK (demux);
772   return GST_FLOW_OK;
773 }
774
775 /* must be called with manifest_lock taken */
776 static void
777 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
778 {
779   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
780   GList *iter;
781   GList *old_streams;
782   GstEvent *eos;
783
784   /* take ownership of old_streams before releasing the manifest_lock in
785    * gst_adaptive_demux_stop_tasks
786    */
787   old_streams = demux->priv->old_streams;
788   demux->priv->old_streams = NULL;
789
790   gst_adaptive_demux_stop_tasks (demux, TRUE);
791
792   if (klass->reset)
793     klass->reset (demux);
794
795   eos = gst_event_new_eos ();
796   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
797     GstAdaptiveDemuxStream *stream = iter->data;
798     if (stream->pad) {
799       gst_pad_push_event (stream->pad, gst_event_ref (eos));
800       gst_pad_set_active (stream->pad, FALSE);
801
802       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
803     }
804     gst_adaptive_demux_stream_free (stream);
805   }
806   gst_event_unref (eos);
807   g_list_free (demux->streams);
808   demux->streams = NULL;
809   if (demux->prepared_streams) {
810     g_list_free_full (demux->prepared_streams,
811         (GDestroyNotify) gst_adaptive_demux_stream_free);
812     demux->prepared_streams = NULL;
813   }
814   if (demux->next_streams) {
815     g_list_free_full (demux->next_streams,
816         (GDestroyNotify) gst_adaptive_demux_stream_free);
817     demux->next_streams = NULL;
818   }
819
820   if (old_streams) {
821     g_list_free_full (old_streams,
822         (GDestroyNotify) gst_adaptive_demux_stream_free);
823   }
824
825   if (demux->priv->old_streams) {
826     g_list_free_full (demux->priv->old_streams,
827         (GDestroyNotify) gst_adaptive_demux_stream_free);
828     demux->priv->old_streams = NULL;
829   }
830
831   g_free (demux->manifest_uri);
832   g_free (demux->manifest_base_uri);
833   demux->manifest_uri = NULL;
834   demux->manifest_base_uri = NULL;
835
836   gst_adapter_clear (demux->priv->input_adapter);
837   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
838
839   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
840
841   demux->have_group_id = FALSE;
842   demux->group_id = G_MAXUINT;
843   demux->priv->segment_seqnum = gst_util_seqnum_next ();
844 }
845
846 static void
847 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
848 {
849   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
850
851   switch (GST_MESSAGE_TYPE (msg)) {
852     case GST_MESSAGE_ERROR:{
853       GList *iter;
854       GstAdaptiveDemuxStream *stream = NULL;
855       GError *err = NULL;
856       gchar *debug = NULL;
857       gchar *new_error = NULL;
858       const GstStructure *details = NULL;
859
860       GST_MANIFEST_LOCK (demux);
861
862       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
863         GstAdaptiveDemuxStream *cur = iter->data;
864         if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
865                 GST_OBJECT_CAST (cur->src))) {
866           stream = cur;
867           break;
868         }
869       }
870       if (stream == NULL) {
871         for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
872           GstAdaptiveDemuxStream *cur = iter->data;
873           if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
874                   GST_OBJECT_CAST (cur->src))) {
875             stream = cur;
876             break;
877           }
878         }
879         if (stream == NULL) {
880           GST_WARNING_OBJECT (demux,
881               "Failed to locate stream for errored element");
882           break;
883         }
884       }
885
886       gst_message_parse_error (msg, &err, &debug);
887
888       GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
889           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
890           err->message, debug);
891
892       if (debug)
893         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
894       if (new_error) {
895         g_free (err->message);
896         err->message = new_error;
897       }
898
899       gst_message_parse_error_details (msg, &details);
900       if (details) {
901         gst_structure_get_uint (details, "http-status-code",
902             &stream->last_status_code);
903       }
904
905       /* error, but ask to retry */
906       gst_adaptive_demux_stream_fragment_download_finish (stream,
907           GST_FLOW_CUSTOM_ERROR, err);
908
909       g_error_free (err);
910       g_free (debug);
911
912       GST_MANIFEST_UNLOCK (demux);
913
914       gst_message_unref (msg);
915       msg = NULL;
916     }
917       break;
918     default:
919       break;
920   }
921
922   if (msg)
923     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
924 }
925
926 void
927 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
928     gsize struct_size)
929 {
930   GST_API_LOCK (demux);
931   GST_MANIFEST_LOCK (demux);
932   demux->stream_struct_size = struct_size;
933   GST_MANIFEST_UNLOCK (demux);
934   GST_API_UNLOCK (demux);
935 }
936
937 /* must be called with manifest_lock taken */
938 static gboolean
939 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
940     GstAdaptiveDemuxStream * stream)
941 {
942   GstPad *pad = stream->pad;
943   gchar *name = gst_pad_get_name (pad);
944   GstEvent *event;
945   gchar *stream_id;
946
947   gst_pad_set_active (pad, TRUE);
948   stream->need_header = TRUE;
949
950   stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
951
952   event =
953       gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
954       GST_EVENT_STREAM_START, 0);
955   if (event) {
956     if (gst_event_parse_group_id (event, &demux->group_id))
957       demux->have_group_id = TRUE;
958     else
959       demux->have_group_id = FALSE;
960     gst_event_unref (event);
961   } else if (!demux->have_group_id) {
962     demux->have_group_id = TRUE;
963     demux->group_id = gst_util_group_id_next ();
964   }
965   event = gst_event_new_stream_start (stream_id);
966   if (demux->have_group_id)
967     gst_event_set_group_id (event, demux->group_id);
968
969   gst_pad_push_event (pad, event);
970   g_free (stream_id);
971   g_free (name);
972
973   GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
974
975   stream->discont = TRUE;
976
977   return TRUE;
978 }
979
980 static gboolean
981 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
982     GstAdaptiveDemuxStream * stream)
983 {
984   gboolean ret;
985   GstPad *pad = stream->pad;
986   GstCaps *caps;
987
988   if (stream->pending_caps) {
989     gst_pad_set_caps (pad, stream->pending_caps);
990     caps = stream->pending_caps;
991     stream->pending_caps = NULL;
992   } else {
993     caps = gst_pad_get_current_caps (pad);
994   }
995
996   GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
997       GST_DEBUG_PAD_NAME (pad), caps);
998   if (caps)
999     gst_caps_unref (caps);
1000
1001   gst_object_ref (pad);
1002
1003   /* Don't hold the manifest lock while exposing a pad */
1004   GST_MANIFEST_UNLOCK (demux);
1005   ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1006   GST_MANIFEST_LOCK (demux);
1007
1008   return ret;
1009 }
1010
1011 /* must be called with manifest_lock taken */
1012 static GstClockTime
1013 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1014     GstAdaptiveDemuxStream * stream)
1015 {
1016   GstAdaptiveDemuxClass *klass;
1017
1018   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1019
1020   if (klass->get_presentation_offset == NULL)
1021     return 0;
1022
1023   return klass->get_presentation_offset (demux, stream);
1024 }
1025
1026 /* must be called with manifest_lock taken */
1027 static GstClockTime
1028 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1029 {
1030   GstAdaptiveDemuxClass *klass;
1031
1032   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1033
1034   if (klass->get_period_start_time == NULL)
1035     return 0;
1036
1037   return klass->get_period_start_time (demux);
1038 }
1039
1040 /* must be called with manifest_lock taken */
1041 static gboolean
1042 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1043     gboolean first_and_live)
1044 {
1045   GList *iter;
1046   GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1047
1048   g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1049   if (demux->prepared_streams != NULL) {
1050     /* Old streams that were never exposed, due to a seek or so */
1051     GST_FIXME_OBJECT (demux,
1052         "Preparing new streams without cleaning up old ones!");
1053     return FALSE;
1054   }
1055
1056   demux->prepared_streams = demux->next_streams;
1057   demux->next_streams = NULL;
1058
1059   if (!gst_adaptive_demux_is_running (demux)) {
1060     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1061     return TRUE;
1062   }
1063
1064   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1065     GstAdaptiveDemuxStream *stream = iter->data;
1066
1067     stream->do_block = TRUE;
1068
1069     if (!gst_adaptive_demux_prepare_stream (demux,
1070             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1071       /* TODO act on error */
1072       GST_FIXME_OBJECT (stream->pad,
1073           "Do something on failure to expose stream");
1074     }
1075
1076     if (first_and_live) {
1077       /* TODO we only need the first timestamp, maybe create a simple function to
1078        * get the current PTS of a fragment ? */
1079       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1080       gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1081
1082       if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1083         min_pts = MIN (min_pts, stream->fragment.timestamp);
1084       } else {
1085         min_pts = stream->fragment.timestamp;
1086       }
1087     }
1088   }
1089
1090   period_start = gst_adaptive_demux_get_period_start_time (demux);
1091
1092   /* For live streams, the subclass is supposed to seek to the current
1093    * fragment and then tell us its timestamp in stream->fragment.timestamp.
1094    * We now also have to seek our demuxer segment to reflect this.
1095    *
1096    * FIXME: This needs some refactoring at some point.
1097    */
1098   if (first_and_live) {
1099     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1100         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1101         GST_SEEK_TYPE_NONE, -1, NULL);
1102   }
1103
1104   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1105     GstAdaptiveDemuxStream *stream = iter->data;
1106     GstClockTime offset;
1107
1108     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1109     stream->segment = demux->segment;
1110
1111     /* The demuxer segment is just built from seek events, but for each stream
1112      * we have to adjust segments according to the current period and the
1113      * stream specific presentation time offset.
1114      *
1115      * For each period, buffer timestamps start again from 0. Additionally the
1116      * buffer timestamps are shifted by the stream specific presentation time
1117      * offset, so the first buffer timestamp of a period is 0 + presentation
1118      * time offset. If the stream contains timestamps itself, this is also
1119      * supposed to be the presentation time stored inside the stream.
1120      *
1121      * The stream time over periods is supposed to be continuous, that is the
1122      * buffer timestamp 0 + presentation time offset should map to the start
1123      * time of the current period.
1124      *
1125      *
1126      * The adjustment of the stream segments as such works the following.
1127      *
1128      * If the demuxer segment start is bigger than the period start, this
1129      * means that we have to drop some media at the beginning of the current
1130      * period, e.g. because a seek into the middle of the period has
1131      * happened. The amount of media to drop is the difference between the
1132      * period start and the demuxer segment start, and as each period starts
1133      * again from 0, this difference is going to be the actual stream's
1134      * segment start. As all timestamps of the stream are shifted by the
1135      * presentation time offset, we will also have to move the segment start
1136      * by that offset.
1137      *
1138      * Likewise, the demuxer segment stop value is adjusted in the same
1139      * fashion.
1140      *
1141      * Now the running time and stream time at the stream's segment start has
1142      * to be the one that is stored inside the demuxer's segment, which means
1143      * that segment.base and segment.time have to be copied over (done just
1144      * above)
1145      *
1146      *
1147      * If the demuxer segment start is smaller than the period start time,
1148      * this means that the whole period is inside the segment. As each period
1149      * starts timestamps from 0, and additionally timestamps are shifted by
1150      * the presentation time offset, the stream's first timestamp (and as such
1151      * the stream's segment start) has to be the presentation time offset.
1152      * The stream time at the segment start is supposed to be the stream time
1153      * of the period start according to the demuxer segment, so the stream
1154      * segment's time would be set to that. The same goes for the stream
1155      * segment's base, which is supposed to be the running time of the period
1156      * start according to the demuxer's segment.
1157      *
1158      * The same logic applies for negative rates with the segment stop and
1159      * the period stop time (which gets clamped).
1160      *
1161      *
1162      * For the first case where not the complete period is inside the segment,
1163      * the segment time and base as calculated by the second case would be
1164      * equivalent.
1165      */
1166     GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1167         &demux->segment);
1168     GST_DEBUG_OBJECT (demux,
1169         "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1170         GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1171     /* note for readers:
1172      * Since stream->segment is initially a copy of demux->segment,
1173      * only the values that need updating are modified below. */
1174     if (first_and_live) {
1175       /* If first and live, demuxer did seek to the current position already */
1176       stream->segment.start = demux->segment.start - period_start + offset;
1177       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1178         stream->segment.stop = demux->segment.stop - period_start + offset;
1179       /* FIXME : Do we need to handle negative rates for this ? */
1180       stream->segment.position = stream->segment.start;
1181     } else if (demux->segment.start > period_start) {
1182       /* seek within a period */
1183       stream->segment.start = demux->segment.start - period_start + offset;
1184       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1185         stream->segment.stop = demux->segment.stop - period_start + offset;
1186       if (stream->segment.rate >= 0)
1187         stream->segment.position = offset;
1188       else
1189         stream->segment.position = stream->segment.stop;
1190     } else {
1191       stream->segment.start = offset;
1192       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1193         stream->segment.stop = demux->segment.stop - period_start + offset;
1194       if (stream->segment.rate >= 0)
1195         stream->segment.position = offset;
1196       else
1197         stream->segment.position = stream->segment.stop;
1198       stream->segment.time =
1199           gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1200           period_start);
1201       stream->segment.base =
1202           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1203           period_start);
1204     }
1205
1206     stream->pending_segment = gst_event_new_segment (&stream->segment);
1207     gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1208     stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1209
1210     GST_DEBUG_OBJECT (demux,
1211         "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1212         &stream->segment, stream);
1213   }
1214
1215   return TRUE;
1216 }
1217
1218 static gboolean
1219 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1220 {
1221   GList *iter;
1222   GList *old_streams;
1223
1224   g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1225
1226   old_streams = demux->streams;
1227   demux->streams = demux->prepared_streams;
1228   demux->prepared_streams = NULL;
1229
1230   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1231     GstAdaptiveDemuxStream *stream = iter->data;
1232
1233     if (!gst_adaptive_demux_expose_stream (demux,
1234             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1235       /* TODO act on error */
1236     }
1237   }
1238   demux->priv->preroll_pending = 0;
1239
1240   GST_MANIFEST_UNLOCK (demux);
1241   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1242   GST_MANIFEST_LOCK (demux);
1243
1244   if (old_streams) {
1245     GstEvent *eos = gst_event_new_eos ();
1246
1247     /* before we put streams in the demux->priv->old_streams list,
1248      * we ask the download task to stop. In this way, it will no longer be
1249      * allowed to change the demux object.
1250      */
1251     for (iter = old_streams; iter; iter = g_list_next (iter)) {
1252       GstAdaptiveDemuxStream *stream = iter->data;
1253       GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1254
1255       GST_MANIFEST_UNLOCK (demux);
1256
1257       GST_DEBUG_OBJECT (pad, "Pushing EOS");
1258       gst_pad_push_event (pad, gst_event_ref (eos));
1259       gst_pad_set_active (pad, FALSE);
1260
1261       GST_LOG_OBJECT (pad, "Removing stream");
1262       gst_element_remove_pad (GST_ELEMENT (demux), pad);
1263       GST_MANIFEST_LOCK (demux);
1264
1265       gst_object_unref (GST_OBJECT (pad));
1266
1267       /* ask the download task to stop.
1268        * We will not join it now, because our thread can be one of these tasks.
1269        * We will do the joining later, from another stream download task or
1270        * from gst_adaptive_demux_stop_tasks.
1271        * We also cannot change the state of the stream->src element, because
1272        * that will wait on the streaming thread (which could be this thread)
1273        * to stop first.
1274        * Because we sent an EOS to the downstream element, the stream->src
1275        * element should detect this in its streaming task and stop.
1276        * Even if it doesn't do that, we will change its state later in
1277        * gst_adaptive_demux_stop_tasks.
1278        */
1279       GST_LOG_OBJECT (stream, "Marking stream as cancelled");
1280       gst_task_stop (stream->download_task);
1281       g_mutex_lock (&stream->fragment_download_lock);
1282       stream->cancelled = TRUE;
1283       stream->replaced = TRUE;
1284       g_cond_signal (&stream->fragment_download_cond);
1285       g_mutex_unlock (&stream->fragment_download_lock);
1286     }
1287     gst_event_unref (eos);
1288
1289     /* The list should be freed from another thread as we can't properly
1290      * cleanup a GstTask from itself */
1291     demux->priv->old_streams =
1292         g_list_concat (demux->priv->old_streams, old_streams);
1293   }
1294
1295   /* Unblock after removing oldstreams */
1296   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1297     GstAdaptiveDemuxStream *stream = iter->data;
1298     stream->do_block = FALSE;
1299   }
1300
1301   GST_DEBUG_OBJECT (demux, "All streams are exposed");
1302
1303   return TRUE;
1304 }
1305
1306 /* must be called with manifest_lock taken */
1307 GstAdaptiveDemuxStream *
1308 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1309 {
1310   GstAdaptiveDemuxStream *stream;
1311
1312   stream = g_malloc0 (demux->stream_struct_size);
1313
1314   /* Downloading task */
1315   g_rec_mutex_init (&stream->download_lock);
1316   stream->download_task =
1317       gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1318       stream, NULL);
1319   gst_task_set_lock (stream->download_task, &stream->download_lock);
1320
1321   stream->pad = pad;
1322   stream->demux = demux;
1323   stream->fragment_bitrates =
1324       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1325   gst_pad_set_element_private (pad, stream);
1326   stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1327
1328   g_mutex_lock (&demux->priv->preroll_lock);
1329   stream->do_block = TRUE;
1330   demux->priv->preroll_pending++;
1331   g_mutex_unlock (&demux->priv->preroll_lock);
1332
1333   gst_pad_set_query_function (pad,
1334       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1335   gst_pad_set_event_function (pad,
1336       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1337
1338   gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1339   g_cond_init (&stream->fragment_download_cond);
1340   g_mutex_init (&stream->fragment_download_lock);
1341
1342   demux->next_streams = g_list_append (demux->next_streams, stream);
1343
1344   return stream;
1345 }
1346
1347 GstAdaptiveDemuxStream *
1348 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1349 {
1350   GList *iter;
1351
1352   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1353     GstAdaptiveDemuxStream *stream = iter->data;
1354     if (stream->pad == pad) {
1355       return stream;
1356     }
1357   }
1358
1359   return NULL;
1360 }
1361
1362 /* must be called with manifest_lock taken.
1363  * It will temporarily drop the manifest_lock in order to join the task.
1364  * It will join only the old_streams (the demux->streams are joined by
1365  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1366  * called)
1367  */
1368 static void
1369 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1370 {
1371   GstAdaptiveDemux *demux = stream->demux;
1372   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1373
1374   if (klass->stream_free)
1375     klass->stream_free (stream);
1376
1377   g_clear_error (&stream->last_error);
1378   if (stream->download_task) {
1379     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1380       GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1381           GST_DEBUG_PAD_NAME (stream->pad));
1382
1383       gst_task_stop (stream->download_task);
1384
1385       g_mutex_lock (&stream->fragment_download_lock);
1386       stream->cancelled = TRUE;
1387       g_cond_signal (&stream->fragment_download_cond);
1388       g_mutex_unlock (&stream->fragment_download_lock);
1389     }
1390     GST_LOG_OBJECT (demux, "Waiting for task to finish");
1391
1392     /* temporarily drop the manifest lock to join the task */
1393     GST_MANIFEST_UNLOCK (demux);
1394
1395     gst_task_join (stream->download_task);
1396
1397     GST_MANIFEST_LOCK (demux);
1398
1399     GST_LOG_OBJECT (demux, "Finished");
1400     gst_object_unref (stream->download_task);
1401     g_rec_mutex_clear (&stream->download_lock);
1402     stream->download_task = NULL;
1403   }
1404
1405   gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1406
1407   if (stream->pending_segment) {
1408     gst_event_unref (stream->pending_segment);
1409     stream->pending_segment = NULL;
1410   }
1411
1412   if (stream->pending_events) {
1413     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1414     stream->pending_events = NULL;
1415   }
1416
1417   if (stream->internal_pad) {
1418     gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1419   }
1420
1421   if (stream->src_srcpad) {
1422     gst_object_unref (stream->src_srcpad);
1423     stream->src_srcpad = NULL;
1424   }
1425
1426   if (stream->src) {
1427     GstElement *src = stream->src;
1428
1429     stream->src = NULL;
1430
1431     GST_MANIFEST_UNLOCK (demux);
1432     gst_element_set_locked_state (src, TRUE);
1433     gst_element_set_state (src, GST_STATE_NULL);
1434     gst_bin_remove (GST_BIN_CAST (demux), src);
1435     GST_MANIFEST_LOCK (demux);
1436   }
1437
1438   g_cond_clear (&stream->fragment_download_cond);
1439   g_mutex_clear (&stream->fragment_download_lock);
1440   g_free (stream->fragment_bitrates);
1441
1442   if (stream->pad) {
1443     gst_object_unref (stream->pad);
1444     stream->pad = NULL;
1445   }
1446   if (stream->pending_caps)
1447     gst_caps_unref (stream->pending_caps);
1448
1449   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1450
1451   g_free (stream);
1452 }
1453
1454 /* must be called with manifest_lock taken */
1455 static gboolean
1456 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1457     gint64 * range_start, gint64 * range_stop)
1458 {
1459   GstAdaptiveDemuxClass *klass;
1460
1461   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1462
1463   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1464
1465   return klass->get_live_seek_range (demux, range_start, range_stop);
1466 }
1467
1468 /* must be called with manifest_lock taken */
1469 static gboolean
1470 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1471     GstAdaptiveDemuxStream * stream)
1472 {
1473   gint64 range_start, range_stop;
1474   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1475     GST_LOG_OBJECT (stream->pad,
1476         "stream position %" GST_TIME_FORMAT "  live seek range %"
1477         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1478         GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1479         GST_STIME_ARGS (range_stop));
1480     return (stream->segment.position >= range_start
1481         && stream->segment.position <= range_stop);
1482   }
1483
1484   return FALSE;
1485 }
1486
1487 /* must be called with manifest_lock taken */
1488 static gboolean
1489 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1490 {
1491   GstAdaptiveDemuxClass *klass;
1492
1493   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1494   if (gst_adaptive_demux_is_live (demux)) {
1495     return klass->get_live_seek_range != NULL;
1496   }
1497
1498   return klass->seek != NULL;
1499 }
1500
1501 static void
1502 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1503     GList * streams, gint64 period_start, GstSeekType start_type,
1504     GstSeekType stop_type)
1505 {
1506   GList *iter;
1507   for (iter = streams; iter; iter = g_list_next (iter)) {
1508     GstAdaptiveDemuxStream *stream = iter->data;
1509     GstEvent *seg_evt;
1510     GstClockTime offset;
1511
1512     /* See comments in gst_adaptive_demux_get_period_start_time() for
1513      * an explanation of the segment modifications */
1514     stream->segment = demux->segment;
1515     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1516     stream->segment.start += offset - period_start;
1517     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1518       stream->segment.stop += offset - period_start;
1519     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1520       stream->segment.position = stream->segment.start;
1521     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1522       stream->segment.position = stream->segment.stop;
1523     seg_evt = gst_event_new_segment (&stream->segment);
1524     gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1525     gst_event_replace (&stream->pending_segment, seg_evt);
1526     GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1527         stream->pending_segment);
1528     gst_event_unref (seg_evt);
1529     /* Make sure the first buffer after a seek has the discont flag */
1530     stream->discont = TRUE;
1531     stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1532   }
1533 }
1534
1535 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
1536                               GST_SEEK_FLAG_SNAP_AFTER |          \
1537                               GST_SEEK_FLAG_SNAP_NEAREST |        \
1538                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1539                               GST_SEEK_FLAG_KEY_UNIT))
1540 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1541                               GST_SEEK_FLAG_SNAP_AFTER | \
1542                               GST_SEEK_FLAG_SNAP_NEAREST))
1543
1544 static gboolean
1545 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1546     GstEvent * event)
1547 {
1548   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1549   gdouble rate;
1550   GstFormat format;
1551   GstSeekFlags flags;
1552   GstSeekType start_type, stop_type;
1553   gint64 start, stop;
1554   guint32 seqnum;
1555   gboolean update;
1556   gboolean ret;
1557   GstSegment oldsegment;
1558   GstAdaptiveDemuxStream *stream = NULL;
1559
1560   GST_INFO_OBJECT (demux, "Received seek event");
1561
1562   GST_API_LOCK (demux);
1563   GST_MANIFEST_LOCK (demux);
1564
1565   if (!gst_adaptive_demux_can_seek (demux)) {
1566     GST_MANIFEST_UNLOCK (demux);
1567     GST_API_UNLOCK (demux);
1568     gst_event_unref (event);
1569     return FALSE;
1570   }
1571
1572   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1573       &stop_type, &stop);
1574
1575   if (format != GST_FORMAT_TIME) {
1576     GST_MANIFEST_UNLOCK (demux);
1577     GST_API_UNLOCK (demux);
1578     GST_WARNING_OBJECT (demux,
1579         "Adaptive demuxers only support TIME-based seeking");
1580     gst_event_unref (event);
1581     return FALSE;
1582   }
1583
1584   if (flags & GST_SEEK_FLAG_SEGMENT) {
1585     GST_FIXME_OBJECT (demux, "Handle segment seeks");
1586     GST_MANIFEST_UNLOCK (demux);
1587     GST_API_UNLOCK (demux);
1588     gst_event_unref (event);
1589     return FALSE;
1590   }
1591
1592   seqnum = gst_event_get_seqnum (event);
1593
1594   if (gst_adaptive_demux_is_live (demux)) {
1595     gint64 range_start, range_stop;
1596     gboolean changed = FALSE;
1597     gboolean start_valid = TRUE, stop_valid = TRUE;
1598
1599     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1600             &range_stop)) {
1601       GST_MANIFEST_UNLOCK (demux);
1602       GST_API_UNLOCK (demux);
1603       gst_event_unref (event);
1604       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1605       return FALSE;
1606     }
1607
1608     GST_DEBUG_OBJECT (demux,
1609         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1610         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1611
1612     /* Handle relative positioning for live streams (relative to the range_stop) */
1613     if (start_type == GST_SEEK_TYPE_END) {
1614       start = range_stop + start;
1615       start_type = GST_SEEK_TYPE_SET;
1616       changed = TRUE;
1617     }
1618     if (stop_type == GST_SEEK_TYPE_END) {
1619       stop = range_stop + stop;
1620       stop_type = GST_SEEK_TYPE_SET;
1621       changed = TRUE;
1622     }
1623
1624     /* Adjust the requested start/stop position if it falls beyond the live
1625      * seek range.
1626      * The only case where we don't adjust is for the starting point of
1627      * an accurate seek (start if forward and stop if backwards)
1628      */
1629     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1630         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1631       GST_DEBUG_OBJECT (demux,
1632           "seek before live stream start, setting to range start: %"
1633           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1634       start = range_start;
1635       changed = TRUE;
1636     }
1637     /* truncate stop position also if set */
1638     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1639         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1640       GST_DEBUG_OBJECT (demux,
1641           "seek ending after live start, adjusting to: %"
1642           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1643       stop = range_stop;
1644       changed = TRUE;
1645     }
1646
1647     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1648         (start < range_start || start > range_stop)) {
1649       GST_WARNING_OBJECT (demux,
1650           "Seek to invalid position start:%" GST_STIME_FORMAT
1651           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1652           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1653           GST_STIME_ARGS (range_stop));
1654       start_valid = FALSE;
1655     }
1656     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1657         (stop < range_start || stop > range_stop)) {
1658       GST_WARNING_OBJECT (demux,
1659           "Seek to invalid position stop:%" GST_STIME_FORMAT
1660           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1661           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1662           GST_STIME_ARGS (range_stop));
1663       stop_valid = FALSE;
1664     }
1665
1666     /* If the seek position is still outside of the seekable range, refuse the seek */
1667     if (!start_valid || !stop_valid) {
1668       GST_MANIFEST_UNLOCK (demux);
1669       GST_API_UNLOCK (demux);
1670       gst_event_unref (event);
1671       return FALSE;
1672     }
1673
1674     /* Re-create seek event with changed/updated values */
1675     if (changed) {
1676       gst_event_unref (event);
1677       event =
1678           gst_event_new_seek (rate, format, flags,
1679           start_type, start, stop_type, stop);
1680       gst_event_set_seqnum (event, seqnum);
1681     }
1682   }
1683
1684   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1685
1686   /* have a backup in case seek fails */
1687   gst_segment_copy_into (&demux->segment, &oldsegment);
1688
1689   if (flags & GST_SEEK_FLAG_FLUSH) {
1690     GstEvent *fevent;
1691
1692     GST_DEBUG_OBJECT (demux, "sending flush start");
1693     fevent = gst_event_new_flush_start ();
1694     gst_event_set_seqnum (fevent, seqnum);
1695     GST_MANIFEST_UNLOCK (demux);
1696     gst_adaptive_demux_push_src_event (demux, fevent);
1697     GST_MANIFEST_LOCK (demux);
1698
1699     gst_adaptive_demux_stop_tasks (demux, FALSE);
1700   } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1701       (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1702
1703     gst_adaptive_demux_stop_tasks (demux, FALSE);
1704   }
1705
1706   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1707
1708   /*
1709    * Handle snap seeks as follows:
1710    * 1) do the snap seeking on the stream that received
1711    *    the event
1712    * 2) use the final position on this stream to seek
1713    *    on the other streams to the same position
1714    *
1715    * We can't snap at all streams at the same time as
1716    * they might end in different positions, so just
1717    * use the one that received the event as the 'leading'
1718    * one to do the snap seek.
1719    */
1720   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1721           gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1722     GstClockTime ts;
1723     GstSeekFlags stream_seek_flags = flags;
1724
1725     /* snap-seek on the stream that received the event and then
1726      * use the resulting position to seek on all streams */
1727
1728     if (rate >= 0) {
1729       if (start_type != GST_SEEK_TYPE_NONE)
1730         ts = start;
1731       else {
1732         ts = stream->segment.position;
1733         start_type = GST_SEEK_TYPE_SET;
1734       }
1735     } else {
1736       if (stop_type != GST_SEEK_TYPE_NONE)
1737         ts = stop;
1738       else {
1739         stop_type = GST_SEEK_TYPE_SET;
1740         ts = stream->segment.position;
1741       }
1742     }
1743
1744     if (stream) {
1745       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1746     }
1747
1748     /* replace event with a new one without snapping to seek on all streams */
1749     gst_event_unref (event);
1750     if (rate >= 0) {
1751       start = ts;
1752     } else {
1753       stop = ts;
1754     }
1755     event =
1756         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1757         start_type, start, stop_type, stop);
1758     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1759   }
1760   stream = NULL;
1761
1762   gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1763       start, stop_type, stop, &update);
1764
1765   /* FIXME - this seems unatural, do_seek() is updating base when we
1766    * only want the start/stop position to change, maybe do_seek() needs
1767    * some fixing? */
1768   if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1769               && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1770               && stop_type == GST_SEEK_TYPE_NONE))) {
1771     demux->segment.base = oldsegment.base;
1772   }
1773
1774   GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1775
1776   ret = demux_class->seek (demux, event);
1777
1778   if (!ret) {
1779     /* Is there anything else we can do if it fails? */
1780     gst_segment_copy_into (&oldsegment, &demux->segment);
1781   } else {
1782     demux->priv->segment_seqnum = seqnum;
1783   }
1784   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1785
1786   if (flags & GST_SEEK_FLAG_FLUSH) {
1787     GstEvent *fevent;
1788
1789     GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1790     fevent = gst_event_new_flush_stop (TRUE);
1791     gst_event_set_seqnum (fevent, seqnum);
1792     gst_adaptive_demux_push_src_event (demux, fevent);
1793   }
1794
1795   if (demux->next_streams) {
1796     /* If the seek generated new streams, get them
1797      * to preroll */
1798     gst_adaptive_demux_prepare_streams (demux, FALSE);
1799     gst_adaptive_demux_start_tasks (demux, TRUE);
1800   } else {
1801     GstClockTime period_start =
1802         gst_adaptive_demux_get_period_start_time (demux);
1803
1804     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1805     gst_adaptive_demux_update_streams_segment (demux, demux->streams,
1806         period_start, start_type, stop_type);
1807     gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
1808         period_start, start_type, stop_type);
1809     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1810
1811     /* Restart the demux */
1812     gst_adaptive_demux_start_tasks (demux, FALSE);
1813   }
1814
1815   GST_MANIFEST_UNLOCK (demux);
1816   GST_API_UNLOCK (demux);
1817   gst_event_unref (event);
1818
1819   return ret;
1820 }
1821
1822 static gboolean
1823 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1824     GstEvent * event)
1825 {
1826   GstAdaptiveDemux *demux;
1827
1828   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1829
1830   /* FIXME handle events received on pads that are to be removed */
1831
1832   switch (event->type) {
1833     case GST_EVENT_SEEK:
1834     {
1835       guint32 seqnum = gst_event_get_seqnum (event);
1836       if (seqnum == demux->priv->segment_seqnum) {
1837         GST_LOG_OBJECT (pad,
1838             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
1839         gst_event_unref (event);
1840         return TRUE;
1841       }
1842       return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1843     }
1844     case GST_EVENT_RECONFIGURE:{
1845       GstAdaptiveDemuxStream *stream;
1846
1847       GST_MANIFEST_LOCK (demux);
1848       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1849
1850       if (stream) {
1851         if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
1852             stream->last_ret == GST_FLOW_NOT_LINKED) {
1853           stream->last_ret = GST_FLOW_OK;
1854           stream->restart_download = TRUE;
1855           stream->need_header = TRUE;
1856           stream->discont = TRUE;
1857           GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1858           gst_task_start (stream->download_task);
1859         }
1860         gst_event_unref (event);
1861         GST_MANIFEST_UNLOCK (demux);
1862         return TRUE;
1863       }
1864       GST_MANIFEST_UNLOCK (demux);
1865     }
1866       break;
1867     case GST_EVENT_LATENCY:{
1868       /* Upstream and our internal source are irrelevant
1869        * for latency, and we should not fail here to
1870        * configure the latency */
1871       gst_event_unref (event);
1872       return TRUE;
1873     }
1874     case GST_EVENT_QOS:{
1875       GstAdaptiveDemuxStream *stream;
1876
1877       GST_MANIFEST_LOCK (demux);
1878       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1879
1880       if (stream) {
1881         GstClockTimeDiff diff;
1882         GstClockTime timestamp;
1883
1884         gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
1885         /* Only take into account lateness if late */
1886         if (diff > 0)
1887           stream->qos_earliest_time = timestamp + 2 * diff;
1888         else
1889           stream->qos_earliest_time = timestamp;
1890         GST_DEBUG_OBJECT (stream->pad, "qos_earliest_time %" GST_TIME_FORMAT,
1891             GST_TIME_ARGS (stream->qos_earliest_time));
1892       }
1893       GST_MANIFEST_UNLOCK (demux);
1894       break;
1895     }
1896     default:
1897       break;
1898   }
1899
1900   return gst_pad_event_default (pad, parent, event);
1901 }
1902
1903 static gboolean
1904 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1905     GstQuery * query)
1906 {
1907   GstAdaptiveDemux *demux;
1908   GstAdaptiveDemuxClass *demux_class;
1909   gboolean ret = FALSE;
1910
1911   if (query == NULL)
1912     return FALSE;
1913
1914   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1915   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1916
1917   switch (query->type) {
1918     case GST_QUERY_DURATION:{
1919       GstClockTime duration = -1;
1920       GstFormat fmt;
1921
1922       gst_query_parse_duration (query, &fmt, NULL);
1923
1924       if (fmt == GST_FORMAT_TIME
1925           && g_atomic_int_get (&demux->priv->have_manifest)) {
1926         duration = demux_class->get_duration (demux);
1927
1928         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1929           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1930           ret = TRUE;
1931         }
1932       }
1933
1934       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1935           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1936       break;
1937     }
1938     case GST_QUERY_LATENCY:{
1939       gst_query_set_latency (query, FALSE, 0, -1);
1940       ret = TRUE;
1941       break;
1942     }
1943     case GST_QUERY_SEEKING:{
1944       GstFormat fmt;
1945       gint64 stop = -1;
1946       gint64 start = 0;
1947
1948       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
1949         GST_INFO_OBJECT (demux,
1950             "Don't have manifest yet, can't answer seeking query");
1951         return FALSE;           /* can't answer without manifest */
1952       }
1953
1954       GST_MANIFEST_LOCK (demux);
1955
1956       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1957       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
1958       if (fmt == GST_FORMAT_TIME) {
1959         GstClockTime duration;
1960         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
1961
1962         ret = TRUE;
1963         if (can_seek) {
1964           if (gst_adaptive_demux_is_live (demux)) {
1965             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
1966             if (!ret) {
1967               GST_MANIFEST_UNLOCK (demux);
1968               GST_INFO_OBJECT (demux, "can't answer seeking query");
1969               return FALSE;
1970             }
1971           } else {
1972             duration = demux_class->get_duration (demux);
1973             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
1974               stop = duration;
1975           }
1976         }
1977         gst_query_set_seeking (query, fmt, can_seek, start, stop);
1978         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
1979             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
1980             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
1981       }
1982       GST_MANIFEST_UNLOCK (demux);
1983       break;
1984     }
1985     case GST_QUERY_URI:
1986
1987       GST_MANIFEST_LOCK (demux);
1988
1989       /* TODO HLS can answer this differently it seems */
1990       if (demux->manifest_uri) {
1991         /* FIXME: (hls) Do we answer with the variant playlist, with the current
1992          * playlist or the the uri of the last downlowaded fragment? */
1993         gst_query_set_uri (query, demux->manifest_uri);
1994         ret = TRUE;
1995       }
1996
1997       GST_MANIFEST_UNLOCK (demux);
1998       break;
1999     default:
2000       /* Don't forward queries upstream because of the special nature of this
2001        *  "demuxer", which relies on the upstream element only to be fed
2002        *  the Manifest
2003        */
2004       break;
2005   }
2006
2007   return ret;
2008 }
2009
2010 /* must be called with manifest_lock taken */
2011 static void
2012 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2013     gboolean start_preroll_streams)
2014 {
2015   GList *iter;
2016
2017   if (!gst_adaptive_demux_is_running (demux)) {
2018     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2019     return;
2020   }
2021
2022   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2023
2024   iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2025
2026   for (; iter; iter = g_list_next (iter)) {
2027     GstAdaptiveDemuxStream *stream = iter->data;
2028
2029     if (!start_preroll_streams) {
2030       g_mutex_lock (&stream->fragment_download_lock);
2031       stream->cancelled = FALSE;
2032       stream->replaced = FALSE;
2033       g_mutex_unlock (&stream->fragment_download_lock);
2034     }
2035
2036     stream->last_ret = GST_FLOW_OK;
2037     gst_task_start (stream->download_task);
2038   }
2039 }
2040
2041 /* must be called with manifest_lock taken */
2042 static void
2043 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2044 {
2045   gst_uri_downloader_cancel (demux->downloader);
2046
2047   gst_task_stop (demux->priv->updates_task);
2048
2049   g_mutex_lock (&demux->priv->updates_timed_lock);
2050   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2051   demux->priv->stop_updates_task = TRUE;
2052   g_cond_signal (&demux->priv->updates_timed_cond);
2053   g_mutex_unlock (&demux->priv->updates_timed_lock);
2054 }
2055
2056 /* must be called with manifest_lock taken */
2057 static void
2058 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2059 {
2060   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2061
2062   if (gst_adaptive_demux_is_live (demux)) {
2063     gst_uri_downloader_reset (demux->downloader);
2064     g_mutex_lock (&demux->priv->updates_timed_lock);
2065     demux->priv->stop_updates_task = FALSE;
2066     g_mutex_unlock (&demux->priv->updates_timed_lock);
2067     /* Task to periodically update the manifest */
2068     if (demux_class->requires_periodical_playlist_update (demux)) {
2069       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2070       gst_task_start (demux->priv->updates_task);
2071     }
2072   }
2073 }
2074
2075 /* must be called with manifest_lock taken
2076  * This function will temporarily release manifest_lock in order to join the
2077  * download threads.
2078  * The api_lock will still protect it against other threads trying to modify
2079  * the demux element.
2080  */
2081 static void
2082 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2083 {
2084   int i;
2085   GList *iter;
2086   GList *list_to_process;
2087
2088   GST_LOG_OBJECT (demux, "Stopping tasks");
2089
2090   if (stop_updates)
2091     gst_adaptive_demux_stop_manifest_update_task (demux);
2092
2093   list_to_process = demux->streams;
2094   for (i = 0; i < 2; ++i) {
2095     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2096       GstAdaptiveDemuxStream *stream = iter->data;
2097
2098       g_mutex_lock (&stream->fragment_download_lock);
2099       stream->cancelled = TRUE;
2100       gst_task_stop (stream->download_task);
2101       g_cond_signal (&stream->fragment_download_cond);
2102       g_mutex_unlock (&stream->fragment_download_lock);
2103     }
2104     list_to_process = demux->prepared_streams;
2105   }
2106
2107   GST_MANIFEST_UNLOCK (demux);
2108   g_mutex_lock (&demux->priv->preroll_lock);
2109   g_cond_broadcast (&demux->priv->preroll_cond);
2110   g_mutex_unlock (&demux->priv->preroll_lock);
2111   GST_MANIFEST_LOCK (demux);
2112
2113   g_mutex_lock (&demux->priv->manifest_update_lock);
2114   g_cond_broadcast (&demux->priv->manifest_cond);
2115   g_mutex_unlock (&demux->priv->manifest_update_lock);
2116
2117   /* need to release manifest_lock before stopping the src element.
2118    * The streams were asked to cancel, so they will not make any writes to demux
2119    * object. Even if we temporarily release manifest_lock, the demux->streams
2120    * cannot change and iter cannot be invalidated.
2121    */
2122   list_to_process = demux->streams;
2123   for (i = 0; i < 2; ++i) {
2124     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2125       GstAdaptiveDemuxStream *stream = iter->data;
2126       GstElement *src = stream->src;
2127
2128       GST_MANIFEST_UNLOCK (demux);
2129
2130       if (src) {
2131         gst_element_set_locked_state (src, TRUE);
2132         gst_element_set_state (src, GST_STATE_READY);
2133       }
2134
2135       /* stream->download_task value never changes, so it is safe to read it
2136        * outside critical section
2137        */
2138       gst_task_join (stream->download_task);
2139
2140       GST_MANIFEST_LOCK (demux);
2141     }
2142     list_to_process = demux->prepared_streams;
2143   }
2144
2145   GST_MANIFEST_UNLOCK (demux);
2146   if (stop_updates)
2147     gst_task_join (demux->priv->updates_task);
2148
2149   GST_MANIFEST_LOCK (demux);
2150
2151   list_to_process = demux->streams;
2152   for (i = 0; i < 2; ++i) {
2153     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2154       GstAdaptiveDemuxStream *stream = iter->data;
2155
2156       stream->download_error_count = 0;
2157       stream->need_header = TRUE;
2158       stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
2159     }
2160     list_to_process = demux->prepared_streams;
2161   }
2162 }
2163
2164 /* must be called with manifest_lock taken */
2165 static gboolean
2166 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2167 {
2168   GList *iter;
2169   gboolean ret = TRUE;
2170
2171   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2172     GstAdaptiveDemuxStream *stream = iter->data;
2173     gst_event_ref (event);
2174     ret = ret & gst_pad_push_event (stream->pad, event);
2175   }
2176   gst_event_unref (event);
2177   return ret;
2178 }
2179
2180 /* must be called with manifest_lock taken */
2181 void
2182 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2183     GstCaps * caps)
2184 {
2185   GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2186       caps);
2187   gst_caps_replace (&stream->pending_caps, caps);
2188   gst_caps_unref (caps);
2189 }
2190
2191 /* must be called with manifest_lock taken */
2192 void
2193 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2194     GstTagList * tags)
2195 {
2196   GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2197       tags);
2198   if (stream->pending_tags) {
2199     gst_tag_list_unref (stream->pending_tags);
2200   }
2201   stream->pending_tags = tags;
2202 }
2203
2204 /* must be called with manifest_lock taken */
2205 void
2206 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2207     GstEvent * event)
2208 {
2209   stream->pending_events = g_list_append (stream->pending_events, event);
2210 }
2211
2212 /* must be called with manifest_lock taken */
2213 static guint64
2214 _update_average_bitrate (GstAdaptiveDemux * demux,
2215     GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2216 {
2217   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2218
2219   stream->moving_bitrate -= stream->fragment_bitrates[index];
2220   stream->fragment_bitrates[index] = new_bitrate;
2221   stream->moving_bitrate += new_bitrate;
2222
2223   stream->moving_index += 1;
2224
2225   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2226     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2227   return stream->moving_bitrate / stream->moving_index;
2228 }
2229
2230 /* must be called with manifest_lock taken */
2231 static guint64
2232 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2233     GstAdaptiveDemuxStream * stream)
2234 {
2235   guint64 average_bitrate;
2236   guint64 fragment_bitrate;
2237
2238   if (demux->connection_speed) {
2239     GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2240         demux->connection_speed / 1000);
2241     stream->current_download_rate = demux->connection_speed;
2242     return demux->connection_speed;
2243   }
2244
2245   fragment_bitrate = stream->last_bitrate;
2246   GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2247       fragment_bitrate);
2248
2249   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2250
2251   GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
2252       fragment_bitrate);
2253   GST_INFO_OBJECT (stream,
2254       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2255       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2256
2257   /* Conservative approach, make sure we don't upgrade too fast */
2258   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2259
2260   stream->current_download_rate *= demux->bitrate_limit;
2261   GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2262       G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2263
2264 #if 0
2265   /* Debugging code, modulate the bitrate every few fragments */
2266   {
2267     static guint ctr = 0;
2268     if (ctr % 3 == 0) {
2269       GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2270       stream->current_download_rate /= 2;
2271     }
2272     ctr++;
2273   }
2274 #endif
2275
2276   return stream->current_download_rate;
2277 }
2278
2279 /* must be called with manifest_lock taken */
2280 static GstFlowReturn
2281 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2282 {
2283   gboolean all_notlinked = TRUE;
2284   gboolean all_eos = TRUE;
2285   GList *iter;
2286
2287   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2288     GstAdaptiveDemuxStream *stream = iter->data;
2289
2290     if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2291       all_notlinked = FALSE;
2292       if (stream->last_ret != GST_FLOW_EOS)
2293         all_eos = FALSE;
2294     }
2295
2296     if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2297         || stream->last_ret == GST_FLOW_FLUSHING) {
2298       return stream->last_ret;
2299     }
2300   }
2301   if (all_notlinked)
2302     return GST_FLOW_NOT_LINKED;
2303   else if (all_eos)
2304     return GST_FLOW_EOS;
2305   return GST_FLOW_OK;
2306 }
2307
2308 /* Called with preroll_lock */
2309 static void
2310 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2311     GstAdaptiveDemuxStream * stream)
2312 {
2313   demux->priv->preroll_pending--;
2314   if (demux->priv->preroll_pending == 0) {
2315     /* That was the last one, time to release all streams
2316      * and expose them */
2317     GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2318     gst_adaptive_demux_expose_streams (demux);
2319     g_cond_broadcast (&demux->priv->preroll_cond);
2320   }
2321 }
2322
2323 /* must be called with manifest_lock taken.
2324  * Temporarily releases manifest_lock
2325  */
2326 GstFlowReturn
2327 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2328     GstBuffer * buffer)
2329 {
2330   GstAdaptiveDemux *demux = stream->demux;
2331   GstFlowReturn ret = GST_FLOW_OK;
2332   gboolean discont = FALSE;
2333   /* Pending events */
2334   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2335   GList *pending_events = NULL;
2336
2337   /* FIXME : 
2338    * This is duplicating *exactly* the same thing as what is done at the beginning
2339    * of _src_chain if starting_fragment is TRUE */
2340   if (stream->first_fragment_buffer) {
2341     GstClockTime offset =
2342         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2343     GstClockTime period_start =
2344         gst_adaptive_demux_get_period_start_time (demux);
2345
2346     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2347     if (demux->segment.rate < 0)
2348       /* Set DISCONT flag for every first buffer in reverse playback mode
2349        * as each fragment for its own has to be reversed */
2350       discont = TRUE;
2351
2352     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2353     if (GST_BUFFER_PTS_IS_VALID (buffer))
2354       GST_BUFFER_PTS (buffer) += offset;
2355
2356     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2357       stream->segment.position = GST_BUFFER_PTS (buffer);
2358
2359       /* Convert from position inside the stream's segment to the demuxer's
2360        * segment, they are not necessarily the same */
2361       if (stream->segment.position - offset + period_start >
2362           demux->segment.position)
2363         demux->segment.position =
2364             stream->segment.position - offset + period_start;
2365     }
2366     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2367
2368     GST_LOG_OBJECT (stream->pad,
2369         "Going to push buffer with PTS %" GST_TIME_FORMAT,
2370         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2371   } else {
2372     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2373   }
2374
2375   if (stream->discont) {
2376     discont = TRUE;
2377     stream->discont = FALSE;
2378   }
2379
2380   if (discont) {
2381     GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2382     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2383   } else {
2384     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2385   }
2386
2387   stream->first_fragment_buffer = FALSE;
2388
2389   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2390   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2391   if (G_UNLIKELY (stream->pending_caps)) {
2392     pending_caps = gst_event_new_caps (stream->pending_caps);
2393     gst_caps_unref (stream->pending_caps);
2394     stream->pending_caps = NULL;
2395   }
2396
2397   if (stream->do_block) {
2398
2399     g_mutex_lock (&demux->priv->preroll_lock);
2400
2401     /* If we are preroll state, set caps in here */
2402     if (pending_caps) {
2403       gst_pad_push_event (stream->pad, pending_caps);
2404       pending_caps = NULL;
2405     }
2406
2407     gst_adaptive_demux_handle_preroll (demux, stream);
2408     GST_MANIFEST_UNLOCK (demux);
2409
2410     while (stream->do_block && !stream->cancelled) {
2411       GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2412       g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2413     }
2414     if (stream->cancelled) {
2415       GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2416       gst_buffer_unref (buffer);
2417       g_mutex_unlock (&demux->priv->preroll_lock);
2418       return GST_FLOW_FLUSHING;
2419     }
2420
2421     g_mutex_unlock (&demux->priv->preroll_lock);
2422     GST_MANIFEST_LOCK (demux);
2423   }
2424
2425   if (G_UNLIKELY (stream->pending_segment)) {
2426     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2427     pending_segment = stream->pending_segment;
2428     stream->pending_segment = NULL;
2429     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2430   }
2431   if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2432     GstTagList *tags = stream->pending_tags;
2433
2434     stream->pending_tags = NULL;
2435     stream->bitrate_changed = 0;
2436
2437     if (stream->fragment.bitrate != 0) {
2438       if (tags)
2439         tags = gst_tag_list_make_writable (tags);
2440       else
2441         tags = gst_tag_list_new_empty ();
2442
2443       gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2444           GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2445     }
2446     if (tags)
2447       pending_tags = gst_event_new_tag (tags);
2448   }
2449   if (G_UNLIKELY (stream->pending_events)) {
2450     pending_events = stream->pending_events;
2451     stream->pending_events = NULL;
2452   }
2453
2454   GST_MANIFEST_UNLOCK (demux);
2455
2456   /* Do not push events or buffers holding the manifest lock */
2457   if (G_UNLIKELY (pending_caps)) {
2458     GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2459         pending_caps);
2460     gst_pad_push_event (stream->pad, pending_caps);
2461   }
2462   if (G_UNLIKELY (pending_segment)) {
2463     GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2464         pending_segment);
2465     gst_pad_push_event (stream->pad, pending_segment);
2466   }
2467   if (G_UNLIKELY (pending_tags)) {
2468     GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2469         pending_tags);
2470     gst_pad_push_event (stream->pad, pending_tags);
2471   }
2472   while (pending_events != NULL) {
2473     GstEvent *event = pending_events->data;
2474
2475     if (!gst_pad_push_event (stream->pad, event))
2476       GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2477
2478     pending_events = g_list_delete_link (pending_events, pending_events);
2479   }
2480
2481   /* Wait for preroll if blocking */
2482   GST_DEBUG_OBJECT (stream->pad,
2483       "About to push buffer of size %" G_GSIZE_FORMAT,
2484       gst_buffer_get_size (buffer));
2485
2486   ret = gst_pad_push (stream->pad, buffer);
2487
2488   GST_MANIFEST_LOCK (demux);
2489
2490   g_mutex_lock (&stream->fragment_download_lock);
2491   if (G_UNLIKELY (stream->cancelled)) {
2492     GST_LOG_OBJECT (stream, "Stream was cancelled");
2493     ret = stream->last_ret = GST_FLOW_FLUSHING;
2494     g_mutex_unlock (&stream->fragment_download_lock);
2495     return ret;
2496   }
2497   g_mutex_unlock (&stream->fragment_download_lock);
2498
2499   GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2500       gst_flow_get_name (ret));
2501
2502   return ret;
2503 }
2504
2505 /* must be called with manifest_lock taken */
2506 static GstFlowReturn
2507 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2508     GstAdaptiveDemuxStream * stream)
2509 {
2510   /* No need to advance, this isn't a real fragment */
2511   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2512     return GST_FLOW_OK;
2513
2514   return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2515       stream->fragment.duration);
2516 }
2517
2518 /* must be called with manifest_lock taken.
2519  * Can temporarily release manifest_lock
2520  */
2521 static GstFlowReturn
2522 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2523     GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2524 {
2525   return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2526 }
2527
2528 static gboolean
2529 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2530     * demux)
2531 {
2532   return TRUE;
2533 }
2534
2535 static GstFlowReturn
2536 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2537 {
2538   GstAdaptiveDemuxStream *stream;
2539   GstAdaptiveDemux *demux;
2540   GstAdaptiveDemuxClass *klass;
2541   GstFlowReturn ret = GST_FLOW_OK;
2542
2543   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2544   stream = gst_pad_get_element_private (pad);
2545   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2546
2547   GST_MANIFEST_LOCK (demux);
2548
2549   /* do not make any changes if the stream is cancelled */
2550   g_mutex_lock (&stream->fragment_download_lock);
2551   if (G_UNLIKELY (stream->cancelled)) {
2552     g_mutex_unlock (&stream->fragment_download_lock);
2553     gst_buffer_unref (buffer);
2554     ret = stream->last_ret = GST_FLOW_FLUSHING;
2555     GST_MANIFEST_UNLOCK (demux);
2556     return ret;
2557   }
2558   g_mutex_unlock (&stream->fragment_download_lock);
2559
2560   /* starting_fragment is set to TRUE at the beginning of
2561    * _stream_download_fragment()
2562    * /!\ If there is a header/index being downloaded, then this will
2563    * be TRUE for the first one ... but FALSE for the remaining ones,
2564    * including the *actual* fragment ! */
2565   if (stream->starting_fragment) {
2566     GstClockTime offset =
2567         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2568     GstClockTime period_start =
2569         gst_adaptive_demux_get_period_start_time (demux);
2570
2571     stream->starting_fragment = FALSE;
2572     if (klass->start_fragment) {
2573       if (!klass->start_fragment (demux, stream)) {
2574         ret = GST_FLOW_ERROR;
2575         goto error;
2576       }
2577     }
2578
2579     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2580     if (GST_BUFFER_PTS_IS_VALID (buffer))
2581       GST_BUFFER_PTS (buffer) += offset;
2582
2583     GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2584         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2585
2586     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2587       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2588       stream->segment.position = GST_BUFFER_PTS (buffer);
2589
2590       /* Convert from position inside the stream's segment to the demuxer's
2591        * segment, they are not necessarily the same */
2592       if (stream->segment.position - offset + period_start >
2593           demux->segment.position)
2594         demux->segment.position =
2595             stream->segment.position - offset + period_start;
2596       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2597     }
2598
2599   } else {
2600     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2601   }
2602
2603   /* downloading_first_buffer is set to TRUE in download_uri() just before
2604    * activating the source (i.e. requesting a given URI)
2605    *
2606    * The difference with starting_fragment is that this will be called
2607    * for *all* first buffers (of index, and header, and fragment)
2608    *
2609    * ... to then only do something useful (in this block) for actual
2610    * fragments... */
2611   if (stream->downloading_first_buffer) {
2612     gint64 chunk_size = 0;
2613
2614     stream->downloading_first_buffer = FALSE;
2615
2616     if (!stream->downloading_header && !stream->downloading_index) {
2617       /* If this is the first buffer of a fragment (not the headers or index)
2618        * and we don't have a birate from the sub-class, then see if we
2619        * can work it out from the fragment size and duration */
2620       if (stream->fragment.bitrate == 0 &&
2621           stream->fragment.duration != 0 &&
2622           gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2623               &chunk_size)) {
2624         guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2625                 8 * GST_SECOND, stream->fragment.duration));
2626         GST_LOG_OBJECT (demux,
2627             "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2628             " = bitrate %u", chunk_size,
2629             GST_TIME_ARGS (stream->fragment.duration), bitrate);
2630         stream->fragment.bitrate = bitrate;
2631       }
2632       if (stream->fragment.bitrate) {
2633         stream->bitrate_changed = TRUE;
2634       } else {
2635         GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2636       }
2637     }
2638   }
2639
2640   stream->download_total_bytes += gst_buffer_get_size (buffer);
2641
2642   GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2643       gst_buffer_get_size (buffer));
2644
2645   ret = klass->data_received (demux, stream, buffer);
2646
2647   if (ret == GST_FLOW_FLUSHING) {
2648     /* do not make any changes if the stream is cancelled */
2649     g_mutex_lock (&stream->fragment_download_lock);
2650     if (G_UNLIKELY (stream->cancelled)) {
2651       g_mutex_unlock (&stream->fragment_download_lock);
2652       GST_MANIFEST_UNLOCK (demux);
2653       return ret;
2654     }
2655     g_mutex_unlock (&stream->fragment_download_lock);
2656   }
2657
2658   if (ret != GST_FLOW_OK) {
2659     gboolean finished = FALSE;
2660
2661     if (ret < GST_FLOW_EOS) {
2662       GST_ELEMENT_FLOW_ERROR (demux, ret);
2663
2664       /* TODO push this on all pads */
2665       gst_pad_push_event (stream->pad, gst_event_new_eos ());
2666     } else {
2667       GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2668           gst_flow_get_name (ret));
2669     }
2670
2671     if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2672       ret = GST_FLOW_EOS;       /* return EOS to make the source stop */
2673     } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
2674       /* Behaves like an EOS event from upstream */
2675       stream->fragment.finished = TRUE;
2676       ret = klass->finish_fragment (demux, stream);
2677       if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2678         ret = GST_FLOW_EOS;     /* return EOS to make the source stop */
2679       } else if (ret != GST_FLOW_OK) {
2680         goto error;
2681       }
2682       finished = TRUE;
2683     }
2684
2685     gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2686     if (finished)
2687       ret = GST_FLOW_EOS;
2688   }
2689
2690 error:
2691
2692   GST_MANIFEST_UNLOCK (demux);
2693
2694   return ret;
2695 }
2696
2697 /* must be called with manifest_lock taken */
2698 static void
2699 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2700     stream, GstFlowReturn ret, GError * err)
2701 {
2702   GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2703       gst_flow_get_name (ret), err);
2704
2705   /* if we have an error, only replace last_ret if it was OK before to avoid
2706    * overwriting the first error we got */
2707   if (stream->last_ret == GST_FLOW_OK) {
2708     stream->last_ret = ret;
2709     if (err) {
2710       g_clear_error (&stream->last_error);
2711       stream->last_error = g_error_copy (err);
2712     }
2713   }
2714   g_mutex_lock (&stream->fragment_download_lock);
2715   stream->download_finished = TRUE;
2716   g_cond_signal (&stream->fragment_download_cond);
2717   g_mutex_unlock (&stream->fragment_download_lock);
2718 }
2719
2720 static GstFlowReturn
2721 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
2722 {
2723   GstFlowReturn ret = GST_FLOW_OK;
2724   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
2725
2726   if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
2727       || !klass->need_another_chunk (stream)
2728       || stream->fragment.chunk_size == 0) {
2729     stream->fragment.finished = TRUE;
2730     ret = klass->finish_fragment (stream->demux, stream);
2731   }
2732   gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2733
2734   return ret;
2735 }
2736
2737 static gboolean
2738 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2739 {
2740   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2741   GstAdaptiveDemux *demux = stream->demux;
2742
2743   switch (GST_EVENT_TYPE (event)) {
2744     case GST_EVENT_EOS:{
2745       GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
2746       GST_MANIFEST_LOCK (demux);
2747
2748       gst_adaptive_demux_eos_handling (stream);
2749
2750       /* FIXME ?
2751        * _eos_handling() calls  fragment_download_finish() which does the
2752        * same thing as below.
2753        * Could this cause races ? */
2754       g_mutex_lock (&stream->fragment_download_lock);
2755       stream->download_finished = TRUE;
2756       g_cond_signal (&stream->fragment_download_cond);
2757       g_mutex_unlock (&stream->fragment_download_lock);
2758
2759       GST_MANIFEST_UNLOCK (demux);
2760       break;
2761     }
2762     default:
2763       break;
2764   }
2765
2766   gst_event_unref (event);
2767
2768   return TRUE;
2769 }
2770
2771 static gboolean
2772 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2773 {
2774   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2775
2776   switch (GST_QUERY_TYPE (query)) {
2777     case GST_QUERY_ALLOCATION:
2778       return FALSE;
2779       break;
2780     default:
2781       break;
2782   }
2783
2784   return gst_pad_peer_query (stream->pad, query);
2785 }
2786
2787 static GstPadProbeReturn
2788 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
2789     GstAdaptiveDemuxStream * stream)
2790 {
2791   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2792
2793   if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
2794     GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2795     if (stream->fragment_bytes_downloaded == 0) {
2796       stream->last_latency =
2797           gst_adaptive_demux_get_monotonic_time (stream->demux) -
2798           (stream->download_start_time * GST_USECOND);
2799       GST_DEBUG_OBJECT (pad,
2800           "FIRST BYTE since download_start %" GST_TIME_FORMAT,
2801           GST_TIME_ARGS (stream->last_latency));
2802     }
2803     stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
2804     GST_LOG_OBJECT (pad,
2805         "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
2806         gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
2807   } else if (GST_PAD_PROBE_INFO_TYPE (info) &
2808       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
2809     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2810     GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
2811         GST_EVENT_TYPE_NAME (ev), ev);
2812     switch (GST_EVENT_TYPE (ev)) {
2813       case GST_EVENT_SEGMENT:
2814         stream->fragment_bytes_downloaded = 0;
2815         break;
2816       case GST_EVENT_EOS:
2817       {
2818         stream->last_download_time =
2819             gst_adaptive_demux_get_monotonic_time (stream->demux) -
2820             (stream->download_start_time * GST_USECOND);
2821         stream->last_bitrate =
2822             gst_util_uint64_scale (stream->fragment_bytes_downloaded,
2823             8 * GST_SECOND, stream->last_download_time);
2824         GST_DEBUG_OBJECT (pad,
2825             "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
2826             G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
2827             stream->last_bitrate);
2828         /* Calculate bitrate since URI request */
2829       }
2830         break;
2831       default:
2832         break;
2833     }
2834   }
2835
2836   return ret;
2837 }
2838
2839 /* must be called with manifest_lock taken.
2840  * Can temporarily release manifest_lock
2841  */
2842 static gboolean
2843 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2844     GstAdaptiveDemuxStream * stream)
2845 {
2846   gboolean ret = TRUE;
2847
2848   /* Wait until we're cancelled or there's something for
2849    * us to download in the playlist or the playlist
2850    * became non-live */
2851   while (TRUE) {
2852     GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2853
2854     /* get the manifest_update_lock while still holding the manifest_lock.
2855      * This will prevent other threads to signal the condition (they will need
2856      * both manifest_lock and manifest_update_lock in order to signal).
2857      * It cannot deadlock because all threads always get the manifest_lock first
2858      * and manifest_update_lock second.
2859      */
2860     g_mutex_lock (&demux->priv->manifest_update_lock);
2861
2862     GST_MANIFEST_UNLOCK (demux);
2863
2864     g_cond_wait (&demux->priv->manifest_cond,
2865         &demux->priv->manifest_update_lock);
2866     g_mutex_unlock (&demux->priv->manifest_update_lock);
2867
2868     GST_MANIFEST_LOCK (demux);
2869
2870     /* check for cancelled every time we get the manifest_lock */
2871     g_mutex_lock (&stream->fragment_download_lock);
2872     if (G_UNLIKELY (stream->cancelled)) {
2873       ret = FALSE;
2874       stream->last_ret = GST_FLOW_FLUSHING;
2875       g_mutex_unlock (&stream->fragment_download_lock);
2876       break;
2877     }
2878     g_mutex_unlock (&stream->fragment_download_lock);
2879
2880     /* Got a new fragment or not live anymore? */
2881     if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
2882         GST_FLOW_OK) {
2883       GST_DEBUG_OBJECT (demux, "new fragment available, "
2884           "not waiting for manifest update");
2885       ret = TRUE;
2886       break;
2887     }
2888
2889     if (!gst_adaptive_demux_is_live (demux)) {
2890       GST_DEBUG_OBJECT (demux, "Not live anymore, "
2891           "not waiting for manifest update");
2892       ret = FALSE;
2893       break;
2894     }
2895   }
2896   GST_DEBUG_OBJECT (demux, "Retrying now");
2897   return ret;
2898 }
2899
2900 /* must be called with manifest_lock taken */
2901 static gboolean
2902 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2903     const gchar * uri, const gchar * referer, gboolean refresh,
2904     gboolean allow_cache)
2905 {
2906   GstAdaptiveDemux *demux = stream->demux;
2907
2908   if (!gst_uri_is_valid (uri)) {
2909     GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2910     return FALSE;
2911   }
2912
2913   /* Try to re-use existing source element */
2914   if (stream->src != NULL) {
2915     gchar *old_protocol, *new_protocol;
2916     gchar *old_uri;
2917
2918     old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2919     old_protocol = gst_uri_get_protocol (old_uri);
2920     new_protocol = gst_uri_get_protocol (uri);
2921
2922     if (!g_str_equal (old_protocol, new_protocol)) {
2923       GstElement *src = stream->src;
2924
2925       stream->src = NULL;
2926       gst_object_unref (stream->src_srcpad);
2927       stream->src_srcpad = NULL;
2928       GST_MANIFEST_UNLOCK (demux);
2929       gst_element_set_locked_state (src, TRUE);
2930       gst_element_set_state (src, GST_STATE_NULL);
2931       gst_bin_remove (GST_BIN_CAST (demux), src);
2932       GST_MANIFEST_LOCK (demux);
2933       GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2934     } else {
2935       GError *err = NULL;
2936
2937       GST_DEBUG_OBJECT (demux, "Re-using old source element");
2938       if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
2939               &err)) {
2940         GstElement *src = stream->src;
2941
2942         stream->src = NULL;
2943         GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
2944             err ? err->message : "Unknown error");
2945         g_clear_error (&err);
2946         gst_object_unref (stream->src_srcpad);
2947         stream->src_srcpad = NULL;
2948         GST_MANIFEST_UNLOCK (demux);
2949         gst_element_set_locked_state (src, TRUE);
2950         gst_element_set_state (src, GST_STATE_NULL);
2951         gst_bin_remove (GST_BIN_CAST (demux), src);
2952         GST_MANIFEST_LOCK (demux);
2953       }
2954     }
2955     g_free (old_uri);
2956     g_free (old_protocol);
2957     g_free (new_protocol);
2958   }
2959
2960   if (stream->src == NULL) {
2961     GstPad *uri_handler_src;
2962     GstPad *queue_sink;
2963     GstPad *queue_src;
2964     GstElement *uri_handler;
2965     GstElement *queue;
2966     GstPadLinkReturn pad_link_ret;
2967     GObjectClass *gobject_class;
2968     gchar *internal_name, *bin_name;
2969
2970     /* Our src consists of a bin containing uri_handler -> queue . The
2971      * purpose of the queue is to allow the uri_handler to download an
2972      * entire fragment without blocking, so we can accurately measure the
2973      * download bitrate. */
2974
2975     queue = gst_element_factory_make ("queue", NULL);
2976     if (queue == NULL)
2977       return FALSE;
2978
2979     g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
2980     g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
2981     g_object_set (queue, "max-size-time", (guint64) 0, NULL);
2982
2983     uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
2984     if (uri_handler == NULL) {
2985       GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
2986           ("Missing plugin to handle URI: '%s'", uri), (NULL));
2987       gst_object_unref (queue);
2988       return FALSE;
2989     }
2990
2991     gobject_class = G_OBJECT_GET_CLASS (uri_handler);
2992
2993     if (g_object_class_find_property (gobject_class, "compress"))
2994       g_object_set (uri_handler, "compress", FALSE, NULL);
2995     if (g_object_class_find_property (gobject_class, "keep-alive"))
2996       g_object_set (uri_handler, "keep-alive", TRUE, NULL);
2997     if (g_object_class_find_property (gobject_class, "extra-headers")) {
2998       if (referer || refresh || !allow_cache) {
2999         GstStructure *extra_headers = gst_structure_new_empty ("headers");
3000
3001         if (referer)
3002           gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3003               NULL);
3004
3005         if (!allow_cache)
3006           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3007               "no-cache", NULL);
3008         else if (refresh)
3009           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3010               "max-age=0", NULL);
3011
3012         g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3013
3014         gst_structure_free (extra_headers);
3015       } else {
3016         g_object_set (uri_handler, "extra-headers", NULL, NULL);
3017       }
3018     }
3019
3020     /* Source bin creation */
3021     bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3022     stream->src = gst_bin_new (bin_name);
3023     g_free (bin_name);
3024     if (stream->src == NULL) {
3025       gst_object_unref (queue);
3026       gst_object_unref (uri_handler);
3027       return FALSE;
3028     }
3029
3030     gst_bin_add (GST_BIN_CAST (stream->src), queue);
3031     gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3032
3033     uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3034     queue_sink = gst_element_get_static_pad (queue, "sink");
3035
3036     pad_link_ret =
3037         gst_pad_link_full (uri_handler_src, queue_sink,
3038         GST_PAD_LINK_CHECK_NOTHING);
3039     if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3040       GST_WARNING_OBJECT (demux,
3041           "Could not link pads %s:%s to %s:%s for reason %d",
3042           GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3043           pad_link_ret);
3044       g_object_unref (queue_sink);
3045       g_object_unref (uri_handler_src);
3046       gst_object_unref (stream->src);
3047       stream->src = NULL;
3048       return FALSE;
3049     }
3050
3051     /* Add a downstream event and data probe */
3052     gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3053         (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3054
3055     g_object_unref (queue_sink);
3056     g_object_unref (uri_handler_src);
3057     queue_src = gst_element_get_static_pad (queue, "src");
3058     stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3059     g_object_unref (queue_src);
3060     gst_element_add_pad (stream->src, stream->src_srcpad);
3061
3062     gst_element_set_locked_state (stream->src, TRUE);
3063     gst_bin_add (GST_BIN_CAST (demux), stream->src);
3064     stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3065
3066     /* set up our internal floating pad to drop all events from
3067      * the http src we don't care about. On the chain function
3068      * we just push the buffer forward */
3069     internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3070     stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3071     g_free (internal_name);
3072     gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3073         GST_OBJECT_CAST (demux));
3074     GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3075     gst_pad_set_element_private (stream->internal_pad, stream);
3076     gst_pad_set_active (stream->internal_pad, TRUE);
3077     gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3078     gst_pad_set_event_function (stream->internal_pad, _src_event);
3079     gst_pad_set_query_function (stream->internal_pad, _src_query);
3080
3081     if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3082             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3083       GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3084       return FALSE;
3085     }
3086
3087     stream->uri_handler = uri_handler;
3088     stream->queue = queue;
3089
3090     stream->last_status_code = 200;     /* default to OK */
3091   }
3092   return TRUE;
3093 }
3094
3095 static GstPadProbeReturn
3096 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3097     gpointer user_data)
3098 {
3099   GstAdaptiveDemuxStream *stream = user_data;
3100
3101   /* The source's src pad is IDLE so now set the state to READY */
3102   g_mutex_lock (&stream->fragment_download_lock);
3103   stream->src_at_ready = TRUE;
3104   g_cond_signal (&stream->fragment_download_cond);
3105   g_mutex_unlock (&stream->fragment_download_lock);
3106
3107   return GST_PAD_PROBE_REMOVE;
3108 }
3109
3110 #ifndef GST_DISABLE_GST_DEBUG
3111 static const char *
3112 uritype (GstAdaptiveDemuxStream * s)
3113 {
3114   if (s->downloading_header)
3115     return "header";
3116   if (s->downloading_index)
3117     return "index";
3118   return "fragment";
3119 }
3120 #endif
3121
3122 /* must be called with manifest_lock taken.
3123  * Can temporarily release manifest_lock
3124  *
3125  * Will return when URI is fully downloaded (or aborted/errored)
3126  */
3127 static GstFlowReturn
3128 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3129     GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3130     gint64 end, guint * http_status)
3131 {
3132   GstFlowReturn ret = GST_FLOW_OK;
3133   GST_DEBUG_OBJECT (stream->pad,
3134       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3135       uritype (stream), uri, start, end);
3136
3137   if (http_status)
3138     *http_status = 200;         /* default to ok if no further information */
3139
3140   if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3141     ret = stream->last_ret = GST_FLOW_ERROR;
3142     return ret;
3143   }
3144
3145   gst_element_set_locked_state (stream->src, TRUE);
3146
3147   GST_MANIFEST_UNLOCK (demux);
3148   if (gst_element_set_state (stream->src,
3149           GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3150     /* If ranges are specified, seek to it */
3151     if (start != 0 || end != -1) {
3152       /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3153        * stop position */
3154       if (end != -1)
3155         end += 1;
3156       /* Send the seek event to the uri_handler, as the other pipeline elements
3157        * can't handle it when READY. */
3158       if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3159                   GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3160                   GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3161
3162         GST_MANIFEST_LOCK (demux);
3163         /* looks like the source can't handle seeks in READY */
3164         g_clear_error (&stream->last_error);
3165         stream->last_error = g_error_new (GST_CORE_ERROR,
3166             GST_CORE_ERROR_NOT_IMPLEMENTED,
3167             "Source element can't handle range requests");
3168         stream->last_ret = GST_FLOW_ERROR;
3169       } else {
3170         GST_MANIFEST_LOCK (demux);
3171       }
3172     } else {
3173       GST_MANIFEST_LOCK (demux);
3174     }
3175
3176     if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3177       stream->download_start_time =
3178           GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3179
3180       /* src element is in state READY. Before we start it, we reset
3181        * download_finished
3182        */
3183       g_mutex_lock (&stream->fragment_download_lock);
3184       stream->download_finished = FALSE;
3185       stream->downloading_first_buffer = TRUE;
3186       g_mutex_unlock (&stream->fragment_download_lock);
3187
3188       GST_MANIFEST_UNLOCK (demux);
3189
3190       if (!gst_element_sync_state_with_parent (stream->src)) {
3191         GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3192         GST_MANIFEST_LOCK (demux);
3193         ret = stream->last_ret = GST_FLOW_ERROR;
3194         return ret;
3195       }
3196
3197       /* wait for the fragment to be completely downloaded */
3198       GST_DEBUG_OBJECT (stream->pad,
3199           "Waiting for %s download to finish: %s", uritype (stream), uri);
3200
3201       g_mutex_lock (&stream->fragment_download_lock);
3202       stream->src_at_ready = FALSE;
3203       if (G_UNLIKELY (stream->cancelled)) {
3204         g_mutex_unlock (&stream->fragment_download_lock);
3205         GST_MANIFEST_LOCK (demux);
3206         ret = stream->last_ret = GST_FLOW_FLUSHING;
3207         return ret;
3208       }
3209       /* download_finished is only set:
3210        * * in ::fragment_download_finish()
3211        * * if EOS is received on the _src pad
3212        * */
3213       while (!stream->cancelled && !stream->download_finished) {
3214         g_cond_wait (&stream->fragment_download_cond,
3215             &stream->fragment_download_lock);
3216       }
3217       g_mutex_unlock (&stream->fragment_download_lock);
3218
3219       GST_DEBUG_OBJECT (stream->pad,
3220           "Finished Waiting for %s download: %s", uritype (stream), uri);
3221
3222       GST_MANIFEST_LOCK (demux);
3223       g_mutex_lock (&stream->fragment_download_lock);
3224       if (G_UNLIKELY (stream->cancelled)) {
3225         ret = stream->last_ret = GST_FLOW_FLUSHING;
3226         g_mutex_unlock (&stream->fragment_download_lock);
3227         return ret;
3228       }
3229       g_mutex_unlock (&stream->fragment_download_lock);
3230
3231       ret = stream->last_ret;
3232
3233       GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3234           uritype (stream), uri, stream->last_ret,
3235           gst_flow_get_name (stream->last_ret));
3236       if (stream->last_ret != GST_FLOW_OK && http_status) {
3237         *http_status = stream->last_status_code;
3238       }
3239     }
3240
3241     /* changing src element state might try to join the streaming thread, so
3242      * we must not hold the manifest lock.
3243      */
3244     GST_MANIFEST_UNLOCK (demux);
3245   } else {
3246     GST_MANIFEST_UNLOCK (demux);
3247     if (stream->last_ret == GST_FLOW_OK)
3248       stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3249     ret = GST_FLOW_CUSTOM_ERROR;
3250   }
3251
3252   stream->src_at_ready = FALSE;
3253
3254   gst_element_set_locked_state (stream->src, TRUE);
3255   gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3256       gst_ad_stream_src_to_ready_cb, stream, NULL);
3257
3258   g_mutex_lock (&stream->fragment_download_lock);
3259   while (!stream->src_at_ready) {
3260     g_cond_wait (&stream->fragment_download_cond,
3261         &stream->fragment_download_lock);
3262   }
3263   g_mutex_unlock (&stream->fragment_download_lock);
3264
3265   gst_element_set_state (stream->src, GST_STATE_READY);
3266
3267   /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3268   GST_MANIFEST_LOCK (demux);
3269   g_mutex_lock (&stream->fragment_download_lock);
3270   if (G_UNLIKELY (stream->cancelled)) {
3271     ret = stream->last_ret = GST_FLOW_FLUSHING;
3272     g_mutex_unlock (&stream->fragment_download_lock);
3273     return ret;
3274   }
3275   g_mutex_unlock (&stream->fragment_download_lock);
3276
3277   /* deactivate and reactivate our ghostpad to make it fresh for a new
3278    * stream */
3279   gst_pad_set_active (stream->internal_pad, FALSE);
3280   gst_pad_set_active (stream->internal_pad, TRUE);
3281
3282   return ret;
3283 }
3284
3285 /* must be called with manifest_lock taken.
3286  * Can temporarily release manifest_lock
3287  */
3288 static GstFlowReturn
3289 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3290     stream)
3291 {
3292   GstAdaptiveDemux *demux = stream->demux;
3293   GstFlowReturn ret = GST_FLOW_OK;
3294
3295   if (stream->fragment.header_uri != NULL) {
3296     GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3297         G_GINT64_FORMAT, stream->fragment.header_uri,
3298         stream->fragment.header_range_start, stream->fragment.header_range_end);
3299
3300     stream->downloading_header = TRUE;
3301     ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3302         stream->fragment.header_uri, stream->fragment.header_range_start,
3303         stream->fragment.header_range_end, NULL);
3304     stream->downloading_header = FALSE;
3305   }
3306
3307   /* check if we have an index */
3308   if (ret == GST_FLOW_OK) {     /* TODO check for other valid types */
3309
3310     if (stream->fragment.index_uri != NULL) {
3311       GST_DEBUG_OBJECT (demux,
3312           "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3313           stream->fragment.index_uri,
3314           stream->fragment.index_range_start, stream->fragment.index_range_end);
3315       stream->downloading_index = TRUE;
3316       ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3317           stream->fragment.index_uri, stream->fragment.index_range_start,
3318           stream->fragment.index_range_end, NULL);
3319       stream->downloading_index = FALSE;
3320     }
3321   }
3322
3323   return ret;
3324 }
3325
3326 /* must be called with manifest_lock taken.
3327  * Can temporarily release manifest_lock
3328  */
3329 static GstFlowReturn
3330 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3331 {
3332   GstAdaptiveDemux *demux = stream->demux;
3333   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3334   gchar *url = NULL;
3335   GstFlowReturn ret;
3336   gboolean retried_once = FALSE, live;
3337   guint http_status;
3338   guint last_status_code;
3339
3340   /* FIXME :  */
3341   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3342   stream->starting_fragment = TRUE;
3343   stream->last_ret = GST_FLOW_OK;
3344   stream->first_fragment_buffer = TRUE;
3345
3346   GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3347       stream->fragment.uri ? "FRAGMENT " : "",
3348       stream->fragment.header_uri ? "HEADER " : "",
3349       stream->fragment.index_uri ? "INDEX" : "");
3350
3351   if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3352       stream->fragment.index_uri == NULL)
3353     goto no_url_error;
3354
3355   if (stream->need_header) {
3356     ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3357     if (ret != GST_FLOW_OK) {
3358       return ret;
3359     }
3360     stream->need_header = FALSE;
3361   }
3362
3363 again:
3364   ret = GST_FLOW_OK;
3365   url = stream->fragment.uri;
3366   GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3367   if (!url)
3368     return ret;
3369
3370   stream->last_ret = GST_FLOW_OK;
3371   http_status = 200;
3372
3373   /* Download the actual fragment, either in fragments or in one go */
3374   if (klass->need_another_chunk && klass->need_another_chunk (stream)
3375       && stream->fragment.chunk_size != 0) {
3376     /* Handle chunk downloading */
3377     gint64 range_start, range_end, chunk_start, chunk_end;
3378     guint64 download_total_bytes;
3379     gint chunk_size = stream->fragment.chunk_size;
3380
3381     range_start = chunk_start = stream->fragment.range_start;
3382     range_end = stream->fragment.range_end;
3383     /* HTTP ranges are inclusive for the end */
3384     if (chunk_size != -1)
3385       chunk_end = range_start + chunk_size - 1;
3386     else
3387       chunk_end = range_end;
3388
3389     if (range_end != -1)
3390       chunk_end = MIN (chunk_end, range_end);
3391
3392     while (!stream->fragment.finished && (chunk_start <= range_end
3393             || range_end == -1)) {
3394       download_total_bytes = stream->download_total_bytes;
3395
3396       ret =
3397           gst_adaptive_demux_stream_download_uri (demux, stream, url,
3398           chunk_start, chunk_end, &http_status);
3399
3400       GST_DEBUG_OBJECT (stream->pad,
3401           "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3402           http_status, gst_flow_get_name (stream->last_ret));
3403
3404       /* Don't retry for any chunks except the first. We would have sent
3405        * data downstream already otherwise and it's difficult to recover
3406        * from that in a meaningful way */
3407       if (chunk_start > range_start)
3408         retried_once = TRUE;
3409
3410       /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3411        * downloading up to -1. We don't know the full duration.
3412        * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3413       if (ret != GST_FLOW_OK && chunk_end == -1) {
3414         break;
3415       } else if (ret != GST_FLOW_OK) {
3416         chunk_end = -1;
3417         stream->last_ret = GST_FLOW_OK;
3418         continue;
3419       }
3420
3421       if (chunk_end == -1)
3422         break;
3423
3424       /* Short read, we're at the end now */
3425       if (stream->download_total_bytes - download_total_bytes <
3426           chunk_end + 1 - chunk_start)
3427         break;
3428
3429       if (!klass->need_another_chunk (stream))
3430         break;
3431
3432       /* HTTP ranges are inclusive for the end */
3433       chunk_start += chunk_size;
3434       chunk_size = stream->fragment.chunk_size;
3435       if (chunk_size != -1)
3436         chunk_end = chunk_start + chunk_size - 1;
3437       else
3438         chunk_end = range_end;
3439
3440       if (range_end != -1)
3441         chunk_end = MIN (chunk_end, range_end);
3442     }
3443   } else {
3444     ret =
3445         gst_adaptive_demux_stream_download_uri (demux, stream, url,
3446         stream->fragment.range_start, stream->fragment.range_end, &http_status);
3447     GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3448         stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3449   }
3450   if (ret == GST_FLOW_OK)
3451     goto beach;
3452
3453   g_mutex_lock (&stream->fragment_download_lock);
3454   if (G_UNLIKELY (stream->cancelled)) {
3455     g_mutex_unlock (&stream->fragment_download_lock);
3456     return ret;
3457   }
3458   g_mutex_unlock (&stream->fragment_download_lock);
3459
3460   /* TODO check if we are truly stopping */
3461   if (ret != GST_FLOW_CUSTOM_ERROR)
3462     goto beach;
3463
3464   last_status_code = stream->last_status_code;
3465   GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3466       last_status_code, stream->download_error_count);
3467
3468   live = gst_adaptive_demux_is_live (demux);
3469   if (!retried_once && ((last_status_code / 100 == 4 && live)
3470           || last_status_code / 100 == 5)) {
3471     /* 4xx/5xx */
3472     /* if current position is before available start, switch to next */
3473     if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3474       goto flushing;
3475
3476     if (live) {
3477       gint64 range_start, range_stop;
3478
3479       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3480               &range_stop))
3481         goto flushing;
3482
3483       if (demux->segment.position < range_start) {
3484         GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3485         stream->last_ret = GST_FLOW_OK;
3486         ret = gst_adaptive_demux_eos_handling (stream);
3487         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3488             gst_flow_get_name (ret));
3489         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3490         ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3491         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3492             gst_flow_get_name (ret));
3493         if (ret == GST_FLOW_OK) {
3494           retried_once = TRUE;
3495           goto again;
3496         }
3497       } else if (demux->segment.position > range_stop) {
3498         /* wait a bit to be in range, we don't have any locks at that point */
3499         gint64 wait_time =
3500             gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3501         if (wait_time > 0) {
3502           gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3503
3504           GST_DEBUG_OBJECT (stream->pad,
3505               "Download waiting for %" GST_TIME_FORMAT,
3506               GST_TIME_ARGS (wait_time));
3507
3508           GST_MANIFEST_UNLOCK (demux);
3509           g_mutex_lock (&stream->fragment_download_lock);
3510           if (G_UNLIKELY (stream->cancelled)) {
3511             g_mutex_unlock (&stream->fragment_download_lock);
3512             GST_MANIFEST_LOCK (demux);
3513             stream->last_ret = GST_FLOW_FLUSHING;
3514             goto flushing;
3515           }
3516           do {
3517             g_cond_wait_until (&stream->fragment_download_cond,
3518                 &stream->fragment_download_lock, end_time);
3519             if (G_UNLIKELY (stream->cancelled)) {
3520               g_mutex_unlock (&stream->fragment_download_lock);
3521               GST_MANIFEST_LOCK (demux);
3522               stream->last_ret = GST_FLOW_FLUSHING;
3523               goto flushing;
3524             }
3525           } while (!stream->download_finished);
3526           g_mutex_unlock (&stream->fragment_download_lock);
3527
3528           GST_MANIFEST_LOCK (demux);
3529         }
3530       }
3531     }
3532
3533   flushing:
3534     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3535       /* looks like there is no way of knowing when a live stream has ended
3536        * Have to assume we are falling behind and cause a manifest reload */
3537       GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3538       return GST_FLOW_EOS;
3539     }
3540   } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3541     /* If this is the last fragment, consider failures EOS and not actual
3542      * errors. Due to rounding errors in the durations, the last fragment
3543      * might not actually exist */
3544     GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3545     return GST_FLOW_EOS;
3546   } else {
3547     /* retry once (same segment) for 5xx (server errors) */
3548     if (!retried_once) {
3549       retried_once = TRUE;
3550       /* wait a short time in case the server needs a bit to recover, we don't
3551        * care if we get woken up before end time. We can use sleep here since
3552        * we're already blocking and just want to wait some time. */
3553       g_usleep (100000);        /* a tenth of a second */
3554       goto again;
3555     }
3556   }
3557
3558 beach:
3559   return ret;
3560
3561 no_url_error:
3562   {
3563     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3564         (_("Failed to get fragment URL.")),
3565         ("An error happened when getting fragment URL"));
3566     gst_task_stop (stream->download_task);
3567     return GST_FLOW_ERROR;
3568   }
3569 }
3570
3571 /* this function will take the manifest_lock and will keep it until the end.
3572  * It will release it temporarily only when going to sleep.
3573  * Every time it takes the manifest_lock, it will check for cancelled condition
3574  */
3575 static void
3576 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3577 {
3578   GstAdaptiveDemux *demux = stream->demux;
3579   GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3580   GstFlowReturn ret;
3581   gboolean live;
3582
3583   GST_LOG_OBJECT (stream->pad, "download loop start");
3584
3585   GST_MANIFEST_LOCK (demux);
3586
3587   g_mutex_lock (&stream->fragment_download_lock);
3588   if (G_UNLIKELY (stream->cancelled)) {
3589     stream->last_ret = GST_FLOW_FLUSHING;
3590     g_mutex_unlock (&stream->fragment_download_lock);
3591     goto cancelled;
3592   }
3593   g_mutex_unlock (&stream->fragment_download_lock);
3594
3595   /* Check if we're done with our segment */
3596   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3597   if (demux->segment.rate > 0) {
3598     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3599         && stream->segment.position >= stream->segment.stop) {
3600       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3601       ret = GST_FLOW_EOS;
3602       gst_task_stop (stream->download_task);
3603       goto end_of_manifest;
3604     }
3605   } else {
3606     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3607         && stream->segment.position <= stream->segment.start) {
3608       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3609       ret = GST_FLOW_EOS;
3610       gst_task_stop (stream->download_task);
3611       goto end_of_manifest;
3612     }
3613   }
3614   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3615
3616   /* Cleanup old streams if any */
3617   if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3618     GList *old_streams = demux->priv->old_streams;
3619     demux->priv->old_streams = NULL;
3620
3621     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
3622     g_list_free_full (old_streams,
3623         (GDestroyNotify) gst_adaptive_demux_stream_free);
3624     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
3625
3626     /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
3627      * Recheck the cancelled flag.
3628      */
3629     g_mutex_lock (&stream->fragment_download_lock);
3630     if (G_UNLIKELY (stream->cancelled)) {
3631       stream->last_ret = GST_FLOW_FLUSHING;
3632       g_mutex_unlock (&stream->fragment_download_lock);
3633       goto cancelled;
3634     }
3635     g_mutex_unlock (&stream->fragment_download_lock);
3636   }
3637
3638   /* Restarting download, figure out new position
3639    * FIXME : Move this to a separate function ? */
3640   if (G_UNLIKELY (stream->restart_download)) {
3641     GstEvent *seg_event;
3642     GstClockTime cur, ts = 0;
3643     gint64 pos;
3644
3645     GST_DEBUG_OBJECT (stream->pad,
3646         "Activating stream due to reconfigure event");
3647
3648     if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
3649       ts = (GstClockTime) pos;
3650       GST_DEBUG_OBJECT (demux, "Downstream position: %"
3651           GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3652     } else {
3653       /* query other pads as some faulty element in the pad's branch might
3654        * reject position queries. This should be better than using the
3655        * demux segment position that can be much ahead */
3656       GList *iter;
3657
3658       for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
3659         GstAdaptiveDemuxStream *cur_stream =
3660             (GstAdaptiveDemuxStream *) iter->data;
3661
3662         if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
3663                 &pos)) {
3664           ts = (GstClockTime) pos;
3665           GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
3666               GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3667           break;
3668         }
3669       }
3670     }
3671
3672     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3673     cur =
3674         gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
3675         stream->segment.position);
3676
3677     /* we might have already pushed this data */
3678     ts = MAX (ts, cur);
3679
3680     GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
3681         "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3682
3683     if (GST_CLOCK_TIME_IS_VALID (ts)) {
3684       GstClockTime offset, period_start;
3685
3686       offset =
3687           gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3688       period_start = gst_adaptive_demux_get_period_start_time (demux);
3689
3690       /* TODO check return */
3691       gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
3692           0, ts, &ts);
3693
3694       stream->segment.position = ts - period_start + offset;
3695     }
3696
3697     /* The stream's segment is still correct except for
3698      * the position, so let's send a new one with the
3699      * updated position */
3700     seg_event = gst_event_new_segment (&stream->segment);
3701     gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
3702     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3703
3704     GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
3705         GST_PTR_FORMAT, seg_event);
3706     gst_pad_push_event (stream->pad, seg_event);
3707
3708     stream->discont = TRUE;
3709     stream->restart_download = FALSE;
3710   }
3711
3712   live = gst_adaptive_demux_is_live (demux);
3713
3714   /* Get information about the fragment to download */
3715   GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3716   ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3717   GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
3718       ret, gst_flow_get_name (ret));
3719   if (ret == GST_FLOW_OK) {
3720
3721     /* wait for live fragments to be available */
3722     if (live) {
3723       gint64 wait_time =
3724           gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3725       if (wait_time > 0) {
3726         GstClockTime end_time =
3727             gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
3728
3729         GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
3730             GST_TIME_ARGS (wait_time));
3731
3732         GST_MANIFEST_UNLOCK (demux);
3733
3734         g_mutex_lock (&stream->fragment_download_lock);
3735         if (G_UNLIKELY (stream->cancelled)) {
3736           g_mutex_unlock (&stream->fragment_download_lock);
3737           GST_MANIFEST_LOCK (demux);
3738           stream->last_ret = GST_FLOW_FLUSHING;
3739           goto cancelled;
3740         }
3741         gst_adaptive_demux_wait_until (demux->realtime_clock,
3742             &stream->fragment_download_cond, &stream->fragment_download_lock,
3743             end_time);
3744         g_mutex_unlock (&stream->fragment_download_lock);
3745
3746         GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
3747
3748         GST_MANIFEST_LOCK (demux);
3749
3750         g_mutex_lock (&stream->fragment_download_lock);
3751         if (G_UNLIKELY (stream->cancelled)) {
3752           stream->last_ret = GST_FLOW_FLUSHING;
3753           g_mutex_unlock (&stream->fragment_download_lock);
3754           goto cancelled;
3755         }
3756         g_mutex_unlock (&stream->fragment_download_lock);
3757       }
3758     }
3759
3760     stream->last_ret = GST_FLOW_OK;
3761
3762     next_download = gst_adaptive_demux_get_monotonic_time (demux);
3763     ret = gst_adaptive_demux_stream_download_fragment (stream);
3764
3765     if (ret == GST_FLOW_FLUSHING) {
3766       g_mutex_lock (&stream->fragment_download_lock);
3767       if (G_UNLIKELY (stream->cancelled)) {
3768         stream->last_ret = GST_FLOW_FLUSHING;
3769         g_mutex_unlock (&stream->fragment_download_lock);
3770         goto cancelled;
3771       }
3772       g_mutex_unlock (&stream->fragment_download_lock);
3773     }
3774
3775   } else {
3776     stream->last_ret = ret;
3777   }
3778
3779   switch (ret) {
3780     case GST_FLOW_OK:
3781       break;                    /* all is good, let's go */
3782     case GST_FLOW_EOS:
3783       GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
3784
3785       /* we push the EOS after releasing the object lock */
3786       if (gst_adaptive_demux_is_live (demux)
3787           && (demux->segment.rate == 1.0
3788               || gst_adaptive_demux_stream_in_live_seek_range (demux,
3789                   stream))) {
3790         GstAdaptiveDemuxClass *demux_class =
3791             GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3792
3793         /* this might be a fragment download error, refresh the manifest, just in case */
3794         if (!demux_class->requires_periodical_playlist_update (demux)) {
3795           ret = gst_adaptive_demux_update_manifest (demux);
3796           break;
3797           /* Wait only if we can ensure current manifest has been expired.
3798            * The meaning "we have next period" *WITH* EOS is that, current
3799            * period has been ended but we can continue to the next period */
3800         } else if (!gst_adaptive_demux_has_next_period (demux) &&
3801             gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
3802           goto end;
3803         }
3804         gst_task_stop (stream->download_task);
3805         if (stream->replaced) {
3806           goto end;
3807         }
3808       } else {
3809         gst_task_stop (stream->download_task);
3810       }
3811
3812       if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
3813         if (gst_adaptive_demux_has_next_period (demux)) {
3814           GST_DEBUG_OBJECT (stream->pad,
3815               "Next period available, not sending EOS");
3816           gst_adaptive_demux_advance_period (demux);
3817           ret = GST_FLOW_OK;
3818         }
3819       }
3820       break;
3821
3822     case GST_FLOW_NOT_LINKED:
3823     {
3824       GstFlowReturn ret;
3825       gst_task_stop (stream->download_task);
3826
3827       ret = gst_adaptive_demux_combine_flows (demux);
3828       if (ret == GST_FLOW_NOT_LINKED) {
3829         GST_ELEMENT_FLOW_ERROR (demux, ret);
3830       }
3831     }
3832       break;
3833
3834     case GST_FLOW_FLUSHING:{
3835       GList *iter;
3836
3837       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
3838         GstAdaptiveDemuxStream *other;
3839
3840         other = iter->data;
3841         gst_task_stop (other->download_task);
3842       }
3843     }
3844       break;
3845
3846     default:
3847       if (ret <= GST_FLOW_ERROR) {
3848         gboolean is_live = gst_adaptive_demux_is_live (demux);
3849         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
3850         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
3851           goto download_error;
3852         }
3853
3854         g_clear_error (&stream->last_error);
3855
3856         /* First try to update the playlist for non-live playlists
3857          * in case the URIs have changed in the meantime. But only
3858          * try it the first time, after that we're going to wait a
3859          * a bit to not flood the server */
3860         if (stream->download_error_count == 1 && !is_live) {
3861           /* TODO hlsdemux had more options to this function (boolean and err) */
3862
3863           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3864             /* Retry immediately, the playlist actually has changed */
3865             GST_DEBUG_OBJECT (demux, "Updated the playlist");
3866             goto end;
3867           }
3868         }
3869
3870         /* Wait half the fragment duration before retrying */
3871         next_download += stream->fragment.duration / 2;
3872
3873         GST_MANIFEST_UNLOCK (demux);
3874
3875         g_mutex_lock (&stream->fragment_download_lock);
3876         if (G_UNLIKELY (stream->cancelled)) {
3877           g_mutex_unlock (&stream->fragment_download_lock);
3878           GST_MANIFEST_LOCK (demux);
3879           stream->last_ret = GST_FLOW_FLUSHING;
3880           goto cancelled;
3881         }
3882         gst_adaptive_demux_wait_until (demux->realtime_clock,
3883             &stream->fragment_download_cond, &stream->fragment_download_lock,
3884             next_download);
3885         g_mutex_unlock (&stream->fragment_download_lock);
3886
3887         GST_DEBUG_OBJECT (demux, "Retrying now");
3888
3889         GST_MANIFEST_LOCK (demux);
3890
3891         g_mutex_lock (&stream->fragment_download_lock);
3892         if (G_UNLIKELY (stream->cancelled)) {
3893           stream->last_ret = GST_FLOW_FLUSHING;
3894           g_mutex_unlock (&stream->fragment_download_lock);
3895           goto cancelled;
3896         }
3897         g_mutex_unlock (&stream->fragment_download_lock);
3898
3899         /* Refetch the playlist now after we waited */
3900         if (!is_live
3901             && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3902           GST_DEBUG_OBJECT (demux, "Updated the playlist");
3903         }
3904         goto end;
3905       }
3906       break;
3907   }
3908
3909 end_of_manifest:
3910   if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3911     if (GST_OBJECT_PARENT (stream->pad) != NULL) {
3912       if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
3913         GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
3914         gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3915       } else {
3916         GST_DEBUG_OBJECT (stream->src,
3917             "Stream is EOS, but we're switching fragments. Not sending.");
3918       }
3919     } else {
3920       GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
3921       goto download_error;
3922     }
3923   }
3924
3925 end:
3926   GST_MANIFEST_UNLOCK (demux);
3927   GST_LOG_OBJECT (stream->pad, "download loop end");
3928   return;
3929
3930 cancelled:
3931   {
3932     GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3933     goto end;
3934   }
3935 download_error:
3936   {
3937     GstMessage *msg;
3938
3939     if (stream->last_error) {
3940       gchar *debug = g_strdup_printf ("Error on stream %s:%s",
3941           GST_DEBUG_PAD_NAME (stream->pad));
3942       msg =
3943           gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
3944           debug);
3945       GST_ERROR_OBJECT (stream->pad, "Download error: %s",
3946           stream->last_error->message);
3947       g_free (debug);
3948     } else {
3949       GError *err =
3950           g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
3951           _("Couldn't download fragments"));
3952       msg =
3953           gst_message_new_error (GST_OBJECT_CAST (demux), err,
3954           "Fragment downloading has failed consecutive times");
3955       g_error_free (err);
3956       GST_ERROR_OBJECT (stream->pad,
3957           "Download error: Couldn't download fragments, too many failures");
3958     }
3959
3960     gst_task_stop (stream->download_task);
3961     if (stream->src) {
3962       GstElement *src = stream->src;
3963
3964       stream->src = NULL;
3965       GST_MANIFEST_UNLOCK (demux);
3966       gst_element_set_locked_state (src, TRUE);
3967       gst_element_set_state (src, GST_STATE_NULL);
3968       gst_bin_remove (GST_BIN_CAST (demux), src);
3969       GST_MANIFEST_LOCK (demux);
3970     }
3971
3972     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3973
3974     goto end;
3975   }
3976 }
3977
3978 static void
3979 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
3980 {
3981   GstClockTime next_update;
3982   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3983
3984   /* Loop for updating of the playlist. This periodically checks if
3985    * the playlist is updated and does so, then signals the streaming
3986    * thread in case it can continue downloading now. */
3987
3988   /* block until the next scheduled update or the signal to quit this thread */
3989   GST_DEBUG_OBJECT (demux, "Started updates task");
3990
3991   GST_MANIFEST_LOCK (demux);
3992
3993   next_update =
3994       gst_adaptive_demux_get_monotonic_time (demux) +
3995       klass->get_manifest_update_interval (demux) * GST_USECOND;
3996
3997   /* Updating playlist only needed for live playlists */
3998   while (gst_adaptive_demux_is_live (demux)) {
3999     GstFlowReturn ret = GST_FLOW_OK;
4000
4001     /* Wait here until we should do the next update or we're cancelled */
4002     GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4003
4004     GST_MANIFEST_UNLOCK (demux);
4005
4006     g_mutex_lock (&demux->priv->updates_timed_lock);
4007     if (demux->priv->stop_updates_task) {
4008       g_mutex_unlock (&demux->priv->updates_timed_lock);
4009       goto quit;
4010     }
4011     gst_adaptive_demux_wait_until (demux->realtime_clock,
4012         &demux->priv->updates_timed_cond,
4013         &demux->priv->updates_timed_lock, next_update);
4014     g_mutex_unlock (&demux->priv->updates_timed_lock);
4015
4016     g_mutex_lock (&demux->priv->updates_timed_lock);
4017     if (demux->priv->stop_updates_task) {
4018       g_mutex_unlock (&demux->priv->updates_timed_lock);
4019       goto quit;
4020     }
4021     g_mutex_unlock (&demux->priv->updates_timed_lock);
4022
4023     GST_MANIFEST_LOCK (demux);
4024
4025     GST_DEBUG_OBJECT (demux, "Updating playlist");
4026
4027     ret = gst_adaptive_demux_update_manifest (demux);
4028
4029     if (ret == GST_FLOW_EOS) {
4030     } else if (ret != GST_FLOW_OK) {
4031       /* update_failed_count is used only here, no need to protect it */
4032       demux->priv->update_failed_count++;
4033       if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4034         GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4035             gst_flow_get_name (ret));
4036         next_update = gst_adaptive_demux_get_monotonic_time (demux)
4037             + klass->get_manifest_update_interval (demux) * GST_USECOND;
4038       } else {
4039         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4040             (_("Internal data stream error.")), ("Could not update playlist"));
4041         GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4042         gst_task_stop (demux->priv->updates_task);
4043         GST_MANIFEST_UNLOCK (demux);
4044         goto end;
4045       }
4046     } else {
4047       GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4048       demux->priv->update_failed_count = 0;
4049       next_update =
4050           gst_adaptive_demux_get_monotonic_time (demux) +
4051           klass->get_manifest_update_interval (demux) * GST_USECOND;
4052
4053       /* Wake up download tasks */
4054       g_mutex_lock (&demux->priv->manifest_update_lock);
4055       g_cond_broadcast (&demux->priv->manifest_cond);
4056       g_mutex_unlock (&demux->priv->manifest_update_lock);
4057     }
4058   }
4059
4060   GST_MANIFEST_UNLOCK (demux);
4061
4062 quit:
4063   {
4064     GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4065   }
4066
4067 end:
4068   {
4069     return;
4070   }
4071 }
4072
4073 /* must be called with manifest_lock taken */
4074 static gboolean
4075 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4076     GstEvent * event)
4077 {
4078   gboolean ret;
4079   GstPad *pad;
4080   GstAdaptiveDemux *demux = stream->demux;
4081
4082   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4083     stream->eos = TRUE;
4084   }
4085
4086   pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4087
4088   /* Can't push events holding the manifest lock */
4089   GST_MANIFEST_UNLOCK (demux);
4090
4091   GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4092       "Pushing event %" GST_PTR_FORMAT, event);
4093
4094   ret = gst_pad_push_event (pad, event);
4095
4096   gst_object_unref (pad);
4097
4098   GST_MANIFEST_LOCK (demux);
4099
4100   return ret;
4101 }
4102
4103 /* must be called with manifest_lock taken */
4104 static gboolean
4105 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4106 {
4107   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4108
4109   if (klass->is_live)
4110     return klass->is_live (demux);
4111   return FALSE;
4112 }
4113
4114 /* must be called with manifest_lock taken */
4115 static GstFlowReturn
4116 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4117     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4118     GstClockTime ts, GstClockTime * final_ts)
4119 {
4120   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4121
4122   if (klass->stream_seek)
4123     return klass->stream_seek (stream, forward, flags, ts, final_ts);
4124   return GST_FLOW_ERROR;
4125 }
4126
4127 /* must be called with manifest_lock taken */
4128 static gboolean
4129 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4130     GstAdaptiveDemuxStream * stream)
4131 {
4132   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4133   gboolean ret = TRUE;
4134
4135   if (klass->stream_has_next_fragment)
4136     ret = klass->stream_has_next_fragment (stream);
4137
4138   return ret;
4139 }
4140
4141 /* must be called with manifest_lock taken */
4142 /* Called from:
4143  *  the ::finish_fragment() handlers when an *actual* fragment is done
4144  *   */
4145 GstFlowReturn
4146 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4147     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4148 {
4149   GstFlowReturn ret;
4150
4151   if (stream->last_ret == GST_FLOW_OK) {
4152     stream->last_ret =
4153         gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4154         duration);
4155   }
4156   ret = stream->last_ret;
4157
4158   return ret;
4159 }
4160
4161 /* must be called with manifest_lock taken */
4162 GstFlowReturn
4163 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4164     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4165 {
4166   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4167   GstFlowReturn ret;
4168
4169   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4170
4171   GST_LOG_OBJECT (stream->pad,
4172       "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4173       GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4174
4175   stream->download_error_count = 0;
4176   g_clear_error (&stream->last_error);
4177
4178   /* FIXME - url has no indication of byte ranges for subsegments */
4179   /* FIXME : All those time statistics are biased, since they are calculated
4180    * *AFTER* the queue2, which might be blocking. They should ideally be
4181    * calculated *before* queue2 in the uri_handler_probe */
4182   gst_element_post_message (GST_ELEMENT_CAST (demux),
4183       gst_message_new_element (GST_OBJECT_CAST (demux),
4184           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4185               "manifest-uri", G_TYPE_STRING,
4186               demux->manifest_uri, "uri", G_TYPE_STRING,
4187               stream->fragment.uri, "fragment-start-time",
4188               GST_TYPE_CLOCK_TIME, stream->download_start_time,
4189               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4190               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4191               stream->download_total_bytes, "fragment-download-time",
4192               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4193
4194   /* Don't update to the end of the segment if in reverse playback */
4195   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4196   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4197     GstClockTime offset =
4198         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4199     GstClockTime period_start =
4200         gst_adaptive_demux_get_period_start_time (demux);
4201
4202     stream->segment.position += duration;
4203
4204     /* Convert from position inside the stream's segment to the demuxer's
4205      * segment, they are not necessarily the same */
4206     if (stream->segment.position - offset + period_start >
4207         demux->segment.position)
4208       demux->segment.position =
4209           stream->segment.position - offset + period_start;
4210   }
4211   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4212
4213   /* When advancing with a non 1.0 rate on live streams, we need to check
4214    * the live seeking range again to make sure we can still advance to
4215    * that position */
4216   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4217     if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4218       ret = GST_FLOW_EOS;
4219     else
4220       ret = klass->stream_advance_fragment (stream);
4221   } else if (gst_adaptive_demux_is_live (demux)
4222       || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4223     ret = klass->stream_advance_fragment (stream);
4224   } else {
4225     ret = GST_FLOW_EOS;
4226   }
4227
4228   stream->download_start_time =
4229       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4230
4231   if (ret == GST_FLOW_OK) {
4232     if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4233             gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4234       stream->need_header = TRUE;
4235       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4236     }
4237
4238     /* the subclass might want to switch pads */
4239     if (G_UNLIKELY (demux->next_streams)) {
4240       GList *iter;
4241       gboolean can_expose = TRUE;
4242
4243       gst_task_stop (stream->download_task);
4244
4245       ret = GST_FLOW_EOS;
4246
4247       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4248         /* Only expose if all streams are now cancelled or finished downloading */
4249         GstAdaptiveDemuxStream *other = iter->data;
4250         if (other != stream) {
4251           g_mutex_lock (&other->fragment_download_lock);
4252           can_expose &= (other->cancelled == TRUE
4253               || other->download_finished == TRUE);
4254           g_mutex_unlock (&other->fragment_download_lock);
4255         }
4256       }
4257
4258       if (can_expose) {
4259         GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4260             "to do bitrate switching");
4261         gst_adaptive_demux_prepare_streams (demux, FALSE);
4262         gst_adaptive_demux_start_tasks (demux, TRUE);
4263       } else {
4264         GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4265       }
4266     }
4267   }
4268
4269   return ret;
4270 }
4271
4272 /* must be called with manifest_lock taken */
4273 static gboolean
4274 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4275     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4276 {
4277   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4278
4279   if (klass->stream_select_bitrate)
4280     return klass->stream_select_bitrate (stream, bitrate);
4281   return FALSE;
4282 }
4283
4284 /* must be called with manifest_lock taken */
4285 static GstFlowReturn
4286 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4287     GstAdaptiveDemuxStream * stream)
4288 {
4289   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4290   GstFlowReturn ret;
4291
4292   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4293       GST_FLOW_ERROR);
4294
4295   /* Make sure the sub-class will update bitrate, or else
4296    * we will later */
4297   stream->fragment.bitrate = 0;
4298   stream->fragment.finished = FALSE;
4299
4300   GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4301       GST_TIME_ARGS (stream->segment.position));
4302
4303   ret = klass->stream_update_fragment_info (stream);
4304
4305   GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4306       stream->fragment.uri);
4307   if (ret == GST_FLOW_OK) {
4308     GST_LOG_OBJECT (stream->pad,
4309         "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4310         GST_TIME_ARGS (stream->fragment.timestamp),
4311         GST_TIME_ARGS (stream->fragment.duration));
4312     GST_LOG_OBJECT (stream->pad,
4313         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4314         stream->fragment.range_start, stream->fragment.range_end);
4315   }
4316
4317   return ret;
4318 }
4319
4320 /* must be called with manifest_lock taken */
4321 static gint64
4322 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4323     demux, GstAdaptiveDemuxStream * stream)
4324 {
4325   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4326
4327   if (klass->stream_get_fragment_waiting_time)
4328     return klass->stream_get_fragment_waiting_time (stream);
4329   return 0;
4330 }
4331
4332 /* must be called with manifest_lock taken */
4333 static GstFlowReturn
4334 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4335 {
4336   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4337   GstFragment *download;
4338   GstBuffer *buffer;
4339   GstFlowReturn ret;
4340   GError *error = NULL;
4341
4342   download = gst_uri_downloader_fetch_uri (demux->downloader,
4343       demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4344   if (download) {
4345     g_free (demux->manifest_uri);
4346     g_free (demux->manifest_base_uri);
4347     if (download->redirect_permanent && download->redirect_uri) {
4348       demux->manifest_uri = g_strdup (download->redirect_uri);
4349       demux->manifest_base_uri = NULL;
4350     } else {
4351       demux->manifest_uri = g_strdup (download->uri);
4352       demux->manifest_base_uri = g_strdup (download->redirect_uri);
4353     }
4354
4355     buffer = gst_fragment_get_buffer (download);
4356     g_object_unref (download);
4357     ret = klass->update_manifest_data (demux, buffer);
4358     gst_buffer_unref (buffer);
4359     /* FIXME: Should the manifest uri vars be reverted to original
4360      * values if updating fails? */
4361   } else {
4362     GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4363         error->message);
4364     ret = GST_FLOW_NOT_LINKED;
4365   }
4366   g_clear_error (&error);
4367
4368   return ret;
4369 }
4370
4371 /* must be called with manifest_lock taken */
4372 static GstFlowReturn
4373 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4374 {
4375   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4376   GstFlowReturn ret;
4377
4378   ret = klass->update_manifest (demux);
4379
4380   if (ret == GST_FLOW_OK) {
4381     GstClockTime duration;
4382     /* Send an updated duration message */
4383     duration = klass->get_duration (demux);
4384     if (duration != GST_CLOCK_TIME_NONE) {
4385       GST_DEBUG_OBJECT (demux,
4386           "Sending duration message : %" GST_TIME_FORMAT,
4387           GST_TIME_ARGS (duration));
4388       gst_element_post_message (GST_ELEMENT (demux),
4389           gst_message_new_duration_changed (GST_OBJECT (demux)));
4390     } else {
4391       GST_DEBUG_OBJECT (demux,
4392           "Duration unknown, can not send the duration message");
4393     }
4394
4395     /* If a manifest changes it's liveness or periodic updateness, we need
4396      * to start/stop the manifest update task appropriately */
4397     /* Keep this condition in sync with the one in
4398      * gst_adaptive_demux_start_manifest_update_task()
4399      */
4400     if (gst_adaptive_demux_is_live (demux) &&
4401         klass->requires_periodical_playlist_update (demux)) {
4402       gst_adaptive_demux_start_manifest_update_task (demux);
4403     } else {
4404       gst_adaptive_demux_stop_manifest_update_task (demux);
4405     }
4406   }
4407
4408   return ret;
4409 }
4410
4411 void
4412 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4413 {
4414   g_free (f->uri);
4415   f->uri = NULL;
4416   f->range_start = 0;
4417   f->range_end = -1;
4418
4419   g_free (f->header_uri);
4420   f->header_uri = NULL;
4421   f->header_range_start = 0;
4422   f->header_range_end = -1;
4423
4424   g_free (f->index_uri);
4425   f->index_uri = NULL;
4426   f->index_range_start = 0;
4427   f->index_range_end = -1;
4428
4429   f->finished = FALSE;
4430 }
4431
4432 /* must be called with manifest_lock taken */
4433 static gboolean
4434 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4435 {
4436   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4437   gboolean ret = FALSE;
4438
4439   if (klass->has_next_period)
4440     ret = klass->has_next_period (demux);
4441   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4442   return ret;
4443 }
4444
4445 /* must be called with manifest_lock taken */
4446 static void
4447 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4448 {
4449   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4450
4451   g_return_if_fail (klass->advance_period != NULL);
4452
4453   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4454   klass->advance_period (demux);
4455   gst_adaptive_demux_prepare_streams (demux, FALSE);
4456   gst_adaptive_demux_start_tasks (demux, TRUE);
4457 }
4458
4459 /**
4460  * gst_adaptive_demux_get_monotonic_time:
4461  * Returns: a monotonically increasing time, using the system realtime clock
4462  */
4463 GstClockTime
4464 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4465 {
4466   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4467   return gst_clock_get_time (demux->realtime_clock);
4468 }
4469
4470 /**
4471  * gst_adaptive_demux_get_client_now_utc:
4472  * @demux: #GstAdaptiveDemux
4473  * Returns: the client's estimate of UTC
4474  *
4475  * Used to find the client's estimate of UTC, using the system realtime clock.
4476  */
4477 GDateTime *
4478 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4479 {
4480   GstClockTime rtc_now;
4481   gint64 utc_now;
4482   GTimeVal gtv;
4483
4484   rtc_now = gst_clock_get_time (demux->realtime_clock);
4485   utc_now = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4486   gtv.tv_sec = utc_now / G_TIME_SPAN_SECOND;
4487   gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
4488   return g_date_time_new_from_timeval_utc (&gtv);
4489 }
4490
4491 /**
4492  * gst_adaptive_demux_is_running
4493  * @demux: #GstAdaptiveDemux
4494  * Returns: whether the demuxer is processing data
4495  *
4496  * Returns FALSE if shutdown has started (transitioning down from
4497  * PAUSED), otherwise TRUE.
4498  */
4499 gboolean
4500 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4501 {
4502   return g_atomic_int_get (&demux->running);
4503 }
4504
4505 static GstAdaptiveDemuxTimer *
4506 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4507 {
4508   GstAdaptiveDemuxTimer *timer;
4509
4510   timer = g_slice_new (GstAdaptiveDemuxTimer);
4511   timer->fired = FALSE;
4512   timer->cond = cond;
4513   timer->mutex = mutex;
4514   timer->ref_count = 1;
4515   return timer;
4516 }
4517
4518 static GstAdaptiveDemuxTimer *
4519 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4520 {
4521   g_return_val_if_fail (timer != NULL, NULL);
4522   g_atomic_int_inc (&timer->ref_count);
4523   return timer;
4524 }
4525
4526 static void
4527 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4528 {
4529   g_return_if_fail (timer != NULL);
4530   if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4531     g_slice_free (GstAdaptiveDemuxTimer, timer);
4532   }
4533 }
4534
4535 /* gst_adaptive_demux_wait_until:
4536  * A replacement for g_cond_wait_until that uses the clock rather
4537  * than system time to control the duration of the sleep. Typically
4538  * clock is actually a #GstSystemClock, in which case this function
4539  * behaves exactly like g_cond_wait_until. Inside unit tests,
4540  * the clock is typically a #GstTestClock, which allows tests to run
4541  * in non-realtime.
4542  * This function must be called with mutex held.
4543  */
4544 static gboolean
4545 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4546     GstClockTime end_time)
4547 {
4548   GstAdaptiveDemuxTimer *timer;
4549   gboolean fired;
4550   GstClockReturn res;
4551
4552   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4553     /* for an invalid time, gst_clock_id_wait_async will try to call
4554      * gst_adaptive_demux_clock_callback from the current thread.
4555      * It still holds the mutex while doing that, so it will deadlock.
4556      * g_cond_wait_until would return immediately with false, so we'll do the same.
4557      */
4558     return FALSE;
4559   }
4560   timer = gst_adaptive_demux_timer_new (cond, mutex);
4561   timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4562   res =
4563       gst_clock_id_wait_async (timer->clock_id,
4564       gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4565       (GDestroyNotify) gst_adaptive_demux_timer_unref);
4566   /* clock does not support asynchronously wait. Assert and return */
4567   if (res == GST_CLOCK_UNSUPPORTED) {
4568     gst_clock_id_unref (timer->clock_id);
4569     gst_adaptive_demux_timer_unref (timer);
4570     g_return_val_if_reached (TRUE);
4571   }
4572   g_assert (!timer->fired);
4573   /* the gst_adaptive_demux_clock_callback() will signal the
4574    * cond when the clock's single shot timer fires, or the cond will be
4575    * signalled by another thread that wants to cause this wait to finish
4576    * early (e.g. to terminate the waiting thread).
4577    * There is no need for a while loop here, because that logic is
4578    * implemented by the function calling gst_adaptive_demux_wait_until() */
4579   g_cond_wait (cond, mutex);
4580   fired = timer->fired;
4581   if (!fired)
4582     gst_clock_id_unschedule (timer->clock_id);
4583   gst_clock_id_unref (timer->clock_id);
4584   gst_adaptive_demux_timer_unref (timer);
4585   return !fired;
4586 }
4587
4588 static gboolean
4589 gst_adaptive_demux_clock_callback (GstClock * clock,
4590     GstClockTime time, GstClockID id, gpointer user_data)
4591 {
4592   GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4593   g_return_val_if_fail (timer != NULL, FALSE);
4594   g_mutex_lock (timer->mutex);
4595   timer->fired = TRUE;
4596   g_cond_signal (timer->cond);
4597   g_mutex_unlock (timer->mutex);
4598   return TRUE;
4599 }