adaptivedemux: Handle live duration queries
[platform/upstream/gstreamer.git] / gst-libs / gst / adaptivedemux / gstadaptivedemux.c
1 /* GStreamer
2  *
3  * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4  *   Author: Thiago Santos <thiagoss@osg.samsung.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 /**
23  * SECTION:gstadaptivedemux
24  * @short_description: Base class for adaptive demuxers
25  *
26  * What is an adaptive demuxer?
27  * Adaptive demuxers are special demuxers in the sense that they don't
28  * actually demux data received from upstream but download the data
29  * themselves.
30  *
31  * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32  * a set of fragments. The manifest describes the available media and
33  * the sequence of fragments to use. Each fragment contains a small
34  * part of the media (typically only a few seconds). It is possible for
35  * the manifest to have the same media available in different configurations
36  * (bitrates for example) so that the client can select the one that
37  * best suits its scenario (network fluctuation, hardware requirements...).
38  * It is possible to switch from one representation of the media to another
39  * during playback. That's why it is called 'adaptive', because it can be
40  * adapted to the client's needs.
41  *
42  * Architectural overview:
43  * The manifest is received by the demuxer in its sink pad and, upon receiving
44  * EOS, it parses the manifest and exposes the streams available in it. For
45  * each stream a source element will be created and will download the list
46  * of fragments one by one. Once a fragment is finished downloading, the next
47  * URI is set to the source element and it starts fetching it and pushing
48  * through the stream's pad. This implies that each stream is independent from
49  * each other as it runs on a separate thread.
50  *
51  * After downloading each fragment, the download rate of it is calculated and
52  * the demuxer has a chance to switch to a different bitrate if needed. The
53  * switch can be done by simply pushing a new caps before the next fragment
54  * when codecs are the same, or by exposing a new pad group if it needs
55  * a codec change.
56  *
57  * Extra features:
58  * - Not linked streams: Streams that are not-linked have their download threads
59  *                       interrupted to save network bandwidth. When they are
60  *                       relinked a reconfigure event is received and the
61  *                       stream is restarted.
62  *
63  * Subclasses:
64  * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65  * about the intrinsics of the subclass formats, so the subclasses are
66  * responsible for maintaining the manifest data structures and stream
67  * information.
68  */
69
70 /*
71 MT safety.
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
77
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
85
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
103
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
115 */
116
117 #ifdef HAVE_CONFIG_H
118 #include "config.h"
119 #endif
120
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
124
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
127
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024    /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
134
135 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
136 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
137     GST_TRACE("Locking from thread %p", g_thread_self()); \
138     g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
139     GST_TRACE("Locked from thread %p", g_thread_self()); \
140  } G_STMT_END
141
142 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
143     GST_TRACE("Unlocking from thread %p", g_thread_self()); \
144     g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
145  } G_STMT_END
146
147 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
148 #define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
149 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
150
151 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
152 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
153 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
154
155 enum
156 {
157   PROP_0,
158   PROP_CONNECTION_SPEED,
159   PROP_BITRATE_LIMIT,
160   PROP_LAST
161 };
162
163 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
164 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
165
166 struct _GstAdaptiveDemuxPrivate
167 {
168   GstAdapter *input_adapter;    /* protected by manifest_lock */
169   gint have_manifest;           /* MT safe */
170
171   GList *old_streams;           /* protected by manifest_lock */
172
173   GstTask *updates_task;        /* MT safe */
174   GRecMutex updates_lock;
175   GMutex updates_timed_lock;
176   GCond updates_timed_cond;     /* protected by updates_timed_lock */
177   gboolean stop_updates_task;   /* protected by updates_timed_lock */
178
179   /* used only from updates_task, no need to protect it */
180   gint update_failed_count;
181
182   guint32 segment_seqnum;       /* protected by manifest_lock */
183
184   /* main lock used to protect adaptive demux and all its streams.
185    * It serializes the adaptive demux public API.
186    */
187   GRecMutex manifest_lock;
188
189   /* condition to wait for manifest updates on a live stream.
190    * In order to signal the manifest_cond, the caller needs to hold both
191    * manifest_lock and manifest_update_lock (taken in this order)
192    */
193   GCond manifest_cond;
194   GMutex manifest_update_lock;
195
196   /* Lock and condition for prerolling streams before exposing */
197   GMutex preroll_lock;
198   GCond preroll_cond;
199   gint preroll_pending;
200
201   GMutex api_lock;
202
203   /* Protects demux and stream segment information
204    * Needed because seeks can update segment information
205    * without needing to stop tasks when they just want to
206    * update the segment boundaries */
207   GMutex segment_lock;
208 };
209
210 typedef struct _GstAdaptiveDemuxTimer
211 {
212   volatile gint ref_count;
213   GCond *cond;
214   GMutex *mutex;
215   GstClockID clock_id;
216   gboolean fired;
217 } GstAdaptiveDemuxTimer;
218
219 static GstBinClass *parent_class = NULL;
220 static gint private_offset = 0;
221
222 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
223 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
224     GstAdaptiveDemuxClass * klass);
225 static void gst_adaptive_demux_finalize (GObject * object);
226 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
227     element, GstStateChange transition);
228
229 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
230
231 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
232     GstEvent * event);
233 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
234     GstObject * parent, GstBuffer * buffer);
235 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
236     GstQuery * query);
237 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
238     GstEvent * event);
239
240 static gboolean
241 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
242
243 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
244 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
245     stream);
246 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
247 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
248     gboolean first_and_live);
249 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
250 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
251 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
252     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
253     GstClockTime ts, GstClockTime * final_ts);
254 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
255     demux, GstAdaptiveDemuxStream * stream);
256 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
257     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
258 static GstFlowReturn
259 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
260     GstAdaptiveDemuxStream * stream);
261 static gint64
262 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
263     GstAdaptiveDemuxStream * stream);
264 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
265     demux);
266 static GstFlowReturn
267 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
268 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
269 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
270
271 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
272 static GstFlowReturn
273 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
274     GstEvent * event);
275
276 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
277     demux);
278 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
279     demux);
280
281 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
282     gboolean start_preroll_streams);
283 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
284     gboolean stop_updates);
285 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
286     demux);
287 static void
288 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
289     stream, GstFlowReturn ret, GError * err);
290 static GstFlowReturn
291 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
292     GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
293 static GstFlowReturn
294 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
295     GstAdaptiveDemuxStream * stream);
296 static GstFlowReturn
297 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
298     GstAdaptiveDemuxStream * stream, GstClockTime duration);
299 static gboolean
300 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
301     GstClockTime end_time);
302 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
303     GstClockTime time, GstClockID id, gpointer user_data);
304 static gboolean
305 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
306     * demux);
307
308 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
309  * method to get to the padtemplates */
310 GType
311 gst_adaptive_demux_get_type (void)
312 {
313   static volatile gsize type = 0;
314
315   if (g_once_init_enter (&type)) {
316     GType _type;
317     static const GTypeInfo info = {
318       sizeof (GstAdaptiveDemuxClass),
319       NULL,
320       NULL,
321       (GClassInitFunc) gst_adaptive_demux_class_init,
322       NULL,
323       NULL,
324       sizeof (GstAdaptiveDemux),
325       0,
326       (GInstanceInitFunc) gst_adaptive_demux_init,
327     };
328
329     _type = g_type_register_static (GST_TYPE_BIN,
330         "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
331
332     private_offset =
333         g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
334
335     g_once_init_leave (&type, _type);
336   }
337   return type;
338 }
339
340 static inline GstAdaptiveDemuxPrivate *
341 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
342 {
343   return (G_STRUCT_MEMBER_P (self, private_offset));
344 }
345
346 static void
347 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
348     const GValue * value, GParamSpec * pspec)
349 {
350   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
351
352   GST_API_LOCK (demux);
353   GST_MANIFEST_LOCK (demux);
354
355   switch (prop_id) {
356     case PROP_CONNECTION_SPEED:
357       demux->connection_speed = g_value_get_uint (value) * 1000;
358       GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
359           demux->connection_speed);
360       break;
361     case PROP_BITRATE_LIMIT:
362       demux->bitrate_limit = g_value_get_float (value);
363       break;
364     default:
365       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
366       break;
367   }
368
369   GST_MANIFEST_UNLOCK (demux);
370   GST_API_UNLOCK (demux);
371 }
372
373 static void
374 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
375     GValue * value, GParamSpec * pspec)
376 {
377   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
378
379   GST_MANIFEST_LOCK (demux);
380
381   switch (prop_id) {
382     case PROP_CONNECTION_SPEED:
383       g_value_set_uint (value, demux->connection_speed / 1000);
384       break;
385     case PROP_BITRATE_LIMIT:
386       g_value_set_float (value, demux->bitrate_limit);
387       break;
388     default:
389       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
390       break;
391   }
392
393   GST_MANIFEST_UNLOCK (demux);
394 }
395
396 static void
397 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
398 {
399   GObjectClass *gobject_class;
400   GstElementClass *gstelement_class;
401   GstBinClass *gstbin_class;
402
403   gobject_class = G_OBJECT_CLASS (klass);
404   gstelement_class = GST_ELEMENT_CLASS (klass);
405   gstbin_class = GST_BIN_CLASS (klass);
406
407   GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
408       "Base Adaptive Demux");
409
410   parent_class = g_type_class_peek_parent (klass);
411
412   if (private_offset != 0)
413     g_type_class_adjust_private_offset (klass, &private_offset);
414
415   gobject_class->set_property = gst_adaptive_demux_set_property;
416   gobject_class->get_property = gst_adaptive_demux_get_property;
417   gobject_class->finalize = gst_adaptive_demux_finalize;
418
419   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
420       g_param_spec_uint ("connection-speed", "Connection Speed",
421           "Network connection speed in kbps (0 = calculate from downloaded"
422           " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424
425   /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
426   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
427       g_param_spec_float ("bitrate-limit",
428           "Bitrate limit in %",
429           "Limit of the available bitrate to use when switching to alternates.",
430           0, 1, DEFAULT_BITRATE_LIMIT,
431           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432
433   gstelement_class->change_state = gst_adaptive_demux_change_state;
434
435   gstbin_class->handle_message = gst_adaptive_demux_handle_message;
436
437   klass->data_received = gst_adaptive_demux_stream_data_received_default;
438   klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
439   klass->update_manifest = gst_adaptive_demux_update_manifest_default;
440   klass->requires_periodical_playlist_update =
441       gst_adaptive_demux_requires_periodical_playlist_update_default;
442
443 }
444
445 static void
446 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
447     GstAdaptiveDemuxClass * klass)
448 {
449   GstPadTemplate *pad_template;
450   GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
451   GObjectClass *gobject_class;
452
453   GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
454
455   demux->priv = gst_adaptive_demux_get_instance_private (demux);
456   demux->priv->input_adapter = gst_adapter_new ();
457   demux->downloader = gst_uri_downloader_new ();
458   gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
459   demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
460   demux->priv->segment_seqnum = gst_util_seqnum_next ();
461   demux->have_group_id = FALSE;
462   demux->group_id = G_MAXUINT;
463
464   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
465
466   gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
467       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
468
469   demux->realtime_clock = gst_system_clock_obtain ();
470   g_assert (demux->realtime_clock != NULL);
471   gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
472   if (g_object_class_find_property (gobject_class, "clock-type")) {
473     g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
474   } else {
475     GST_WARNING_OBJECT (demux,
476         "System clock does not have clock-type property");
477   }
478   if (clock_type == GST_CLOCK_TYPE_REALTIME) {
479     demux->clock_offset = 0;
480   } else {
481     GDateTime *utc_now;
482     GstClockTime rtc_now;
483
484     utc_now = g_date_time_new_now_utc ();
485     rtc_now = gst_clock_get_time (demux->realtime_clock);
486     demux->clock_offset =
487         g_date_time_to_unix (utc_now) * G_TIME_SPAN_SECOND +
488         g_date_time_get_microsecond (utc_now) - GST_TIME_AS_USECONDS (rtc_now);
489     g_date_time_unref (utc_now);
490   }
491   g_rec_mutex_init (&demux->priv->updates_lock);
492   demux->priv->updates_task =
493       gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
494       demux, NULL);
495   gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
496
497   g_mutex_init (&demux->priv->updates_timed_lock);
498   g_cond_init (&demux->priv->updates_timed_cond);
499
500   g_cond_init (&demux->priv->manifest_cond);
501   g_mutex_init (&demux->priv->manifest_update_lock);
502
503   g_rec_mutex_init (&demux->priv->manifest_lock);
504   g_mutex_init (&demux->priv->api_lock);
505   g_mutex_init (&demux->priv->segment_lock);
506
507   g_cond_init (&demux->priv->preroll_cond);
508   g_mutex_init (&demux->priv->preroll_lock);
509
510   pad_template =
511       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
512   g_return_if_fail (pad_template != NULL);
513
514   demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
515   gst_pad_set_event_function (demux->sinkpad,
516       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
517   gst_pad_set_chain_function (demux->sinkpad,
518       GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
519
520   /* Properties */
521   demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
522   demux->connection_speed = DEFAULT_CONNECTION_SPEED;
523
524   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
525 }
526
527 static void
528 gst_adaptive_demux_finalize (GObject * object)
529 {
530   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
531   GstAdaptiveDemuxPrivate *priv = demux->priv;
532
533   GST_DEBUG_OBJECT (object, "finalize");
534
535   g_object_unref (priv->input_adapter);
536   g_object_unref (demux->downloader);
537
538   g_mutex_clear (&priv->updates_timed_lock);
539   g_cond_clear (&priv->updates_timed_cond);
540   g_mutex_clear (&demux->priv->manifest_update_lock);
541   g_cond_clear (&demux->priv->manifest_cond);
542   g_object_unref (priv->updates_task);
543   g_rec_mutex_clear (&priv->updates_lock);
544   g_rec_mutex_clear (&demux->priv->manifest_lock);
545   g_mutex_clear (&demux->priv->api_lock);
546   g_mutex_clear (&demux->priv->segment_lock);
547   if (demux->realtime_clock) {
548     gst_object_unref (demux->realtime_clock);
549     demux->realtime_clock = NULL;
550   }
551
552   g_cond_clear (&demux->priv->preroll_cond);
553   g_mutex_clear (&demux->priv->preroll_lock);
554
555   G_OBJECT_CLASS (parent_class)->finalize (object);
556 }
557
558 static GstStateChangeReturn
559 gst_adaptive_demux_change_state (GstElement * element,
560     GstStateChange transition)
561 {
562   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
563   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
564
565   switch (transition) {
566     case GST_STATE_CHANGE_PAUSED_TO_READY:
567       if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
568         GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
569       gst_uri_downloader_cancel (demux->downloader);
570
571       GST_API_LOCK (demux);
572       GST_MANIFEST_LOCK (demux);
573       gst_adaptive_demux_reset (demux);
574       GST_MANIFEST_UNLOCK (demux);
575       GST_API_UNLOCK (demux);
576       break;
577     case GST_STATE_CHANGE_READY_TO_PAUSED:
578       GST_API_LOCK (demux);
579       GST_MANIFEST_LOCK (demux);
580       gst_adaptive_demux_reset (demux);
581       /* Clear "cancelled" flag in uridownloader since subclass might want to
582        * use uridownloader to fetch another manifest */
583       gst_uri_downloader_reset (demux->downloader);
584       if (g_atomic_int_get (&demux->priv->have_manifest))
585         gst_adaptive_demux_start_manifest_update_task (demux);
586       GST_MANIFEST_UNLOCK (demux);
587       GST_API_UNLOCK (demux);
588       if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
589         GST_DEBUG_OBJECT (demux, "demuxer has started running");
590       break;
591     default:
592       break;
593   }
594
595   /* this must be run without MANIFEST_LOCK taken.
596    * For PLAYING to PLAYING state changes, it will want to take a lock in
597    * src element and that lock is held while the streaming thread is running.
598    * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
599    */
600   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
601
602   return result;
603 }
604
605 static gboolean
606 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
607     GstEvent * event)
608 {
609   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
610   gboolean ret;
611
612   switch (event->type) {
613     case GST_EVENT_FLUSH_STOP:{
614       GST_API_LOCK (demux);
615       GST_MANIFEST_LOCK (demux);
616
617       gst_adaptive_demux_reset (demux);
618
619       ret = gst_pad_event_default (pad, parent, event);
620
621       GST_MANIFEST_UNLOCK (demux);
622       GST_API_UNLOCK (demux);
623
624       return ret;
625     }
626     case GST_EVENT_EOS:{
627       GstAdaptiveDemuxClass *demux_class;
628       GstQuery *query;
629       gboolean query_res;
630       gboolean ret = TRUE;
631       gsize available;
632       GstBuffer *manifest_buffer;
633
634       GST_API_LOCK (demux);
635       GST_MANIFEST_LOCK (demux);
636
637       demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
638
639       available = gst_adapter_available (demux->priv->input_adapter);
640
641       if (available == 0) {
642         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
643         ret = gst_pad_event_default (pad, parent, event);
644
645         GST_MANIFEST_UNLOCK (demux);
646         GST_API_UNLOCK (demux);
647
648         return ret;
649       }
650
651       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
652
653       /* Need to get the URI to use it as a base to generate the fragment's
654        * uris */
655       query = gst_query_new_uri ();
656       query_res = gst_pad_peer_query (pad, query);
657       if (query_res) {
658         gchar *uri, *redirect_uri;
659         gboolean permanent;
660
661         gst_query_parse_uri (query, &uri);
662         gst_query_parse_uri_redirection (query, &redirect_uri);
663         gst_query_parse_uri_redirection_permanent (query, &permanent);
664
665         if (permanent && redirect_uri) {
666           demux->manifest_uri = redirect_uri;
667           demux->manifest_base_uri = NULL;
668           g_free (uri);
669         } else {
670           demux->manifest_uri = uri;
671           demux->manifest_base_uri = redirect_uri;
672         }
673
674         GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
675             demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
676       } else {
677         GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
678       }
679       gst_query_unref (query);
680
681       /* Let the subclass parse the manifest */
682       manifest_buffer =
683           gst_adapter_take_buffer (demux->priv->input_adapter, available);
684       if (!demux_class->process_manifest (demux, manifest_buffer)) {
685         /* In most cases, this will happen if we set a wrong url in the
686          * source element and we have received the 404 HTML response instead of
687          * the manifest */
688         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
689             (NULL));
690         ret = FALSE;
691       } else {
692         g_atomic_int_set (&demux->priv->have_manifest, TRUE);
693       }
694       gst_buffer_unref (manifest_buffer);
695
696       gst_element_post_message (GST_ELEMENT_CAST (demux),
697           gst_message_new_element (GST_OBJECT_CAST (demux),
698               gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
699                   "manifest-uri", G_TYPE_STRING,
700                   demux->manifest_uri, "uri", G_TYPE_STRING,
701                   demux->manifest_uri,
702                   "manifest-download-start", GST_TYPE_CLOCK_TIME,
703                   GST_CLOCK_TIME_NONE,
704                   "manifest-download-stop", GST_TYPE_CLOCK_TIME,
705                   gst_util_get_timestamp (), NULL)));
706
707       if (ret) {
708         /* Send duration message */
709         if (!gst_adaptive_demux_is_live (demux)) {
710           GstClockTime duration = demux_class->get_duration (demux);
711
712           if (duration != GST_CLOCK_TIME_NONE) {
713             GST_DEBUG_OBJECT (demux,
714                 "Sending duration message : %" GST_TIME_FORMAT,
715                 GST_TIME_ARGS (duration));
716             gst_element_post_message (GST_ELEMENT (demux),
717                 gst_message_new_duration_changed (GST_OBJECT (demux)));
718           } else {
719             GST_DEBUG_OBJECT (demux,
720                 "media duration unknown, can not send the duration message");
721           }
722         }
723
724         if (demux->next_streams) {
725           gst_adaptive_demux_prepare_streams (demux,
726               gst_adaptive_demux_is_live (demux));
727           gst_adaptive_demux_start_tasks (demux, TRUE);
728           gst_adaptive_demux_start_manifest_update_task (demux);
729         } else {
730           /* no streams */
731           GST_WARNING_OBJECT (demux, "No streams created from manifest");
732           GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
733               (_("This file contains no playable streams.")),
734               ("No known stream formats found at the Manifest"));
735           ret = FALSE;
736         }
737
738       }
739       GST_MANIFEST_UNLOCK (demux);
740       GST_API_UNLOCK (demux);
741
742       gst_event_unref (event);
743       return ret;
744     }
745     case GST_EVENT_SEGMENT:
746       /* Swallow newsegments, we'll push our own */
747       gst_event_unref (event);
748       return TRUE;
749     default:
750       break;
751   }
752
753   return gst_pad_event_default (pad, parent, event);
754 }
755
756 static GstFlowReturn
757 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
758     GstBuffer * buffer)
759 {
760   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
761
762   GST_MANIFEST_LOCK (demux);
763
764   gst_adapter_push (demux->priv->input_adapter, buffer);
765
766   GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
767       (gint) gst_adapter_available (demux->priv->input_adapter));
768
769   GST_MANIFEST_UNLOCK (demux);
770   return GST_FLOW_OK;
771 }
772
773 /* must be called with manifest_lock taken */
774 static void
775 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
776 {
777   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
778   GList *iter;
779   GList *old_streams;
780   GstEvent *eos;
781
782   /* take ownership of old_streams before releasing the manifest_lock in
783    * gst_adaptive_demux_stop_tasks
784    */
785   old_streams = demux->priv->old_streams;
786   demux->priv->old_streams = NULL;
787
788   gst_adaptive_demux_stop_tasks (demux, TRUE);
789
790   if (klass->reset)
791     klass->reset (demux);
792
793   eos = gst_event_new_eos ();
794   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
795     GstAdaptiveDemuxStream *stream = iter->data;
796     if (stream->pad) {
797       gst_pad_push_event (stream->pad, gst_event_ref (eos));
798       gst_pad_set_active (stream->pad, FALSE);
799
800       gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
801     }
802     gst_adaptive_demux_stream_free (stream);
803   }
804   gst_event_unref (eos);
805   g_list_free (demux->streams);
806   demux->streams = NULL;
807   if (demux->prepared_streams) {
808     g_list_free_full (demux->prepared_streams,
809         (GDestroyNotify) gst_adaptive_demux_stream_free);
810     demux->prepared_streams = NULL;
811   }
812   if (demux->next_streams) {
813     g_list_free_full (demux->next_streams,
814         (GDestroyNotify) gst_adaptive_demux_stream_free);
815     demux->next_streams = NULL;
816   }
817
818   if (old_streams) {
819     g_list_free_full (old_streams,
820         (GDestroyNotify) gst_adaptive_demux_stream_free);
821   }
822
823   if (demux->priv->old_streams) {
824     g_list_free_full (demux->priv->old_streams,
825         (GDestroyNotify) gst_adaptive_demux_stream_free);
826     demux->priv->old_streams = NULL;
827   }
828
829   g_free (demux->manifest_uri);
830   g_free (demux->manifest_base_uri);
831   demux->manifest_uri = NULL;
832   demux->manifest_base_uri = NULL;
833
834   gst_adapter_clear (demux->priv->input_adapter);
835   g_atomic_int_set (&demux->priv->have_manifest, FALSE);
836
837   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
838
839   demux->have_group_id = FALSE;
840   demux->group_id = G_MAXUINT;
841   demux->priv->segment_seqnum = gst_util_seqnum_next ();
842 }
843
844 static void
845 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
846 {
847   GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
848
849   switch (GST_MESSAGE_TYPE (msg)) {
850     case GST_MESSAGE_ERROR:{
851       GList *iter;
852       GstAdaptiveDemuxStream *stream = NULL;
853       GError *err = NULL;
854       gchar *debug = NULL;
855       gchar *new_error = NULL;
856       const GstStructure *details = NULL;
857
858       GST_MANIFEST_LOCK (demux);
859
860       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
861         GstAdaptiveDemuxStream *cur = iter->data;
862         if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
863                 GST_OBJECT_CAST (cur->src))) {
864           stream = cur;
865           break;
866         }
867       }
868       if (stream == NULL) {
869         for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
870           GstAdaptiveDemuxStream *cur = iter->data;
871           if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
872                   GST_OBJECT_CAST (cur->src))) {
873             stream = cur;
874             break;
875           }
876         }
877         if (stream == NULL) {
878           GST_WARNING_OBJECT (demux,
879               "Failed to locate stream for errored element");
880           break;
881         }
882       }
883
884       gst_message_parse_error (msg, &err, &debug);
885
886       GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
887           "Source posted error: %d:%d %s (%s)", err->domain, err->code,
888           err->message, debug);
889
890       if (debug)
891         new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
892       if (new_error) {
893         g_free (err->message);
894         err->message = new_error;
895       }
896
897       gst_message_parse_error_details (msg, &details);
898       if (details) {
899         gst_structure_get_uint (details, "http-status-code",
900             &stream->last_status_code);
901       }
902
903       /* error, but ask to retry */
904       gst_adaptive_demux_stream_fragment_download_finish (stream,
905           GST_FLOW_CUSTOM_ERROR, err);
906
907       g_error_free (err);
908       g_free (debug);
909
910       GST_MANIFEST_UNLOCK (demux);
911
912       gst_message_unref (msg);
913       msg = NULL;
914     }
915       break;
916     default:
917       break;
918   }
919
920   if (msg)
921     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
922 }
923
924 void
925 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
926     gsize struct_size)
927 {
928   GST_API_LOCK (demux);
929   GST_MANIFEST_LOCK (demux);
930   demux->stream_struct_size = struct_size;
931   GST_MANIFEST_UNLOCK (demux);
932   GST_API_UNLOCK (demux);
933 }
934
935 /* must be called with manifest_lock taken */
936 static gboolean
937 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
938     GstAdaptiveDemuxStream * stream)
939 {
940   GstPad *pad = stream->pad;
941   gchar *name = gst_pad_get_name (pad);
942   GstEvent *event;
943   gchar *stream_id;
944
945   gst_pad_set_active (pad, TRUE);
946   stream->need_header = TRUE;
947
948   stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
949
950   event =
951       gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
952       GST_EVENT_STREAM_START, 0);
953   if (event) {
954     if (gst_event_parse_group_id (event, &demux->group_id))
955       demux->have_group_id = TRUE;
956     else
957       demux->have_group_id = FALSE;
958     gst_event_unref (event);
959   } else if (!demux->have_group_id) {
960     demux->have_group_id = TRUE;
961     demux->group_id = gst_util_group_id_next ();
962   }
963   event = gst_event_new_stream_start (stream_id);
964   if (demux->have_group_id)
965     gst_event_set_group_id (event, demux->group_id);
966
967   gst_pad_push_event (pad, event);
968   g_free (stream_id);
969   g_free (name);
970
971   GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
972
973   stream->discont = TRUE;
974
975   return TRUE;
976 }
977
978 static gboolean
979 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
980     GstAdaptiveDemuxStream * stream)
981 {
982   gboolean ret;
983   GstPad *pad = stream->pad;
984   GstCaps *caps;
985
986   if (stream->pending_caps) {
987     gst_pad_set_caps (pad, stream->pending_caps);
988     caps = stream->pending_caps;
989     stream->pending_caps = NULL;
990   } else {
991     caps = gst_pad_get_current_caps (pad);
992   }
993
994   GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
995       GST_DEBUG_PAD_NAME (pad), caps);
996   if (caps)
997     gst_caps_unref (caps);
998
999   gst_object_ref (pad);
1000
1001   /* Don't hold the manifest lock while exposing a pad */
1002   GST_MANIFEST_UNLOCK (demux);
1003   ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1004   GST_MANIFEST_LOCK (demux);
1005
1006   return ret;
1007 }
1008
1009 /* must be called with manifest_lock taken */
1010 static GstClockTime
1011 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1012     GstAdaptiveDemuxStream * stream)
1013 {
1014   GstAdaptiveDemuxClass *klass;
1015
1016   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1017
1018   if (klass->get_presentation_offset == NULL)
1019     return 0;
1020
1021   return klass->get_presentation_offset (demux, stream);
1022 }
1023
1024 /* must be called with manifest_lock taken */
1025 static GstClockTime
1026 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1027 {
1028   GstAdaptiveDemuxClass *klass;
1029
1030   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1031
1032   if (klass->get_period_start_time == NULL)
1033     return 0;
1034
1035   return klass->get_period_start_time (demux);
1036 }
1037
1038 /* must be called with manifest_lock taken */
1039 static gboolean
1040 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1041     gboolean first_and_live)
1042 {
1043   GList *iter;
1044   GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1045
1046   g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1047   if (demux->prepared_streams != NULL) {
1048     /* Old streams that were never exposed, due to a seek or so */
1049     GST_FIXME_OBJECT (demux,
1050         "Preparing new streams without cleaning up old ones!");
1051     return FALSE;
1052   }
1053
1054   demux->prepared_streams = demux->next_streams;
1055   demux->next_streams = NULL;
1056
1057   if (!gst_adaptive_demux_is_running (demux)) {
1058     GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1059     return TRUE;
1060   }
1061
1062   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1063     GstAdaptiveDemuxStream *stream = iter->data;
1064
1065     stream->do_block = TRUE;
1066
1067     if (!gst_adaptive_demux_prepare_stream (demux,
1068             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1069       /* TODO act on error */
1070       GST_FIXME_OBJECT (stream->pad,
1071           "Do something on failure to expose stream");
1072     }
1073
1074     if (first_and_live) {
1075       /* TODO we only need the first timestamp, maybe create a simple function to
1076        * get the current PTS of a fragment ? */
1077       GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1078       gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1079
1080       if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1081         min_pts = MIN (min_pts, stream->fragment.timestamp);
1082       } else {
1083         min_pts = stream->fragment.timestamp;
1084       }
1085     }
1086   }
1087
1088   period_start = gst_adaptive_demux_get_period_start_time (demux);
1089
1090   /* For live streams, the subclass is supposed to seek to the current
1091    * fragment and then tell us its timestamp in stream->fragment.timestamp.
1092    * We now also have to seek our demuxer segment to reflect this.
1093    *
1094    * FIXME: This needs some refactoring at some point.
1095    */
1096   if (first_and_live) {
1097     gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1098         GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1099         GST_SEEK_TYPE_NONE, -1, NULL);
1100   }
1101
1102   for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1103     GstAdaptiveDemuxStream *stream = iter->data;
1104     GstClockTime offset;
1105
1106     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1107     stream->segment = demux->segment;
1108
1109     /* The demuxer segment is just built from seek events, but for each stream
1110      * we have to adjust segments according to the current period and the
1111      * stream specific presentation time offset.
1112      *
1113      * For each period, buffer timestamps start again from 0. Additionally the
1114      * buffer timestamps are shifted by the stream specific presentation time
1115      * offset, so the first buffer timestamp of a period is 0 + presentation
1116      * time offset. If the stream contains timestamps itself, this is also
1117      * supposed to be the presentation time stored inside the stream.
1118      *
1119      * The stream time over periods is supposed to be continuous, that is the
1120      * buffer timestamp 0 + presentation time offset should map to the start
1121      * time of the current period.
1122      *
1123      *
1124      * The adjustment of the stream segments as such works the following.
1125      *
1126      * If the demuxer segment start is bigger than the period start, this
1127      * means that we have to drop some media at the beginning of the current
1128      * period, e.g. because a seek into the middle of the period has
1129      * happened. The amount of media to drop is the difference between the
1130      * period start and the demuxer segment start, and as each period starts
1131      * again from 0, this difference is going to be the actual stream's
1132      * segment start. As all timestamps of the stream are shifted by the
1133      * presentation time offset, we will also have to move the segment start
1134      * by that offset.
1135      *
1136      * Likewise, the demuxer segment stop value is adjusted in the same
1137      * fashion.
1138      *
1139      * Now the running time and stream time at the stream's segment start has
1140      * to be the one that is stored inside the demuxer's segment, which means
1141      * that segment.base and segment.time have to be copied over (done just
1142      * above)
1143      *
1144      *
1145      * If the demuxer segment start is smaller than the period start time,
1146      * this means that the whole period is inside the segment. As each period
1147      * starts timestamps from 0, and additionally timestamps are shifted by
1148      * the presentation time offset, the stream's first timestamp (and as such
1149      * the stream's segment start) has to be the presentation time offset.
1150      * The stream time at the segment start is supposed to be the stream time
1151      * of the period start according to the demuxer segment, so the stream
1152      * segment's time would be set to that. The same goes for the stream
1153      * segment's base, which is supposed to be the running time of the period
1154      * start according to the demuxer's segment.
1155      *
1156      * The same logic applies for negative rates with the segment stop and
1157      * the period stop time (which gets clamped).
1158      *
1159      *
1160      * For the first case where not the complete period is inside the segment,
1161      * the segment time and base as calculated by the second case would be
1162      * equivalent.
1163      */
1164     GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1165         &demux->segment);
1166     GST_DEBUG_OBJECT (demux,
1167         "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1168         GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1169     /* note for readers:
1170      * Since stream->segment is initially a copy of demux->segment,
1171      * only the values that need updating are modified below. */
1172     if (first_and_live) {
1173       /* If first and live, demuxer did seek to the current position already */
1174       stream->segment.start = demux->segment.start - period_start + offset;
1175       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1176         stream->segment.stop = demux->segment.stop - period_start + offset;
1177       /* FIXME : Do we need to handle negative rates for this ? */
1178       stream->segment.position = stream->segment.start;
1179     } else if (demux->segment.start > period_start) {
1180       /* seek within a period */
1181       stream->segment.start = demux->segment.start - period_start + offset;
1182       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1183         stream->segment.stop = demux->segment.stop - period_start + offset;
1184       if (stream->segment.rate >= 0)
1185         stream->segment.position = offset;
1186       else
1187         stream->segment.position = stream->segment.stop;
1188     } else {
1189       stream->segment.start = offset;
1190       if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1191         stream->segment.stop = demux->segment.stop - period_start + offset;
1192       if (stream->segment.rate >= 0)
1193         stream->segment.position = offset;
1194       else
1195         stream->segment.position = stream->segment.stop;
1196       stream->segment.time =
1197           gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1198           period_start);
1199       stream->segment.base =
1200           gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1201           period_start);
1202     }
1203
1204     stream->pending_segment = gst_event_new_segment (&stream->segment);
1205     gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1206     stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1207
1208     GST_DEBUG_OBJECT (demux,
1209         "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1210         &stream->segment, stream);
1211   }
1212
1213   return TRUE;
1214 }
1215
1216 static gboolean
1217 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1218 {
1219   GList *iter;
1220   GList *old_streams;
1221
1222   g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1223
1224   old_streams = demux->streams;
1225   demux->streams = demux->prepared_streams;
1226   demux->prepared_streams = NULL;
1227
1228   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1229     GstAdaptiveDemuxStream *stream = iter->data;
1230
1231     if (!gst_adaptive_demux_expose_stream (demux,
1232             GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1233       /* TODO act on error */
1234     }
1235   }
1236   demux->priv->preroll_pending = 0;
1237
1238   GST_MANIFEST_UNLOCK (demux);
1239   gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1240   GST_MANIFEST_LOCK (demux);
1241
1242   if (old_streams) {
1243     GstEvent *eos = gst_event_new_eos ();
1244
1245     /* before we put streams in the demux->priv->old_streams list,
1246      * we ask the download task to stop. In this way, it will no longer be
1247      * allowed to change the demux object.
1248      */
1249     for (iter = old_streams; iter; iter = g_list_next (iter)) {
1250       GstAdaptiveDemuxStream *stream = iter->data;
1251       GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1252
1253       GST_MANIFEST_UNLOCK (demux);
1254
1255       GST_DEBUG_OBJECT (pad, "Pushing EOS");
1256       gst_pad_push_event (pad, gst_event_ref (eos));
1257       gst_pad_set_active (pad, FALSE);
1258
1259       GST_LOG_OBJECT (pad, "Removing stream");
1260       gst_element_remove_pad (GST_ELEMENT (demux), pad);
1261       GST_MANIFEST_LOCK (demux);
1262
1263       gst_object_unref (GST_OBJECT (pad));
1264
1265       /* ask the download task to stop.
1266        * We will not join it now, because our thread can be one of these tasks.
1267        * We will do the joining later, from another stream download task or
1268        * from gst_adaptive_demux_stop_tasks.
1269        * We also cannot change the state of the stream->src element, because
1270        * that will wait on the streaming thread (which could be this thread)
1271        * to stop first.
1272        * Because we sent an EOS to the downstream element, the stream->src
1273        * element should detect this in its streaming task and stop.
1274        * Even if it doesn't do that, we will change its state later in
1275        * gst_adaptive_demux_stop_tasks.
1276        */
1277       GST_LOG_OBJECT (stream, "Marking stream as cancelled");
1278       gst_task_stop (stream->download_task);
1279       g_mutex_lock (&stream->fragment_download_lock);
1280       stream->cancelled = TRUE;
1281       stream->replaced = TRUE;
1282       g_cond_signal (&stream->fragment_download_cond);
1283       g_mutex_unlock (&stream->fragment_download_lock);
1284     }
1285     gst_event_unref (eos);
1286
1287     /* The list should be freed from another thread as we can't properly
1288      * cleanup a GstTask from itself */
1289     demux->priv->old_streams =
1290         g_list_concat (demux->priv->old_streams, old_streams);
1291   }
1292
1293   /* Unblock after removing oldstreams */
1294   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1295     GstAdaptiveDemuxStream *stream = iter->data;
1296     stream->do_block = FALSE;
1297   }
1298
1299   GST_DEBUG_OBJECT (demux, "All streams are exposed");
1300
1301   return TRUE;
1302 }
1303
1304 /* must be called with manifest_lock taken */
1305 GstAdaptiveDemuxStream *
1306 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1307 {
1308   GstAdaptiveDemuxStream *stream;
1309
1310   stream = g_malloc0 (demux->stream_struct_size);
1311
1312   /* Downloading task */
1313   g_rec_mutex_init (&stream->download_lock);
1314   stream->download_task =
1315       gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1316       stream, NULL);
1317   gst_task_set_lock (stream->download_task, &stream->download_lock);
1318
1319   stream->pad = pad;
1320   stream->demux = demux;
1321   stream->fragment_bitrates =
1322       g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1323   gst_pad_set_element_private (pad, stream);
1324   stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1325
1326   g_mutex_lock (&demux->priv->preroll_lock);
1327   stream->do_block = TRUE;
1328   demux->priv->preroll_pending++;
1329   g_mutex_unlock (&demux->priv->preroll_lock);
1330
1331   gst_pad_set_query_function (pad,
1332       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1333   gst_pad_set_event_function (pad,
1334       GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1335
1336   gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1337   g_cond_init (&stream->fragment_download_cond);
1338   g_mutex_init (&stream->fragment_download_lock);
1339
1340   demux->next_streams = g_list_append (demux->next_streams, stream);
1341
1342   return stream;
1343 }
1344
1345 GstAdaptiveDemuxStream *
1346 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1347 {
1348   GList *iter;
1349
1350   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1351     GstAdaptiveDemuxStream *stream = iter->data;
1352     if (stream->pad == pad) {
1353       return stream;
1354     }
1355   }
1356
1357   return NULL;
1358 }
1359
1360 /* must be called with manifest_lock taken.
1361  * It will temporarily drop the manifest_lock in order to join the task.
1362  * It will join only the old_streams (the demux->streams are joined by
1363  * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1364  * called)
1365  */
1366 static void
1367 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1368 {
1369   GstAdaptiveDemux *demux = stream->demux;
1370   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1371
1372   if (klass->stream_free)
1373     klass->stream_free (stream);
1374
1375   g_clear_error (&stream->last_error);
1376   if (stream->download_task) {
1377     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1378       GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1379           GST_DEBUG_PAD_NAME (stream->pad));
1380
1381       gst_task_stop (stream->download_task);
1382
1383       g_mutex_lock (&stream->fragment_download_lock);
1384       stream->cancelled = TRUE;
1385       g_cond_signal (&stream->fragment_download_cond);
1386       g_mutex_unlock (&stream->fragment_download_lock);
1387     }
1388     GST_LOG_OBJECT (demux, "Waiting for task to finish");
1389
1390     /* temporarily drop the manifest lock to join the task */
1391     GST_MANIFEST_UNLOCK (demux);
1392
1393     gst_task_join (stream->download_task);
1394
1395     GST_MANIFEST_LOCK (demux);
1396
1397     GST_LOG_OBJECT (demux, "Finished");
1398     gst_object_unref (stream->download_task);
1399     g_rec_mutex_clear (&stream->download_lock);
1400     stream->download_task = NULL;
1401   }
1402
1403   gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1404
1405   if (stream->pending_segment) {
1406     gst_event_unref (stream->pending_segment);
1407     stream->pending_segment = NULL;
1408   }
1409
1410   if (stream->pending_events) {
1411     g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1412     stream->pending_events = NULL;
1413   }
1414
1415   if (stream->internal_pad) {
1416     gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1417   }
1418
1419   if (stream->src_srcpad) {
1420     gst_object_unref (stream->src_srcpad);
1421     stream->src_srcpad = NULL;
1422   }
1423
1424   if (stream->src) {
1425     GstElement *src = stream->src;
1426
1427     stream->src = NULL;
1428
1429     GST_MANIFEST_UNLOCK (demux);
1430     gst_element_set_locked_state (src, TRUE);
1431     gst_element_set_state (src, GST_STATE_NULL);
1432     gst_bin_remove (GST_BIN_CAST (demux), src);
1433     GST_MANIFEST_LOCK (demux);
1434   }
1435
1436   g_cond_clear (&stream->fragment_download_cond);
1437   g_mutex_clear (&stream->fragment_download_lock);
1438   g_free (stream->fragment_bitrates);
1439
1440   if (stream->pad) {
1441     gst_object_unref (stream->pad);
1442     stream->pad = NULL;
1443   }
1444   if (stream->pending_caps)
1445     gst_caps_unref (stream->pending_caps);
1446
1447   g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1448
1449   g_free (stream);
1450 }
1451
1452 /* must be called with manifest_lock taken */
1453 static gboolean
1454 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1455     gint64 * range_start, gint64 * range_stop)
1456 {
1457   GstAdaptiveDemuxClass *klass;
1458
1459   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1460
1461   g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1462
1463   return klass->get_live_seek_range (demux, range_start, range_stop);
1464 }
1465
1466 /* must be called with manifest_lock taken */
1467 static gboolean
1468 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1469     GstAdaptiveDemuxStream * stream)
1470 {
1471   gint64 range_start, range_stop;
1472   if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1473     GST_LOG_OBJECT (stream->pad,
1474         "stream position %" GST_TIME_FORMAT "  live seek range %"
1475         GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1476         GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1477         GST_STIME_ARGS (range_stop));
1478     return (stream->segment.position >= range_start
1479         && stream->segment.position <= range_stop);
1480   }
1481
1482   return FALSE;
1483 }
1484
1485 /* must be called with manifest_lock taken */
1486 static gboolean
1487 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1488 {
1489   GstAdaptiveDemuxClass *klass;
1490
1491   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1492   if (gst_adaptive_demux_is_live (demux)) {
1493     return klass->get_live_seek_range != NULL;
1494   }
1495
1496   return klass->seek != NULL;
1497 }
1498
1499 static void
1500 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1501     GList * streams, gint64 period_start, GstSeekType start_type,
1502     GstSeekType stop_type)
1503 {
1504   GList *iter;
1505   for (iter = streams; iter; iter = g_list_next (iter)) {
1506     GstAdaptiveDemuxStream *stream = iter->data;
1507     GstEvent *seg_evt;
1508     GstClockTime offset;
1509
1510     /* See comments in gst_adaptive_demux_get_period_start_time() for
1511      * an explanation of the segment modifications */
1512     stream->segment = demux->segment;
1513     offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1514     stream->segment.start += offset - period_start;
1515     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1516       stream->segment.stop += offset - period_start;
1517     if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1518       stream->segment.position = stream->segment.start;
1519     else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1520       stream->segment.position = stream->segment.stop;
1521     seg_evt = gst_event_new_segment (&stream->segment);
1522     gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1523     gst_event_replace (&stream->pending_segment, seg_evt);
1524     GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1525         stream->pending_segment);
1526     gst_event_unref (seg_evt);
1527     /* Make sure the first buffer after a seek has the discont flag */
1528     stream->discont = TRUE;
1529     stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1530   }
1531 }
1532
1533 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE |         \
1534                               GST_SEEK_FLAG_SNAP_AFTER |          \
1535                               GST_SEEK_FLAG_SNAP_NEAREST |        \
1536                               GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1537                               GST_SEEK_FLAG_KEY_UNIT))
1538 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1539                               GST_SEEK_FLAG_SNAP_AFTER | \
1540                               GST_SEEK_FLAG_SNAP_NEAREST))
1541
1542 static gboolean
1543 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1544     GstEvent * event)
1545 {
1546   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1547   gdouble rate;
1548   GstFormat format;
1549   GstSeekFlags flags;
1550   GstSeekType start_type, stop_type;
1551   gint64 start, stop;
1552   guint32 seqnum;
1553   gboolean update;
1554   gboolean ret;
1555   GstSegment oldsegment;
1556   GstAdaptiveDemuxStream *stream = NULL;
1557
1558   GST_INFO_OBJECT (demux, "Received seek event");
1559
1560   GST_API_LOCK (demux);
1561   GST_MANIFEST_LOCK (demux);
1562
1563   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1564       &stop_type, &stop);
1565
1566   if (format != GST_FORMAT_TIME) {
1567     GST_MANIFEST_UNLOCK (demux);
1568     GST_API_UNLOCK (demux);
1569     GST_WARNING_OBJECT (demux,
1570         "Adaptive demuxers only support TIME-based seeking");
1571     gst_event_unref (event);
1572     return FALSE;
1573   }
1574
1575   if (flags & GST_SEEK_FLAG_SEGMENT) {
1576     GST_FIXME_OBJECT (demux, "Handle segment seeks");
1577     GST_MANIFEST_UNLOCK (demux);
1578     GST_API_UNLOCK (demux);
1579     gst_event_unref (event);
1580     return FALSE;
1581   }
1582
1583   seqnum = gst_event_get_seqnum (event);
1584
1585   if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
1586     /* For instant rate seeks, reply directly and update
1587      * our segment so the new rate is reflected in any future
1588      * fragments */
1589     GstEvent *ev;
1590
1591     /* instant rate change only supported if direction does not change. All
1592      * other requirements are already checked before creating the seek event
1593      * but let's double-check here to be sure */
1594     if ((demux->segment.rate > 0 && rate < 0) ||
1595         (demux->segment.rate < 0 && rate > 0) ||
1596         start_type != GST_SEEK_TYPE_NONE ||
1597         stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
1598       GST_ERROR_OBJECT (demux,
1599           "Instant rate change seeks only supported in the "
1600           "same direction, without flushing and position change");
1601       GST_MANIFEST_UNLOCK (demux);
1602       GST_API_UNLOCK (demux);
1603       return FALSE;
1604     }
1605
1606     ev = gst_event_new_instant_rate_change (rate / demux->segment.rate,
1607         (GstSegmentFlags) flags);
1608     gst_event_set_seqnum (ev, seqnum);
1609
1610     GST_MANIFEST_UNLOCK (demux);
1611
1612     ret = gst_adaptive_demux_push_src_event (demux, ev);
1613
1614     GST_API_UNLOCK (demux);
1615     gst_event_unref (event);
1616
1617     return ret;
1618   }
1619
1620   if (!gst_adaptive_demux_can_seek (demux)) {
1621     GST_MANIFEST_UNLOCK (demux);
1622     GST_API_UNLOCK (demux);
1623     gst_event_unref (event);
1624     return FALSE;
1625   }
1626
1627   if (gst_adaptive_demux_is_live (demux)) {
1628     gint64 range_start, range_stop;
1629     gboolean changed = FALSE;
1630     gboolean start_valid = TRUE, stop_valid = TRUE;
1631
1632     if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1633             &range_stop)) {
1634       GST_MANIFEST_UNLOCK (demux);
1635       GST_API_UNLOCK (demux);
1636       gst_event_unref (event);
1637       GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1638       return FALSE;
1639     }
1640
1641     GST_DEBUG_OBJECT (demux,
1642         "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1643         GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1644
1645     /* Handle relative positioning for live streams (relative to the range_stop) */
1646     if (start_type == GST_SEEK_TYPE_END) {
1647       start = range_stop + start;
1648       start_type = GST_SEEK_TYPE_SET;
1649       changed = TRUE;
1650     }
1651     if (stop_type == GST_SEEK_TYPE_END) {
1652       stop = range_stop + stop;
1653       stop_type = GST_SEEK_TYPE_SET;
1654       changed = TRUE;
1655     }
1656
1657     /* Adjust the requested start/stop position if it falls beyond the live
1658      * seek range.
1659      * The only case where we don't adjust is for the starting point of
1660      * an accurate seek (start if forward and stop if backwards)
1661      */
1662     if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1663         (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1664       GST_DEBUG_OBJECT (demux,
1665           "seek before live stream start, setting to range start: %"
1666           GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1667       start = range_start;
1668       changed = TRUE;
1669     }
1670     /* truncate stop position also if set */
1671     if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1672         (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1673       GST_DEBUG_OBJECT (demux,
1674           "seek ending after live start, adjusting to: %"
1675           GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1676       stop = range_stop;
1677       changed = TRUE;
1678     }
1679
1680     if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1681         (start < range_start || start > range_stop)) {
1682       GST_WARNING_OBJECT (demux,
1683           "Seek to invalid position start:%" GST_STIME_FORMAT
1684           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1685           ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1686           GST_STIME_ARGS (range_stop));
1687       start_valid = FALSE;
1688     }
1689     if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1690         (stop < range_start || stop > range_stop)) {
1691       GST_WARNING_OBJECT (demux,
1692           "Seek to invalid position stop:%" GST_STIME_FORMAT
1693           " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1694           ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1695           GST_STIME_ARGS (range_stop));
1696       stop_valid = FALSE;
1697     }
1698
1699     /* If the seek position is still outside of the seekable range, refuse the seek */
1700     if (!start_valid || !stop_valid) {
1701       GST_MANIFEST_UNLOCK (demux);
1702       GST_API_UNLOCK (demux);
1703       gst_event_unref (event);
1704       return FALSE;
1705     }
1706
1707     /* Re-create seek event with changed/updated values */
1708     if (changed) {
1709       gst_event_unref (event);
1710       event =
1711           gst_event_new_seek (rate, format, flags,
1712           start_type, start, stop_type, stop);
1713       gst_event_set_seqnum (event, seqnum);
1714     }
1715   }
1716
1717   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1718
1719   /* have a backup in case seek fails */
1720   gst_segment_copy_into (&demux->segment, &oldsegment);
1721
1722   if (flags & GST_SEEK_FLAG_FLUSH) {
1723     GstEvent *fevent;
1724
1725     GST_DEBUG_OBJECT (demux, "sending flush start");
1726     fevent = gst_event_new_flush_start ();
1727     gst_event_set_seqnum (fevent, seqnum);
1728     GST_MANIFEST_UNLOCK (demux);
1729     gst_adaptive_demux_push_src_event (demux, fevent);
1730     GST_MANIFEST_LOCK (demux);
1731
1732     gst_adaptive_demux_stop_tasks (demux, FALSE);
1733   } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1734       (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1735
1736     gst_adaptive_demux_stop_tasks (demux, FALSE);
1737   }
1738
1739   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1740
1741   /*
1742    * Handle snap seeks as follows:
1743    * 1) do the snap seeking on the stream that received
1744    *    the event
1745    * 2) use the final position on this stream to seek
1746    *    on the other streams to the same position
1747    *
1748    * We can't snap at all streams at the same time as
1749    * they might end in different positions, so just
1750    * use the one that received the event as the 'leading'
1751    * one to do the snap seek.
1752    */
1753   if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1754           gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1755     GstClockTime ts;
1756     GstSeekFlags stream_seek_flags = flags;
1757
1758     /* snap-seek on the stream that received the event and then
1759      * use the resulting position to seek on all streams */
1760
1761     if (rate >= 0) {
1762       if (start_type != GST_SEEK_TYPE_NONE)
1763         ts = start;
1764       else {
1765         ts = stream->segment.position;
1766         start_type = GST_SEEK_TYPE_SET;
1767       }
1768     } else {
1769       if (stop_type != GST_SEEK_TYPE_NONE)
1770         ts = stop;
1771       else {
1772         stop_type = GST_SEEK_TYPE_SET;
1773         ts = stream->segment.position;
1774       }
1775     }
1776
1777     if (stream) {
1778       demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1779     }
1780
1781     /* replace event with a new one without snapping to seek on all streams */
1782     gst_event_unref (event);
1783     if (rate >= 0) {
1784       start = ts;
1785     } else {
1786       stop = ts;
1787     }
1788     event =
1789         gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1790         start_type, start, stop_type, stop);
1791     GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1792   }
1793   stream = NULL;
1794
1795   ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1796       start, stop_type, stop, &update);
1797
1798   if (ret) {
1799     /* FIXME - this seems unatural, do_seek() is updating base when we
1800      * only want the start/stop position to change, maybe do_seek() needs
1801      * some fixing? */
1802     if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1803                 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1804                 && stop_type == GST_SEEK_TYPE_NONE))) {
1805       demux->segment.base = oldsegment.base;
1806     }
1807
1808     GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1809
1810     ret = demux_class->seek (demux, event);
1811   }
1812
1813   if (!ret) {
1814     /* Is there anything else we can do if it fails? */
1815     gst_segment_copy_into (&oldsegment, &demux->segment);
1816   } else {
1817     demux->priv->segment_seqnum = seqnum;
1818   }
1819   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1820
1821   if (flags & GST_SEEK_FLAG_FLUSH) {
1822     GstEvent *fevent;
1823
1824     GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1825     fevent = gst_event_new_flush_stop (TRUE);
1826     gst_event_set_seqnum (fevent, seqnum);
1827     gst_adaptive_demux_push_src_event (demux, fevent);
1828   }
1829
1830   if (demux->next_streams) {
1831     /* If the seek generated new streams, get them
1832      * to preroll */
1833     gst_adaptive_demux_prepare_streams (demux, FALSE);
1834     gst_adaptive_demux_start_tasks (demux, TRUE);
1835   } else {
1836     GstClockTime period_start =
1837         gst_adaptive_demux_get_period_start_time (demux);
1838
1839     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1840     gst_adaptive_demux_update_streams_segment (demux, demux->streams,
1841         period_start, start_type, stop_type);
1842     gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
1843         period_start, start_type, stop_type);
1844     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1845
1846     /* Restart the demux */
1847     gst_adaptive_demux_start_tasks (demux, FALSE);
1848   }
1849
1850   GST_MANIFEST_UNLOCK (demux);
1851   GST_API_UNLOCK (demux);
1852   gst_event_unref (event);
1853
1854   return ret;
1855 }
1856
1857 static gboolean
1858 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1859     GstEvent * event)
1860 {
1861   GstAdaptiveDemux *demux;
1862
1863   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1864
1865   /* FIXME handle events received on pads that are to be removed */
1866
1867   switch (event->type) {
1868     case GST_EVENT_SEEK:
1869     {
1870       guint32 seqnum = gst_event_get_seqnum (event);
1871       if (seqnum == demux->priv->segment_seqnum) {
1872         GST_LOG_OBJECT (pad,
1873             "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
1874         gst_event_unref (event);
1875         return TRUE;
1876       }
1877       return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1878     }
1879     case GST_EVENT_RECONFIGURE:{
1880       GstAdaptiveDemuxStream *stream;
1881
1882       GST_MANIFEST_LOCK (demux);
1883       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1884
1885       if (stream) {
1886         if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
1887             stream->last_ret == GST_FLOW_NOT_LINKED) {
1888           stream->last_ret = GST_FLOW_OK;
1889           stream->restart_download = TRUE;
1890           stream->need_header = TRUE;
1891           stream->discont = TRUE;
1892           GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1893           gst_task_start (stream->download_task);
1894         }
1895         gst_event_unref (event);
1896         GST_MANIFEST_UNLOCK (demux);
1897         return TRUE;
1898       }
1899       GST_MANIFEST_UNLOCK (demux);
1900     }
1901       break;
1902     case GST_EVENT_LATENCY:{
1903       /* Upstream and our internal source are irrelevant
1904        * for latency, and we should not fail here to
1905        * configure the latency */
1906       gst_event_unref (event);
1907       return TRUE;
1908     }
1909     case GST_EVENT_QOS:{
1910       GstAdaptiveDemuxStream *stream;
1911
1912       GST_MANIFEST_LOCK (demux);
1913       stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1914
1915       if (stream) {
1916         GstClockTimeDiff diff;
1917         GstClockTime timestamp;
1918
1919         gst_event_parse_qos (event, NULL, NULL, &diff, &timestamp);
1920         /* Only take into account lateness if late */
1921         if (diff > 0)
1922           stream->qos_earliest_time = timestamp + 2 * diff;
1923         else
1924           stream->qos_earliest_time = timestamp;
1925         GST_DEBUG_OBJECT (stream->pad, "qos_earliest_time %" GST_TIME_FORMAT,
1926             GST_TIME_ARGS (stream->qos_earliest_time));
1927       }
1928       GST_MANIFEST_UNLOCK (demux);
1929       break;
1930     }
1931     default:
1932       break;
1933   }
1934
1935   return gst_pad_event_default (pad, parent, event);
1936 }
1937
1938 static gboolean
1939 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1940     GstQuery * query)
1941 {
1942   GstAdaptiveDemux *demux;
1943   GstAdaptiveDemuxClass *demux_class;
1944   gboolean ret = FALSE;
1945
1946   if (query == NULL)
1947     return FALSE;
1948
1949   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1950   demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1951
1952   switch (query->type) {
1953     case GST_QUERY_DURATION:{
1954       GstClockTime duration = -1;
1955       GstFormat fmt;
1956
1957       gst_query_parse_duration (query, &fmt, NULL);
1958
1959       if (gst_adaptive_demux_is_live (demux)) {
1960         /* We are able to answer this query: the duration is unknown */
1961         gst_query_set_duration (query, fmt, -1);
1962         ret = TRUE;
1963         break;
1964       }
1965
1966       if (fmt == GST_FORMAT_TIME
1967           && g_atomic_int_get (&demux->priv->have_manifest)) {
1968         duration = demux_class->get_duration (demux);
1969
1970         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1971           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1972           ret = TRUE;
1973         }
1974       }
1975
1976       GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1977           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1978       break;
1979     }
1980     case GST_QUERY_LATENCY:{
1981       gst_query_set_latency (query, FALSE, 0, -1);
1982       ret = TRUE;
1983       break;
1984     }
1985     case GST_QUERY_SEEKING:{
1986       GstFormat fmt;
1987       gint64 stop = -1;
1988       gint64 start = 0;
1989
1990       if (!g_atomic_int_get (&demux->priv->have_manifest)) {
1991         GST_INFO_OBJECT (demux,
1992             "Don't have manifest yet, can't answer seeking query");
1993         return FALSE;           /* can't answer without manifest */
1994       }
1995
1996       GST_MANIFEST_LOCK (demux);
1997
1998       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1999       GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2000       if (fmt == GST_FORMAT_TIME) {
2001         GstClockTime duration;
2002         gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2003
2004         ret = TRUE;
2005         if (can_seek) {
2006           if (gst_adaptive_demux_is_live (demux)) {
2007             ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2008             if (!ret) {
2009               GST_MANIFEST_UNLOCK (demux);
2010               GST_INFO_OBJECT (demux, "can't answer seeking query");
2011               return FALSE;
2012             }
2013           } else {
2014             duration = demux_class->get_duration (demux);
2015             if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2016               stop = duration;
2017           }
2018         }
2019         gst_query_set_seeking (query, fmt, can_seek, start, stop);
2020         GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2021             GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2022             GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2023       }
2024       GST_MANIFEST_UNLOCK (demux);
2025       break;
2026     }
2027     case GST_QUERY_URI:
2028
2029       GST_MANIFEST_LOCK (demux);
2030
2031       /* TODO HLS can answer this differently it seems */
2032       if (demux->manifest_uri) {
2033         /* FIXME: (hls) Do we answer with the variant playlist, with the current
2034          * playlist or the the uri of the last downlowaded fragment? */
2035         gst_query_set_uri (query, demux->manifest_uri);
2036         ret = TRUE;
2037       }
2038
2039       GST_MANIFEST_UNLOCK (demux);
2040       break;
2041     default:
2042       /* Don't forward queries upstream because of the special nature of this
2043        *  "demuxer", which relies on the upstream element only to be fed
2044        *  the Manifest
2045        */
2046       break;
2047   }
2048
2049   return ret;
2050 }
2051
2052 /* must be called with manifest_lock taken */
2053 static void
2054 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2055     gboolean start_preroll_streams)
2056 {
2057   GList *iter;
2058
2059   if (!gst_adaptive_demux_is_running (demux)) {
2060     GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2061     return;
2062   }
2063
2064   GST_INFO_OBJECT (demux, "Starting streams' tasks");
2065
2066   iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2067
2068   for (; iter; iter = g_list_next (iter)) {
2069     GstAdaptiveDemuxStream *stream = iter->data;
2070
2071     if (!start_preroll_streams) {
2072       g_mutex_lock (&stream->fragment_download_lock);
2073       stream->cancelled = FALSE;
2074       stream->replaced = FALSE;
2075       g_mutex_unlock (&stream->fragment_download_lock);
2076     }
2077
2078     stream->last_ret = GST_FLOW_OK;
2079     gst_task_start (stream->download_task);
2080   }
2081 }
2082
2083 /* must be called with manifest_lock taken */
2084 static void
2085 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2086 {
2087   gst_uri_downloader_cancel (demux->downloader);
2088
2089   gst_task_stop (demux->priv->updates_task);
2090
2091   g_mutex_lock (&demux->priv->updates_timed_lock);
2092   GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2093   demux->priv->stop_updates_task = TRUE;
2094   g_cond_signal (&demux->priv->updates_timed_cond);
2095   g_mutex_unlock (&demux->priv->updates_timed_lock);
2096 }
2097
2098 /* must be called with manifest_lock taken */
2099 static void
2100 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2101 {
2102   GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2103
2104   if (gst_adaptive_demux_is_live (demux)) {
2105     gst_uri_downloader_reset (demux->downloader);
2106     g_mutex_lock (&demux->priv->updates_timed_lock);
2107     demux->priv->stop_updates_task = FALSE;
2108     g_mutex_unlock (&demux->priv->updates_timed_lock);
2109     /* Task to periodically update the manifest */
2110     if (demux_class->requires_periodical_playlist_update (demux)) {
2111       GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2112       gst_task_start (demux->priv->updates_task);
2113     }
2114   }
2115 }
2116
2117 /* must be called with manifest_lock taken
2118  * This function will temporarily release manifest_lock in order to join the
2119  * download threads.
2120  * The api_lock will still protect it against other threads trying to modify
2121  * the demux element.
2122  */
2123 static void
2124 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2125 {
2126   int i;
2127   GList *iter;
2128   GList *list_to_process;
2129
2130   GST_LOG_OBJECT (demux, "Stopping tasks");
2131
2132   if (stop_updates)
2133     gst_adaptive_demux_stop_manifest_update_task (demux);
2134
2135   list_to_process = demux->streams;
2136   for (i = 0; i < 2; ++i) {
2137     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2138       GstAdaptiveDemuxStream *stream = iter->data;
2139
2140       g_mutex_lock (&stream->fragment_download_lock);
2141       stream->cancelled = TRUE;
2142       gst_task_stop (stream->download_task);
2143       g_cond_signal (&stream->fragment_download_cond);
2144       g_mutex_unlock (&stream->fragment_download_lock);
2145     }
2146     list_to_process = demux->prepared_streams;
2147   }
2148
2149   GST_MANIFEST_UNLOCK (demux);
2150   g_mutex_lock (&demux->priv->preroll_lock);
2151   g_cond_broadcast (&demux->priv->preroll_cond);
2152   g_mutex_unlock (&demux->priv->preroll_lock);
2153   GST_MANIFEST_LOCK (demux);
2154
2155   g_mutex_lock (&demux->priv->manifest_update_lock);
2156   g_cond_broadcast (&demux->priv->manifest_cond);
2157   g_mutex_unlock (&demux->priv->manifest_update_lock);
2158
2159   /* need to release manifest_lock before stopping the src element.
2160    * The streams were asked to cancel, so they will not make any writes to demux
2161    * object. Even if we temporarily release manifest_lock, the demux->streams
2162    * cannot change and iter cannot be invalidated.
2163    */
2164   list_to_process = demux->streams;
2165   for (i = 0; i < 2; ++i) {
2166     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2167       GstAdaptiveDemuxStream *stream = iter->data;
2168       GstElement *src = stream->src;
2169
2170       GST_MANIFEST_UNLOCK (demux);
2171
2172       if (src) {
2173         gst_element_set_locked_state (src, TRUE);
2174         gst_element_set_state (src, GST_STATE_READY);
2175       }
2176
2177       /* stream->download_task value never changes, so it is safe to read it
2178        * outside critical section
2179        */
2180       gst_task_join (stream->download_task);
2181
2182       GST_MANIFEST_LOCK (demux);
2183     }
2184     list_to_process = demux->prepared_streams;
2185   }
2186
2187   GST_MANIFEST_UNLOCK (demux);
2188   if (stop_updates)
2189     gst_task_join (demux->priv->updates_task);
2190
2191   GST_MANIFEST_LOCK (demux);
2192
2193   list_to_process = demux->streams;
2194   for (i = 0; i < 2; ++i) {
2195     for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2196       GstAdaptiveDemuxStream *stream = iter->data;
2197
2198       stream->download_error_count = 0;
2199       stream->need_header = TRUE;
2200       stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
2201     }
2202     list_to_process = demux->prepared_streams;
2203   }
2204 }
2205
2206 /* must be called with manifest_lock taken */
2207 static gboolean
2208 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2209 {
2210   GList *iter;
2211   gboolean ret = TRUE;
2212
2213   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2214     GstAdaptiveDemuxStream *stream = iter->data;
2215     gst_event_ref (event);
2216     ret = ret & gst_pad_push_event (stream->pad, event);
2217   }
2218   gst_event_unref (event);
2219   return ret;
2220 }
2221
2222 /* must be called with manifest_lock taken */
2223 void
2224 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2225     GstCaps * caps)
2226 {
2227   GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2228       caps);
2229   gst_caps_replace (&stream->pending_caps, caps);
2230   gst_caps_unref (caps);
2231 }
2232
2233 /* must be called with manifest_lock taken */
2234 void
2235 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2236     GstTagList * tags)
2237 {
2238   GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2239       tags);
2240   if (stream->pending_tags) {
2241     gst_tag_list_unref (stream->pending_tags);
2242   }
2243   stream->pending_tags = tags;
2244 }
2245
2246 /* must be called with manifest_lock taken */
2247 void
2248 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2249     GstEvent * event)
2250 {
2251   stream->pending_events = g_list_append (stream->pending_events, event);
2252 }
2253
2254 /* must be called with manifest_lock taken */
2255 static guint64
2256 _update_average_bitrate (GstAdaptiveDemux * demux,
2257     GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2258 {
2259   gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2260
2261   stream->moving_bitrate -= stream->fragment_bitrates[index];
2262   stream->fragment_bitrates[index] = new_bitrate;
2263   stream->moving_bitrate += new_bitrate;
2264
2265   stream->moving_index += 1;
2266
2267   if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2268     return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2269   return stream->moving_bitrate / stream->moving_index;
2270 }
2271
2272 /* must be called with manifest_lock taken */
2273 static guint64
2274 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2275     GstAdaptiveDemuxStream * stream)
2276 {
2277   guint64 average_bitrate;
2278   guint64 fragment_bitrate;
2279
2280   if (demux->connection_speed) {
2281     GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2282         demux->connection_speed / 1000);
2283     stream->current_download_rate = demux->connection_speed;
2284     return demux->connection_speed;
2285   }
2286
2287   fragment_bitrate = stream->last_bitrate;
2288   GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2289       fragment_bitrate);
2290
2291   average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2292
2293   GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
2294       fragment_bitrate);
2295   GST_INFO_OBJECT (stream,
2296       "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2297       NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2298
2299   /* Conservative approach, make sure we don't upgrade too fast */
2300   stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2301
2302   stream->current_download_rate *= demux->bitrate_limit;
2303   GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2304       G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2305
2306 #if 0
2307   /* Debugging code, modulate the bitrate every few fragments */
2308   {
2309     static guint ctr = 0;
2310     if (ctr % 3 == 0) {
2311       GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2312       stream->current_download_rate /= 2;
2313     }
2314     ctr++;
2315   }
2316 #endif
2317
2318   return stream->current_download_rate;
2319 }
2320
2321 /* must be called with manifest_lock taken */
2322 static GstFlowReturn
2323 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2324 {
2325   gboolean all_notlinked = TRUE;
2326   gboolean all_eos = TRUE;
2327   GList *iter;
2328
2329   for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2330     GstAdaptiveDemuxStream *stream = iter->data;
2331
2332     if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2333       all_notlinked = FALSE;
2334       if (stream->last_ret != GST_FLOW_EOS)
2335         all_eos = FALSE;
2336     }
2337
2338     if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2339         || stream->last_ret == GST_FLOW_FLUSHING) {
2340       return stream->last_ret;
2341     }
2342   }
2343   if (all_notlinked)
2344     return GST_FLOW_NOT_LINKED;
2345   else if (all_eos)
2346     return GST_FLOW_EOS;
2347   return GST_FLOW_OK;
2348 }
2349
2350 /* Called with preroll_lock */
2351 static void
2352 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2353     GstAdaptiveDemuxStream * stream)
2354 {
2355   demux->priv->preroll_pending--;
2356   if (demux->priv->preroll_pending == 0) {
2357     /* That was the last one, time to release all streams
2358      * and expose them */
2359     GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2360     gst_adaptive_demux_expose_streams (demux);
2361     g_cond_broadcast (&demux->priv->preroll_cond);
2362   }
2363 }
2364
2365 /* must be called with manifest_lock taken.
2366  * Temporarily releases manifest_lock
2367  */
2368 GstFlowReturn
2369 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2370     GstBuffer * buffer)
2371 {
2372   GstAdaptiveDemux *demux = stream->demux;
2373   GstFlowReturn ret = GST_FLOW_OK;
2374   gboolean discont = FALSE;
2375   /* Pending events */
2376   GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2377   GList *pending_events = NULL;
2378
2379   /* FIXME : 
2380    * This is duplicating *exactly* the same thing as what is done at the beginning
2381    * of _src_chain if starting_fragment is TRUE */
2382   if (stream->first_fragment_buffer) {
2383     GstClockTime offset =
2384         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2385     GstClockTime period_start =
2386         gst_adaptive_demux_get_period_start_time (demux);
2387
2388     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2389     if (demux->segment.rate < 0)
2390       /* Set DISCONT flag for every first buffer in reverse playback mode
2391        * as each fragment for its own has to be reversed */
2392       discont = TRUE;
2393
2394     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2395     if (GST_BUFFER_PTS_IS_VALID (buffer))
2396       GST_BUFFER_PTS (buffer) += offset;
2397
2398     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2399       stream->segment.position = GST_BUFFER_PTS (buffer);
2400
2401       /* Convert from position inside the stream's segment to the demuxer's
2402        * segment, they are not necessarily the same */
2403       if (stream->segment.position - offset + period_start >
2404           demux->segment.position)
2405         demux->segment.position =
2406             stream->segment.position - offset + period_start;
2407     }
2408     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2409
2410     GST_LOG_OBJECT (stream->pad,
2411         "Going to push buffer with PTS %" GST_TIME_FORMAT,
2412         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2413   } else {
2414     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2415   }
2416
2417   if (stream->discont) {
2418     discont = TRUE;
2419     stream->discont = FALSE;
2420   }
2421
2422   if (discont) {
2423     GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2424     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2425   } else {
2426     GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2427   }
2428
2429   stream->first_fragment_buffer = FALSE;
2430
2431   GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2432   GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2433   if (G_UNLIKELY (stream->pending_caps)) {
2434     pending_caps = gst_event_new_caps (stream->pending_caps);
2435     gst_caps_unref (stream->pending_caps);
2436     stream->pending_caps = NULL;
2437   }
2438
2439   if (stream->do_block) {
2440
2441     g_mutex_lock (&demux->priv->preroll_lock);
2442
2443     /* If we are preroll state, set caps in here */
2444     if (pending_caps) {
2445       gst_pad_push_event (stream->pad, pending_caps);
2446       pending_caps = NULL;
2447     }
2448
2449     gst_adaptive_demux_handle_preroll (demux, stream);
2450     GST_MANIFEST_UNLOCK (demux);
2451
2452     while (stream->do_block && !stream->cancelled) {
2453       GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2454       g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2455     }
2456     if (stream->cancelled) {
2457       GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2458       gst_buffer_unref (buffer);
2459       g_mutex_unlock (&demux->priv->preroll_lock);
2460       return GST_FLOW_FLUSHING;
2461     }
2462
2463     g_mutex_unlock (&demux->priv->preroll_lock);
2464     GST_MANIFEST_LOCK (demux);
2465   }
2466
2467   if (G_UNLIKELY (stream->pending_segment)) {
2468     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2469     pending_segment = stream->pending_segment;
2470     stream->pending_segment = NULL;
2471     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2472   }
2473   if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2474     GstTagList *tags = stream->pending_tags;
2475
2476     stream->pending_tags = NULL;
2477     stream->bitrate_changed = 0;
2478
2479     if (stream->fragment.bitrate != 0) {
2480       if (tags)
2481         tags = gst_tag_list_make_writable (tags);
2482       else
2483         tags = gst_tag_list_new_empty ();
2484
2485       gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2486           GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2487     }
2488     if (tags)
2489       pending_tags = gst_event_new_tag (tags);
2490   }
2491   if (G_UNLIKELY (stream->pending_events)) {
2492     pending_events = stream->pending_events;
2493     stream->pending_events = NULL;
2494   }
2495
2496   GST_MANIFEST_UNLOCK (demux);
2497
2498   /* Do not push events or buffers holding the manifest lock */
2499   if (G_UNLIKELY (pending_caps)) {
2500     GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2501         pending_caps);
2502     gst_pad_push_event (stream->pad, pending_caps);
2503   }
2504   if (G_UNLIKELY (pending_segment)) {
2505     GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2506         pending_segment);
2507     gst_pad_push_event (stream->pad, pending_segment);
2508   }
2509   if (G_UNLIKELY (pending_tags)) {
2510     GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2511         pending_tags);
2512     gst_pad_push_event (stream->pad, pending_tags);
2513   }
2514   while (pending_events != NULL) {
2515     GstEvent *event = pending_events->data;
2516
2517     if (!gst_pad_push_event (stream->pad, event))
2518       GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2519
2520     pending_events = g_list_delete_link (pending_events, pending_events);
2521   }
2522
2523   /* Wait for preroll if blocking */
2524   GST_DEBUG_OBJECT (stream->pad,
2525       "About to push buffer of size %" G_GSIZE_FORMAT,
2526       gst_buffer_get_size (buffer));
2527
2528   ret = gst_pad_push (stream->pad, buffer);
2529
2530   GST_MANIFEST_LOCK (demux);
2531
2532   g_mutex_lock (&stream->fragment_download_lock);
2533   if (G_UNLIKELY (stream->cancelled)) {
2534     GST_LOG_OBJECT (stream, "Stream was cancelled");
2535     ret = stream->last_ret = GST_FLOW_FLUSHING;
2536     g_mutex_unlock (&stream->fragment_download_lock);
2537     return ret;
2538   }
2539   g_mutex_unlock (&stream->fragment_download_lock);
2540
2541   GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2542       gst_flow_get_name (ret));
2543
2544   return ret;
2545 }
2546
2547 /* must be called with manifest_lock taken */
2548 static GstFlowReturn
2549 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2550     GstAdaptiveDemuxStream * stream)
2551 {
2552   /* No need to advance, this isn't a real fragment */
2553   if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2554     return GST_FLOW_OK;
2555
2556   return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2557       stream->fragment.duration);
2558 }
2559
2560 /* must be called with manifest_lock taken.
2561  * Can temporarily release manifest_lock
2562  */
2563 static GstFlowReturn
2564 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2565     GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2566 {
2567   return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2568 }
2569
2570 static gboolean
2571 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2572     * demux)
2573 {
2574   return TRUE;
2575 }
2576
2577 static GstFlowReturn
2578 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2579 {
2580   GstAdaptiveDemuxStream *stream;
2581   GstAdaptiveDemux *demux;
2582   GstAdaptiveDemuxClass *klass;
2583   GstFlowReturn ret = GST_FLOW_OK;
2584
2585   demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2586   stream = gst_pad_get_element_private (pad);
2587   klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2588
2589   GST_MANIFEST_LOCK (demux);
2590
2591   /* do not make any changes if the stream is cancelled */
2592   g_mutex_lock (&stream->fragment_download_lock);
2593   if (G_UNLIKELY (stream->cancelled)) {
2594     g_mutex_unlock (&stream->fragment_download_lock);
2595     gst_buffer_unref (buffer);
2596     ret = stream->last_ret = GST_FLOW_FLUSHING;
2597     GST_MANIFEST_UNLOCK (demux);
2598     return ret;
2599   }
2600   g_mutex_unlock (&stream->fragment_download_lock);
2601
2602   /* starting_fragment is set to TRUE at the beginning of
2603    * _stream_download_fragment()
2604    * /!\ If there is a header/index being downloaded, then this will
2605    * be TRUE for the first one ... but FALSE for the remaining ones,
2606    * including the *actual* fragment ! */
2607   if (stream->starting_fragment) {
2608     GstClockTime offset =
2609         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2610     GstClockTime period_start =
2611         gst_adaptive_demux_get_period_start_time (demux);
2612
2613     stream->starting_fragment = FALSE;
2614     if (klass->start_fragment) {
2615       if (!klass->start_fragment (demux, stream)) {
2616         ret = GST_FLOW_ERROR;
2617         goto error;
2618       }
2619     }
2620
2621     GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2622     if (GST_BUFFER_PTS_IS_VALID (buffer))
2623       GST_BUFFER_PTS (buffer) += offset;
2624
2625     GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2626         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2627
2628     if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2629       GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2630       stream->segment.position = GST_BUFFER_PTS (buffer);
2631
2632       /* Convert from position inside the stream's segment to the demuxer's
2633        * segment, they are not necessarily the same */
2634       if (stream->segment.position - offset + period_start >
2635           demux->segment.position)
2636         demux->segment.position =
2637             stream->segment.position - offset + period_start;
2638       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2639     }
2640
2641   } else {
2642     GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2643   }
2644
2645   /* downloading_first_buffer is set to TRUE in download_uri() just before
2646    * activating the source (i.e. requesting a given URI)
2647    *
2648    * The difference with starting_fragment is that this will be called
2649    * for *all* first buffers (of index, and header, and fragment)
2650    *
2651    * ... to then only do something useful (in this block) for actual
2652    * fragments... */
2653   if (stream->downloading_first_buffer) {
2654     gint64 chunk_size = 0;
2655
2656     stream->downloading_first_buffer = FALSE;
2657
2658     if (!stream->downloading_header && !stream->downloading_index) {
2659       /* If this is the first buffer of a fragment (not the headers or index)
2660        * and we don't have a birate from the sub-class, then see if we
2661        * can work it out from the fragment size and duration */
2662       if (stream->fragment.bitrate == 0 &&
2663           stream->fragment.duration != 0 &&
2664           gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2665               &chunk_size)) {
2666         guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2667                 8 * GST_SECOND, stream->fragment.duration));
2668         GST_LOG_OBJECT (demux,
2669             "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2670             " = bitrate %u", chunk_size,
2671             GST_TIME_ARGS (stream->fragment.duration), bitrate);
2672         stream->fragment.bitrate = bitrate;
2673       }
2674       if (stream->fragment.bitrate) {
2675         stream->bitrate_changed = TRUE;
2676       } else {
2677         GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2678       }
2679     }
2680   }
2681
2682   stream->download_total_bytes += gst_buffer_get_size (buffer);
2683
2684   GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2685       gst_buffer_get_size (buffer));
2686
2687   ret = klass->data_received (demux, stream, buffer);
2688
2689   if (ret == GST_FLOW_FLUSHING) {
2690     /* do not make any changes if the stream is cancelled */
2691     g_mutex_lock (&stream->fragment_download_lock);
2692     if (G_UNLIKELY (stream->cancelled)) {
2693       g_mutex_unlock (&stream->fragment_download_lock);
2694       GST_MANIFEST_UNLOCK (demux);
2695       return ret;
2696     }
2697     g_mutex_unlock (&stream->fragment_download_lock);
2698   }
2699
2700   if (ret != GST_FLOW_OK) {
2701     gboolean finished = FALSE;
2702
2703     if (ret < GST_FLOW_EOS) {
2704       GST_ELEMENT_FLOW_ERROR (demux, ret);
2705
2706       /* TODO push this on all pads */
2707       gst_pad_push_event (stream->pad, gst_event_new_eos ());
2708     } else {
2709       GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2710           gst_flow_get_name (ret));
2711     }
2712
2713     if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2714       ret = GST_FLOW_EOS;       /* return EOS to make the source stop */
2715     } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
2716       /* Behaves like an EOS event from upstream */
2717       stream->fragment.finished = TRUE;
2718       ret = klass->finish_fragment (demux, stream);
2719       if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2720         ret = GST_FLOW_EOS;     /* return EOS to make the source stop */
2721       } else if (ret != GST_FLOW_OK) {
2722         goto error;
2723       }
2724       finished = TRUE;
2725     }
2726
2727     gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2728     if (finished)
2729       ret = GST_FLOW_EOS;
2730   }
2731
2732 error:
2733
2734   GST_MANIFEST_UNLOCK (demux);
2735
2736   return ret;
2737 }
2738
2739 /* must be called with manifest_lock taken */
2740 static void
2741 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2742     stream, GstFlowReturn ret, GError * err)
2743 {
2744   GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2745       gst_flow_get_name (ret), err);
2746
2747   /* if we have an error, only replace last_ret if it was OK before to avoid
2748    * overwriting the first error we got */
2749   if (stream->last_ret == GST_FLOW_OK) {
2750     stream->last_ret = ret;
2751     if (err) {
2752       g_clear_error (&stream->last_error);
2753       stream->last_error = g_error_copy (err);
2754     }
2755   }
2756   g_mutex_lock (&stream->fragment_download_lock);
2757   stream->download_finished = TRUE;
2758   g_cond_signal (&stream->fragment_download_cond);
2759   g_mutex_unlock (&stream->fragment_download_lock);
2760 }
2761
2762 static GstFlowReturn
2763 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
2764 {
2765   GstFlowReturn ret = GST_FLOW_OK;
2766   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
2767
2768   if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
2769       || !klass->need_another_chunk (stream)
2770       || stream->fragment.chunk_size == 0) {
2771     stream->fragment.finished = TRUE;
2772     ret = klass->finish_fragment (stream->demux, stream);
2773   }
2774   gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2775
2776   return ret;
2777 }
2778
2779 static gboolean
2780 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2781 {
2782   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2783   GstAdaptiveDemux *demux = stream->demux;
2784
2785   switch (GST_EVENT_TYPE (event)) {
2786     case GST_EVENT_EOS:{
2787       GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
2788       GST_MANIFEST_LOCK (demux);
2789
2790       gst_adaptive_demux_eos_handling (stream);
2791
2792       /* FIXME ?
2793        * _eos_handling() calls  fragment_download_finish() which does the
2794        * same thing as below.
2795        * Could this cause races ? */
2796       g_mutex_lock (&stream->fragment_download_lock);
2797       stream->download_finished = TRUE;
2798       g_cond_signal (&stream->fragment_download_cond);
2799       g_mutex_unlock (&stream->fragment_download_lock);
2800
2801       GST_MANIFEST_UNLOCK (demux);
2802       break;
2803     }
2804     default:
2805       break;
2806   }
2807
2808   gst_event_unref (event);
2809
2810   return TRUE;
2811 }
2812
2813 static gboolean
2814 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2815 {
2816   GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2817
2818   switch (GST_QUERY_TYPE (query)) {
2819     case GST_QUERY_ALLOCATION:
2820       return FALSE;
2821       break;
2822     default:
2823       break;
2824   }
2825
2826   return gst_pad_peer_query (stream->pad, query);
2827 }
2828
2829 static GstPadProbeReturn
2830 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
2831     GstAdaptiveDemuxStream * stream)
2832 {
2833   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2834
2835   if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
2836     GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2837     if (stream->fragment_bytes_downloaded == 0) {
2838       stream->last_latency =
2839           gst_adaptive_demux_get_monotonic_time (stream->demux) -
2840           (stream->download_start_time * GST_USECOND);
2841       GST_DEBUG_OBJECT (pad,
2842           "FIRST BYTE since download_start %" GST_TIME_FORMAT,
2843           GST_TIME_ARGS (stream->last_latency));
2844     }
2845     stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
2846     GST_LOG_OBJECT (pad,
2847         "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
2848         gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
2849   } else if (GST_PAD_PROBE_INFO_TYPE (info) &
2850       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
2851     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2852     GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
2853         GST_EVENT_TYPE_NAME (ev), ev);
2854     switch (GST_EVENT_TYPE (ev)) {
2855       case GST_EVENT_SEGMENT:
2856         stream->fragment_bytes_downloaded = 0;
2857         break;
2858       case GST_EVENT_EOS:
2859       {
2860         stream->last_download_time =
2861             gst_adaptive_demux_get_monotonic_time (stream->demux) -
2862             (stream->download_start_time * GST_USECOND);
2863         stream->last_bitrate =
2864             gst_util_uint64_scale (stream->fragment_bytes_downloaded,
2865             8 * GST_SECOND, stream->last_download_time);
2866         GST_DEBUG_OBJECT (pad,
2867             "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
2868             G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
2869             stream->last_bitrate);
2870         /* Calculate bitrate since URI request */
2871       }
2872         break;
2873       default:
2874         break;
2875     }
2876   }
2877
2878   return ret;
2879 }
2880
2881 /* must be called with manifest_lock taken.
2882  * Can temporarily release manifest_lock
2883  */
2884 static gboolean
2885 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2886     GstAdaptiveDemuxStream * stream)
2887 {
2888   gboolean ret = TRUE;
2889
2890   /* Wait until we're cancelled or there's something for
2891    * us to download in the playlist or the playlist
2892    * became non-live */
2893   while (TRUE) {
2894     GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2895
2896     /* get the manifest_update_lock while still holding the manifest_lock.
2897      * This will prevent other threads to signal the condition (they will need
2898      * both manifest_lock and manifest_update_lock in order to signal).
2899      * It cannot deadlock because all threads always get the manifest_lock first
2900      * and manifest_update_lock second.
2901      */
2902     g_mutex_lock (&demux->priv->manifest_update_lock);
2903
2904     GST_MANIFEST_UNLOCK (demux);
2905
2906     g_cond_wait (&demux->priv->manifest_cond,
2907         &demux->priv->manifest_update_lock);
2908     g_mutex_unlock (&demux->priv->manifest_update_lock);
2909
2910     GST_MANIFEST_LOCK (demux);
2911
2912     /* check for cancelled every time we get the manifest_lock */
2913     g_mutex_lock (&stream->fragment_download_lock);
2914     if (G_UNLIKELY (stream->cancelled)) {
2915       ret = FALSE;
2916       stream->last_ret = GST_FLOW_FLUSHING;
2917       g_mutex_unlock (&stream->fragment_download_lock);
2918       break;
2919     }
2920     g_mutex_unlock (&stream->fragment_download_lock);
2921
2922     /* Got a new fragment or not live anymore? */
2923     if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
2924         GST_FLOW_OK) {
2925       GST_DEBUG_OBJECT (demux, "new fragment available, "
2926           "not waiting for manifest update");
2927       ret = TRUE;
2928       break;
2929     }
2930
2931     if (!gst_adaptive_demux_is_live (demux)) {
2932       GST_DEBUG_OBJECT (demux, "Not live anymore, "
2933           "not waiting for manifest update");
2934       ret = FALSE;
2935       break;
2936     }
2937   }
2938   GST_DEBUG_OBJECT (demux, "Retrying now");
2939   return ret;
2940 }
2941
2942 /* must be called with manifest_lock taken */
2943 static gboolean
2944 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2945     const gchar * uri, const gchar * referer, gboolean refresh,
2946     gboolean allow_cache)
2947 {
2948   GstAdaptiveDemux *demux = stream->demux;
2949
2950   if (!gst_uri_is_valid (uri)) {
2951     GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2952     return FALSE;
2953   }
2954
2955   /* Try to re-use existing source element */
2956   if (stream->src != NULL) {
2957     gchar *old_protocol, *new_protocol;
2958     gchar *old_uri;
2959
2960     old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2961     old_protocol = gst_uri_get_protocol (old_uri);
2962     new_protocol = gst_uri_get_protocol (uri);
2963
2964     if (!g_str_equal (old_protocol, new_protocol)) {
2965       GstElement *src = stream->src;
2966
2967       stream->src = NULL;
2968       gst_object_unref (stream->src_srcpad);
2969       stream->src_srcpad = NULL;
2970       GST_MANIFEST_UNLOCK (demux);
2971       gst_element_set_locked_state (src, TRUE);
2972       gst_element_set_state (src, GST_STATE_NULL);
2973       gst_bin_remove (GST_BIN_CAST (demux), src);
2974       GST_MANIFEST_LOCK (demux);
2975       GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2976     } else {
2977       GError *err = NULL;
2978
2979       GST_DEBUG_OBJECT (demux, "Re-using old source element");
2980       if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
2981               &err)) {
2982         GstElement *src = stream->src;
2983
2984         stream->src = NULL;
2985         GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
2986             err ? err->message : "Unknown error");
2987         g_clear_error (&err);
2988         gst_object_unref (stream->src_srcpad);
2989         stream->src_srcpad = NULL;
2990         GST_MANIFEST_UNLOCK (demux);
2991         gst_element_set_locked_state (src, TRUE);
2992         gst_element_set_state (src, GST_STATE_NULL);
2993         gst_bin_remove (GST_BIN_CAST (demux), src);
2994         GST_MANIFEST_LOCK (demux);
2995       }
2996     }
2997     g_free (old_uri);
2998     g_free (old_protocol);
2999     g_free (new_protocol);
3000   }
3001
3002   if (stream->src == NULL) {
3003     GstPad *uri_handler_src;
3004     GstPad *queue_sink;
3005     GstPad *queue_src;
3006     GstElement *uri_handler;
3007     GstElement *queue;
3008     GstPadLinkReturn pad_link_ret;
3009     GObjectClass *gobject_class;
3010     gchar *internal_name, *bin_name;
3011
3012     /* Our src consists of a bin containing uri_handler -> queue . The
3013      * purpose of the queue is to allow the uri_handler to download an
3014      * entire fragment without blocking, so we can accurately measure the
3015      * download bitrate. */
3016
3017     queue = gst_element_factory_make ("queue", NULL);
3018     if (queue == NULL)
3019       return FALSE;
3020
3021     g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
3022     g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
3023     g_object_set (queue, "max-size-time", (guint64) 0, NULL);
3024
3025     uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3026     if (uri_handler == NULL) {
3027       GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
3028           ("Missing plugin to handle URI: '%s'", uri), (NULL));
3029       gst_object_unref (queue);
3030       return FALSE;
3031     }
3032
3033     gobject_class = G_OBJECT_GET_CLASS (uri_handler);
3034
3035     if (g_object_class_find_property (gobject_class, "compress"))
3036       g_object_set (uri_handler, "compress", FALSE, NULL);
3037     if (g_object_class_find_property (gobject_class, "keep-alive"))
3038       g_object_set (uri_handler, "keep-alive", TRUE, NULL);
3039     if (g_object_class_find_property (gobject_class, "extra-headers")) {
3040       if (referer || refresh || !allow_cache) {
3041         GstStructure *extra_headers = gst_structure_new_empty ("headers");
3042
3043         if (referer)
3044           gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3045               NULL);
3046
3047         if (!allow_cache)
3048           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3049               "no-cache", NULL);
3050         else if (refresh)
3051           gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3052               "max-age=0", NULL);
3053
3054         g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3055
3056         gst_structure_free (extra_headers);
3057       } else {
3058         g_object_set (uri_handler, "extra-headers", NULL, NULL);
3059       }
3060     }
3061
3062     /* Source bin creation */
3063     bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3064     stream->src = gst_bin_new (bin_name);
3065     g_free (bin_name);
3066     if (stream->src == NULL) {
3067       gst_object_unref (queue);
3068       gst_object_unref (uri_handler);
3069       return FALSE;
3070     }
3071
3072     gst_bin_add (GST_BIN_CAST (stream->src), queue);
3073     gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3074
3075     uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3076     queue_sink = gst_element_get_static_pad (queue, "sink");
3077
3078     pad_link_ret =
3079         gst_pad_link_full (uri_handler_src, queue_sink,
3080         GST_PAD_LINK_CHECK_NOTHING);
3081     if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3082       GST_WARNING_OBJECT (demux,
3083           "Could not link pads %s:%s to %s:%s for reason %d",
3084           GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3085           pad_link_ret);
3086       g_object_unref (queue_sink);
3087       g_object_unref (uri_handler_src);
3088       gst_object_unref (stream->src);
3089       stream->src = NULL;
3090       return FALSE;
3091     }
3092
3093     /* Add a downstream event and data probe */
3094     gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3095         (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3096
3097     g_object_unref (queue_sink);
3098     g_object_unref (uri_handler_src);
3099     queue_src = gst_element_get_static_pad (queue, "src");
3100     stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3101     g_object_unref (queue_src);
3102     gst_element_add_pad (stream->src, stream->src_srcpad);
3103
3104     gst_element_set_locked_state (stream->src, TRUE);
3105     gst_bin_add (GST_BIN_CAST (demux), stream->src);
3106     stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3107
3108     /* set up our internal floating pad to drop all events from
3109      * the http src we don't care about. On the chain function
3110      * we just push the buffer forward */
3111     internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3112     stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3113     g_free (internal_name);
3114     gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3115         GST_OBJECT_CAST (demux));
3116     GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3117     gst_pad_set_element_private (stream->internal_pad, stream);
3118     gst_pad_set_active (stream->internal_pad, TRUE);
3119     gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3120     gst_pad_set_event_function (stream->internal_pad, _src_event);
3121     gst_pad_set_query_function (stream->internal_pad, _src_query);
3122
3123     if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3124             GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3125       GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3126       return FALSE;
3127     }
3128
3129     stream->uri_handler = uri_handler;
3130     stream->queue = queue;
3131
3132     stream->last_status_code = 200;     /* default to OK */
3133   }
3134   return TRUE;
3135 }
3136
3137 static GstPadProbeReturn
3138 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3139     gpointer user_data)
3140 {
3141   GstAdaptiveDemuxStream *stream = user_data;
3142
3143   /* The source's src pad is IDLE so now set the state to READY */
3144   g_mutex_lock (&stream->fragment_download_lock);
3145   stream->src_at_ready = TRUE;
3146   g_cond_signal (&stream->fragment_download_cond);
3147   g_mutex_unlock (&stream->fragment_download_lock);
3148
3149   return GST_PAD_PROBE_REMOVE;
3150 }
3151
3152 #ifndef GST_DISABLE_GST_DEBUG
3153 static const char *
3154 uritype (GstAdaptiveDemuxStream * s)
3155 {
3156   if (s->downloading_header)
3157     return "header";
3158   if (s->downloading_index)
3159     return "index";
3160   return "fragment";
3161 }
3162 #endif
3163
3164 /* must be called with manifest_lock taken.
3165  * Can temporarily release manifest_lock
3166  *
3167  * Will return when URI is fully downloaded (or aborted/errored)
3168  */
3169 static GstFlowReturn
3170 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3171     GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3172     gint64 end, guint * http_status)
3173 {
3174   GstFlowReturn ret = GST_FLOW_OK;
3175   GST_DEBUG_OBJECT (stream->pad,
3176       "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3177       uritype (stream), uri, start, end);
3178
3179   if (http_status)
3180     *http_status = 200;         /* default to ok if no further information */
3181
3182   if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3183     ret = stream->last_ret = GST_FLOW_ERROR;
3184     return ret;
3185   }
3186
3187   gst_element_set_locked_state (stream->src, TRUE);
3188
3189   GST_MANIFEST_UNLOCK (demux);
3190   if (gst_element_set_state (stream->src,
3191           GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3192     /* If ranges are specified, seek to it */
3193     if (start != 0 || end != -1) {
3194       /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3195        * stop position */
3196       if (end != -1)
3197         end += 1;
3198       /* Send the seek event to the uri_handler, as the other pipeline elements
3199        * can't handle it when READY. */
3200       if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3201                   GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3202                   GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3203
3204         GST_MANIFEST_LOCK (demux);
3205         /* looks like the source can't handle seeks in READY */
3206         g_clear_error (&stream->last_error);
3207         stream->last_error = g_error_new (GST_CORE_ERROR,
3208             GST_CORE_ERROR_NOT_IMPLEMENTED,
3209             "Source element can't handle range requests");
3210         stream->last_ret = GST_FLOW_ERROR;
3211       } else {
3212         GST_MANIFEST_LOCK (demux);
3213       }
3214     } else {
3215       GST_MANIFEST_LOCK (demux);
3216     }
3217
3218     if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3219       stream->download_start_time =
3220           GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3221
3222       /* src element is in state READY. Before we start it, we reset
3223        * download_finished
3224        */
3225       g_mutex_lock (&stream->fragment_download_lock);
3226       stream->download_finished = FALSE;
3227       stream->downloading_first_buffer = TRUE;
3228       g_mutex_unlock (&stream->fragment_download_lock);
3229
3230       GST_MANIFEST_UNLOCK (demux);
3231
3232       if (!gst_element_sync_state_with_parent (stream->src)) {
3233         GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3234         GST_MANIFEST_LOCK (demux);
3235         ret = stream->last_ret = GST_FLOW_ERROR;
3236         return ret;
3237       }
3238
3239       /* wait for the fragment to be completely downloaded */
3240       GST_DEBUG_OBJECT (stream->pad,
3241           "Waiting for %s download to finish: %s", uritype (stream), uri);
3242
3243       g_mutex_lock (&stream->fragment_download_lock);
3244       stream->src_at_ready = FALSE;
3245       if (G_UNLIKELY (stream->cancelled)) {
3246         g_mutex_unlock (&stream->fragment_download_lock);
3247         GST_MANIFEST_LOCK (demux);
3248         ret = stream->last_ret = GST_FLOW_FLUSHING;
3249         return ret;
3250       }
3251       /* download_finished is only set:
3252        * * in ::fragment_download_finish()
3253        * * if EOS is received on the _src pad
3254        * */
3255       while (!stream->cancelled && !stream->download_finished) {
3256         g_cond_wait (&stream->fragment_download_cond,
3257             &stream->fragment_download_lock);
3258       }
3259       g_mutex_unlock (&stream->fragment_download_lock);
3260
3261       GST_DEBUG_OBJECT (stream->pad,
3262           "Finished Waiting for %s download: %s", uritype (stream), uri);
3263
3264       GST_MANIFEST_LOCK (demux);
3265       g_mutex_lock (&stream->fragment_download_lock);
3266       if (G_UNLIKELY (stream->cancelled)) {
3267         ret = stream->last_ret = GST_FLOW_FLUSHING;
3268         g_mutex_unlock (&stream->fragment_download_lock);
3269         return ret;
3270       }
3271       g_mutex_unlock (&stream->fragment_download_lock);
3272
3273       ret = stream->last_ret;
3274
3275       GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3276           uritype (stream), uri, stream->last_ret,
3277           gst_flow_get_name (stream->last_ret));
3278       if (stream->last_ret != GST_FLOW_OK && http_status) {
3279         *http_status = stream->last_status_code;
3280       }
3281     }
3282
3283     /* changing src element state might try to join the streaming thread, so
3284      * we must not hold the manifest lock.
3285      */
3286     GST_MANIFEST_UNLOCK (demux);
3287   } else {
3288     GST_MANIFEST_UNLOCK (demux);
3289     if (stream->last_ret == GST_FLOW_OK)
3290       stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3291     ret = GST_FLOW_CUSTOM_ERROR;
3292   }
3293
3294   stream->src_at_ready = FALSE;
3295
3296   gst_element_set_locked_state (stream->src, TRUE);
3297   gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3298       gst_ad_stream_src_to_ready_cb, stream, NULL);
3299
3300   g_mutex_lock (&stream->fragment_download_lock);
3301   while (!stream->src_at_ready) {
3302     g_cond_wait (&stream->fragment_download_cond,
3303         &stream->fragment_download_lock);
3304   }
3305   g_mutex_unlock (&stream->fragment_download_lock);
3306
3307   gst_element_set_state (stream->src, GST_STATE_READY);
3308
3309   /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3310   GST_MANIFEST_LOCK (demux);
3311   g_mutex_lock (&stream->fragment_download_lock);
3312   if (G_UNLIKELY (stream->cancelled)) {
3313     ret = stream->last_ret = GST_FLOW_FLUSHING;
3314     g_mutex_unlock (&stream->fragment_download_lock);
3315     return ret;
3316   }
3317   g_mutex_unlock (&stream->fragment_download_lock);
3318
3319   /* deactivate and reactivate our ghostpad to make it fresh for a new
3320    * stream */
3321   gst_pad_set_active (stream->internal_pad, FALSE);
3322   gst_pad_set_active (stream->internal_pad, TRUE);
3323
3324   return ret;
3325 }
3326
3327 /* must be called with manifest_lock taken.
3328  * Can temporarily release manifest_lock
3329  */
3330 static GstFlowReturn
3331 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3332     stream)
3333 {
3334   GstAdaptiveDemux *demux = stream->demux;
3335   GstFlowReturn ret = GST_FLOW_OK;
3336
3337   if (stream->fragment.header_uri != NULL) {
3338     GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3339         G_GINT64_FORMAT, stream->fragment.header_uri,
3340         stream->fragment.header_range_start, stream->fragment.header_range_end);
3341
3342     stream->downloading_header = TRUE;
3343     ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3344         stream->fragment.header_uri, stream->fragment.header_range_start,
3345         stream->fragment.header_range_end, NULL);
3346     stream->downloading_header = FALSE;
3347   }
3348
3349   /* check if we have an index */
3350   if (ret == GST_FLOW_OK) {     /* TODO check for other valid types */
3351
3352     if (stream->fragment.index_uri != NULL) {
3353       GST_DEBUG_OBJECT (demux,
3354           "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3355           stream->fragment.index_uri,
3356           stream->fragment.index_range_start, stream->fragment.index_range_end);
3357       stream->downloading_index = TRUE;
3358       ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3359           stream->fragment.index_uri, stream->fragment.index_range_start,
3360           stream->fragment.index_range_end, NULL);
3361       stream->downloading_index = FALSE;
3362     }
3363   }
3364
3365   return ret;
3366 }
3367
3368 /* must be called with manifest_lock taken.
3369  * Can temporarily release manifest_lock
3370  */
3371 static GstFlowReturn
3372 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3373 {
3374   GstAdaptiveDemux *demux = stream->demux;
3375   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3376   gchar *url = NULL;
3377   GstFlowReturn ret;
3378   gboolean retried_once = FALSE, live;
3379   guint http_status;
3380   guint last_status_code;
3381
3382   /* FIXME :  */
3383   /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3384   stream->starting_fragment = TRUE;
3385   stream->last_ret = GST_FLOW_OK;
3386   stream->first_fragment_buffer = TRUE;
3387
3388   GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3389       stream->fragment.uri ? "FRAGMENT " : "",
3390       stream->fragment.header_uri ? "HEADER " : "",
3391       stream->fragment.index_uri ? "INDEX" : "");
3392
3393   if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3394       stream->fragment.index_uri == NULL)
3395     goto no_url_error;
3396
3397   if (stream->need_header) {
3398     ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3399     if (ret != GST_FLOW_OK) {
3400       return ret;
3401     }
3402     stream->need_header = FALSE;
3403   }
3404
3405 again:
3406   ret = GST_FLOW_OK;
3407   url = stream->fragment.uri;
3408   GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3409   if (!url)
3410     return ret;
3411
3412   stream->last_ret = GST_FLOW_OK;
3413   http_status = 200;
3414
3415   /* Download the actual fragment, either in fragments or in one go */
3416   if (klass->need_another_chunk && klass->need_another_chunk (stream)
3417       && stream->fragment.chunk_size != 0) {
3418     /* Handle chunk downloading */
3419     gint64 range_start, range_end, chunk_start, chunk_end;
3420     guint64 download_total_bytes;
3421     gint chunk_size = stream->fragment.chunk_size;
3422
3423     range_start = chunk_start = stream->fragment.range_start;
3424     range_end = stream->fragment.range_end;
3425     /* HTTP ranges are inclusive for the end */
3426     if (chunk_size != -1)
3427       chunk_end = range_start + chunk_size - 1;
3428     else
3429       chunk_end = range_end;
3430
3431     if (range_end != -1)
3432       chunk_end = MIN (chunk_end, range_end);
3433
3434     while (!stream->fragment.finished && (chunk_start <= range_end
3435             || range_end == -1)) {
3436       download_total_bytes = stream->download_total_bytes;
3437
3438       ret =
3439           gst_adaptive_demux_stream_download_uri (demux, stream, url,
3440           chunk_start, chunk_end, &http_status);
3441
3442       GST_DEBUG_OBJECT (stream->pad,
3443           "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3444           http_status, gst_flow_get_name (stream->last_ret));
3445
3446       /* Don't retry for any chunks except the first. We would have sent
3447        * data downstream already otherwise and it's difficult to recover
3448        * from that in a meaningful way */
3449       if (chunk_start > range_start)
3450         retried_once = TRUE;
3451
3452       /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3453        * downloading up to -1. We don't know the full duration.
3454        * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3455       if (ret != GST_FLOW_OK && chunk_end == -1) {
3456         break;
3457       } else if (ret != GST_FLOW_OK) {
3458         chunk_end = -1;
3459         stream->last_ret = GST_FLOW_OK;
3460         continue;
3461       }
3462
3463       if (chunk_end == -1)
3464         break;
3465
3466       /* Short read, we're at the end now */
3467       if (stream->download_total_bytes - download_total_bytes <
3468           chunk_end + 1 - chunk_start)
3469         break;
3470
3471       if (!klass->need_another_chunk (stream))
3472         break;
3473
3474       /* HTTP ranges are inclusive for the end */
3475       chunk_start += chunk_size;
3476       chunk_size = stream->fragment.chunk_size;
3477       if (chunk_size != -1)
3478         chunk_end = chunk_start + chunk_size - 1;
3479       else
3480         chunk_end = range_end;
3481
3482       if (range_end != -1)
3483         chunk_end = MIN (chunk_end, range_end);
3484     }
3485   } else {
3486     ret =
3487         gst_adaptive_demux_stream_download_uri (demux, stream, url,
3488         stream->fragment.range_start, stream->fragment.range_end, &http_status);
3489     GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3490         stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3491   }
3492   if (ret == GST_FLOW_OK)
3493     goto beach;
3494
3495   g_mutex_lock (&stream->fragment_download_lock);
3496   if (G_UNLIKELY (stream->cancelled)) {
3497     g_mutex_unlock (&stream->fragment_download_lock);
3498     return ret;
3499   }
3500   g_mutex_unlock (&stream->fragment_download_lock);
3501
3502   /* TODO check if we are truly stopping */
3503   if (ret != GST_FLOW_CUSTOM_ERROR)
3504     goto beach;
3505
3506   last_status_code = stream->last_status_code;
3507   GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3508       last_status_code, stream->download_error_count);
3509
3510   live = gst_adaptive_demux_is_live (demux);
3511   if (!retried_once && ((last_status_code / 100 == 4 && live)
3512           || last_status_code / 100 == 5)) {
3513     /* 4xx/5xx */
3514     /* if current position is before available start, switch to next */
3515     if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3516       goto flushing;
3517
3518     if (live) {
3519       gint64 range_start, range_stop;
3520
3521       if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3522               &range_stop))
3523         goto flushing;
3524
3525       if (demux->segment.position < range_start) {
3526         GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3527         stream->last_ret = GST_FLOW_OK;
3528         ret = gst_adaptive_demux_eos_handling (stream);
3529         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3530             gst_flow_get_name (ret));
3531         GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3532         ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3533         GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3534             gst_flow_get_name (ret));
3535         if (ret == GST_FLOW_OK) {
3536           retried_once = TRUE;
3537           goto again;
3538         }
3539       } else if (demux->segment.position > range_stop) {
3540         /* wait a bit to be in range, we don't have any locks at that point */
3541         gint64 wait_time =
3542             gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3543         if (wait_time > 0) {
3544           gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3545
3546           GST_DEBUG_OBJECT (stream->pad,
3547               "Download waiting for %" GST_TIME_FORMAT,
3548               GST_TIME_ARGS (wait_time));
3549
3550           GST_MANIFEST_UNLOCK (demux);
3551           g_mutex_lock (&stream->fragment_download_lock);
3552           if (G_UNLIKELY (stream->cancelled)) {
3553             g_mutex_unlock (&stream->fragment_download_lock);
3554             GST_MANIFEST_LOCK (demux);
3555             stream->last_ret = GST_FLOW_FLUSHING;
3556             goto flushing;
3557           }
3558           do {
3559             g_cond_wait_until (&stream->fragment_download_cond,
3560                 &stream->fragment_download_lock, end_time);
3561             if (G_UNLIKELY (stream->cancelled)) {
3562               g_mutex_unlock (&stream->fragment_download_lock);
3563               GST_MANIFEST_LOCK (demux);
3564               stream->last_ret = GST_FLOW_FLUSHING;
3565               goto flushing;
3566             }
3567           } while (!stream->download_finished);
3568           g_mutex_unlock (&stream->fragment_download_lock);
3569
3570           GST_MANIFEST_LOCK (demux);
3571         }
3572       }
3573     }
3574
3575   flushing:
3576     if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3577       /* looks like there is no way of knowing when a live stream has ended
3578        * Have to assume we are falling behind and cause a manifest reload */
3579       GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3580       return GST_FLOW_EOS;
3581     }
3582   } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3583     /* If this is the last fragment, consider failures EOS and not actual
3584      * errors. Due to rounding errors in the durations, the last fragment
3585      * might not actually exist */
3586     GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3587     return GST_FLOW_EOS;
3588   } else {
3589     /* retry once (same segment) for 5xx (server errors) */
3590     if (!retried_once) {
3591       retried_once = TRUE;
3592       /* wait a short time in case the server needs a bit to recover, we don't
3593        * care if we get woken up before end time. We can use sleep here since
3594        * we're already blocking and just want to wait some time. */
3595       g_usleep (100000);        /* a tenth of a second */
3596       goto again;
3597     }
3598   }
3599
3600 beach:
3601   return ret;
3602
3603 no_url_error:
3604   {
3605     GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3606         (_("Failed to get fragment URL.")),
3607         ("An error happened when getting fragment URL"));
3608     gst_task_stop (stream->download_task);
3609     return GST_FLOW_ERROR;
3610   }
3611 }
3612
3613 /* this function will take the manifest_lock and will keep it until the end.
3614  * It will release it temporarily only when going to sleep.
3615  * Every time it takes the manifest_lock, it will check for cancelled condition
3616  */
3617 static void
3618 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3619 {
3620   GstAdaptiveDemux *demux = stream->demux;
3621   GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3622   GstFlowReturn ret;
3623   gboolean live;
3624
3625   GST_LOG_OBJECT (stream->pad, "download loop start");
3626
3627   GST_MANIFEST_LOCK (demux);
3628
3629   g_mutex_lock (&stream->fragment_download_lock);
3630   if (G_UNLIKELY (stream->cancelled)) {
3631     stream->last_ret = GST_FLOW_FLUSHING;
3632     g_mutex_unlock (&stream->fragment_download_lock);
3633     goto cancelled;
3634   }
3635   g_mutex_unlock (&stream->fragment_download_lock);
3636
3637   /* Check if we're done with our segment */
3638   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3639   if (demux->segment.rate > 0) {
3640     if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3641         && stream->segment.position >= stream->segment.stop) {
3642       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3643       ret = GST_FLOW_EOS;
3644       gst_task_stop (stream->download_task);
3645       goto end_of_manifest;
3646     }
3647   } else {
3648     if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3649         && stream->segment.position <= stream->segment.start) {
3650       GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3651       ret = GST_FLOW_EOS;
3652       gst_task_stop (stream->download_task);
3653       goto end_of_manifest;
3654     }
3655   }
3656   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3657
3658   /* Cleanup old streams if any */
3659   if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3660     GList *old_streams = demux->priv->old_streams;
3661     demux->priv->old_streams = NULL;
3662
3663     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
3664     g_list_free_full (old_streams,
3665         (GDestroyNotify) gst_adaptive_demux_stream_free);
3666     GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
3667
3668     /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
3669      * Recheck the cancelled flag.
3670      */
3671     g_mutex_lock (&stream->fragment_download_lock);
3672     if (G_UNLIKELY (stream->cancelled)) {
3673       stream->last_ret = GST_FLOW_FLUSHING;
3674       g_mutex_unlock (&stream->fragment_download_lock);
3675       goto cancelled;
3676     }
3677     g_mutex_unlock (&stream->fragment_download_lock);
3678   }
3679
3680   /* Restarting download, figure out new position
3681    * FIXME : Move this to a separate function ? */
3682   if (G_UNLIKELY (stream->restart_download)) {
3683     GstEvent *seg_event;
3684     GstClockTime cur, ts = 0;
3685     gint64 pos;
3686
3687     GST_DEBUG_OBJECT (stream->pad,
3688         "Activating stream due to reconfigure event");
3689
3690     if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
3691       ts = (GstClockTime) pos;
3692       GST_DEBUG_OBJECT (demux, "Downstream position: %"
3693           GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3694     } else {
3695       /* query other pads as some faulty element in the pad's branch might
3696        * reject position queries. This should be better than using the
3697        * demux segment position that can be much ahead */
3698       GList *iter;
3699
3700       for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
3701         GstAdaptiveDemuxStream *cur_stream =
3702             (GstAdaptiveDemuxStream *) iter->data;
3703
3704         if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
3705                 &pos)) {
3706           ts = (GstClockTime) pos;
3707           GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
3708               GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3709           break;
3710         }
3711       }
3712     }
3713
3714     GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3715     cur =
3716         gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
3717         stream->segment.position);
3718
3719     /* we might have already pushed this data */
3720     ts = MAX (ts, cur);
3721
3722     GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
3723         "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3724
3725     if (GST_CLOCK_TIME_IS_VALID (ts)) {
3726       GstClockTime offset, period_start;
3727
3728       offset =
3729           gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3730       period_start = gst_adaptive_demux_get_period_start_time (demux);
3731
3732       /* TODO check return */
3733       gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
3734           0, ts, &ts);
3735
3736       stream->segment.position = ts - period_start + offset;
3737     }
3738
3739     /* The stream's segment is still correct except for
3740      * the position, so let's send a new one with the
3741      * updated position */
3742     seg_event = gst_event_new_segment (&stream->segment);
3743     gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
3744     GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3745
3746     GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
3747         GST_PTR_FORMAT, seg_event);
3748     gst_pad_push_event (stream->pad, seg_event);
3749
3750     stream->discont = TRUE;
3751     stream->restart_download = FALSE;
3752   }
3753
3754   live = gst_adaptive_demux_is_live (demux);
3755
3756   /* Get information about the fragment to download */
3757   GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3758   ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3759   GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
3760       ret, gst_flow_get_name (ret));
3761   if (ret == GST_FLOW_OK) {
3762
3763     /* wait for live fragments to be available */
3764     if (live) {
3765       gint64 wait_time =
3766           gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3767       if (wait_time > 0) {
3768         GstClockTime end_time =
3769             gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
3770
3771         GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
3772             GST_TIME_ARGS (wait_time));
3773
3774         GST_MANIFEST_UNLOCK (demux);
3775
3776         g_mutex_lock (&stream->fragment_download_lock);
3777         if (G_UNLIKELY (stream->cancelled)) {
3778           g_mutex_unlock (&stream->fragment_download_lock);
3779           GST_MANIFEST_LOCK (demux);
3780           stream->last_ret = GST_FLOW_FLUSHING;
3781           goto cancelled;
3782         }
3783         gst_adaptive_demux_wait_until (demux->realtime_clock,
3784             &stream->fragment_download_cond, &stream->fragment_download_lock,
3785             end_time);
3786         g_mutex_unlock (&stream->fragment_download_lock);
3787
3788         GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
3789
3790         GST_MANIFEST_LOCK (demux);
3791
3792         g_mutex_lock (&stream->fragment_download_lock);
3793         if (G_UNLIKELY (stream->cancelled)) {
3794           stream->last_ret = GST_FLOW_FLUSHING;
3795           g_mutex_unlock (&stream->fragment_download_lock);
3796           goto cancelled;
3797         }
3798         g_mutex_unlock (&stream->fragment_download_lock);
3799       }
3800     }
3801
3802     stream->last_ret = GST_FLOW_OK;
3803
3804     next_download = gst_adaptive_demux_get_monotonic_time (demux);
3805     ret = gst_adaptive_demux_stream_download_fragment (stream);
3806
3807     if (ret == GST_FLOW_FLUSHING) {
3808       g_mutex_lock (&stream->fragment_download_lock);
3809       if (G_UNLIKELY (stream->cancelled)) {
3810         stream->last_ret = GST_FLOW_FLUSHING;
3811         g_mutex_unlock (&stream->fragment_download_lock);
3812         goto cancelled;
3813       }
3814       g_mutex_unlock (&stream->fragment_download_lock);
3815     }
3816
3817   } else {
3818     stream->last_ret = ret;
3819   }
3820
3821   switch (ret) {
3822     case GST_FLOW_OK:
3823       break;                    /* all is good, let's go */
3824     case GST_FLOW_EOS:
3825       GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
3826
3827       /* we push the EOS after releasing the object lock */
3828       if (gst_adaptive_demux_is_live (demux)
3829           && (demux->segment.rate == 1.0
3830               || gst_adaptive_demux_stream_in_live_seek_range (demux,
3831                   stream))) {
3832         GstAdaptiveDemuxClass *demux_class =
3833             GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3834
3835         /* this might be a fragment download error, refresh the manifest, just in case */
3836         if (!demux_class->requires_periodical_playlist_update (demux)) {
3837           ret = gst_adaptive_demux_update_manifest (demux);
3838           break;
3839           /* Wait only if we can ensure current manifest has been expired.
3840            * The meaning "we have next period" *WITH* EOS is that, current
3841            * period has been ended but we can continue to the next period */
3842         } else if (!gst_adaptive_demux_has_next_period (demux) &&
3843             gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
3844           goto end;
3845         }
3846         gst_task_stop (stream->download_task);
3847         if (stream->replaced) {
3848           goto end;
3849         }
3850       } else {
3851         gst_task_stop (stream->download_task);
3852       }
3853
3854       if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
3855         if (gst_adaptive_demux_has_next_period (demux)) {
3856           GST_DEBUG_OBJECT (stream->pad,
3857               "Next period available, not sending EOS");
3858           gst_adaptive_demux_advance_period (demux);
3859           ret = GST_FLOW_OK;
3860         }
3861       }
3862       break;
3863
3864     case GST_FLOW_NOT_LINKED:
3865     {
3866       GstFlowReturn ret;
3867       gst_task_stop (stream->download_task);
3868
3869       ret = gst_adaptive_demux_combine_flows (demux);
3870       if (ret == GST_FLOW_NOT_LINKED) {
3871         GST_ELEMENT_FLOW_ERROR (demux, ret);
3872       }
3873     }
3874       break;
3875
3876     case GST_FLOW_FLUSHING:{
3877       GList *iter;
3878
3879       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
3880         GstAdaptiveDemuxStream *other;
3881
3882         other = iter->data;
3883         gst_task_stop (other->download_task);
3884       }
3885     }
3886       break;
3887
3888     default:
3889       if (ret <= GST_FLOW_ERROR) {
3890         gboolean is_live = gst_adaptive_demux_is_live (demux);
3891         GST_WARNING_OBJECT (demux, "Error while downloading fragment");
3892         if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
3893           goto download_error;
3894         }
3895
3896         g_clear_error (&stream->last_error);
3897
3898         /* First try to update the playlist for non-live playlists
3899          * in case the URIs have changed in the meantime. But only
3900          * try it the first time, after that we're going to wait a
3901          * a bit to not flood the server */
3902         if (stream->download_error_count == 1 && !is_live) {
3903           /* TODO hlsdemux had more options to this function (boolean and err) */
3904
3905           if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3906             /* Retry immediately, the playlist actually has changed */
3907             GST_DEBUG_OBJECT (demux, "Updated the playlist");
3908             goto end;
3909           }
3910         }
3911
3912         /* Wait half the fragment duration before retrying */
3913         next_download += stream->fragment.duration / 2;
3914
3915         GST_MANIFEST_UNLOCK (demux);
3916
3917         g_mutex_lock (&stream->fragment_download_lock);
3918         if (G_UNLIKELY (stream->cancelled)) {
3919           g_mutex_unlock (&stream->fragment_download_lock);
3920           GST_MANIFEST_LOCK (demux);
3921           stream->last_ret = GST_FLOW_FLUSHING;
3922           goto cancelled;
3923         }
3924         gst_adaptive_demux_wait_until (demux->realtime_clock,
3925             &stream->fragment_download_cond, &stream->fragment_download_lock,
3926             next_download);
3927         g_mutex_unlock (&stream->fragment_download_lock);
3928
3929         GST_DEBUG_OBJECT (demux, "Retrying now");
3930
3931         GST_MANIFEST_LOCK (demux);
3932
3933         g_mutex_lock (&stream->fragment_download_lock);
3934         if (G_UNLIKELY (stream->cancelled)) {
3935           stream->last_ret = GST_FLOW_FLUSHING;
3936           g_mutex_unlock (&stream->fragment_download_lock);
3937           goto cancelled;
3938         }
3939         g_mutex_unlock (&stream->fragment_download_lock);
3940
3941         /* Refetch the playlist now after we waited */
3942         if (!is_live
3943             && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3944           GST_DEBUG_OBJECT (demux, "Updated the playlist");
3945         }
3946         goto end;
3947       }
3948       break;
3949   }
3950
3951 end_of_manifest:
3952   if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3953     if (GST_OBJECT_PARENT (stream->pad) != NULL) {
3954       if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
3955         GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
3956         gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3957       } else {
3958         GST_DEBUG_OBJECT (stream->src,
3959             "Stream is EOS, but we're switching fragments. Not sending.");
3960       }
3961     } else {
3962       GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
3963       goto download_error;
3964     }
3965   }
3966
3967 end:
3968   GST_MANIFEST_UNLOCK (demux);
3969   GST_LOG_OBJECT (stream->pad, "download loop end");
3970   return;
3971
3972 cancelled:
3973   {
3974     GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3975     goto end;
3976   }
3977 download_error:
3978   {
3979     GstMessage *msg;
3980
3981     if (stream->last_error) {
3982       gchar *debug = g_strdup_printf ("Error on stream %s:%s",
3983           GST_DEBUG_PAD_NAME (stream->pad));
3984       msg =
3985           gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
3986           debug);
3987       GST_ERROR_OBJECT (stream->pad, "Download error: %s",
3988           stream->last_error->message);
3989       g_free (debug);
3990     } else {
3991       GError *err =
3992           g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
3993           _("Couldn't download fragments"));
3994       msg =
3995           gst_message_new_error (GST_OBJECT_CAST (demux), err,
3996           "Fragment downloading has failed consecutive times");
3997       g_error_free (err);
3998       GST_ERROR_OBJECT (stream->pad,
3999           "Download error: Couldn't download fragments, too many failures");
4000     }
4001
4002     gst_task_stop (stream->download_task);
4003     if (stream->src) {
4004       GstElement *src = stream->src;
4005
4006       stream->src = NULL;
4007       GST_MANIFEST_UNLOCK (demux);
4008       gst_element_set_locked_state (src, TRUE);
4009       gst_element_set_state (src, GST_STATE_NULL);
4010       gst_bin_remove (GST_BIN_CAST (demux), src);
4011       GST_MANIFEST_LOCK (demux);
4012     }
4013
4014     gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
4015
4016     goto end;
4017   }
4018 }
4019
4020 static void
4021 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
4022 {
4023   GstClockTime next_update;
4024   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4025
4026   /* Loop for updating of the playlist. This periodically checks if
4027    * the playlist is updated and does so, then signals the streaming
4028    * thread in case it can continue downloading now. */
4029
4030   /* block until the next scheduled update or the signal to quit this thread */
4031   GST_DEBUG_OBJECT (demux, "Started updates task");
4032
4033   GST_MANIFEST_LOCK (demux);
4034
4035   next_update =
4036       gst_adaptive_demux_get_monotonic_time (demux) +
4037       klass->get_manifest_update_interval (demux) * GST_USECOND;
4038
4039   /* Updating playlist only needed for live playlists */
4040   while (gst_adaptive_demux_is_live (demux)) {
4041     GstFlowReturn ret = GST_FLOW_OK;
4042
4043     /* Wait here until we should do the next update or we're cancelled */
4044     GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4045
4046     GST_MANIFEST_UNLOCK (demux);
4047
4048     g_mutex_lock (&demux->priv->updates_timed_lock);
4049     if (demux->priv->stop_updates_task) {
4050       g_mutex_unlock (&demux->priv->updates_timed_lock);
4051       goto quit;
4052     }
4053     gst_adaptive_demux_wait_until (demux->realtime_clock,
4054         &demux->priv->updates_timed_cond,
4055         &demux->priv->updates_timed_lock, next_update);
4056     g_mutex_unlock (&demux->priv->updates_timed_lock);
4057
4058     g_mutex_lock (&demux->priv->updates_timed_lock);
4059     if (demux->priv->stop_updates_task) {
4060       g_mutex_unlock (&demux->priv->updates_timed_lock);
4061       goto quit;
4062     }
4063     g_mutex_unlock (&demux->priv->updates_timed_lock);
4064
4065     GST_MANIFEST_LOCK (demux);
4066
4067     GST_DEBUG_OBJECT (demux, "Updating playlist");
4068
4069     ret = gst_adaptive_demux_update_manifest (demux);
4070
4071     if (ret == GST_FLOW_EOS) {
4072     } else if (ret != GST_FLOW_OK) {
4073       /* update_failed_count is used only here, no need to protect it */
4074       demux->priv->update_failed_count++;
4075       if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4076         GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4077             gst_flow_get_name (ret));
4078         next_update = gst_adaptive_demux_get_monotonic_time (demux)
4079             + klass->get_manifest_update_interval (demux) * GST_USECOND;
4080       } else {
4081         GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4082             (_("Internal data stream error.")), ("Could not update playlist"));
4083         GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4084         gst_task_stop (demux->priv->updates_task);
4085         GST_MANIFEST_UNLOCK (demux);
4086         goto end;
4087       }
4088     } else {
4089       GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4090       demux->priv->update_failed_count = 0;
4091       next_update =
4092           gst_adaptive_demux_get_monotonic_time (demux) +
4093           klass->get_manifest_update_interval (demux) * GST_USECOND;
4094
4095       /* Wake up download tasks */
4096       g_mutex_lock (&demux->priv->manifest_update_lock);
4097       g_cond_broadcast (&demux->priv->manifest_cond);
4098       g_mutex_unlock (&demux->priv->manifest_update_lock);
4099     }
4100   }
4101
4102   GST_MANIFEST_UNLOCK (demux);
4103
4104 quit:
4105   {
4106     GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4107   }
4108
4109 end:
4110   {
4111     return;
4112   }
4113 }
4114
4115 /* must be called with manifest_lock taken */
4116 static gboolean
4117 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4118     GstEvent * event)
4119 {
4120   gboolean ret;
4121   GstPad *pad;
4122   GstAdaptiveDemux *demux = stream->demux;
4123
4124   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4125     stream->eos = TRUE;
4126   }
4127
4128   pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4129
4130   /* Can't push events holding the manifest lock */
4131   GST_MANIFEST_UNLOCK (demux);
4132
4133   GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4134       "Pushing event %" GST_PTR_FORMAT, event);
4135
4136   ret = gst_pad_push_event (pad, event);
4137
4138   gst_object_unref (pad);
4139
4140   GST_MANIFEST_LOCK (demux);
4141
4142   return ret;
4143 }
4144
4145 /* must be called with manifest_lock taken */
4146 static gboolean
4147 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4148 {
4149   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4150
4151   if (klass->is_live)
4152     return klass->is_live (demux);
4153   return FALSE;
4154 }
4155
4156 /* must be called with manifest_lock taken */
4157 static GstFlowReturn
4158 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4159     GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4160     GstClockTime ts, GstClockTime * final_ts)
4161 {
4162   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4163
4164   if (klass->stream_seek)
4165     return klass->stream_seek (stream, forward, flags, ts, final_ts);
4166   return GST_FLOW_ERROR;
4167 }
4168
4169 /* must be called with manifest_lock taken */
4170 static gboolean
4171 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4172     GstAdaptiveDemuxStream * stream)
4173 {
4174   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4175   gboolean ret = TRUE;
4176
4177   if (klass->stream_has_next_fragment)
4178     ret = klass->stream_has_next_fragment (stream);
4179
4180   return ret;
4181 }
4182
4183 /* must be called with manifest_lock taken */
4184 /* Called from:
4185  *  the ::finish_fragment() handlers when an *actual* fragment is done
4186  *   */
4187 GstFlowReturn
4188 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4189     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4190 {
4191   GstFlowReturn ret;
4192
4193   if (stream->last_ret == GST_FLOW_OK) {
4194     stream->last_ret =
4195         gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4196         duration);
4197   }
4198   ret = stream->last_ret;
4199
4200   return ret;
4201 }
4202
4203 /* must be called with manifest_lock taken */
4204 GstFlowReturn
4205 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4206     GstAdaptiveDemuxStream * stream, GstClockTime duration)
4207 {
4208   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4209   GstFlowReturn ret;
4210
4211   g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4212
4213   GST_LOG_OBJECT (stream->pad,
4214       "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4215       GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4216
4217   stream->download_error_count = 0;
4218   g_clear_error (&stream->last_error);
4219
4220   /* FIXME - url has no indication of byte ranges for subsegments */
4221   /* FIXME : All those time statistics are biased, since they are calculated
4222    * *AFTER* the queue2, which might be blocking. They should ideally be
4223    * calculated *before* queue2 in the uri_handler_probe */
4224   gst_element_post_message (GST_ELEMENT_CAST (demux),
4225       gst_message_new_element (GST_OBJECT_CAST (demux),
4226           gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4227               "manifest-uri", G_TYPE_STRING,
4228               demux->manifest_uri, "uri", G_TYPE_STRING,
4229               stream->fragment.uri, "fragment-start-time",
4230               GST_TYPE_CLOCK_TIME, stream->download_start_time,
4231               "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4232               gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4233               stream->download_total_bytes, "fragment-download-time",
4234               GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4235
4236   /* Don't update to the end of the segment if in reverse playback */
4237   GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4238   if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4239     GstClockTime offset =
4240         gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4241     GstClockTime period_start =
4242         gst_adaptive_demux_get_period_start_time (demux);
4243
4244     stream->segment.position += duration;
4245
4246     /* Convert from position inside the stream's segment to the demuxer's
4247      * segment, they are not necessarily the same */
4248     if (stream->segment.position - offset + period_start >
4249         demux->segment.position)
4250       demux->segment.position =
4251           stream->segment.position - offset + period_start;
4252   }
4253   GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4254
4255   /* When advancing with a non 1.0 rate on live streams, we need to check
4256    * the live seeking range again to make sure we can still advance to
4257    * that position */
4258   if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4259     if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4260       ret = GST_FLOW_EOS;
4261     else
4262       ret = klass->stream_advance_fragment (stream);
4263   } else if (gst_adaptive_demux_is_live (demux)
4264       || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4265     ret = klass->stream_advance_fragment (stream);
4266   } else {
4267     ret = GST_FLOW_EOS;
4268   }
4269
4270   stream->download_start_time =
4271       GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4272
4273   if (ret == GST_FLOW_OK) {
4274     if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4275             gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4276       stream->need_header = TRUE;
4277       ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4278     }
4279
4280     /* the subclass might want to switch pads */
4281     if (G_UNLIKELY (demux->next_streams)) {
4282       GList *iter;
4283       gboolean can_expose = TRUE;
4284
4285       gst_task_stop (stream->download_task);
4286
4287       ret = GST_FLOW_EOS;
4288
4289       for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4290         /* Only expose if all streams are now cancelled or finished downloading */
4291         GstAdaptiveDemuxStream *other = iter->data;
4292         if (other != stream) {
4293           g_mutex_lock (&other->fragment_download_lock);
4294           can_expose &= (other->cancelled == TRUE
4295               || other->download_finished == TRUE);
4296           g_mutex_unlock (&other->fragment_download_lock);
4297         }
4298       }
4299
4300       if (can_expose) {
4301         GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4302             "to do bitrate switching");
4303         gst_adaptive_demux_prepare_streams (demux, FALSE);
4304         gst_adaptive_demux_start_tasks (demux, TRUE);
4305       } else {
4306         GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4307       }
4308     }
4309   }
4310
4311   return ret;
4312 }
4313
4314 /* must be called with manifest_lock taken */
4315 static gboolean
4316 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4317     demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4318 {
4319   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4320
4321   if (klass->stream_select_bitrate)
4322     return klass->stream_select_bitrate (stream, bitrate);
4323   return FALSE;
4324 }
4325
4326 /* must be called with manifest_lock taken */
4327 static GstFlowReturn
4328 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4329     GstAdaptiveDemuxStream * stream)
4330 {
4331   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4332   GstFlowReturn ret;
4333
4334   g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4335       GST_FLOW_ERROR);
4336
4337   /* Make sure the sub-class will update bitrate, or else
4338    * we will later */
4339   stream->fragment.bitrate = 0;
4340   stream->fragment.finished = FALSE;
4341
4342   GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4343       GST_TIME_ARGS (stream->segment.position));
4344
4345   ret = klass->stream_update_fragment_info (stream);
4346
4347   GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4348       stream->fragment.uri);
4349   if (ret == GST_FLOW_OK) {
4350     GST_LOG_OBJECT (stream->pad,
4351         "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4352         GST_TIME_ARGS (stream->fragment.timestamp),
4353         GST_TIME_ARGS (stream->fragment.duration));
4354     GST_LOG_OBJECT (stream->pad,
4355         "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4356         stream->fragment.range_start, stream->fragment.range_end);
4357   }
4358
4359   return ret;
4360 }
4361
4362 /* must be called with manifest_lock taken */
4363 static gint64
4364 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4365     demux, GstAdaptiveDemuxStream * stream)
4366 {
4367   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4368
4369   if (klass->stream_get_fragment_waiting_time)
4370     return klass->stream_get_fragment_waiting_time (stream);
4371   return 0;
4372 }
4373
4374 /* must be called with manifest_lock taken */
4375 static GstFlowReturn
4376 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4377 {
4378   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4379   GstFragment *download;
4380   GstBuffer *buffer;
4381   GstFlowReturn ret;
4382   GError *error = NULL;
4383
4384   download = gst_uri_downloader_fetch_uri (demux->downloader,
4385       demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4386   if (download) {
4387     g_free (demux->manifest_uri);
4388     g_free (demux->manifest_base_uri);
4389     if (download->redirect_permanent && download->redirect_uri) {
4390       demux->manifest_uri = g_strdup (download->redirect_uri);
4391       demux->manifest_base_uri = NULL;
4392     } else {
4393       demux->manifest_uri = g_strdup (download->uri);
4394       demux->manifest_base_uri = g_strdup (download->redirect_uri);
4395     }
4396
4397     buffer = gst_fragment_get_buffer (download);
4398     g_object_unref (download);
4399     ret = klass->update_manifest_data (demux, buffer);
4400     gst_buffer_unref (buffer);
4401     /* FIXME: Should the manifest uri vars be reverted to original
4402      * values if updating fails? */
4403   } else {
4404     GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4405         error->message);
4406     ret = GST_FLOW_NOT_LINKED;
4407   }
4408   g_clear_error (&error);
4409
4410   return ret;
4411 }
4412
4413 /* must be called with manifest_lock taken */
4414 static GstFlowReturn
4415 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4416 {
4417   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4418   GstFlowReturn ret;
4419
4420   ret = klass->update_manifest (demux);
4421
4422   if (ret == GST_FLOW_OK) {
4423     GstClockTime duration;
4424     /* Send an updated duration message */
4425     duration = klass->get_duration (demux);
4426     if (duration != GST_CLOCK_TIME_NONE) {
4427       GST_DEBUG_OBJECT (demux,
4428           "Sending duration message : %" GST_TIME_FORMAT,
4429           GST_TIME_ARGS (duration));
4430       gst_element_post_message (GST_ELEMENT (demux),
4431           gst_message_new_duration_changed (GST_OBJECT (demux)));
4432     } else {
4433       GST_DEBUG_OBJECT (demux,
4434           "Duration unknown, can not send the duration message");
4435     }
4436
4437     /* If a manifest changes it's liveness or periodic updateness, we need
4438      * to start/stop the manifest update task appropriately */
4439     /* Keep this condition in sync with the one in
4440      * gst_adaptive_demux_start_manifest_update_task()
4441      */
4442     if (gst_adaptive_demux_is_live (demux) &&
4443         klass->requires_periodical_playlist_update (demux)) {
4444       gst_adaptive_demux_start_manifest_update_task (demux);
4445     } else {
4446       gst_adaptive_demux_stop_manifest_update_task (demux);
4447     }
4448   }
4449
4450   return ret;
4451 }
4452
4453 void
4454 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4455 {
4456   g_free (f->uri);
4457   f->uri = NULL;
4458   f->range_start = 0;
4459   f->range_end = -1;
4460
4461   g_free (f->header_uri);
4462   f->header_uri = NULL;
4463   f->header_range_start = 0;
4464   f->header_range_end = -1;
4465
4466   g_free (f->index_uri);
4467   f->index_uri = NULL;
4468   f->index_range_start = 0;
4469   f->index_range_end = -1;
4470
4471   f->finished = FALSE;
4472 }
4473
4474 /* must be called with manifest_lock taken */
4475 static gboolean
4476 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4477 {
4478   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4479   gboolean ret = FALSE;
4480
4481   if (klass->has_next_period)
4482     ret = klass->has_next_period (demux);
4483   GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4484   return ret;
4485 }
4486
4487 /* must be called with manifest_lock taken */
4488 static void
4489 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4490 {
4491   GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4492
4493   g_return_if_fail (klass->advance_period != NULL);
4494
4495   GST_DEBUG_OBJECT (demux, "Advancing to next period");
4496   klass->advance_period (demux);
4497   gst_adaptive_demux_prepare_streams (demux, FALSE);
4498   gst_adaptive_demux_start_tasks (demux, TRUE);
4499 }
4500
4501 /**
4502  * gst_adaptive_demux_get_monotonic_time:
4503  * Returns: a monotonically increasing time, using the system realtime clock
4504  */
4505 GstClockTime
4506 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4507 {
4508   g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4509   return gst_clock_get_time (demux->realtime_clock);
4510 }
4511
4512 /**
4513  * gst_adaptive_demux_get_client_now_utc:
4514  * @demux: #GstAdaptiveDemux
4515  * Returns: the client's estimate of UTC
4516  *
4517  * Used to find the client's estimate of UTC, using the system realtime clock.
4518  */
4519 GDateTime *
4520 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4521 {
4522   GstClockTime rtc_now;
4523   GDateTime *unix_datetime;
4524   GDateTime *result_datetime;
4525   gint64 utc_now_in_us;
4526
4527   rtc_now = gst_clock_get_time (demux->realtime_clock);
4528   utc_now_in_us = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4529   unix_datetime =
4530       g_date_time_new_from_unix_utc (utc_now_in_us / G_TIME_SPAN_SECOND);
4531   result_datetime =
4532       g_date_time_add (unix_datetime, utc_now_in_us % G_TIME_SPAN_SECOND);
4533   g_date_time_unref (unix_datetime);
4534   return result_datetime;
4535 }
4536
4537 /**
4538  * gst_adaptive_demux_is_running
4539  * @demux: #GstAdaptiveDemux
4540  * Returns: whether the demuxer is processing data
4541  *
4542  * Returns FALSE if shutdown has started (transitioning down from
4543  * PAUSED), otherwise TRUE.
4544  */
4545 gboolean
4546 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4547 {
4548   return g_atomic_int_get (&demux->running);
4549 }
4550
4551 static GstAdaptiveDemuxTimer *
4552 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4553 {
4554   GstAdaptiveDemuxTimer *timer;
4555
4556   timer = g_slice_new (GstAdaptiveDemuxTimer);
4557   timer->fired = FALSE;
4558   timer->cond = cond;
4559   timer->mutex = mutex;
4560   timer->ref_count = 1;
4561   return timer;
4562 }
4563
4564 static GstAdaptiveDemuxTimer *
4565 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4566 {
4567   g_return_val_if_fail (timer != NULL, NULL);
4568   g_atomic_int_inc (&timer->ref_count);
4569   return timer;
4570 }
4571
4572 static void
4573 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4574 {
4575   g_return_if_fail (timer != NULL);
4576   if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4577     g_slice_free (GstAdaptiveDemuxTimer, timer);
4578   }
4579 }
4580
4581 /* gst_adaptive_demux_wait_until:
4582  * A replacement for g_cond_wait_until that uses the clock rather
4583  * than system time to control the duration of the sleep. Typically
4584  * clock is actually a #GstSystemClock, in which case this function
4585  * behaves exactly like g_cond_wait_until. Inside unit tests,
4586  * the clock is typically a #GstTestClock, which allows tests to run
4587  * in non-realtime.
4588  * This function must be called with mutex held.
4589  */
4590 static gboolean
4591 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4592     GstClockTime end_time)
4593 {
4594   GstAdaptiveDemuxTimer *timer;
4595   gboolean fired;
4596   GstClockReturn res;
4597
4598   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4599     /* for an invalid time, gst_clock_id_wait_async will try to call
4600      * gst_adaptive_demux_clock_callback from the current thread.
4601      * It still holds the mutex while doing that, so it will deadlock.
4602      * g_cond_wait_until would return immediately with false, so we'll do the same.
4603      */
4604     return FALSE;
4605   }
4606   timer = gst_adaptive_demux_timer_new (cond, mutex);
4607   timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4608   res =
4609       gst_clock_id_wait_async (timer->clock_id,
4610       gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4611       (GDestroyNotify) gst_adaptive_demux_timer_unref);
4612   /* clock does not support asynchronously wait. Assert and return */
4613   if (res == GST_CLOCK_UNSUPPORTED) {
4614     gst_clock_id_unref (timer->clock_id);
4615     gst_adaptive_demux_timer_unref (timer);
4616     g_return_val_if_reached (TRUE);
4617   }
4618   g_assert (!timer->fired);
4619   /* the gst_adaptive_demux_clock_callback() will signal the
4620    * cond when the clock's single shot timer fires, or the cond will be
4621    * signalled by another thread that wants to cause this wait to finish
4622    * early (e.g. to terminate the waiting thread).
4623    * There is no need for a while loop here, because that logic is
4624    * implemented by the function calling gst_adaptive_demux_wait_until() */
4625   g_cond_wait (cond, mutex);
4626   fired = timer->fired;
4627   if (!fired)
4628     gst_clock_id_unschedule (timer->clock_id);
4629   gst_clock_id_unref (timer->clock_id);
4630   gst_adaptive_demux_timer_unref (timer);
4631   return !fired;
4632 }
4633
4634 static gboolean
4635 gst_adaptive_demux_clock_callback (GstClock * clock,
4636     GstClockTime time, GstClockID id, gpointer user_data)
4637 {
4638   GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4639   g_return_val_if_fail (timer != NULL, FALSE);
4640   g_mutex_lock (timer->mutex);
4641   timer->fired = TRUE;
4642   g_cond_signal (timer->cond);
4643   g_mutex_unlock (timer->mutex);
4644   return TRUE;
4645 }