3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
23 * SECTION:gstadaptivedemux
24 * @short_description: Base class for adaptive demuxers
26 * What is an adaptive demuxer?
27 * Adaptive demuxers are special demuxers in the sense that they don't
28 * actually demux data received from upstream but download the data
31 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32 * a set of fragments. The manifest describes the available media and
33 * the sequence of fragments to use. Each fragment contains a small
34 * part of the media (typically only a few seconds). It is possible for
35 * the manifest to have the same media available in different configurations
36 * (bitrates for example) so that the client can select the one that
37 * best suits its scenario (network fluctuation, hardware requirements...).
38 * It is possible to switch from one representation of the media to another
39 * during playback. That's why it is called 'adaptive', because it can be
40 * adapted to the client's needs.
42 * Architectural overview:
43 * The manifest is received by the demuxer in its sink pad and, upon receiving
44 * EOS, it parses the manifest and exposes the streams available in it. For
45 * each stream a source element will be created and will download the list
46 * of fragments one by one. Once a fragment is finished downloading, the next
47 * URI is set to the source element and it starts fetching it and pushing
48 * through the stream's pad. This implies that each stream is independent from
49 * each other as it runs on a separate thread.
51 * After downloading each fragment, the download rate of it is calculated and
52 * the demuxer has a chance to switch to a different bitrate if needed. The
53 * switch can be done by simply pushing a new caps before the next fragment
54 * when codecs are the same, or by exposing a new pad group if it needs
58 * - Not linked streams: Streams that are not-linked have their download threads
59 * interrupted to save network bandwidth. When they are
60 * relinked a reconfigure event is received and the
61 * stream is restarted.
64 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65 * about the intrinsics of the subclass formats, so the subclasses are
66 * resposible for maintaining the manifest data structures and stream
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024 /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
135 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
136 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
137 GST_TRACE("Locking from thread %p", g_thread_self()); \
138 g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
139 GST_TRACE("Locked from thread %p", g_thread_self()); \
142 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
143 GST_TRACE("Unlocking from thread %p", g_thread_self()); \
144 g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
147 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
148 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
149 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
151 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
152 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
153 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
158 PROP_CONNECTION_SPEED,
163 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
164 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
166 struct _GstAdaptiveDemuxPrivate
168 GstAdapter *input_adapter; /* protected by manifest_lock */
169 gint have_manifest; /* MT safe */
171 GList *old_streams; /* protected by manifest_lock */
173 GstTask *updates_task; /* MT safe */
174 GRecMutex updates_lock;
175 GMutex updates_timed_lock;
176 GCond updates_timed_cond; /* protected by updates_timed_lock */
177 gboolean stop_updates_task; /* protected by updates_timed_lock */
179 /* used only from updates_task, no need to protect it */
180 gint update_failed_count;
182 guint32 segment_seqnum; /* protected by manifest_lock */
184 /* main lock used to protect adaptive demux and all its streams.
185 * It serializes the adaptive demux public API.
187 GRecMutex manifest_lock;
189 /* condition to wait for manifest updates on a live stream.
190 * In order to signal the manifest_cond, the caller needs to hold both
191 * manifest_lock and manifest_update_lock (taken in this order)
194 GMutex manifest_update_lock;
196 /* Lock and condition for prerolling streams before exposing */
199 gint preroll_pending;
203 /* Protects demux and stream segment information
204 * Needed because seeks can update segment information
205 * without needing to stop tasks when they just want to
206 * update the segment boundaries */
210 typedef struct _GstAdaptiveDemuxTimer
212 volatile gint ref_count;
217 } GstAdaptiveDemuxTimer;
219 static GstBinClass *parent_class = NULL;
220 static gint private_offset = 0;
222 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
223 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
224 GstAdaptiveDemuxClass * klass);
225 static void gst_adaptive_demux_finalize (GObject * object);
226 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
227 element, GstStateChange transition);
229 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
231 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
233 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
234 GstObject * parent, GstBuffer * buffer);
235 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
237 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
241 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
243 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
244 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
246 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
247 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
248 gboolean first_and_live);
249 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
250 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
251 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
252 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
253 GstClockTime ts, GstClockTime * final_ts);
254 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
255 demux, GstAdaptiveDemuxStream * stream);
256 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
257 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
259 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
260 GstAdaptiveDemuxStream * stream);
262 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
263 GstAdaptiveDemuxStream * stream);
264 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
267 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
268 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
269 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
271 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
273 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
276 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
278 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
281 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
282 gboolean start_preroll_streams);
283 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
284 gboolean stop_updates);
285 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
288 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
289 stream, GstFlowReturn ret, GError * err);
291 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
292 GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
294 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
295 GstAdaptiveDemuxStream * stream);
297 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
298 GstAdaptiveDemuxStream * stream, GstClockTime duration);
300 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
301 GstClockTime end_time);
302 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
303 GstClockTime time, GstClockID id, gpointer user_data);
305 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
308 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
309 * method to get to the padtemplates */
311 gst_adaptive_demux_get_type (void)
313 static volatile gsize type = 0;
315 if (g_once_init_enter (&type)) {
317 static const GTypeInfo info = {
318 sizeof (GstAdaptiveDemuxClass),
321 (GClassInitFunc) gst_adaptive_demux_class_init,
324 sizeof (GstAdaptiveDemux),
326 (GInstanceInitFunc) gst_adaptive_demux_init,
329 _type = g_type_register_static (GST_TYPE_BIN,
330 "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
333 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
335 g_once_init_leave (&type, _type);
340 static inline GstAdaptiveDemuxPrivate *
341 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
343 return (G_STRUCT_MEMBER_P (self, private_offset));
347 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
348 const GValue * value, GParamSpec * pspec)
350 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
352 GST_API_LOCK (demux);
353 GST_MANIFEST_LOCK (demux);
356 case PROP_CONNECTION_SPEED:
357 demux->connection_speed = g_value_get_uint (value) * 1000;
358 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
359 demux->connection_speed);
361 case PROP_BITRATE_LIMIT:
362 demux->bitrate_limit = g_value_get_float (value);
365 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
369 GST_MANIFEST_UNLOCK (demux);
370 GST_API_UNLOCK (demux);
374 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
375 GValue * value, GParamSpec * pspec)
377 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
379 GST_MANIFEST_LOCK (demux);
382 case PROP_CONNECTION_SPEED:
383 g_value_set_uint (value, demux->connection_speed / 1000);
385 case PROP_BITRATE_LIMIT:
386 g_value_set_float (value, demux->bitrate_limit);
389 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
393 GST_MANIFEST_UNLOCK (demux);
397 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
399 GObjectClass *gobject_class;
400 GstElementClass *gstelement_class;
401 GstBinClass *gstbin_class;
403 gobject_class = G_OBJECT_CLASS (klass);
404 gstelement_class = GST_ELEMENT_CLASS (klass);
405 gstbin_class = GST_BIN_CLASS (klass);
407 GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
408 "Base Adaptive Demux");
410 parent_class = g_type_class_peek_parent (klass);
412 if (private_offset != 0)
413 g_type_class_adjust_private_offset (klass, &private_offset);
415 gobject_class->set_property = gst_adaptive_demux_set_property;
416 gobject_class->get_property = gst_adaptive_demux_get_property;
417 gobject_class->finalize = gst_adaptive_demux_finalize;
419 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
420 g_param_spec_uint ("connection-speed", "Connection Speed",
421 "Network connection speed in kbps (0 = calculate from downloaded"
422 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
423 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
425 /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
426 g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
427 g_param_spec_float ("bitrate-limit",
428 "Bitrate limit in %",
429 "Limit of the available bitrate to use when switching to alternates.",
430 0, 1, DEFAULT_BITRATE_LIMIT,
431 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433 gstelement_class->change_state = gst_adaptive_demux_change_state;
435 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
437 klass->data_received = gst_adaptive_demux_stream_data_received_default;
438 klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
439 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
440 klass->requires_periodical_playlist_update =
441 gst_adaptive_demux_requires_periodical_playlist_update_default;
446 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
447 GstAdaptiveDemuxClass * klass)
449 GstPadTemplate *pad_template;
450 GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
451 GObjectClass *gobject_class;
453 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
455 demux->priv = gst_adaptive_demux_get_instance_private (demux);
456 demux->priv->input_adapter = gst_adapter_new ();
457 demux->downloader = gst_uri_downloader_new ();
458 gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
459 demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
460 demux->priv->segment_seqnum = gst_util_seqnum_next ();
461 demux->have_group_id = FALSE;
462 demux->group_id = G_MAXUINT;
464 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
466 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
467 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
469 demux->realtime_clock = gst_system_clock_obtain ();
470 g_assert (demux->realtime_clock != NULL);
471 gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
472 if (g_object_class_find_property (gobject_class, "clock-type")) {
473 g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
475 GST_WARNING_OBJECT (demux,
476 "System clock does not have clock-type property");
478 if (clock_type == GST_CLOCK_TYPE_REALTIME) {
479 demux->clock_offset = 0;
482 GstClockTime rtc_now;
485 utc_now = g_date_time_new_now_utc ();
486 rtc_now = gst_clock_get_time (demux->realtime_clock);
487 g_date_time_to_timeval (utc_now, >v);
488 demux->clock_offset =
489 gtv.tv_sec * G_TIME_SPAN_SECOND + gtv.tv_usec -
490 GST_TIME_AS_USECONDS (rtc_now);
491 g_date_time_unref (utc_now);
493 g_rec_mutex_init (&demux->priv->updates_lock);
494 demux->priv->updates_task =
495 gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
497 gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
499 g_mutex_init (&demux->priv->updates_timed_lock);
500 g_cond_init (&demux->priv->updates_timed_cond);
502 g_cond_init (&demux->priv->manifest_cond);
503 g_mutex_init (&demux->priv->manifest_update_lock);
505 g_rec_mutex_init (&demux->priv->manifest_lock);
506 g_mutex_init (&demux->priv->api_lock);
507 g_mutex_init (&demux->priv->segment_lock);
509 g_cond_init (&demux->priv->preroll_cond);
510 g_mutex_init (&demux->priv->preroll_lock);
513 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
514 g_return_if_fail (pad_template != NULL);
516 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
517 gst_pad_set_event_function (demux->sinkpad,
518 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
519 gst_pad_set_chain_function (demux->sinkpad,
520 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
523 demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
524 demux->connection_speed = DEFAULT_CONNECTION_SPEED;
526 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
530 gst_adaptive_demux_finalize (GObject * object)
532 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
533 GstAdaptiveDemuxPrivate *priv = demux->priv;
535 GST_DEBUG_OBJECT (object, "finalize");
537 g_object_unref (priv->input_adapter);
538 g_object_unref (demux->downloader);
540 g_mutex_clear (&priv->updates_timed_lock);
541 g_cond_clear (&priv->updates_timed_cond);
542 g_mutex_clear (&demux->priv->manifest_update_lock);
543 g_cond_clear (&demux->priv->manifest_cond);
544 g_object_unref (priv->updates_task);
545 g_rec_mutex_clear (&priv->updates_lock);
546 g_rec_mutex_clear (&demux->priv->manifest_lock);
547 g_mutex_clear (&demux->priv->api_lock);
548 g_mutex_clear (&demux->priv->segment_lock);
549 if (demux->realtime_clock) {
550 gst_object_unref (demux->realtime_clock);
551 demux->realtime_clock = NULL;
554 g_cond_clear (&demux->priv->preroll_cond);
555 g_mutex_clear (&demux->priv->preroll_lock);
557 G_OBJECT_CLASS (parent_class)->finalize (object);
560 static GstStateChangeReturn
561 gst_adaptive_demux_change_state (GstElement * element,
562 GstStateChange transition)
564 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
565 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
567 switch (transition) {
568 case GST_STATE_CHANGE_PAUSED_TO_READY:
569 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
570 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
571 gst_uri_downloader_cancel (demux->downloader);
573 GST_API_LOCK (demux);
574 GST_MANIFEST_LOCK (demux);
575 gst_adaptive_demux_reset (demux);
576 GST_MANIFEST_UNLOCK (demux);
577 GST_API_UNLOCK (demux);
579 case GST_STATE_CHANGE_READY_TO_PAUSED:
580 GST_API_LOCK (demux);
581 GST_MANIFEST_LOCK (demux);
582 gst_adaptive_demux_reset (demux);
583 /* Clear "cancelled" flag in uridownloader since subclass might want to
584 * use uridownloader to fetch another manifest */
585 gst_uri_downloader_reset (demux->downloader);
586 if (g_atomic_int_get (&demux->priv->have_manifest))
587 gst_adaptive_demux_start_manifest_update_task (demux);
588 GST_MANIFEST_UNLOCK (demux);
589 GST_API_UNLOCK (demux);
590 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
591 GST_DEBUG_OBJECT (demux, "demuxer has started running");
597 /* this must be run without MANIFEST_LOCK taken.
598 * For PLAYING to PLAYING state changes, it will want to take a lock in
599 * src element and that lock is held while the streaming thread is running.
600 * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
602 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
608 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
611 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
614 switch (event->type) {
615 case GST_EVENT_FLUSH_STOP:{
616 GST_API_LOCK (demux);
617 GST_MANIFEST_LOCK (demux);
619 gst_adaptive_demux_reset (demux);
621 ret = gst_pad_event_default (pad, parent, event);
623 GST_MANIFEST_UNLOCK (demux);
624 GST_API_UNLOCK (demux);
629 GstAdaptiveDemuxClass *demux_class;
634 GstBuffer *manifest_buffer;
636 GST_API_LOCK (demux);
637 GST_MANIFEST_LOCK (demux);
639 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
641 available = gst_adapter_available (demux->priv->input_adapter);
643 if (available == 0) {
644 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
645 ret = gst_pad_event_default (pad, parent, event);
647 GST_MANIFEST_UNLOCK (demux);
648 GST_API_UNLOCK (demux);
653 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
655 /* Need to get the URI to use it as a base to generate the fragment's
657 query = gst_query_new_uri ();
658 query_res = gst_pad_peer_query (pad, query);
660 gchar *uri, *redirect_uri;
663 gst_query_parse_uri (query, &uri);
664 gst_query_parse_uri_redirection (query, &redirect_uri);
665 gst_query_parse_uri_redirection_permanent (query, &permanent);
667 if (permanent && redirect_uri) {
668 demux->manifest_uri = redirect_uri;
669 demux->manifest_base_uri = NULL;
672 demux->manifest_uri = uri;
673 demux->manifest_base_uri = redirect_uri;
676 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
677 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
679 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
681 gst_query_unref (query);
683 /* Let the subclass parse the manifest */
685 gst_adapter_take_buffer (demux->priv->input_adapter, available);
686 if (!demux_class->process_manifest (demux, manifest_buffer)) {
687 /* In most cases, this will happen if we set a wrong url in the
688 * source element and we have received the 404 HTML response instead of
690 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
694 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
696 gst_buffer_unref (manifest_buffer);
698 gst_element_post_message (GST_ELEMENT_CAST (demux),
699 gst_message_new_element (GST_OBJECT_CAST (demux),
700 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
701 "manifest-uri", G_TYPE_STRING,
702 demux->manifest_uri, "uri", G_TYPE_STRING,
704 "manifest-download-start", GST_TYPE_CLOCK_TIME,
706 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
707 gst_util_get_timestamp (), NULL)));
710 /* Send duration message */
711 if (!gst_adaptive_demux_is_live (demux)) {
712 GstClockTime duration = demux_class->get_duration (demux);
714 if (duration != GST_CLOCK_TIME_NONE) {
715 GST_DEBUG_OBJECT (demux,
716 "Sending duration message : %" GST_TIME_FORMAT,
717 GST_TIME_ARGS (duration));
718 gst_element_post_message (GST_ELEMENT (demux),
719 gst_message_new_duration_changed (GST_OBJECT (demux)));
721 GST_DEBUG_OBJECT (demux,
722 "media duration unknown, can not send the duration message");
726 if (demux->next_streams) {
727 gst_adaptive_demux_prepare_streams (demux,
728 gst_adaptive_demux_is_live (demux));
729 gst_adaptive_demux_start_tasks (demux, TRUE);
730 gst_adaptive_demux_start_manifest_update_task (demux);
733 GST_WARNING_OBJECT (demux, "No streams created from manifest");
734 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
735 (_("This file contains no playable streams.")),
736 ("No known stream formats found at the Manifest"));
741 GST_MANIFEST_UNLOCK (demux);
742 GST_API_UNLOCK (demux);
744 gst_event_unref (event);
747 case GST_EVENT_SEGMENT:
748 /* Swallow newsegments, we'll push our own */
749 gst_event_unref (event);
755 return gst_pad_event_default (pad, parent, event);
759 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
762 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
764 GST_MANIFEST_LOCK (demux);
766 gst_adapter_push (demux->priv->input_adapter, buffer);
768 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
769 (gint) gst_adapter_available (demux->priv->input_adapter));
771 GST_MANIFEST_UNLOCK (demux);
775 /* must be called with manifest_lock taken */
777 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
779 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
784 /* take ownership of old_streams before releasing the manifest_lock in
785 * gst_adaptive_demux_stop_tasks
787 old_streams = demux->priv->old_streams;
788 demux->priv->old_streams = NULL;
790 gst_adaptive_demux_stop_tasks (demux, TRUE);
793 klass->reset (demux);
795 eos = gst_event_new_eos ();
796 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
797 GstAdaptiveDemuxStream *stream = iter->data;
799 gst_pad_push_event (stream->pad, gst_event_ref (eos));
800 gst_pad_set_active (stream->pad, FALSE);
802 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
804 gst_adaptive_demux_stream_free (stream);
806 gst_event_unref (eos);
807 g_list_free (demux->streams);
808 demux->streams = NULL;
809 if (demux->prepared_streams) {
810 g_list_free_full (demux->prepared_streams,
811 (GDestroyNotify) gst_adaptive_demux_stream_free);
812 demux->prepared_streams = NULL;
814 if (demux->next_streams) {
815 g_list_free_full (demux->next_streams,
816 (GDestroyNotify) gst_adaptive_demux_stream_free);
817 demux->next_streams = NULL;
821 g_list_free_full (old_streams,
822 (GDestroyNotify) gst_adaptive_demux_stream_free);
825 if (demux->priv->old_streams) {
826 g_list_free_full (demux->priv->old_streams,
827 (GDestroyNotify) gst_adaptive_demux_stream_free);
828 demux->priv->old_streams = NULL;
831 g_free (demux->manifest_uri);
832 g_free (demux->manifest_base_uri);
833 demux->manifest_uri = NULL;
834 demux->manifest_base_uri = NULL;
836 gst_adapter_clear (demux->priv->input_adapter);
837 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
839 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
841 demux->have_group_id = FALSE;
842 demux->group_id = G_MAXUINT;
843 demux->priv->segment_seqnum = gst_util_seqnum_next ();
847 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
849 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
851 switch (GST_MESSAGE_TYPE (msg)) {
852 case GST_MESSAGE_ERROR:{
854 GstAdaptiveDemuxStream *stream = NULL;
857 gchar *new_error = NULL;
858 const GstStructure *details = NULL;
860 GST_MANIFEST_LOCK (demux);
862 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
863 GstAdaptiveDemuxStream *cur = iter->data;
864 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
865 GST_OBJECT_CAST (cur->src))) {
870 if (stream == NULL) {
871 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
872 GstAdaptiveDemuxStream *cur = iter->data;
873 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
874 GST_OBJECT_CAST (cur->src))) {
879 if (stream == NULL) {
880 GST_WARNING_OBJECT (demux,
881 "Failed to locate stream for errored element");
886 gst_message_parse_error (msg, &err, &debug);
888 GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
889 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
890 err->message, debug);
893 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
895 g_free (err->message);
896 err->message = new_error;
899 gst_message_parse_error_details (msg, &details);
901 gst_structure_get_uint (details, "http-status-code",
902 &stream->last_status_code);
905 /* error, but ask to retry */
906 gst_adaptive_demux_stream_fragment_download_finish (stream,
907 GST_FLOW_CUSTOM_ERROR, err);
912 GST_MANIFEST_UNLOCK (demux);
914 gst_message_unref (msg);
923 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
927 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
930 GST_API_LOCK (demux);
931 GST_MANIFEST_LOCK (demux);
932 demux->stream_struct_size = struct_size;
933 GST_MANIFEST_UNLOCK (demux);
934 GST_API_UNLOCK (demux);
937 /* must be called with manifest_lock taken */
939 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
940 GstAdaptiveDemuxStream * stream)
942 GstPad *pad = stream->pad;
943 gchar *name = gst_pad_get_name (pad);
947 gst_pad_set_active (pad, TRUE);
948 stream->need_header = TRUE;
950 stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
953 gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
954 GST_EVENT_STREAM_START, 0);
956 if (gst_event_parse_group_id (event, &demux->group_id))
957 demux->have_group_id = TRUE;
959 demux->have_group_id = FALSE;
960 gst_event_unref (event);
961 } else if (!demux->have_group_id) {
962 demux->have_group_id = TRUE;
963 demux->group_id = gst_util_group_id_next ();
965 event = gst_event_new_stream_start (stream_id);
966 if (demux->have_group_id)
967 gst_event_set_group_id (event, demux->group_id);
969 gst_pad_push_event (pad, event);
973 GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
975 stream->discont = TRUE;
981 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
982 GstAdaptiveDemuxStream * stream)
985 GstPad *pad = stream->pad;
988 if (stream->pending_caps) {
989 gst_pad_set_caps (pad, stream->pending_caps);
990 caps = stream->pending_caps;
991 stream->pending_caps = NULL;
993 caps = gst_pad_get_current_caps (pad);
996 GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
997 GST_DEBUG_PAD_NAME (pad), caps);
999 gst_caps_unref (caps);
1001 gst_object_ref (pad);
1003 /* Don't hold the manifest lock while exposing a pad */
1004 GST_MANIFEST_UNLOCK (demux);
1005 ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1006 GST_MANIFEST_LOCK (demux);
1011 /* must be called with manifest_lock taken */
1013 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1014 GstAdaptiveDemuxStream * stream)
1016 GstAdaptiveDemuxClass *klass;
1018 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1020 if (klass->get_presentation_offset == NULL)
1023 return klass->get_presentation_offset (demux, stream);
1026 /* must be called with manifest_lock taken */
1028 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1030 GstAdaptiveDemuxClass *klass;
1032 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1034 if (klass->get_period_start_time == NULL)
1037 return klass->get_period_start_time (demux);
1040 /* must be called with manifest_lock taken */
1042 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1043 gboolean first_and_live)
1046 GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1048 g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1049 if (demux->prepared_streams != NULL) {
1050 /* Old streams that were never exposed, due to a seek or so */
1051 GST_FIXME_OBJECT (demux,
1052 "Preparing new streams without cleaning up old ones!");
1056 demux->prepared_streams = demux->next_streams;
1057 demux->next_streams = NULL;
1059 if (!gst_adaptive_demux_is_running (demux)) {
1060 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1064 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1065 GstAdaptiveDemuxStream *stream = iter->data;
1067 stream->do_block = TRUE;
1069 if (!gst_adaptive_demux_prepare_stream (demux,
1070 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1071 /* TODO act on error */
1072 GST_FIXME_OBJECT (stream->pad,
1073 "Do something on failure to expose stream");
1076 if (first_and_live) {
1077 /* TODO we only need the first timestamp, maybe create a simple function to
1078 * get the current PTS of a fragment ? */
1079 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1080 gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1082 if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1083 min_pts = MIN (min_pts, stream->fragment.timestamp);
1085 min_pts = stream->fragment.timestamp;
1090 period_start = gst_adaptive_demux_get_period_start_time (demux);
1092 /* For live streams, the subclass is supposed to seek to the current
1093 * fragment and then tell us its timestamp in stream->fragment.timestamp.
1094 * We now also have to seek our demuxer segment to reflect this.
1096 * FIXME: This needs some refactoring at some point.
1098 if (first_and_live) {
1099 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1100 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1101 GST_SEEK_TYPE_NONE, -1, NULL);
1104 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1105 GstAdaptiveDemuxStream *stream = iter->data;
1106 GstClockTime offset;
1108 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1109 stream->segment = demux->segment;
1111 /* The demuxer segment is just built from seek events, but for each stream
1112 * we have to adjust segments according to the current period and the
1113 * stream specific presentation time offset.
1115 * For each period, buffer timestamps start again from 0. Additionally the
1116 * buffer timestamps are shifted by the stream specific presentation time
1117 * offset, so the first buffer timestamp of a period is 0 + presentation
1118 * time offset. If the stream contains timestamps itself, this is also
1119 * supposed to be the presentation time stored inside the stream.
1121 * The stream time over periods is supposed to be continuous, that is the
1122 * buffer timestamp 0 + presentation time offset should map to the start
1123 * time of the current period.
1126 * The adjustment of the stream segments as such works the following.
1128 * If the demuxer segment start is bigger than the period start, this
1129 * means that we have to drop some media at the beginning of the current
1130 * period, e.g. because a seek into the middle of the period has
1131 * happened. The amount of media to drop is the difference between the
1132 * period start and the demuxer segment start, and as each period starts
1133 * again from 0, this difference is going to be the actual stream's
1134 * segment start. As all timestamps of the stream are shifted by the
1135 * presentation time offset, we will also have to move the segment start
1138 * Likewise, the demuxer segment stop value is adjusted in the same
1141 * Now the running time and stream time at the stream's segment start has
1142 * to be the one that is stored inside the demuxer's segment, which means
1143 * that segment.base and segment.time have to be copied over (done just
1147 * If the demuxer segment start is smaller than the period start time,
1148 * this means that the whole period is inside the segment. As each period
1149 * starts timestamps from 0, and additionally timestamps are shifted by
1150 * the presentation time offset, the stream's first timestamp (and as such
1151 * the stream's segment start) has to be the presentation time offset.
1152 * The stream time at the segment start is supposed to be the stream time
1153 * of the period start according to the demuxer segment, so the stream
1154 * segment's time would be set to that. The same goes for the stream
1155 * segment's base, which is supposed to be the running time of the period
1156 * start according to the demuxer's segment.
1158 * The same logic applies for negative rates with the segment stop and
1159 * the period stop time (which gets clamped).
1162 * For the first case where not the complete period is inside the segment,
1163 * the segment time and base as calculated by the second case would be
1166 GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1168 GST_DEBUG_OBJECT (demux,
1169 "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1170 GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1171 /* note for readers:
1172 * Since stream->segment is initially a copy of demux->segment,
1173 * only the values that need updating are modified below. */
1174 if (first_and_live) {
1175 /* If first and live, demuxer did seek to the current position already */
1176 stream->segment.start = demux->segment.start - period_start + offset;
1177 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1178 stream->segment.stop = demux->segment.stop - period_start + offset;
1179 /* FIXME : Do we need to handle negative rates for this ? */
1180 stream->segment.position = stream->segment.start;
1181 } else if (demux->segment.start > period_start) {
1182 /* seek within a period */
1183 stream->segment.start = demux->segment.start - period_start + offset;
1184 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1185 stream->segment.stop = demux->segment.stop - period_start + offset;
1186 if (stream->segment.rate >= 0)
1187 stream->segment.position = offset;
1189 stream->segment.position = stream->segment.stop;
1191 stream->segment.start = offset;
1192 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1193 stream->segment.stop = demux->segment.stop - period_start + offset;
1194 if (stream->segment.rate >= 0)
1195 stream->segment.position = offset;
1197 stream->segment.position = stream->segment.stop;
1198 stream->segment.time =
1199 gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1201 stream->segment.base =
1202 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1206 stream->pending_segment = gst_event_new_segment (&stream->segment);
1207 gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1208 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1210 GST_DEBUG_OBJECT (demux,
1211 "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1212 &stream->segment, stream);
1219 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1224 g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1226 old_streams = demux->streams;
1227 demux->streams = demux->prepared_streams;
1228 demux->prepared_streams = NULL;
1230 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1231 GstAdaptiveDemuxStream *stream = iter->data;
1233 if (!gst_adaptive_demux_expose_stream (demux,
1234 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1235 /* TODO act on error */
1238 demux->priv->preroll_pending = 0;
1240 GST_MANIFEST_UNLOCK (demux);
1241 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1242 GST_MANIFEST_LOCK (demux);
1245 GstEvent *eos = gst_event_new_eos ();
1247 /* before we put streams in the demux->priv->old_streams list,
1248 * we ask the download task to stop. In this way, it will no longer be
1249 * allowed to change the demux object.
1251 for (iter = old_streams; iter; iter = g_list_next (iter)) {
1252 GstAdaptiveDemuxStream *stream = iter->data;
1253 GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1255 GST_MANIFEST_UNLOCK (demux);
1257 GST_DEBUG_OBJECT (pad, "Pushing EOS");
1258 gst_pad_push_event (pad, gst_event_ref (eos));
1259 gst_pad_set_active (pad, FALSE);
1261 GST_LOG_OBJECT (pad, "Removing stream");
1262 gst_element_remove_pad (GST_ELEMENT (demux), pad);
1263 GST_MANIFEST_LOCK (demux);
1265 gst_object_unref (GST_OBJECT (pad));
1267 /* ask the download task to stop.
1268 * We will not join it now, because our thread can be one of these tasks.
1269 * We will do the joining later, from another stream download task or
1270 * from gst_adaptive_demux_stop_tasks.
1271 * We also cannot change the state of the stream->src element, because
1272 * that will wait on the streaming thread (which could be this thread)
1274 * Because we sent an EOS to the downstream element, the stream->src
1275 * element should detect this in its streaming task and stop.
1276 * Even if it doesn't do that, we will change its state later in
1277 * gst_adaptive_demux_stop_tasks.
1279 GST_LOG_OBJECT (stream, "Marking stream as cancelled");
1280 gst_task_stop (stream->download_task);
1281 g_mutex_lock (&stream->fragment_download_lock);
1282 stream->cancelled = TRUE;
1283 stream->replaced = TRUE;
1284 g_cond_signal (&stream->fragment_download_cond);
1285 g_mutex_unlock (&stream->fragment_download_lock);
1287 gst_event_unref (eos);
1289 /* The list should be freed from another thread as we can't properly
1290 * cleanup a GstTask from itself */
1291 demux->priv->old_streams =
1292 g_list_concat (demux->priv->old_streams, old_streams);
1295 /* Unblock after removing oldstreams */
1296 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1297 GstAdaptiveDemuxStream *stream = iter->data;
1298 stream->do_block = FALSE;
1301 GST_DEBUG_OBJECT (demux, "All streams are exposed");
1306 /* must be called with manifest_lock taken */
1307 GstAdaptiveDemuxStream *
1308 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1310 GstAdaptiveDemuxStream *stream;
1312 stream = g_malloc0 (demux->stream_struct_size);
1314 /* Downloading task */
1315 g_rec_mutex_init (&stream->download_lock);
1316 stream->download_task =
1317 gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1319 gst_task_set_lock (stream->download_task, &stream->download_lock);
1322 stream->demux = demux;
1323 stream->fragment_bitrates =
1324 g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1325 gst_pad_set_element_private (pad, stream);
1326 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1328 g_mutex_lock (&demux->priv->preroll_lock);
1329 stream->do_block = TRUE;
1330 demux->priv->preroll_pending++;
1331 g_mutex_unlock (&demux->priv->preroll_lock);
1333 gst_pad_set_query_function (pad,
1334 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1335 gst_pad_set_event_function (pad,
1336 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1338 gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1339 g_cond_init (&stream->fragment_download_cond);
1340 g_mutex_init (&stream->fragment_download_lock);
1342 demux->next_streams = g_list_append (demux->next_streams, stream);
1347 GstAdaptiveDemuxStream *
1348 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1352 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1353 GstAdaptiveDemuxStream *stream = iter->data;
1354 if (stream->pad == pad) {
1362 /* must be called with manifest_lock taken.
1363 * It will temporarily drop the manifest_lock in order to join the task.
1364 * It will join only the old_streams (the demux->streams are joined by
1365 * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1369 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1371 GstAdaptiveDemux *demux = stream->demux;
1372 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1374 if (klass->stream_free)
1375 klass->stream_free (stream);
1377 g_clear_error (&stream->last_error);
1378 if (stream->download_task) {
1379 if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1380 GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1381 GST_DEBUG_PAD_NAME (stream->pad));
1383 gst_task_stop (stream->download_task);
1385 g_mutex_lock (&stream->fragment_download_lock);
1386 stream->cancelled = TRUE;
1387 g_cond_signal (&stream->fragment_download_cond);
1388 g_mutex_unlock (&stream->fragment_download_lock);
1390 GST_LOG_OBJECT (demux, "Waiting for task to finish");
1392 /* temporarily drop the manifest lock to join the task */
1393 GST_MANIFEST_UNLOCK (demux);
1395 gst_task_join (stream->download_task);
1397 GST_MANIFEST_LOCK (demux);
1399 GST_LOG_OBJECT (demux, "Finished");
1400 gst_object_unref (stream->download_task);
1401 g_rec_mutex_clear (&stream->download_lock);
1402 stream->download_task = NULL;
1405 gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1407 if (stream->pending_segment) {
1408 gst_event_unref (stream->pending_segment);
1409 stream->pending_segment = NULL;
1412 if (stream->pending_events) {
1413 g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1414 stream->pending_events = NULL;
1417 if (stream->internal_pad) {
1418 gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1421 if (stream->src_srcpad) {
1422 gst_object_unref (stream->src_srcpad);
1423 stream->src_srcpad = NULL;
1427 GstElement *src = stream->src;
1431 GST_MANIFEST_UNLOCK (demux);
1432 gst_element_set_locked_state (src, TRUE);
1433 gst_element_set_state (src, GST_STATE_NULL);
1434 gst_bin_remove (GST_BIN_CAST (demux), src);
1435 GST_MANIFEST_LOCK (demux);
1438 g_cond_clear (&stream->fragment_download_cond);
1439 g_mutex_clear (&stream->fragment_download_lock);
1440 g_free (stream->fragment_bitrates);
1443 gst_object_unref (stream->pad);
1446 if (stream->pending_caps)
1447 gst_caps_unref (stream->pending_caps);
1449 g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1454 /* must be called with manifest_lock taken */
1456 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1457 gint64 * range_start, gint64 * range_stop)
1459 GstAdaptiveDemuxClass *klass;
1461 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1463 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1465 return klass->get_live_seek_range (demux, range_start, range_stop);
1468 /* must be called with manifest_lock taken */
1470 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1471 GstAdaptiveDemuxStream * stream)
1473 gint64 range_start, range_stop;
1474 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1475 GST_LOG_OBJECT (stream->pad,
1476 "stream position %" GST_TIME_FORMAT " live seek range %"
1477 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1478 GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1479 GST_STIME_ARGS (range_stop));
1480 return (stream->segment.position >= range_start
1481 && stream->segment.position <= range_stop);
1487 /* must be called with manifest_lock taken */
1489 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1491 GstAdaptiveDemuxClass *klass;
1493 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1494 if (gst_adaptive_demux_is_live (demux)) {
1495 return klass->get_live_seek_range != NULL;
1498 return klass->seek != NULL;
1502 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1503 GList * streams, gint64 period_start, GstSeekType start_type,
1504 GstSeekType stop_type)
1507 for (iter = streams; iter; iter = g_list_next (iter)) {
1508 GstAdaptiveDemuxStream *stream = iter->data;
1510 GstClockTime offset;
1512 /* See comments in gst_adaptive_demux_get_period_start_time() for
1513 * an explanation of the segment modifications */
1514 stream->segment = demux->segment;
1515 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1516 stream->segment.start += offset - period_start;
1517 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1518 stream->segment.stop += offset - period_start;
1519 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1520 stream->segment.position = stream->segment.start;
1521 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1522 stream->segment.position = stream->segment.stop;
1523 seg_evt = gst_event_new_segment (&stream->segment);
1524 gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1525 gst_event_replace (&stream->pending_segment, seg_evt);
1526 GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1527 stream->pending_segment);
1528 gst_event_unref (seg_evt);
1529 /* Make sure the first buffer after a seek has the discont flag */
1530 stream->discont = TRUE;
1531 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1535 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1536 GST_SEEK_FLAG_SNAP_AFTER | \
1537 GST_SEEK_FLAG_SNAP_NEAREST | \
1538 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1539 GST_SEEK_FLAG_KEY_UNIT))
1540 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1541 GST_SEEK_FLAG_SNAP_AFTER | \
1542 GST_SEEK_FLAG_SNAP_NEAREST))
1545 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1548 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1552 GstSeekType start_type, stop_type;
1557 GstSegment oldsegment;
1558 GstAdaptiveDemuxStream *stream = NULL;
1560 GST_INFO_OBJECT (demux, "Received seek event");
1562 GST_API_LOCK (demux);
1563 GST_MANIFEST_LOCK (demux);
1565 if (!gst_adaptive_demux_can_seek (demux)) {
1566 GST_MANIFEST_UNLOCK (demux);
1567 GST_API_UNLOCK (demux);
1568 gst_event_unref (event);
1572 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1575 if (format != GST_FORMAT_TIME) {
1576 GST_MANIFEST_UNLOCK (demux);
1577 GST_API_UNLOCK (demux);
1578 GST_WARNING_OBJECT (demux,
1579 "Adaptive demuxers only support TIME-based seeking");
1580 gst_event_unref (event);
1584 if (flags & GST_SEEK_FLAG_SEGMENT) {
1585 GST_FIXME_OBJECT (demux, "Handle segment seeks");
1586 GST_MANIFEST_UNLOCK (demux);
1587 GST_API_UNLOCK (demux);
1588 gst_event_unref (event);
1592 seqnum = gst_event_get_seqnum (event);
1594 if (gst_adaptive_demux_is_live (demux)) {
1595 gint64 range_start, range_stop;
1596 gboolean changed = FALSE;
1597 gboolean start_valid = TRUE, stop_valid = TRUE;
1599 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1601 GST_MANIFEST_UNLOCK (demux);
1602 GST_API_UNLOCK (demux);
1603 gst_event_unref (event);
1604 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1608 GST_DEBUG_OBJECT (demux,
1609 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1610 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1612 /* Handle relative positioning for live streams (relative to the range_stop) */
1613 if (start_type == GST_SEEK_TYPE_END) {
1614 start = range_stop + start;
1615 start_type = GST_SEEK_TYPE_SET;
1618 if (stop_type == GST_SEEK_TYPE_END) {
1619 stop = range_stop + stop;
1620 stop_type = GST_SEEK_TYPE_SET;
1624 /* Adjust the requested start/stop position if it falls beyond the live
1626 * The only case where we don't adjust is for the starting point of
1627 * an accurate seek (start if forward and stop if backwards)
1629 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1630 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1631 GST_DEBUG_OBJECT (demux,
1632 "seek before live stream start, setting to range start: %"
1633 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1634 start = range_start;
1637 /* truncate stop position also if set */
1638 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1639 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1640 GST_DEBUG_OBJECT (demux,
1641 "seek ending after live start, adjusting to: %"
1642 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1647 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1648 (start < range_start || start > range_stop)) {
1649 GST_WARNING_OBJECT (demux,
1650 "Seek to invalid position start:%" GST_STIME_FORMAT
1651 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1652 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1653 GST_STIME_ARGS (range_stop));
1654 start_valid = FALSE;
1656 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1657 (stop < range_start || stop > range_stop)) {
1658 GST_WARNING_OBJECT (demux,
1659 "Seek to invalid position stop:%" GST_STIME_FORMAT
1660 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1661 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1662 GST_STIME_ARGS (range_stop));
1666 /* If the seek position is still outside of the seekable range, refuse the seek */
1667 if (!start_valid || !stop_valid) {
1668 GST_MANIFEST_UNLOCK (demux);
1669 GST_API_UNLOCK (demux);
1670 gst_event_unref (event);
1674 /* Re-create seek event with changed/updated values */
1676 gst_event_unref (event);
1678 gst_event_new_seek (rate, format, flags,
1679 start_type, start, stop_type, stop);
1680 gst_event_set_seqnum (event, seqnum);
1684 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1686 /* have a backup in case seek fails */
1687 gst_segment_copy_into (&demux->segment, &oldsegment);
1689 if (flags & GST_SEEK_FLAG_FLUSH) {
1692 GST_DEBUG_OBJECT (demux, "sending flush start");
1693 fevent = gst_event_new_flush_start ();
1694 gst_event_set_seqnum (fevent, seqnum);
1695 GST_MANIFEST_UNLOCK (demux);
1696 gst_adaptive_demux_push_src_event (demux, fevent);
1697 GST_MANIFEST_LOCK (demux);
1699 gst_adaptive_demux_stop_tasks (demux, FALSE);
1700 } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1701 (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1703 gst_adaptive_demux_stop_tasks (demux, FALSE);
1706 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1709 * Handle snap seeks as follows:
1710 * 1) do the snap seeking on the stream that received
1712 * 2) use the final position on this stream to seek
1713 * on the other streams to the same position
1715 * We can't snap at all streams at the same time as
1716 * they might end in different positions, so just
1717 * use the one that received the event as the 'leading'
1718 * one to do the snap seek.
1720 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1721 gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1723 GstSeekFlags stream_seek_flags = flags;
1725 /* snap-seek on the stream that received the event and then
1726 * use the resulting position to seek on all streams */
1729 if (start_type != GST_SEEK_TYPE_NONE)
1732 ts = stream->segment.position;
1733 start_type = GST_SEEK_TYPE_SET;
1736 if (stop_type != GST_SEEK_TYPE_NONE)
1739 stop_type = GST_SEEK_TYPE_SET;
1740 ts = stream->segment.position;
1745 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1748 /* replace event with a new one without snaping to seek on all streams */
1749 gst_event_unref (event);
1756 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1757 start_type, start, stop_type, stop);
1758 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1762 gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1763 start, stop_type, stop, &update);
1765 /* FIXME - this seems unatural, do_seek() is updating base when we
1766 * only want the start/stop position to change, maybe do_seek() needs
1768 if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1769 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1770 && stop_type == GST_SEEK_TYPE_NONE))) {
1771 demux->segment.base = oldsegment.base;
1774 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1776 ret = demux_class->seek (demux, event);
1779 /* Is there anything else we can do if it fails? */
1780 gst_segment_copy_into (&oldsegment, &demux->segment);
1782 demux->priv->segment_seqnum = seqnum;
1784 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1786 if (flags & GST_SEEK_FLAG_FLUSH) {
1789 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1790 fevent = gst_event_new_flush_stop (TRUE);
1791 gst_event_set_seqnum (fevent, seqnum);
1792 gst_adaptive_demux_push_src_event (demux, fevent);
1795 if (demux->next_streams) {
1796 /* If the seek generated new streams, get them
1798 gst_adaptive_demux_prepare_streams (demux, FALSE);
1799 gst_adaptive_demux_start_tasks (demux, TRUE);
1801 GstClockTime period_start =
1802 gst_adaptive_demux_get_period_start_time (demux);
1804 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1805 gst_adaptive_demux_update_streams_segment (demux, demux->streams,
1806 period_start, start_type, stop_type);
1807 gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
1808 period_start, start_type, stop_type);
1809 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1811 /* Restart the demux */
1812 gst_adaptive_demux_start_tasks (demux, FALSE);
1815 GST_MANIFEST_UNLOCK (demux);
1816 GST_API_UNLOCK (demux);
1817 gst_event_unref (event);
1823 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1826 GstAdaptiveDemux *demux;
1828 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1830 /* FIXME handle events received on pads that are to be removed */
1832 switch (event->type) {
1833 case GST_EVENT_SEEK:
1835 guint32 seqnum = gst_event_get_seqnum (event);
1836 if (seqnum == demux->priv->segment_seqnum) {
1837 GST_LOG_OBJECT (pad,
1838 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
1839 gst_event_unref (event);
1842 return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1844 case GST_EVENT_RECONFIGURE:{
1845 GstAdaptiveDemuxStream *stream;
1847 GST_MANIFEST_LOCK (demux);
1848 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1851 if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
1852 stream->last_ret == GST_FLOW_NOT_LINKED) {
1853 stream->last_ret = GST_FLOW_OK;
1854 stream->restart_download = TRUE;
1855 stream->need_header = TRUE;
1856 stream->discont = TRUE;
1857 GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1858 gst_task_start (stream->download_task);
1860 gst_event_unref (event);
1861 GST_MANIFEST_UNLOCK (demux);
1864 GST_MANIFEST_UNLOCK (demux);
1867 case GST_EVENT_LATENCY:{
1868 /* Upstream and our internal source are irrelevant
1869 * for latency, and we should not fail here to
1870 * configure the latency */
1871 gst_event_unref (event);
1874 case GST_EVENT_QOS:{
1875 GstAdaptiveDemuxStream *stream;
1877 GST_MANIFEST_LOCK (demux);
1878 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1881 GstClockTimeDiff diff;
1882 GstClockTime timestamp;
1884 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
1885 /* Only take into account lateness if late */
1887 stream->qos_earliest_time = timestamp + 2 * diff;
1889 stream->qos_earliest_time = timestamp;
1890 GST_DEBUG_OBJECT (stream->pad, "qos_earliest_time %" GST_TIME_FORMAT,
1891 GST_TIME_ARGS (stream->qos_earliest_time));
1893 GST_MANIFEST_UNLOCK (demux);
1900 return gst_pad_event_default (pad, parent, event);
1904 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1907 GstAdaptiveDemux *demux;
1908 GstAdaptiveDemuxClass *demux_class;
1909 gboolean ret = FALSE;
1914 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1915 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1917 switch (query->type) {
1918 case GST_QUERY_DURATION:{
1919 GstClockTime duration = -1;
1922 gst_query_parse_duration (query, &fmt, NULL);
1924 if (fmt == GST_FORMAT_TIME
1925 && g_atomic_int_get (&demux->priv->have_manifest)) {
1926 duration = demux_class->get_duration (demux);
1928 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1929 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1934 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1935 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1938 case GST_QUERY_LATENCY:{
1939 gst_query_set_latency (query, FALSE, 0, -1);
1943 case GST_QUERY_SEEKING:{
1948 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
1949 GST_INFO_OBJECT (demux,
1950 "Don't have manifest yet, can't answer seeking query");
1951 return FALSE; /* can't answer without manifest */
1954 GST_MANIFEST_LOCK (demux);
1956 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1957 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
1958 if (fmt == GST_FORMAT_TIME) {
1959 GstClockTime duration;
1960 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
1964 if (gst_adaptive_demux_is_live (demux)) {
1965 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
1967 GST_MANIFEST_UNLOCK (demux);
1968 GST_INFO_OBJECT (demux, "can't answer seeking query");
1972 duration = demux_class->get_duration (demux);
1973 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
1977 gst_query_set_seeking (query, fmt, can_seek, start, stop);
1978 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
1979 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
1980 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
1982 GST_MANIFEST_UNLOCK (demux);
1987 GST_MANIFEST_LOCK (demux);
1989 /* TODO HLS can answer this differently it seems */
1990 if (demux->manifest_uri) {
1991 /* FIXME: (hls) Do we answer with the variant playlist, with the current
1992 * playlist or the the uri of the last downlowaded fragment? */
1993 gst_query_set_uri (query, demux->manifest_uri);
1997 GST_MANIFEST_UNLOCK (demux);
2000 /* Don't forward queries upstream because of the special nature of this
2001 * "demuxer", which relies on the upstream element only to be fed
2010 /* must be called with manifest_lock taken */
2012 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2013 gboolean start_preroll_streams)
2017 if (!gst_adaptive_demux_is_running (demux)) {
2018 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2022 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2024 iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2026 for (; iter; iter = g_list_next (iter)) {
2027 GstAdaptiveDemuxStream *stream = iter->data;
2029 if (!start_preroll_streams) {
2030 g_mutex_lock (&stream->fragment_download_lock);
2031 stream->cancelled = FALSE;
2032 stream->replaced = FALSE;
2033 g_mutex_unlock (&stream->fragment_download_lock);
2036 stream->last_ret = GST_FLOW_OK;
2037 gst_task_start (stream->download_task);
2041 /* must be called with manifest_lock taken */
2043 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2045 gst_uri_downloader_cancel (demux->downloader);
2047 gst_task_stop (demux->priv->updates_task);
2049 g_mutex_lock (&demux->priv->updates_timed_lock);
2050 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2051 demux->priv->stop_updates_task = TRUE;
2052 g_cond_signal (&demux->priv->updates_timed_cond);
2053 g_mutex_unlock (&demux->priv->updates_timed_lock);
2056 /* must be called with manifest_lock taken */
2058 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2060 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2062 if (gst_adaptive_demux_is_live (demux)) {
2063 gst_uri_downloader_reset (demux->downloader);
2064 g_mutex_lock (&demux->priv->updates_timed_lock);
2065 demux->priv->stop_updates_task = FALSE;
2066 g_mutex_unlock (&demux->priv->updates_timed_lock);
2067 /* Task to periodically update the manifest */
2068 if (demux_class->requires_periodical_playlist_update (demux)) {
2069 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2070 gst_task_start (demux->priv->updates_task);
2075 /* must be called with manifest_lock taken
2076 * This function will temporarily release manifest_lock in order to join the
2078 * The api_lock will still protect it against other threads trying to modify
2079 * the demux element.
2082 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2086 GList *list_to_process;
2088 GST_LOG_OBJECT (demux, "Stopping tasks");
2091 gst_adaptive_demux_stop_manifest_update_task (demux);
2093 list_to_process = demux->streams;
2094 for (i = 0; i < 2; ++i) {
2095 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2096 GstAdaptiveDemuxStream *stream = iter->data;
2098 g_mutex_lock (&stream->fragment_download_lock);
2099 stream->cancelled = TRUE;
2100 gst_task_stop (stream->download_task);
2101 g_cond_signal (&stream->fragment_download_cond);
2102 g_mutex_unlock (&stream->fragment_download_lock);
2104 list_to_process = demux->prepared_streams;
2107 GST_MANIFEST_UNLOCK (demux);
2108 g_mutex_lock (&demux->priv->preroll_lock);
2109 g_cond_broadcast (&demux->priv->preroll_cond);
2110 g_mutex_unlock (&demux->priv->preroll_lock);
2111 GST_MANIFEST_LOCK (demux);
2113 g_mutex_lock (&demux->priv->manifest_update_lock);
2114 g_cond_broadcast (&demux->priv->manifest_cond);
2115 g_mutex_unlock (&demux->priv->manifest_update_lock);
2117 /* need to release manifest_lock before stopping the src element.
2118 * The streams were asked to cancel, so they will not make any writes to demux
2119 * object. Even if we temporarily release manifest_lock, the demux->streams
2120 * cannot change and iter cannot be invalidated.
2122 list_to_process = demux->streams;
2123 for (i = 0; i < 2; ++i) {
2124 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2125 GstAdaptiveDemuxStream *stream = iter->data;
2126 GstElement *src = stream->src;
2128 GST_MANIFEST_UNLOCK (demux);
2131 gst_element_set_locked_state (src, TRUE);
2132 gst_element_set_state (src, GST_STATE_READY);
2135 /* stream->download_task value never changes, so it is safe to read it
2136 * outside critical section
2138 gst_task_join (stream->download_task);
2140 GST_MANIFEST_LOCK (demux);
2142 list_to_process = demux->prepared_streams;
2145 GST_MANIFEST_UNLOCK (demux);
2147 gst_task_join (demux->priv->updates_task);
2149 GST_MANIFEST_LOCK (demux);
2151 list_to_process = demux->streams;
2152 for (i = 0; i < 2; ++i) {
2153 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2154 GstAdaptiveDemuxStream *stream = iter->data;
2156 stream->download_error_count = 0;
2157 stream->need_header = TRUE;
2158 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
2160 list_to_process = demux->prepared_streams;
2164 /* must be called with manifest_lock taken */
2166 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2169 gboolean ret = TRUE;
2171 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2172 GstAdaptiveDemuxStream *stream = iter->data;
2173 gst_event_ref (event);
2174 ret = ret & gst_pad_push_event (stream->pad, event);
2176 gst_event_unref (event);
2180 /* must be called with manifest_lock taken */
2182 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2185 GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2187 gst_caps_replace (&stream->pending_caps, caps);
2188 gst_caps_unref (caps);
2191 /* must be called with manifest_lock taken */
2193 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2196 GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2198 if (stream->pending_tags) {
2199 gst_tag_list_unref (stream->pending_tags);
2201 stream->pending_tags = tags;
2204 /* must be called with manifest_lock taken */
2206 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2209 stream->pending_events = g_list_append (stream->pending_events, event);
2212 /* must be called with manifest_lock taken */
2214 _update_average_bitrate (GstAdaptiveDemux * demux,
2215 GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2217 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2219 stream->moving_bitrate -= stream->fragment_bitrates[index];
2220 stream->fragment_bitrates[index] = new_bitrate;
2221 stream->moving_bitrate += new_bitrate;
2223 stream->moving_index += 1;
2225 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2226 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2227 return stream->moving_bitrate / stream->moving_index;
2230 /* must be called with manifest_lock taken */
2232 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2233 GstAdaptiveDemuxStream * stream)
2235 guint64 average_bitrate;
2236 guint64 fragment_bitrate;
2238 if (demux->connection_speed) {
2239 GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2240 demux->connection_speed / 1000);
2241 stream->current_download_rate = demux->connection_speed;
2242 return demux->connection_speed;
2245 fragment_bitrate = stream->last_bitrate;
2246 GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2249 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2251 GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
2253 GST_INFO_OBJECT (stream,
2254 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2255 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2257 /* Conservative approach, make sure we don't upgrade too fast */
2258 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2260 stream->current_download_rate *= demux->bitrate_limit;
2261 GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2262 G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2265 /* Debugging code, modulate the bitrate every few fragments */
2267 static guint ctr = 0;
2269 GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2270 stream->current_download_rate /= 2;
2276 return stream->current_download_rate;
2279 /* must be called with manifest_lock taken */
2280 static GstFlowReturn
2281 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2283 gboolean all_notlinked = TRUE;
2284 gboolean all_eos = TRUE;
2287 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2288 GstAdaptiveDemuxStream *stream = iter->data;
2290 if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2291 all_notlinked = FALSE;
2292 if (stream->last_ret != GST_FLOW_EOS)
2296 if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2297 || stream->last_ret == GST_FLOW_FLUSHING) {
2298 return stream->last_ret;
2302 return GST_FLOW_NOT_LINKED;
2304 return GST_FLOW_EOS;
2308 /* Called with preroll_lock */
2310 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2311 GstAdaptiveDemuxStream * stream)
2313 demux->priv->preroll_pending--;
2314 if (demux->priv->preroll_pending == 0) {
2315 /* That was the last one, time to release all streams
2316 * and expose them */
2317 GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2318 gst_adaptive_demux_expose_streams (demux);
2319 g_cond_broadcast (&demux->priv->preroll_cond);
2323 /* must be called with manifest_lock taken.
2324 * Temporarily releases manifest_lock
2327 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2330 GstAdaptiveDemux *demux = stream->demux;
2331 GstFlowReturn ret = GST_FLOW_OK;
2332 gboolean discont = FALSE;
2333 /* Pending events */
2334 GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2335 GList *pending_events = NULL;
2338 * This is duplicating *exactly* the same thing as what is done at the beginning
2339 * of _src_chain if starting_fragment is TRUE */
2340 if (stream->first_fragment_buffer) {
2341 GstClockTime offset =
2342 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2343 GstClockTime period_start =
2344 gst_adaptive_demux_get_period_start_time (demux);
2346 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2347 if (demux->segment.rate < 0)
2348 /* Set DISCONT flag for every first buffer in reverse playback mode
2349 * as each fragment for its own has to be reversed */
2352 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2353 if (GST_BUFFER_PTS_IS_VALID (buffer))
2354 GST_BUFFER_PTS (buffer) += offset;
2356 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2357 stream->segment.position = GST_BUFFER_PTS (buffer);
2359 /* Convert from position inside the stream's segment to the demuxer's
2360 * segment, they are not necessarily the same */
2361 if (stream->segment.position - offset + period_start >
2362 demux->segment.position)
2363 demux->segment.position =
2364 stream->segment.position - offset + period_start;
2366 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2368 GST_LOG_OBJECT (stream->pad,
2369 "Going to push buffer with PTS %" GST_TIME_FORMAT,
2370 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2372 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2375 if (stream->discont) {
2377 stream->discont = FALSE;
2381 GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2382 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2384 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2387 stream->first_fragment_buffer = FALSE;
2389 GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2390 GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2391 if (G_UNLIKELY (stream->pending_caps)) {
2392 pending_caps = gst_event_new_caps (stream->pending_caps);
2393 gst_caps_unref (stream->pending_caps);
2394 stream->pending_caps = NULL;
2397 if (stream->do_block) {
2399 g_mutex_lock (&demux->priv->preroll_lock);
2401 /* If we are preroll state, set caps in here */
2403 gst_pad_push_event (stream->pad, pending_caps);
2404 pending_caps = NULL;
2407 gst_adaptive_demux_handle_preroll (demux, stream);
2408 GST_MANIFEST_UNLOCK (demux);
2410 while (stream->do_block && !stream->cancelled) {
2411 GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2412 g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2414 if (stream->cancelled) {
2415 GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2416 gst_buffer_unref (buffer);
2417 g_mutex_unlock (&demux->priv->preroll_lock);
2418 return GST_FLOW_FLUSHING;
2421 g_mutex_unlock (&demux->priv->preroll_lock);
2422 GST_MANIFEST_LOCK (demux);
2425 if (G_UNLIKELY (stream->pending_segment)) {
2426 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2427 pending_segment = stream->pending_segment;
2428 stream->pending_segment = NULL;
2429 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2431 if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2432 GstTagList *tags = stream->pending_tags;
2434 stream->pending_tags = NULL;
2435 stream->bitrate_changed = 0;
2437 if (stream->fragment.bitrate != 0) {
2439 tags = gst_tag_list_make_writable (tags);
2441 tags = gst_tag_list_new_empty ();
2443 gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2444 GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2447 pending_tags = gst_event_new_tag (tags);
2449 if (G_UNLIKELY (stream->pending_events)) {
2450 pending_events = stream->pending_events;
2451 stream->pending_events = NULL;
2454 GST_MANIFEST_UNLOCK (demux);
2456 /* Do not push events or buffers holding the manifest lock */
2457 if (G_UNLIKELY (pending_caps)) {
2458 GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2460 gst_pad_push_event (stream->pad, pending_caps);
2462 if (G_UNLIKELY (pending_segment)) {
2463 GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2465 gst_pad_push_event (stream->pad, pending_segment);
2467 if (G_UNLIKELY (pending_tags)) {
2468 GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2470 gst_pad_push_event (stream->pad, pending_tags);
2472 while (pending_events != NULL) {
2473 GstEvent *event = pending_events->data;
2475 if (!gst_pad_push_event (stream->pad, event))
2476 GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2478 pending_events = g_list_delete_link (pending_events, pending_events);
2481 /* Wait for preroll if blocking */
2482 GST_DEBUG_OBJECT (stream->pad,
2483 "About to push buffer of size %" G_GSIZE_FORMAT,
2484 gst_buffer_get_size (buffer));
2486 ret = gst_pad_push (stream->pad, buffer);
2488 GST_MANIFEST_LOCK (demux);
2490 g_mutex_lock (&stream->fragment_download_lock);
2491 if (G_UNLIKELY (stream->cancelled)) {
2492 GST_LOG_OBJECT (stream, "Stream was cancelled");
2493 ret = stream->last_ret = GST_FLOW_FLUSHING;
2494 g_mutex_unlock (&stream->fragment_download_lock);
2497 g_mutex_unlock (&stream->fragment_download_lock);
2499 GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2500 gst_flow_get_name (ret));
2505 /* must be called with manifest_lock taken */
2506 static GstFlowReturn
2507 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2508 GstAdaptiveDemuxStream * stream)
2510 /* No need to advance, this isn't a real fragment */
2511 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2514 return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2515 stream->fragment.duration);
2518 /* must be called with manifest_lock taken.
2519 * Can temporarily release manifest_lock
2521 static GstFlowReturn
2522 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2523 GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2525 return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2529 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2535 static GstFlowReturn
2536 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2538 GstAdaptiveDemuxStream *stream;
2539 GstAdaptiveDemux *demux;
2540 GstAdaptiveDemuxClass *klass;
2541 GstFlowReturn ret = GST_FLOW_OK;
2543 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2544 stream = gst_pad_get_element_private (pad);
2545 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2547 GST_MANIFEST_LOCK (demux);
2549 /* do not make any changes if the stream is cancelled */
2550 g_mutex_lock (&stream->fragment_download_lock);
2551 if (G_UNLIKELY (stream->cancelled)) {
2552 g_mutex_unlock (&stream->fragment_download_lock);
2553 gst_buffer_unref (buffer);
2554 ret = stream->last_ret = GST_FLOW_FLUSHING;
2555 GST_MANIFEST_UNLOCK (demux);
2558 g_mutex_unlock (&stream->fragment_download_lock);
2560 /* starting_fragment is set to TRUE at the beginning of
2561 * _stream_download_fragment()
2562 * /!\ If there is a header/index being downloaded, then this will
2563 * be TRUE for the first one ... but FALSE for the remaining ones,
2564 * including the *actual* fragment ! */
2565 if (stream->starting_fragment) {
2566 GstClockTime offset =
2567 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2568 GstClockTime period_start =
2569 gst_adaptive_demux_get_period_start_time (demux);
2571 stream->starting_fragment = FALSE;
2572 if (klass->start_fragment) {
2573 if (!klass->start_fragment (demux, stream)) {
2574 ret = GST_FLOW_ERROR;
2579 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2580 if (GST_BUFFER_PTS_IS_VALID (buffer))
2581 GST_BUFFER_PTS (buffer) += offset;
2583 GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2584 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2586 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2587 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2588 stream->segment.position = GST_BUFFER_PTS (buffer);
2590 /* Convert from position inside the stream's segment to the demuxer's
2591 * segment, they are not necessarily the same */
2592 if (stream->segment.position - offset + period_start >
2593 demux->segment.position)
2594 demux->segment.position =
2595 stream->segment.position - offset + period_start;
2596 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2600 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2603 /* downloading_first_buffer is set to TRUE in download_uri() just before
2604 * activating the source (i.e. requesting a given URI)
2606 * The difference with starting_fragment is that this will be called
2607 * for *all* first buffers (of index, and header, and fragment)
2609 * ... to then only do something useful (in this block) for actual
2611 if (stream->downloading_first_buffer) {
2612 gint64 chunk_size = 0;
2614 stream->downloading_first_buffer = FALSE;
2616 if (!stream->downloading_header && !stream->downloading_index) {
2617 /* If this is the first buffer of a fragment (not the headers or index)
2618 * and we don't have a birate from the sub-class, then see if we
2619 * can work it out from the fragment size and duration */
2620 if (stream->fragment.bitrate == 0 &&
2621 stream->fragment.duration != 0 &&
2622 gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2624 guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2625 8 * GST_SECOND, stream->fragment.duration));
2626 GST_LOG_OBJECT (demux,
2627 "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2628 " = bitrate %u", chunk_size,
2629 GST_TIME_ARGS (stream->fragment.duration), bitrate);
2630 stream->fragment.bitrate = bitrate;
2632 if (stream->fragment.bitrate) {
2633 stream->bitrate_changed = TRUE;
2635 GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2640 stream->download_total_bytes += gst_buffer_get_size (buffer);
2642 GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2643 gst_buffer_get_size (buffer));
2645 ret = klass->data_received (demux, stream, buffer);
2647 if (ret == GST_FLOW_FLUSHING) {
2648 /* do not make any changes if the stream is cancelled */
2649 g_mutex_lock (&stream->fragment_download_lock);
2650 if (G_UNLIKELY (stream->cancelled)) {
2651 g_mutex_unlock (&stream->fragment_download_lock);
2652 GST_MANIFEST_UNLOCK (demux);
2655 g_mutex_unlock (&stream->fragment_download_lock);
2658 if (ret != GST_FLOW_OK) {
2659 gboolean finished = FALSE;
2661 if (ret < GST_FLOW_EOS) {
2662 GST_ELEMENT_FLOW_ERROR (demux, ret);
2664 /* TODO push this on all pads */
2665 gst_pad_push_event (stream->pad, gst_event_new_eos ());
2667 GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2668 gst_flow_get_name (ret));
2671 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2672 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
2673 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
2674 /* Behaves like an EOS event from upstream */
2675 stream->fragment.finished = TRUE;
2676 ret = klass->finish_fragment (demux, stream);
2677 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2678 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
2679 } else if (ret != GST_FLOW_OK) {
2685 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2692 GST_MANIFEST_UNLOCK (demux);
2697 /* must be called with manifest_lock taken */
2699 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2700 stream, GstFlowReturn ret, GError * err)
2702 GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2703 gst_flow_get_name (ret), err);
2705 /* if we have an error, only replace last_ret if it was OK before to avoid
2706 * overwriting the first error we got */
2707 if (stream->last_ret == GST_FLOW_OK) {
2708 stream->last_ret = ret;
2710 g_clear_error (&stream->last_error);
2711 stream->last_error = g_error_copy (err);
2714 g_mutex_lock (&stream->fragment_download_lock);
2715 stream->download_finished = TRUE;
2716 g_cond_signal (&stream->fragment_download_cond);
2717 g_mutex_unlock (&stream->fragment_download_lock);
2720 static GstFlowReturn
2721 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
2723 GstFlowReturn ret = GST_FLOW_OK;
2724 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
2726 if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
2727 || !klass->need_another_chunk (stream)
2728 || stream->fragment.chunk_size == 0) {
2729 stream->fragment.finished = TRUE;
2730 ret = klass->finish_fragment (stream->demux, stream);
2732 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2738 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2740 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2741 GstAdaptiveDemux *demux = stream->demux;
2743 switch (GST_EVENT_TYPE (event)) {
2744 case GST_EVENT_EOS:{
2745 GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
2746 GST_MANIFEST_LOCK (demux);
2748 gst_adaptive_demux_eos_handling (stream);
2751 * _eos_handling() calls fragment_download_finish() which does the
2752 * same thing as below.
2753 * Could this cause races ? */
2754 g_mutex_lock (&stream->fragment_download_lock);
2755 stream->download_finished = TRUE;
2756 g_cond_signal (&stream->fragment_download_cond);
2757 g_mutex_unlock (&stream->fragment_download_lock);
2759 GST_MANIFEST_UNLOCK (demux);
2766 gst_event_unref (event);
2772 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2774 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2776 switch (GST_QUERY_TYPE (query)) {
2777 case GST_QUERY_ALLOCATION:
2784 return gst_pad_peer_query (stream->pad, query);
2787 static GstPadProbeReturn
2788 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
2789 GstAdaptiveDemuxStream * stream)
2791 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2793 if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
2794 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2795 if (stream->fragment_bytes_downloaded == 0) {
2796 stream->last_latency =
2797 gst_adaptive_demux_get_monotonic_time (stream->demux) -
2798 (stream->download_start_time * GST_USECOND);
2799 GST_DEBUG_OBJECT (pad,
2800 "FIRST BYTE since download_start %" GST_TIME_FORMAT,
2801 GST_TIME_ARGS (stream->last_latency));
2803 stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
2804 GST_LOG_OBJECT (pad,
2805 "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
2806 gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
2807 } else if (GST_PAD_PROBE_INFO_TYPE (info) &
2808 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
2809 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2810 GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
2811 GST_EVENT_TYPE_NAME (ev), ev);
2812 switch (GST_EVENT_TYPE (ev)) {
2813 case GST_EVENT_SEGMENT:
2814 stream->fragment_bytes_downloaded = 0;
2818 stream->last_download_time =
2819 gst_adaptive_demux_get_monotonic_time (stream->demux) -
2820 (stream->download_start_time * GST_USECOND);
2821 stream->last_bitrate =
2822 gst_util_uint64_scale (stream->fragment_bytes_downloaded,
2823 8 * GST_SECOND, stream->last_download_time);
2824 GST_DEBUG_OBJECT (pad,
2825 "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
2826 G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
2827 stream->last_bitrate);
2828 /* Calculate bitrate since URI request */
2839 /* must be called with manifest_lock taken.
2840 * Can temporarily release manifest_lock
2843 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2844 GstAdaptiveDemuxStream * stream)
2846 gboolean ret = TRUE;
2848 /* Wait until we're cancelled or there's something for
2849 * us to download in the playlist or the playlist
2850 * became non-live */
2852 GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2854 /* get the manifest_update_lock while still holding the manifest_lock.
2855 * This will prevent other threads to signal the condition (they will need
2856 * both manifest_lock and manifest_update_lock in order to signal).
2857 * It cannot deadlock because all threads always get the manifest_lock first
2858 * and manifest_update_lock second.
2860 g_mutex_lock (&demux->priv->manifest_update_lock);
2862 GST_MANIFEST_UNLOCK (demux);
2864 g_cond_wait (&demux->priv->manifest_cond,
2865 &demux->priv->manifest_update_lock);
2866 g_mutex_unlock (&demux->priv->manifest_update_lock);
2868 GST_MANIFEST_LOCK (demux);
2870 /* check for cancelled every time we get the manifest_lock */
2871 g_mutex_lock (&stream->fragment_download_lock);
2872 if (G_UNLIKELY (stream->cancelled)) {
2874 stream->last_ret = GST_FLOW_FLUSHING;
2875 g_mutex_unlock (&stream->fragment_download_lock);
2878 g_mutex_unlock (&stream->fragment_download_lock);
2880 /* Got a new fragment or not live anymore? */
2881 if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
2883 GST_DEBUG_OBJECT (demux, "new fragment available, "
2884 "not waiting for manifest update");
2889 if (!gst_adaptive_demux_is_live (demux)) {
2890 GST_DEBUG_OBJECT (demux, "Not live anymore, "
2891 "not waiting for manifest update");
2896 GST_DEBUG_OBJECT (demux, "Retrying now");
2900 /* must be called with manifest_lock taken */
2902 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2903 const gchar * uri, const gchar * referer, gboolean refresh,
2904 gboolean allow_cache)
2906 GstAdaptiveDemux *demux = stream->demux;
2908 if (!gst_uri_is_valid (uri)) {
2909 GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2913 /* Try to re-use existing source element */
2914 if (stream->src != NULL) {
2915 gchar *old_protocol, *new_protocol;
2918 old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2919 old_protocol = gst_uri_get_protocol (old_uri);
2920 new_protocol = gst_uri_get_protocol (uri);
2922 if (!g_str_equal (old_protocol, new_protocol)) {
2923 GstElement *src = stream->src;
2926 gst_object_unref (stream->src_srcpad);
2927 stream->src_srcpad = NULL;
2928 GST_MANIFEST_UNLOCK (demux);
2929 gst_element_set_locked_state (src, TRUE);
2930 gst_element_set_state (src, GST_STATE_NULL);
2931 gst_bin_remove (GST_BIN_CAST (demux), src);
2932 GST_MANIFEST_LOCK (demux);
2933 GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2937 GST_DEBUG_OBJECT (demux, "Re-using old source element");
2938 if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
2940 GstElement *src = stream->src;
2943 GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
2944 err ? err->message : "Unknown error");
2945 g_clear_error (&err);
2946 gst_object_unref (stream->src_srcpad);
2947 stream->src_srcpad = NULL;
2948 GST_MANIFEST_UNLOCK (demux);
2949 gst_element_set_locked_state (src, TRUE);
2950 gst_element_set_state (src, GST_STATE_NULL);
2951 gst_bin_remove (GST_BIN_CAST (demux), src);
2952 GST_MANIFEST_LOCK (demux);
2956 g_free (old_protocol);
2957 g_free (new_protocol);
2960 if (stream->src == NULL) {
2961 GstPad *uri_handler_src;
2964 GstElement *uri_handler;
2966 GstPadLinkReturn pad_link_ret;
2967 GObjectClass *gobject_class;
2968 gchar *internal_name, *bin_name;
2970 /* Our src consists of a bin containing uri_handler -> queue . The
2971 * purpose of the queue is to allow the uri_handler to download an
2972 * entire fragment without blocking, so we can accurately measure the
2973 * download bitrate. */
2975 queue = gst_element_factory_make ("queue", NULL);
2979 g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
2980 g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
2981 g_object_set (queue, "max-size-time", (guint64) 0, NULL);
2983 uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
2984 if (uri_handler == NULL) {
2985 GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
2986 ("Missing plugin to handle URI: '%s'", uri), (NULL));
2987 gst_object_unref (queue);
2991 gobject_class = G_OBJECT_GET_CLASS (uri_handler);
2993 if (g_object_class_find_property (gobject_class, "compress"))
2994 g_object_set (uri_handler, "compress", FALSE, NULL);
2995 if (g_object_class_find_property (gobject_class, "keep-alive"))
2996 g_object_set (uri_handler, "keep-alive", TRUE, NULL);
2997 if (g_object_class_find_property (gobject_class, "extra-headers")) {
2998 if (referer || refresh || !allow_cache) {
2999 GstStructure *extra_headers = gst_structure_new_empty ("headers");
3002 gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3006 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3009 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3012 g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3014 gst_structure_free (extra_headers);
3016 g_object_set (uri_handler, "extra-headers", NULL, NULL);
3020 /* Source bin creation */
3021 bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3022 stream->src = gst_bin_new (bin_name);
3024 if (stream->src == NULL) {
3025 gst_object_unref (queue);
3026 gst_object_unref (uri_handler);
3030 gst_bin_add (GST_BIN_CAST (stream->src), queue);
3031 gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3033 uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3034 queue_sink = gst_element_get_static_pad (queue, "sink");
3037 gst_pad_link_full (uri_handler_src, queue_sink,
3038 GST_PAD_LINK_CHECK_NOTHING);
3039 if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3040 GST_WARNING_OBJECT (demux,
3041 "Could not link pads %s:%s to %s:%s for reason %d",
3042 GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3044 g_object_unref (queue_sink);
3045 g_object_unref (uri_handler_src);
3046 gst_object_unref (stream->src);
3051 /* Add a downstream event and data probe */
3052 gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3053 (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3055 g_object_unref (queue_sink);
3056 g_object_unref (uri_handler_src);
3057 queue_src = gst_element_get_static_pad (queue, "src");
3058 stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3059 g_object_unref (queue_src);
3060 gst_element_add_pad (stream->src, stream->src_srcpad);
3062 gst_element_set_locked_state (stream->src, TRUE);
3063 gst_bin_add (GST_BIN_CAST (demux), stream->src);
3064 stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3066 /* set up our internal floating pad to drop all events from
3067 * the http src we don't care about. On the chain function
3068 * we just push the buffer forward */
3069 internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3070 stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3071 g_free (internal_name);
3072 gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3073 GST_OBJECT_CAST (demux));
3074 GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3075 gst_pad_set_element_private (stream->internal_pad, stream);
3076 gst_pad_set_active (stream->internal_pad, TRUE);
3077 gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3078 gst_pad_set_event_function (stream->internal_pad, _src_event);
3079 gst_pad_set_query_function (stream->internal_pad, _src_query);
3081 if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3082 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3083 GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3087 stream->uri_handler = uri_handler;
3088 stream->queue = queue;
3090 stream->last_status_code = 200; /* default to OK */
3095 static GstPadProbeReturn
3096 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3099 GstAdaptiveDemuxStream *stream = user_data;
3101 /* The source's src pad is IDLE so now set the state to READY */
3102 g_mutex_lock (&stream->fragment_download_lock);
3103 stream->src_at_ready = TRUE;
3104 g_cond_signal (&stream->fragment_download_cond);
3105 g_mutex_unlock (&stream->fragment_download_lock);
3107 return GST_PAD_PROBE_REMOVE;
3110 #ifndef GST_DISABLE_GST_DEBUG
3112 uritype (GstAdaptiveDemuxStream * s)
3114 if (s->downloading_header)
3116 if (s->downloading_index)
3122 /* must be called with manifest_lock taken.
3123 * Can temporarily release manifest_lock
3125 * Will return when URI is fully downloaded (or aborted/errored)
3127 static GstFlowReturn
3128 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3129 GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3130 gint64 end, guint * http_status)
3132 GstFlowReturn ret = GST_FLOW_OK;
3133 GST_DEBUG_OBJECT (stream->pad,
3134 "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3135 uritype (stream), uri, start, end);
3138 *http_status = 200; /* default to ok if no further information */
3140 if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3141 ret = stream->last_ret = GST_FLOW_ERROR;
3145 gst_element_set_locked_state (stream->src, TRUE);
3147 GST_MANIFEST_UNLOCK (demux);
3148 if (gst_element_set_state (stream->src,
3149 GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3150 /* If ranges are specified, seek to it */
3151 if (start != 0 || end != -1) {
3152 /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3156 /* Send the seek event to the uri_handler, as the other pipeline elements
3157 * can't handle it when READY. */
3158 if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3159 GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3160 GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3162 GST_MANIFEST_LOCK (demux);
3163 /* looks like the source can't handle seeks in READY */
3164 g_clear_error (&stream->last_error);
3165 stream->last_error = g_error_new (GST_CORE_ERROR,
3166 GST_CORE_ERROR_NOT_IMPLEMENTED,
3167 "Source element can't handle range requests");
3168 stream->last_ret = GST_FLOW_ERROR;
3170 GST_MANIFEST_LOCK (demux);
3173 GST_MANIFEST_LOCK (demux);
3176 if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3177 stream->download_start_time =
3178 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3180 /* src element is in state READY. Before we start it, we reset
3183 g_mutex_lock (&stream->fragment_download_lock);
3184 stream->download_finished = FALSE;
3185 stream->downloading_first_buffer = TRUE;
3186 g_mutex_unlock (&stream->fragment_download_lock);
3188 GST_MANIFEST_UNLOCK (demux);
3190 if (!gst_element_sync_state_with_parent (stream->src)) {
3191 GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3192 GST_MANIFEST_LOCK (demux);
3193 ret = stream->last_ret = GST_FLOW_ERROR;
3197 /* wait for the fragment to be completely downloaded */
3198 GST_DEBUG_OBJECT (stream->pad,
3199 "Waiting for %s download to finish: %s", uritype (stream), uri);
3201 g_mutex_lock (&stream->fragment_download_lock);
3202 stream->src_at_ready = FALSE;
3203 if (G_UNLIKELY (stream->cancelled)) {
3204 g_mutex_unlock (&stream->fragment_download_lock);
3205 GST_MANIFEST_LOCK (demux);
3206 ret = stream->last_ret = GST_FLOW_FLUSHING;
3209 /* download_finished is only set:
3210 * * in ::fragment_download_finish()
3211 * * if EOS is received on the _src pad
3213 while (!stream->cancelled && !stream->download_finished) {
3214 g_cond_wait (&stream->fragment_download_cond,
3215 &stream->fragment_download_lock);
3217 g_mutex_unlock (&stream->fragment_download_lock);
3219 GST_DEBUG_OBJECT (stream->pad,
3220 "Finished Waiting for %s download: %s", uritype (stream), uri);
3222 GST_MANIFEST_LOCK (demux);
3223 g_mutex_lock (&stream->fragment_download_lock);
3224 if (G_UNLIKELY (stream->cancelled)) {
3225 ret = stream->last_ret = GST_FLOW_FLUSHING;
3226 g_mutex_unlock (&stream->fragment_download_lock);
3229 g_mutex_unlock (&stream->fragment_download_lock);
3231 ret = stream->last_ret;
3233 GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3234 uritype (stream), uri, stream->last_ret,
3235 gst_flow_get_name (stream->last_ret));
3236 if (stream->last_ret != GST_FLOW_OK && http_status) {
3237 *http_status = stream->last_status_code;
3241 /* changing src element state might try to join the streaming thread, so
3242 * we must not hold the manifest lock.
3244 GST_MANIFEST_UNLOCK (demux);
3246 GST_MANIFEST_UNLOCK (demux);
3247 if (stream->last_ret == GST_FLOW_OK)
3248 stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3249 ret = GST_FLOW_CUSTOM_ERROR;
3252 stream->src_at_ready = FALSE;
3254 gst_element_set_locked_state (stream->src, TRUE);
3255 gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3256 gst_ad_stream_src_to_ready_cb, stream, NULL);
3258 g_mutex_lock (&stream->fragment_download_lock);
3259 while (!stream->src_at_ready) {
3260 g_cond_wait (&stream->fragment_download_cond,
3261 &stream->fragment_download_lock);
3263 g_mutex_unlock (&stream->fragment_download_lock);
3265 gst_element_set_state (stream->src, GST_STATE_READY);
3267 /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3268 GST_MANIFEST_LOCK (demux);
3269 g_mutex_lock (&stream->fragment_download_lock);
3270 if (G_UNLIKELY (stream->cancelled)) {
3271 ret = stream->last_ret = GST_FLOW_FLUSHING;
3272 g_mutex_unlock (&stream->fragment_download_lock);
3275 g_mutex_unlock (&stream->fragment_download_lock);
3277 /* deactivate and reactivate our ghostpad to make it fresh for a new
3279 gst_pad_set_active (stream->internal_pad, FALSE);
3280 gst_pad_set_active (stream->internal_pad, TRUE);
3285 /* must be called with manifest_lock taken.
3286 * Can temporarily release manifest_lock
3288 static GstFlowReturn
3289 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3292 GstAdaptiveDemux *demux = stream->demux;
3293 GstFlowReturn ret = GST_FLOW_OK;
3295 if (stream->fragment.header_uri != NULL) {
3296 GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3297 G_GINT64_FORMAT, stream->fragment.header_uri,
3298 stream->fragment.header_range_start, stream->fragment.header_range_end);
3300 stream->downloading_header = TRUE;
3301 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3302 stream->fragment.header_uri, stream->fragment.header_range_start,
3303 stream->fragment.header_range_end, NULL);
3304 stream->downloading_header = FALSE;
3307 /* check if we have an index */
3308 if (ret == GST_FLOW_OK) { /* TODO check for other valid types */
3310 if (stream->fragment.index_uri != NULL) {
3311 GST_DEBUG_OBJECT (demux,
3312 "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3313 stream->fragment.index_uri,
3314 stream->fragment.index_range_start, stream->fragment.index_range_end);
3315 stream->downloading_index = TRUE;
3316 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3317 stream->fragment.index_uri, stream->fragment.index_range_start,
3318 stream->fragment.index_range_end, NULL);
3319 stream->downloading_index = FALSE;
3326 /* must be called with manifest_lock taken.
3327 * Can temporarily release manifest_lock
3329 static GstFlowReturn
3330 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3332 GstAdaptiveDemux *demux = stream->demux;
3333 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3336 gboolean retried_once = FALSE, live;
3338 guint last_status_code;
3341 /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3342 stream->starting_fragment = TRUE;
3343 stream->last_ret = GST_FLOW_OK;
3344 stream->first_fragment_buffer = TRUE;
3346 GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3347 stream->fragment.uri ? "FRAGMENT " : "",
3348 stream->fragment.header_uri ? "HEADER " : "",
3349 stream->fragment.index_uri ? "INDEX" : "");
3351 if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3352 stream->fragment.index_uri == NULL)
3355 if (stream->need_header) {
3356 ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3357 if (ret != GST_FLOW_OK) {
3360 stream->need_header = FALSE;
3365 url = stream->fragment.uri;
3366 GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3370 stream->last_ret = GST_FLOW_OK;
3373 /* Download the actual fragment, either in fragments or in one go */
3374 if (klass->need_another_chunk && klass->need_another_chunk (stream)
3375 && stream->fragment.chunk_size != 0) {
3376 /* Handle chunk downloading */
3377 gint64 range_start, range_end, chunk_start, chunk_end;
3378 guint64 download_total_bytes;
3379 gint chunk_size = stream->fragment.chunk_size;
3381 range_start = chunk_start = stream->fragment.range_start;
3382 range_end = stream->fragment.range_end;
3383 /* HTTP ranges are inclusive for the end */
3384 if (chunk_size != -1)
3385 chunk_end = range_start + chunk_size - 1;
3387 chunk_end = range_end;
3389 if (range_end != -1)
3390 chunk_end = MIN (chunk_end, range_end);
3392 while (!stream->fragment.finished && (chunk_start <= range_end
3393 || range_end == -1)) {
3394 download_total_bytes = stream->download_total_bytes;
3397 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3398 chunk_start, chunk_end, &http_status);
3400 GST_DEBUG_OBJECT (stream->pad,
3401 "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3402 http_status, gst_flow_get_name (stream->last_ret));
3404 /* Don't retry for any chunks except the first. We would have sent
3405 * data downstream already otherwise and it's difficult to recover
3406 * from that in a meaningful way */
3407 if (chunk_start > range_start)
3408 retried_once = TRUE;
3410 /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3411 * downloading up to -1. We don't know the full duration.
3412 * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3413 if (ret != GST_FLOW_OK && chunk_end == -1) {
3415 } else if (ret != GST_FLOW_OK) {
3417 stream->last_ret = GST_FLOW_OK;
3421 if (chunk_end == -1)
3424 /* Short read, we're at the end now */
3425 if (stream->download_total_bytes - download_total_bytes <
3426 chunk_end + 1 - chunk_start)
3429 if (!klass->need_another_chunk (stream))
3432 /* HTTP ranges are inclusive for the end */
3433 chunk_start += chunk_size;
3434 chunk_size = stream->fragment.chunk_size;
3435 if (chunk_size != -1)
3436 chunk_end = chunk_start + chunk_size - 1;
3438 chunk_end = range_end;
3440 if (range_end != -1)
3441 chunk_end = MIN (chunk_end, range_end);
3445 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3446 stream->fragment.range_start, stream->fragment.range_end, &http_status);
3447 GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3448 stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3450 if (ret == GST_FLOW_OK)
3453 g_mutex_lock (&stream->fragment_download_lock);
3454 if (G_UNLIKELY (stream->cancelled)) {
3455 g_mutex_unlock (&stream->fragment_download_lock);
3458 g_mutex_unlock (&stream->fragment_download_lock);
3460 /* TODO check if we are truly stopping */
3461 if (ret != GST_FLOW_CUSTOM_ERROR)
3464 last_status_code = stream->last_status_code;
3465 GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3466 last_status_code, stream->download_error_count);
3468 live = gst_adaptive_demux_is_live (demux);
3469 if (!retried_once && ((last_status_code / 100 == 4 && live)
3470 || last_status_code / 100 == 5)) {
3472 /* if current position is before available start, switch to next */
3473 if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3477 gint64 range_start, range_stop;
3479 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3483 if (demux->segment.position < range_start) {
3484 GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3485 stream->last_ret = GST_FLOW_OK;
3486 ret = gst_adaptive_demux_eos_handling (stream);
3487 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3488 gst_flow_get_name (ret));
3489 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3490 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3491 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3492 gst_flow_get_name (ret));
3493 if (ret == GST_FLOW_OK) {
3494 retried_once = TRUE;
3497 } else if (demux->segment.position > range_stop) {
3498 /* wait a bit to be in range, we don't have any locks at that point */
3500 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3501 if (wait_time > 0) {
3502 gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3504 GST_DEBUG_OBJECT (stream->pad,
3505 "Download waiting for %" GST_TIME_FORMAT,
3506 GST_TIME_ARGS (wait_time));
3508 GST_MANIFEST_UNLOCK (demux);
3509 g_mutex_lock (&stream->fragment_download_lock);
3510 if (G_UNLIKELY (stream->cancelled)) {
3511 g_mutex_unlock (&stream->fragment_download_lock);
3512 GST_MANIFEST_LOCK (demux);
3513 stream->last_ret = GST_FLOW_FLUSHING;
3517 g_cond_wait_until (&stream->fragment_download_cond,
3518 &stream->fragment_download_lock, end_time);
3519 if (G_UNLIKELY (stream->cancelled)) {
3520 g_mutex_unlock (&stream->fragment_download_lock);
3521 GST_MANIFEST_LOCK (demux);
3522 stream->last_ret = GST_FLOW_FLUSHING;
3525 } while (!stream->download_finished);
3526 g_mutex_unlock (&stream->fragment_download_lock);
3528 GST_MANIFEST_LOCK (demux);
3534 if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3535 /* looks like there is no way of knowing when a live stream has ended
3536 * Have to assume we are falling behind and cause a manifest reload */
3537 GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3538 return GST_FLOW_EOS;
3540 } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3541 /* If this is the last fragment, consider failures EOS and not actual
3542 * errors. Due to rounding errors in the durations, the last fragment
3543 * might not actually exist */
3544 GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3545 return GST_FLOW_EOS;
3547 /* retry once (same segment) for 5xx (server errors) */
3548 if (!retried_once) {
3549 retried_once = TRUE;
3550 /* wait a short time in case the server needs a bit to recover, we don't
3551 * care if we get woken up before end time. We can use sleep here since
3552 * we're already blocking and just want to wait some time. */
3553 g_usleep (100000); /* a tenth of a second */
3563 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3564 (_("Failed to get fragment URL.")),
3565 ("An error happened when getting fragment URL"));
3566 gst_task_stop (stream->download_task);
3567 return GST_FLOW_ERROR;
3571 /* this function will take the manifest_lock and will keep it until the end.
3572 * It will release it temporarily only when going to sleep.
3573 * Every time it takes the manifest_lock, it will check for cancelled condition
3576 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3578 GstAdaptiveDemux *demux = stream->demux;
3579 GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3583 GST_LOG_OBJECT (stream->pad, "download loop start");
3585 GST_MANIFEST_LOCK (demux);
3587 g_mutex_lock (&stream->fragment_download_lock);
3588 if (G_UNLIKELY (stream->cancelled)) {
3589 stream->last_ret = GST_FLOW_FLUSHING;
3590 g_mutex_unlock (&stream->fragment_download_lock);
3593 g_mutex_unlock (&stream->fragment_download_lock);
3595 /* Check if we're done with our segment */
3596 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3597 if (demux->segment.rate > 0) {
3598 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3599 && stream->segment.position >= stream->segment.stop) {
3600 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3602 gst_task_stop (stream->download_task);
3603 goto end_of_manifest;
3606 if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3607 && stream->segment.position <= stream->segment.start) {
3608 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3610 gst_task_stop (stream->download_task);
3611 goto end_of_manifest;
3614 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3616 /* Cleanup old streams if any */
3617 if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3618 GList *old_streams = demux->priv->old_streams;
3619 demux->priv->old_streams = NULL;
3621 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
3622 g_list_free_full (old_streams,
3623 (GDestroyNotify) gst_adaptive_demux_stream_free);
3624 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
3626 /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
3627 * Recheck the cancelled flag.
3629 g_mutex_lock (&stream->fragment_download_lock);
3630 if (G_UNLIKELY (stream->cancelled)) {
3631 stream->last_ret = GST_FLOW_FLUSHING;
3632 g_mutex_unlock (&stream->fragment_download_lock);
3635 g_mutex_unlock (&stream->fragment_download_lock);
3638 /* Restarting download, figure out new position
3639 * FIXME : Move this to a separate function ? */
3640 if (G_UNLIKELY (stream->restart_download)) {
3641 GstEvent *seg_event;
3642 GstClockTime cur, ts = 0;
3645 GST_DEBUG_OBJECT (stream->pad,
3646 "Activating stream due to reconfigure event");
3648 if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
3649 ts = (GstClockTime) pos;
3650 GST_DEBUG_OBJECT (demux, "Downstream position: %"
3651 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3653 /* query other pads as some faulty element in the pad's branch might
3654 * reject position queries. This should be better than using the
3655 * demux segment position that can be much ahead */
3658 for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
3659 GstAdaptiveDemuxStream *cur_stream =
3660 (GstAdaptiveDemuxStream *) iter->data;
3662 if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
3664 ts = (GstClockTime) pos;
3665 GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
3666 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3672 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3674 gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
3675 stream->segment.position);
3677 /* we might have already pushed this data */
3680 GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
3681 "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3683 if (GST_CLOCK_TIME_IS_VALID (ts)) {
3684 GstClockTime offset, period_start;
3687 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3688 period_start = gst_adaptive_demux_get_period_start_time (demux);
3690 /* TODO check return */
3691 gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
3694 stream->segment.position = ts - period_start + offset;
3697 /* The stream's segment is still correct except for
3698 * the position, so let's send a new one with the
3699 * updated position */
3700 seg_event = gst_event_new_segment (&stream->segment);
3701 gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
3702 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3704 GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
3705 GST_PTR_FORMAT, seg_event);
3706 gst_pad_push_event (stream->pad, seg_event);
3708 stream->discont = TRUE;
3709 stream->restart_download = FALSE;
3712 live = gst_adaptive_demux_is_live (demux);
3714 /* Get information about the fragment to download */
3715 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3716 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3717 GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
3718 ret, gst_flow_get_name (ret));
3719 if (ret == GST_FLOW_OK) {
3721 /* wait for live fragments to be available */
3724 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3725 if (wait_time > 0) {
3726 GstClockTime end_time =
3727 gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
3729 GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
3730 GST_TIME_ARGS (wait_time));
3732 GST_MANIFEST_UNLOCK (demux);
3734 g_mutex_lock (&stream->fragment_download_lock);
3735 if (G_UNLIKELY (stream->cancelled)) {
3736 g_mutex_unlock (&stream->fragment_download_lock);
3737 GST_MANIFEST_LOCK (demux);
3738 stream->last_ret = GST_FLOW_FLUSHING;
3741 gst_adaptive_demux_wait_until (demux->realtime_clock,
3742 &stream->fragment_download_cond, &stream->fragment_download_lock,
3744 g_mutex_unlock (&stream->fragment_download_lock);
3746 GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
3748 GST_MANIFEST_LOCK (demux);
3750 g_mutex_lock (&stream->fragment_download_lock);
3751 if (G_UNLIKELY (stream->cancelled)) {
3752 stream->last_ret = GST_FLOW_FLUSHING;
3753 g_mutex_unlock (&stream->fragment_download_lock);
3756 g_mutex_unlock (&stream->fragment_download_lock);
3760 stream->last_ret = GST_FLOW_OK;
3762 next_download = gst_adaptive_demux_get_monotonic_time (demux);
3763 ret = gst_adaptive_demux_stream_download_fragment (stream);
3765 if (ret == GST_FLOW_FLUSHING) {
3766 g_mutex_lock (&stream->fragment_download_lock);
3767 if (G_UNLIKELY (stream->cancelled)) {
3768 stream->last_ret = GST_FLOW_FLUSHING;
3769 g_mutex_unlock (&stream->fragment_download_lock);
3772 g_mutex_unlock (&stream->fragment_download_lock);
3776 stream->last_ret = ret;
3781 break; /* all is good, let's go */
3783 GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
3785 /* we push the EOS after releasing the object lock */
3786 if (gst_adaptive_demux_is_live (demux)
3787 && (demux->segment.rate == 1.0
3788 || gst_adaptive_demux_stream_in_live_seek_range (demux,
3790 GstAdaptiveDemuxClass *demux_class =
3791 GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3793 /* this might be a fragment download error, refresh the manifest, just in case */
3794 if (!demux_class->requires_periodical_playlist_update (demux)) {
3795 ret = gst_adaptive_demux_update_manifest (demux);
3797 /* Wait only if we can ensure current manifest has been expired.
3798 * The meaning "we have next period" *WITH* EOS is that, current
3799 * period has been ended but we can continue to the next period */
3800 } else if (!gst_adaptive_demux_has_next_period (demux) &&
3801 gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
3804 gst_task_stop (stream->download_task);
3805 if (stream->replaced) {
3809 gst_task_stop (stream->download_task);
3812 if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
3813 if (gst_adaptive_demux_has_next_period (demux)) {
3814 GST_DEBUG_OBJECT (stream->pad,
3815 "Next period available, not sending EOS");
3816 gst_adaptive_demux_advance_period (demux);
3822 case GST_FLOW_NOT_LINKED:
3825 gst_task_stop (stream->download_task);
3827 ret = gst_adaptive_demux_combine_flows (demux);
3828 if (ret == GST_FLOW_NOT_LINKED) {
3829 GST_ELEMENT_FLOW_ERROR (demux, ret);
3834 case GST_FLOW_FLUSHING:{
3837 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
3838 GstAdaptiveDemuxStream *other;
3841 gst_task_stop (other->download_task);
3847 if (ret <= GST_FLOW_ERROR) {
3848 gboolean is_live = gst_adaptive_demux_is_live (demux);
3849 GST_WARNING_OBJECT (demux, "Error while downloading fragment");
3850 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
3851 goto download_error;
3854 g_clear_error (&stream->last_error);
3856 /* First try to update the playlist for non-live playlists
3857 * in case the URIs have changed in the meantime. But only
3858 * try it the first time, after that we're going to wait a
3859 * a bit to not flood the server */
3860 if (stream->download_error_count == 1 && !is_live) {
3861 /* TODO hlsdemux had more options to this function (boolean and err) */
3863 if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3864 /* Retry immediately, the playlist actually has changed */
3865 GST_DEBUG_OBJECT (demux, "Updated the playlist");
3870 /* Wait half the fragment duration before retrying */
3871 next_download += stream->fragment.duration / 2;
3873 GST_MANIFEST_UNLOCK (demux);
3875 g_mutex_lock (&stream->fragment_download_lock);
3876 if (G_UNLIKELY (stream->cancelled)) {
3877 g_mutex_unlock (&stream->fragment_download_lock);
3878 GST_MANIFEST_LOCK (demux);
3879 stream->last_ret = GST_FLOW_FLUSHING;
3882 gst_adaptive_demux_wait_until (demux->realtime_clock,
3883 &stream->fragment_download_cond, &stream->fragment_download_lock,
3885 g_mutex_unlock (&stream->fragment_download_lock);
3887 GST_DEBUG_OBJECT (demux, "Retrying now");
3889 GST_MANIFEST_LOCK (demux);
3891 g_mutex_lock (&stream->fragment_download_lock);
3892 if (G_UNLIKELY (stream->cancelled)) {
3893 stream->last_ret = GST_FLOW_FLUSHING;
3894 g_mutex_unlock (&stream->fragment_download_lock);
3897 g_mutex_unlock (&stream->fragment_download_lock);
3899 /* Refetch the playlist now after we waited */
3901 && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3902 GST_DEBUG_OBJECT (demux, "Updated the playlist");
3910 if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3911 if (GST_OBJECT_PARENT (stream->pad) != NULL) {
3912 if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
3913 GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
3914 gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3916 GST_DEBUG_OBJECT (stream->src,
3917 "Stream is EOS, but we're switching fragments. Not sending.");
3920 GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
3921 goto download_error;
3926 GST_MANIFEST_UNLOCK (demux);
3927 GST_LOG_OBJECT (stream->pad, "download loop end");
3932 GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3939 if (stream->last_error) {
3940 gchar *debug = g_strdup_printf ("Error on stream %s:%s",
3941 GST_DEBUG_PAD_NAME (stream->pad));
3943 gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
3945 GST_ERROR_OBJECT (stream->pad, "Download error: %s",
3946 stream->last_error->message);
3950 g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
3951 _("Couldn't download fragments"));
3953 gst_message_new_error (GST_OBJECT_CAST (demux), err,
3954 "Fragment downloading has failed consecutive times");
3956 GST_ERROR_OBJECT (stream->pad,
3957 "Download error: Couldn't download fragments, too many failures");
3960 gst_task_stop (stream->download_task);
3962 GstElement *src = stream->src;
3965 GST_MANIFEST_UNLOCK (demux);
3966 gst_element_set_locked_state (src, TRUE);
3967 gst_element_set_state (src, GST_STATE_NULL);
3968 gst_bin_remove (GST_BIN_CAST (demux), src);
3969 GST_MANIFEST_LOCK (demux);
3972 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3979 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
3981 GstClockTime next_update;
3982 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3984 /* Loop for updating of the playlist. This periodically checks if
3985 * the playlist is updated and does so, then signals the streaming
3986 * thread in case it can continue downloading now. */
3988 /* block until the next scheduled update or the signal to quit this thread */
3989 GST_DEBUG_OBJECT (demux, "Started updates task");
3991 GST_MANIFEST_LOCK (demux);
3994 gst_adaptive_demux_get_monotonic_time (demux) +
3995 klass->get_manifest_update_interval (demux) * GST_USECOND;
3997 /* Updating playlist only needed for live playlists */
3998 while (gst_adaptive_demux_is_live (demux)) {
3999 GstFlowReturn ret = GST_FLOW_OK;
4001 /* Wait here until we should do the next update or we're cancelled */
4002 GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4004 GST_MANIFEST_UNLOCK (demux);
4006 g_mutex_lock (&demux->priv->updates_timed_lock);
4007 if (demux->priv->stop_updates_task) {
4008 g_mutex_unlock (&demux->priv->updates_timed_lock);
4011 gst_adaptive_demux_wait_until (demux->realtime_clock,
4012 &demux->priv->updates_timed_cond,
4013 &demux->priv->updates_timed_lock, next_update);
4014 g_mutex_unlock (&demux->priv->updates_timed_lock);
4016 g_mutex_lock (&demux->priv->updates_timed_lock);
4017 if (demux->priv->stop_updates_task) {
4018 g_mutex_unlock (&demux->priv->updates_timed_lock);
4021 g_mutex_unlock (&demux->priv->updates_timed_lock);
4023 GST_MANIFEST_LOCK (demux);
4025 GST_DEBUG_OBJECT (demux, "Updating playlist");
4027 ret = gst_adaptive_demux_update_manifest (demux);
4029 if (ret == GST_FLOW_EOS) {
4030 } else if (ret != GST_FLOW_OK) {
4031 /* update_failed_count is used only here, no need to protect it */
4032 demux->priv->update_failed_count++;
4033 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4034 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4035 gst_flow_get_name (ret));
4036 next_update = gst_adaptive_demux_get_monotonic_time (demux)
4037 + klass->get_manifest_update_interval (demux) * GST_USECOND;
4039 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4040 (_("Internal data stream error.")), ("Could not update playlist"));
4041 GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4042 gst_task_stop (demux->priv->updates_task);
4043 GST_MANIFEST_UNLOCK (demux);
4047 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4048 demux->priv->update_failed_count = 0;
4050 gst_adaptive_demux_get_monotonic_time (demux) +
4051 klass->get_manifest_update_interval (demux) * GST_USECOND;
4053 /* Wake up download tasks */
4054 g_mutex_lock (&demux->priv->manifest_update_lock);
4055 g_cond_broadcast (&demux->priv->manifest_cond);
4056 g_mutex_unlock (&demux->priv->manifest_update_lock);
4060 GST_MANIFEST_UNLOCK (demux);
4064 GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4073 /* must be called with manifest_lock taken */
4075 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4080 GstAdaptiveDemux *demux = stream->demux;
4082 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4086 pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4088 /* Can't push events holding the manifest lock */
4089 GST_MANIFEST_UNLOCK (demux);
4091 GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4092 "Pushing event %" GST_PTR_FORMAT, event);
4094 ret = gst_pad_push_event (pad, event);
4096 gst_object_unref (pad);
4098 GST_MANIFEST_LOCK (demux);
4103 /* must be called with manifest_lock taken */
4105 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4107 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4110 return klass->is_live (demux);
4114 /* must be called with manifest_lock taken */
4115 static GstFlowReturn
4116 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4117 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4118 GstClockTime ts, GstClockTime * final_ts)
4120 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4122 if (klass->stream_seek)
4123 return klass->stream_seek (stream, forward, flags, ts, final_ts);
4124 return GST_FLOW_ERROR;
4127 /* must be called with manifest_lock taken */
4129 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4130 GstAdaptiveDemuxStream * stream)
4132 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4133 gboolean ret = TRUE;
4135 if (klass->stream_has_next_fragment)
4136 ret = klass->stream_has_next_fragment (stream);
4141 /* must be called with manifest_lock taken */
4143 * the ::finish_fragment() handlers when an *actual* fragment is done
4146 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4147 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4151 if (stream->last_ret == GST_FLOW_OK) {
4153 gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4156 ret = stream->last_ret;
4161 /* must be called with manifest_lock taken */
4163 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4164 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4166 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4169 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4171 GST_LOG_OBJECT (stream->pad,
4172 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4173 GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4175 stream->download_error_count = 0;
4176 g_clear_error (&stream->last_error);
4178 /* FIXME - url has no indication of byte ranges for subsegments */
4179 /* FIXME : All those time statistics are biased, since they are calculated
4180 * *AFTER* the queue2, which might be blocking. They should ideally be
4181 * calculated *before* queue2 in the uri_handler_probe */
4182 gst_element_post_message (GST_ELEMENT_CAST (demux),
4183 gst_message_new_element (GST_OBJECT_CAST (demux),
4184 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4185 "manifest-uri", G_TYPE_STRING,
4186 demux->manifest_uri, "uri", G_TYPE_STRING,
4187 stream->fragment.uri, "fragment-start-time",
4188 GST_TYPE_CLOCK_TIME, stream->download_start_time,
4189 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4190 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4191 stream->download_total_bytes, "fragment-download-time",
4192 GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4194 /* Don't update to the end of the segment if in reverse playback */
4195 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4196 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4197 GstClockTime offset =
4198 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4199 GstClockTime period_start =
4200 gst_adaptive_demux_get_period_start_time (demux);
4202 stream->segment.position += duration;
4204 /* Convert from position inside the stream's segment to the demuxer's
4205 * segment, they are not necessarily the same */
4206 if (stream->segment.position - offset + period_start >
4207 demux->segment.position)
4208 demux->segment.position =
4209 stream->segment.position - offset + period_start;
4211 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4213 /* When advancing with a non 1.0 rate on live streams, we need to check
4214 * the live seeking range again to make sure we can still advance to
4216 if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4217 if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4220 ret = klass->stream_advance_fragment (stream);
4221 } else if (gst_adaptive_demux_is_live (demux)
4222 || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4223 ret = klass->stream_advance_fragment (stream);
4228 stream->download_start_time =
4229 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4231 if (ret == GST_FLOW_OK) {
4232 if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4233 gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4234 stream->need_header = TRUE;
4235 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4238 /* the subclass might want to switch pads */
4239 if (G_UNLIKELY (demux->next_streams)) {
4241 gboolean can_expose = TRUE;
4243 gst_task_stop (stream->download_task);
4247 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4248 /* Only expose if all streams are now cancelled or finished downloading */
4249 GstAdaptiveDemuxStream *other = iter->data;
4250 if (other != stream) {
4251 g_mutex_lock (&other->fragment_download_lock);
4252 can_expose &= (other->cancelled == TRUE
4253 || other->download_finished == TRUE);
4254 g_mutex_unlock (&other->fragment_download_lock);
4259 GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4260 "to do bitrate switching");
4261 gst_adaptive_demux_prepare_streams (demux, FALSE);
4262 gst_adaptive_demux_start_tasks (demux, TRUE);
4264 GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4272 /* must be called with manifest_lock taken */
4274 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4275 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4277 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4279 if (klass->stream_select_bitrate)
4280 return klass->stream_select_bitrate (stream, bitrate);
4284 /* must be called with manifest_lock taken */
4285 static GstFlowReturn
4286 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4287 GstAdaptiveDemuxStream * stream)
4289 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4292 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4295 /* Make sure the sub-class will update bitrate, or else
4297 stream->fragment.bitrate = 0;
4298 stream->fragment.finished = FALSE;
4300 GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4301 GST_TIME_ARGS (stream->segment.position));
4303 ret = klass->stream_update_fragment_info (stream);
4305 GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4306 stream->fragment.uri);
4307 if (ret == GST_FLOW_OK) {
4308 GST_LOG_OBJECT (stream->pad,
4309 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4310 GST_TIME_ARGS (stream->fragment.timestamp),
4311 GST_TIME_ARGS (stream->fragment.duration));
4312 GST_LOG_OBJECT (stream->pad,
4313 "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4314 stream->fragment.range_start, stream->fragment.range_end);
4320 /* must be called with manifest_lock taken */
4322 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4323 demux, GstAdaptiveDemuxStream * stream)
4325 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4327 if (klass->stream_get_fragment_waiting_time)
4328 return klass->stream_get_fragment_waiting_time (stream);
4332 /* must be called with manifest_lock taken */
4333 static GstFlowReturn
4334 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4336 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4337 GstFragment *download;
4340 GError *error = NULL;
4342 download = gst_uri_downloader_fetch_uri (demux->downloader,
4343 demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4345 g_free (demux->manifest_uri);
4346 g_free (demux->manifest_base_uri);
4347 if (download->redirect_permanent && download->redirect_uri) {
4348 demux->manifest_uri = g_strdup (download->redirect_uri);
4349 demux->manifest_base_uri = NULL;
4351 demux->manifest_uri = g_strdup (download->uri);
4352 demux->manifest_base_uri = g_strdup (download->redirect_uri);
4355 buffer = gst_fragment_get_buffer (download);
4356 g_object_unref (download);
4357 ret = klass->update_manifest_data (demux, buffer);
4358 gst_buffer_unref (buffer);
4359 /* FIXME: Should the manifest uri vars be reverted to original
4360 * values if updating fails? */
4362 GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4364 ret = GST_FLOW_NOT_LINKED;
4366 g_clear_error (&error);
4371 /* must be called with manifest_lock taken */
4372 static GstFlowReturn
4373 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4375 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4378 ret = klass->update_manifest (demux);
4380 if (ret == GST_FLOW_OK) {
4381 GstClockTime duration;
4382 /* Send an updated duration message */
4383 duration = klass->get_duration (demux);
4384 if (duration != GST_CLOCK_TIME_NONE) {
4385 GST_DEBUG_OBJECT (demux,
4386 "Sending duration message : %" GST_TIME_FORMAT,
4387 GST_TIME_ARGS (duration));
4388 gst_element_post_message (GST_ELEMENT (demux),
4389 gst_message_new_duration_changed (GST_OBJECT (demux)));
4391 GST_DEBUG_OBJECT (demux,
4392 "Duration unknown, can not send the duration message");
4395 /* If a manifest changes it's liveness or periodic updateness, we need
4396 * to start/stop the manifest update task appropriately */
4397 /* Keep this condition in sync with the one in
4398 * gst_adaptive_demux_start_manifest_update_task()
4400 if (gst_adaptive_demux_is_live (demux) &&
4401 klass->requires_periodical_playlist_update (demux)) {
4402 gst_adaptive_demux_start_manifest_update_task (demux);
4404 gst_adaptive_demux_stop_manifest_update_task (demux);
4412 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4419 g_free (f->header_uri);
4420 f->header_uri = NULL;
4421 f->header_range_start = 0;
4422 f->header_range_end = -1;
4424 g_free (f->index_uri);
4425 f->index_uri = NULL;
4426 f->index_range_start = 0;
4427 f->index_range_end = -1;
4429 f->finished = FALSE;
4432 /* must be called with manifest_lock taken */
4434 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4436 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4437 gboolean ret = FALSE;
4439 if (klass->has_next_period)
4440 ret = klass->has_next_period (demux);
4441 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4445 /* must be called with manifest_lock taken */
4447 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4449 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4451 g_return_if_fail (klass->advance_period != NULL);
4453 GST_DEBUG_OBJECT (demux, "Advancing to next period");
4454 klass->advance_period (demux);
4455 gst_adaptive_demux_prepare_streams (demux, FALSE);
4456 gst_adaptive_demux_start_tasks (demux, TRUE);
4460 * gst_adaptive_demux_get_monotonic_time:
4461 * Returns: a monotonically increasing time, using the system realtime clock
4464 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4466 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4467 return gst_clock_get_time (demux->realtime_clock);
4471 * gst_adaptive_demux_get_client_now_utc:
4472 * @demux: #GstAdaptiveDemux
4473 * Returns: the client's estimate of UTC
4475 * Used to find the client's estimate of UTC, using the system realtime clock.
4478 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4480 GstClockTime rtc_now;
4484 rtc_now = gst_clock_get_time (demux->realtime_clock);
4485 utc_now = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4486 gtv.tv_sec = utc_now / G_TIME_SPAN_SECOND;
4487 gtv.tv_usec = utc_now % G_TIME_SPAN_SECOND;
4488 return g_date_time_new_from_timeval_utc (>v);
4492 * gst_adaptive_demux_is_running
4493 * @demux: #GstAdaptiveDemux
4494 * Returns: whether the demuxer is processing data
4496 * Returns FALSE if shutdown has started (transitioning down from
4497 * PAUSED), otherwise TRUE.
4500 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4502 return g_atomic_int_get (&demux->running);
4505 static GstAdaptiveDemuxTimer *
4506 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4508 GstAdaptiveDemuxTimer *timer;
4510 timer = g_slice_new (GstAdaptiveDemuxTimer);
4511 timer->fired = FALSE;
4513 timer->mutex = mutex;
4514 timer->ref_count = 1;
4518 static GstAdaptiveDemuxTimer *
4519 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4521 g_return_val_if_fail (timer != NULL, NULL);
4522 g_atomic_int_inc (&timer->ref_count);
4527 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4529 g_return_if_fail (timer != NULL);
4530 if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4531 g_slice_free (GstAdaptiveDemuxTimer, timer);
4535 /* gst_adaptive_demux_wait_until:
4536 * A replacement for g_cond_wait_until that uses the clock rather
4537 * than system time to control the duration of the sleep. Typically
4538 * clock is actually a #GstSystemClock, in which case this function
4539 * behaves exactly like g_cond_wait_until. Inside unit tests,
4540 * the clock is typically a #GstTestClock, which allows tests to run
4542 * This function must be called with mutex held.
4545 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4546 GstClockTime end_time)
4548 GstAdaptiveDemuxTimer *timer;
4552 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4553 /* for an invalid time, gst_clock_id_wait_async will try to call
4554 * gst_adaptive_demux_clock_callback from the current thread.
4555 * It still holds the mutex while doing that, so it will deadlock.
4556 * g_cond_wait_until would return immediately with false, so we'll do the same.
4560 timer = gst_adaptive_demux_timer_new (cond, mutex);
4561 timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4563 gst_clock_id_wait_async (timer->clock_id,
4564 gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4565 (GDestroyNotify) gst_adaptive_demux_timer_unref);
4566 /* clock does not support asynchronously wait. Assert and return */
4567 if (res == GST_CLOCK_UNSUPPORTED) {
4568 gst_clock_id_unref (timer->clock_id);
4569 gst_adaptive_demux_timer_unref (timer);
4570 g_return_val_if_reached (TRUE);
4572 g_assert (!timer->fired);
4573 /* the gst_adaptive_demux_clock_callback() will signal the
4574 * cond when the clock's single shot timer fires, or the cond will be
4575 * signalled by another thread that wants to cause this wait to finish
4576 * early (e.g. to terminate the waiting thread).
4577 * There is no need for a while loop here, because that logic is
4578 * implemented by the function calling gst_adaptive_demux_wait_until() */
4579 g_cond_wait (cond, mutex);
4580 fired = timer->fired;
4582 gst_clock_id_unschedule (timer->clock_id);
4583 gst_clock_id_unref (timer->clock_id);
4584 gst_adaptive_demux_timer_unref (timer);
4589 gst_adaptive_demux_clock_callback (GstClock * clock,
4590 GstClockTime time, GstClockID id, gpointer user_data)
4592 GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4593 g_return_val_if_fail (timer != NULL, FALSE);
4594 g_mutex_lock (timer->mutex);
4595 timer->fired = TRUE;
4596 g_cond_signal (timer->cond);
4597 g_mutex_unlock (timer->mutex);