3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
23 * SECTION:gstadaptivedemux
24 * @short_description: Base class for adaptive demuxers
26 * What is an adaptive demuxer?
27 * Adaptive demuxers are special demuxers in the sense that they don't
28 * actually demux data received from upstream but download the data
31 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
32 * a set of fragments. The manifest describes the available media and
33 * the sequence of fragments to use. Each fragment contains a small
34 * part of the media (typically only a few seconds). It is possible for
35 * the manifest to have the same media available in different configurations
36 * (bitrates for example) so that the client can select the one that
37 * best suits its scenario (network fluctuation, hardware requirements...).
38 * It is possible to switch from one representation of the media to another
39 * during playback. That's why it is called 'adaptive', because it can be
40 * adapted to the client's needs.
42 * Architectural overview:
43 * The manifest is received by the demuxer in its sink pad and, upon receiving
44 * EOS, it parses the manifest and exposes the streams available in it. For
45 * each stream a source element will be created and will download the list
46 * of fragments one by one. Once a fragment is finished downloading, the next
47 * URI is set to the source element and it starts fetching it and pushing
48 * through the stream's pad. This implies that each stream is independent from
49 * each other as it runs on a separate thread.
51 * After downloading each fragment, the download rate of it is calculated and
52 * the demuxer has a chance to switch to a different bitrate if needed. The
53 * switch can be done by simply pushing a new caps before the next fragment
54 * when codecs are the same, or by exposing a new pad group if it needs
58 * - Not linked streams: Streams that are not-linked have their download threads
59 * interrupted to save network bandwidth. When they are
60 * relinked a reconfigure event is received and the
61 * stream is restarted.
64 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
65 * about the intrinsics of the subclass formats, so the subclasses are
66 * responsible for maintaining the manifest data structures and stream
72 The following rules were observed while implementing MT safety in adaptive demux:
73 1. If a variable is accessed from multiple threads and at least one thread
74 writes to it, then all the accesses needs to be done from inside a critical section.
75 2. If thread A wants to join thread B then at the moment it calls gst_task_join
76 it must not hold any mutexes that thread B might take.
78 Adaptive demux API can be called from several threads. More, adaptive demux
79 starts some threads to monitor the download of fragments. In order to protect
80 accesses to shared variables (demux and streams) all the API functions that
81 can be run in different threads will need to get a mutex (manifest_lock)
82 when they start and release it when they end. Because some of those functions
83 can indirectly call other API functions (eg they can generate events or messages
84 that are processed in the same thread) the manifest_lock must be recursive.
86 The manifest_lock will serialize the public API making access to shared
87 variables safe. But some of these functions will try at some moment to join
88 threads created by adaptive demux, or to change the state of src elements
89 (which will block trying to join the src element streaming thread). Because
90 of rule 2, those functions will need to release the manifest_lock during the
91 call of gst_task_join. During this time they can be interrupted by other API calls.
92 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
93 is called and this will join all threads. In order to prevent interruptions
94 during such period, all the API functions will also use a second lock: api_lock.
95 This will be taken at the beginning of the function and released at the end,
96 but this time this lock will not be temporarily released during join.
97 This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
98 or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
99 to hold it while joining the threads or changing the src element state. The
100 api_lock will serialise all external requests to adaptive demux. In order to
101 avoid deadlocks, if a function needs to acquire both manifest and api locks,
102 the api_lock will be taken first and the manifest_lock second.
104 By using the api_lock a thread is protected against other API calls. But when
105 temporarily dropping the manifest_lock, it will be vulnerable to changes from
106 threads that use only the manifest_lock and not the api_lock. These threads run
107 one of the following functions: gst_adaptive_demux_stream_download_loop,
108 gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
109 that all operations during an API call are not impacted by other writes, the
110 above mentioned functions must check a cancelled flag every time they reacquire
111 the manifest_lock. If the flag is set, they must exit immediately, without
112 performing any changes on the shared data. In this way, an API call (eg seek
113 request) can set the cancel flag before releasing the manifest_lock and be sure
114 that the demux object and its streams are not changed by anybody else.
121 #include "gstadaptivedemux.h"
122 #include "gst/gst-i18n-plugin.h"
123 #include <gst/base/gstadapter.h>
125 GST_DEBUG_CATEGORY (adaptivedemux_debug);
126 #define GST_CAT_DEFAULT adaptivedemux_debug
128 #define MAX_DOWNLOAD_ERROR_COUNT 3
129 #define DEFAULT_FAILED_COUNT 3
130 #define DEFAULT_CONNECTION_SPEED 0
131 #define DEFAULT_BITRATE_LIMIT 0.8f
132 #define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024 /* For safety. Large enough to hold a segment. */
133 #define NUM_LOOKBACK_FRAGMENTS 3
135 #define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
136 #define GST_MANIFEST_LOCK(d) G_STMT_START { \
137 GST_TRACE("Locking from thread %p", g_thread_self()); \
138 g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
139 GST_TRACE("Locked from thread %p", g_thread_self()); \
142 #define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
143 GST_TRACE("Unlocking from thread %p", g_thread_self()); \
144 g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
147 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
148 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
149 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
151 #define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
152 #define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
153 #define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
158 PROP_CONNECTION_SPEED,
163 /* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
164 #define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
166 struct _GstAdaptiveDemuxPrivate
168 GstAdapter *input_adapter; /* protected by manifest_lock */
169 gint have_manifest; /* MT safe */
171 GList *old_streams; /* protected by manifest_lock */
173 GstTask *updates_task; /* MT safe */
174 GRecMutex updates_lock;
175 GMutex updates_timed_lock;
176 GCond updates_timed_cond; /* protected by updates_timed_lock */
177 gboolean stop_updates_task; /* protected by updates_timed_lock */
179 /* used only from updates_task, no need to protect it */
180 gint update_failed_count;
182 guint32 segment_seqnum; /* protected by manifest_lock */
184 /* main lock used to protect adaptive demux and all its streams.
185 * It serializes the adaptive demux public API.
187 GRecMutex manifest_lock;
189 /* condition to wait for manifest updates on a live stream.
190 * In order to signal the manifest_cond, the caller needs to hold both
191 * manifest_lock and manifest_update_lock (taken in this order)
194 GMutex manifest_update_lock;
196 /* Lock and condition for prerolling streams before exposing */
199 gint preroll_pending;
203 /* Protects demux and stream segment information
204 * Needed because seeks can update segment information
205 * without needing to stop tasks when they just want to
206 * update the segment boundaries */
210 typedef struct _GstAdaptiveDemuxTimer
212 volatile gint ref_count;
217 } GstAdaptiveDemuxTimer;
219 static GstBinClass *parent_class = NULL;
220 static gint private_offset = 0;
222 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
223 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
224 GstAdaptiveDemuxClass * klass);
225 static void gst_adaptive_demux_finalize (GObject * object);
226 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
227 element, GstStateChange transition);
229 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
231 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
233 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
234 GstObject * parent, GstBuffer * buffer);
235 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
237 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
241 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
243 static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
244 static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
246 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
247 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
248 gboolean first_and_live);
249 static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
250 static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
251 static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
252 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
253 GstClockTime ts, GstClockTime * final_ts);
254 static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
255 demux, GstAdaptiveDemuxStream * stream);
256 static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
257 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
259 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
260 GstAdaptiveDemuxStream * stream);
262 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
263 GstAdaptiveDemuxStream * stream);
264 static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
267 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
268 static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
269 static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);
271 static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
273 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
276 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
278 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
281 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
282 gboolean start_preroll_streams);
283 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
284 gboolean stop_updates);
285 static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
288 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
289 stream, GstFlowReturn ret, GError * err);
291 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
292 GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
294 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
295 GstAdaptiveDemuxStream * stream);
297 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
298 GstAdaptiveDemuxStream * stream, GstClockTime duration);
300 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
301 GstClockTime end_time);
302 static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
303 GstClockTime time, GstClockID id, gpointer user_data);
305 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
308 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
309 * method to get to the padtemplates */
311 gst_adaptive_demux_get_type (void)
313 static volatile gsize type = 0;
315 if (g_once_init_enter (&type)) {
317 static const GTypeInfo info = {
318 sizeof (GstAdaptiveDemuxClass),
321 (GClassInitFunc) gst_adaptive_demux_class_init,
324 sizeof (GstAdaptiveDemux),
326 (GInstanceInitFunc) gst_adaptive_demux_init,
329 _type = g_type_register_static (GST_TYPE_BIN,
330 "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
333 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
335 g_once_init_leave (&type, _type);
340 static inline GstAdaptiveDemuxPrivate *
341 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
343 return (G_STRUCT_MEMBER_P (self, private_offset));
347 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
348 const GValue * value, GParamSpec * pspec)
350 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
352 GST_API_LOCK (demux);
353 GST_MANIFEST_LOCK (demux);
356 case PROP_CONNECTION_SPEED:
357 demux->connection_speed = g_value_get_uint (value) * 1000;
358 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
359 demux->connection_speed);
361 case PROP_BITRATE_LIMIT:
362 demux->bitrate_limit = g_value_get_float (value);
365 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
369 GST_MANIFEST_UNLOCK (demux);
370 GST_API_UNLOCK (demux);
374 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
375 GValue * value, GParamSpec * pspec)
377 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
379 GST_MANIFEST_LOCK (demux);
382 case PROP_CONNECTION_SPEED:
383 g_value_set_uint (value, demux->connection_speed / 1000);
385 case PROP_BITRATE_LIMIT:
386 g_value_set_float (value, demux->bitrate_limit);
389 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
393 GST_MANIFEST_UNLOCK (demux);
397 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
399 GObjectClass *gobject_class;
400 GstElementClass *gstelement_class;
401 GstBinClass *gstbin_class;
403 gobject_class = G_OBJECT_CLASS (klass);
404 gstelement_class = GST_ELEMENT_CLASS (klass);
405 gstbin_class = GST_BIN_CLASS (klass);
407 GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
408 "Base Adaptive Demux");
410 parent_class = g_type_class_peek_parent (klass);
412 if (private_offset != 0)
413 g_type_class_adjust_private_offset (klass, &private_offset);
415 gobject_class->set_property = gst_adaptive_demux_set_property;
416 gobject_class->get_property = gst_adaptive_demux_get_property;
417 gobject_class->finalize = gst_adaptive_demux_finalize;
419 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
420 g_param_spec_uint ("connection-speed", "Connection Speed",
421 "Network connection speed in kbps (0 = calculate from downloaded"
422 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
423 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
425 /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
426 g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
427 g_param_spec_float ("bitrate-limit",
428 "Bitrate limit in %",
429 "Limit of the available bitrate to use when switching to alternates.",
430 0, 1, DEFAULT_BITRATE_LIMIT,
431 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433 gstelement_class->change_state = gst_adaptive_demux_change_state;
435 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
437 klass->data_received = gst_adaptive_demux_stream_data_received_default;
438 klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
439 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
440 klass->requires_periodical_playlist_update =
441 gst_adaptive_demux_requires_periodical_playlist_update_default;
446 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
447 GstAdaptiveDemuxClass * klass)
449 GstPadTemplate *pad_template;
450 GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
451 GObjectClass *gobject_class;
453 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
455 demux->priv = gst_adaptive_demux_get_instance_private (demux);
456 demux->priv->input_adapter = gst_adapter_new ();
457 demux->downloader = gst_uri_downloader_new ();
458 gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
459 demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
460 demux->priv->segment_seqnum = gst_util_seqnum_next ();
461 demux->have_group_id = FALSE;
462 demux->group_id = G_MAXUINT;
464 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
466 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
467 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
469 demux->realtime_clock = gst_system_clock_obtain ();
470 g_assert (demux->realtime_clock != NULL);
471 gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
472 if (g_object_class_find_property (gobject_class, "clock-type")) {
473 g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
475 GST_WARNING_OBJECT (demux,
476 "System clock does not have clock-type property");
478 if (clock_type == GST_CLOCK_TYPE_REALTIME) {
479 demux->clock_offset = 0;
482 GstClockTime rtc_now;
484 utc_now = g_date_time_new_now_utc ();
485 rtc_now = gst_clock_get_time (demux->realtime_clock);
486 demux->clock_offset =
487 g_date_time_to_unix (utc_now) * G_TIME_SPAN_SECOND +
488 g_date_time_get_microsecond (utc_now) - GST_TIME_AS_USECONDS (rtc_now);
489 g_date_time_unref (utc_now);
491 g_rec_mutex_init (&demux->priv->updates_lock);
492 demux->priv->updates_task =
493 gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
495 gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);
497 g_mutex_init (&demux->priv->updates_timed_lock);
498 g_cond_init (&demux->priv->updates_timed_cond);
500 g_cond_init (&demux->priv->manifest_cond);
501 g_mutex_init (&demux->priv->manifest_update_lock);
503 g_rec_mutex_init (&demux->priv->manifest_lock);
504 g_mutex_init (&demux->priv->api_lock);
505 g_mutex_init (&demux->priv->segment_lock);
507 g_cond_init (&demux->priv->preroll_cond);
508 g_mutex_init (&demux->priv->preroll_lock);
511 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
512 g_return_if_fail (pad_template != NULL);
514 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
515 gst_pad_set_event_function (demux->sinkpad,
516 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
517 gst_pad_set_chain_function (demux->sinkpad,
518 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
521 demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
522 demux->connection_speed = DEFAULT_CONNECTION_SPEED;
524 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
528 gst_adaptive_demux_finalize (GObject * object)
530 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
531 GstAdaptiveDemuxPrivate *priv = demux->priv;
533 GST_DEBUG_OBJECT (object, "finalize");
535 g_object_unref (priv->input_adapter);
536 g_object_unref (demux->downloader);
538 g_mutex_clear (&priv->updates_timed_lock);
539 g_cond_clear (&priv->updates_timed_cond);
540 g_mutex_clear (&demux->priv->manifest_update_lock);
541 g_cond_clear (&demux->priv->manifest_cond);
542 g_object_unref (priv->updates_task);
543 g_rec_mutex_clear (&priv->updates_lock);
544 g_rec_mutex_clear (&demux->priv->manifest_lock);
545 g_mutex_clear (&demux->priv->api_lock);
546 g_mutex_clear (&demux->priv->segment_lock);
547 if (demux->realtime_clock) {
548 gst_object_unref (demux->realtime_clock);
549 demux->realtime_clock = NULL;
552 g_cond_clear (&demux->priv->preroll_cond);
553 g_mutex_clear (&demux->priv->preroll_lock);
555 G_OBJECT_CLASS (parent_class)->finalize (object);
558 static GstStateChangeReturn
559 gst_adaptive_demux_change_state (GstElement * element,
560 GstStateChange transition)
562 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
563 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
565 switch (transition) {
566 case GST_STATE_CHANGE_PAUSED_TO_READY:
567 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
568 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
569 gst_uri_downloader_cancel (demux->downloader);
571 GST_API_LOCK (demux);
572 GST_MANIFEST_LOCK (demux);
573 gst_adaptive_demux_reset (demux);
574 GST_MANIFEST_UNLOCK (demux);
575 GST_API_UNLOCK (demux);
577 case GST_STATE_CHANGE_READY_TO_PAUSED:
578 GST_API_LOCK (demux);
579 GST_MANIFEST_LOCK (demux);
580 gst_adaptive_demux_reset (demux);
581 /* Clear "cancelled" flag in uridownloader since subclass might want to
582 * use uridownloader to fetch another manifest */
583 gst_uri_downloader_reset (demux->downloader);
584 if (g_atomic_int_get (&demux->priv->have_manifest))
585 gst_adaptive_demux_start_manifest_update_task (demux);
586 GST_MANIFEST_UNLOCK (demux);
587 GST_API_UNLOCK (demux);
588 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
589 GST_DEBUG_OBJECT (demux, "demuxer has started running");
595 /* this must be run without MANIFEST_LOCK taken.
596 * For PLAYING to PLAYING state changes, it will want to take a lock in
597 * src element and that lock is held while the streaming thread is running.
598 * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
600 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
606 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
609 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
612 switch (event->type) {
613 case GST_EVENT_FLUSH_STOP:{
614 GST_API_LOCK (demux);
615 GST_MANIFEST_LOCK (demux);
617 gst_adaptive_demux_reset (demux);
619 ret = gst_pad_event_default (pad, parent, event);
621 GST_MANIFEST_UNLOCK (demux);
622 GST_API_UNLOCK (demux);
627 GstAdaptiveDemuxClass *demux_class;
632 GstBuffer *manifest_buffer;
634 GST_API_LOCK (demux);
635 GST_MANIFEST_LOCK (demux);
637 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
639 available = gst_adapter_available (demux->priv->input_adapter);
641 if (available == 0) {
642 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
643 ret = gst_pad_event_default (pad, parent, event);
645 GST_MANIFEST_UNLOCK (demux);
646 GST_API_UNLOCK (demux);
651 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
653 /* Need to get the URI to use it as a base to generate the fragment's
655 query = gst_query_new_uri ();
656 query_res = gst_pad_peer_query (pad, query);
658 gchar *uri, *redirect_uri;
661 gst_query_parse_uri (query, &uri);
662 gst_query_parse_uri_redirection (query, &redirect_uri);
663 gst_query_parse_uri_redirection_permanent (query, &permanent);
665 if (permanent && redirect_uri) {
666 demux->manifest_uri = redirect_uri;
667 demux->manifest_base_uri = NULL;
670 demux->manifest_uri = uri;
671 demux->manifest_base_uri = redirect_uri;
674 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
675 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
677 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
679 gst_query_unref (query);
681 /* Let the subclass parse the manifest */
683 gst_adapter_take_buffer (demux->priv->input_adapter, available);
684 if (!demux_class->process_manifest (demux, manifest_buffer)) {
685 /* In most cases, this will happen if we set a wrong url in the
686 * source element and we have received the 404 HTML response instead of
688 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
692 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
694 gst_buffer_unref (manifest_buffer);
696 gst_element_post_message (GST_ELEMENT_CAST (demux),
697 gst_message_new_element (GST_OBJECT_CAST (demux),
698 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
699 "manifest-uri", G_TYPE_STRING,
700 demux->manifest_uri, "uri", G_TYPE_STRING,
702 "manifest-download-start", GST_TYPE_CLOCK_TIME,
704 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
705 gst_util_get_timestamp (), NULL)));
708 /* Send duration message */
709 if (!gst_adaptive_demux_is_live (demux)) {
710 GstClockTime duration = demux_class->get_duration (demux);
712 if (duration != GST_CLOCK_TIME_NONE) {
713 GST_DEBUG_OBJECT (demux,
714 "Sending duration message : %" GST_TIME_FORMAT,
715 GST_TIME_ARGS (duration));
716 gst_element_post_message (GST_ELEMENT (demux),
717 gst_message_new_duration_changed (GST_OBJECT (demux)));
719 GST_DEBUG_OBJECT (demux,
720 "media duration unknown, can not send the duration message");
724 if (demux->next_streams) {
725 gst_adaptive_demux_prepare_streams (demux,
726 gst_adaptive_demux_is_live (demux));
727 gst_adaptive_demux_start_tasks (demux, TRUE);
728 gst_adaptive_demux_start_manifest_update_task (demux);
731 GST_WARNING_OBJECT (demux, "No streams created from manifest");
732 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
733 (_("This file contains no playable streams.")),
734 ("No known stream formats found at the Manifest"));
739 GST_MANIFEST_UNLOCK (demux);
740 GST_API_UNLOCK (demux);
742 gst_event_unref (event);
745 case GST_EVENT_SEGMENT:
746 /* Swallow newsegments, we'll push our own */
747 gst_event_unref (event);
753 return gst_pad_event_default (pad, parent, event);
757 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
760 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
762 GST_MANIFEST_LOCK (demux);
764 gst_adapter_push (demux->priv->input_adapter, buffer);
766 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
767 (gint) gst_adapter_available (demux->priv->input_adapter));
769 GST_MANIFEST_UNLOCK (demux);
773 /* must be called with manifest_lock taken */
775 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
777 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
782 /* take ownership of old_streams before releasing the manifest_lock in
783 * gst_adaptive_demux_stop_tasks
785 old_streams = demux->priv->old_streams;
786 demux->priv->old_streams = NULL;
788 gst_adaptive_demux_stop_tasks (demux, TRUE);
791 klass->reset (demux);
793 eos = gst_event_new_eos ();
794 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
795 GstAdaptiveDemuxStream *stream = iter->data;
797 gst_pad_push_event (stream->pad, gst_event_ref (eos));
798 gst_pad_set_active (stream->pad, FALSE);
800 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
802 gst_adaptive_demux_stream_free (stream);
804 gst_event_unref (eos);
805 g_list_free (demux->streams);
806 demux->streams = NULL;
807 if (demux->prepared_streams) {
808 g_list_free_full (demux->prepared_streams,
809 (GDestroyNotify) gst_adaptive_demux_stream_free);
810 demux->prepared_streams = NULL;
812 if (demux->next_streams) {
813 g_list_free_full (demux->next_streams,
814 (GDestroyNotify) gst_adaptive_demux_stream_free);
815 demux->next_streams = NULL;
819 g_list_free_full (old_streams,
820 (GDestroyNotify) gst_adaptive_demux_stream_free);
823 if (demux->priv->old_streams) {
824 g_list_free_full (demux->priv->old_streams,
825 (GDestroyNotify) gst_adaptive_demux_stream_free);
826 demux->priv->old_streams = NULL;
829 g_free (demux->manifest_uri);
830 g_free (demux->manifest_base_uri);
831 demux->manifest_uri = NULL;
832 demux->manifest_base_uri = NULL;
834 gst_adapter_clear (demux->priv->input_adapter);
835 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
837 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
839 demux->have_group_id = FALSE;
840 demux->group_id = G_MAXUINT;
841 demux->priv->segment_seqnum = gst_util_seqnum_next ();
845 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
847 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
849 switch (GST_MESSAGE_TYPE (msg)) {
850 case GST_MESSAGE_ERROR:{
852 GstAdaptiveDemuxStream *stream = NULL;
855 gchar *new_error = NULL;
856 const GstStructure *details = NULL;
858 GST_MANIFEST_LOCK (demux);
860 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
861 GstAdaptiveDemuxStream *cur = iter->data;
862 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
863 GST_OBJECT_CAST (cur->src))) {
868 if (stream == NULL) {
869 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
870 GstAdaptiveDemuxStream *cur = iter->data;
871 if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
872 GST_OBJECT_CAST (cur->src))) {
877 if (stream == NULL) {
878 GST_WARNING_OBJECT (demux,
879 "Failed to locate stream for errored element");
884 gst_message_parse_error (msg, &err, &debug);
886 GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
887 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
888 err->message, debug);
891 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
893 g_free (err->message);
894 err->message = new_error;
897 gst_message_parse_error_details (msg, &details);
899 gst_structure_get_uint (details, "http-status-code",
900 &stream->last_status_code);
903 /* error, but ask to retry */
904 gst_adaptive_demux_stream_fragment_download_finish (stream,
905 GST_FLOW_CUSTOM_ERROR, err);
910 GST_MANIFEST_UNLOCK (demux);
912 gst_message_unref (msg);
921 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
925 gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
928 GST_API_LOCK (demux);
929 GST_MANIFEST_LOCK (demux);
930 demux->stream_struct_size = struct_size;
931 GST_MANIFEST_UNLOCK (demux);
932 GST_API_UNLOCK (demux);
935 /* must be called with manifest_lock taken */
937 gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
938 GstAdaptiveDemuxStream * stream)
940 GstPad *pad = stream->pad;
941 gchar *name = gst_pad_get_name (pad);
945 gst_pad_set_active (pad, TRUE);
946 stream->need_header = TRUE;
948 stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);
951 gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
952 GST_EVENT_STREAM_START, 0);
954 if (gst_event_parse_group_id (event, &demux->group_id))
955 demux->have_group_id = TRUE;
957 demux->have_group_id = FALSE;
958 gst_event_unref (event);
959 } else if (!demux->have_group_id) {
960 demux->have_group_id = TRUE;
961 demux->group_id = gst_util_group_id_next ();
963 event = gst_event_new_stream_start (stream_id);
964 if (demux->have_group_id)
965 gst_event_set_group_id (event, demux->group_id);
967 gst_pad_push_event (pad, event);
971 GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));
973 stream->discont = TRUE;
979 gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
980 GstAdaptiveDemuxStream * stream)
983 GstPad *pad = stream->pad;
986 if (stream->pending_caps) {
987 gst_pad_set_caps (pad, stream->pending_caps);
988 caps = stream->pending_caps;
989 stream->pending_caps = NULL;
991 caps = gst_pad_get_current_caps (pad);
994 GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
995 GST_DEBUG_PAD_NAME (pad), caps);
997 gst_caps_unref (caps);
999 gst_object_ref (pad);
1001 /* Don't hold the manifest lock while exposing a pad */
1002 GST_MANIFEST_UNLOCK (demux);
1003 ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
1004 GST_MANIFEST_LOCK (demux);
1009 /* must be called with manifest_lock taken */
1011 gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1012 GstAdaptiveDemuxStream * stream)
1014 GstAdaptiveDemuxClass *klass;
1016 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1018 if (klass->get_presentation_offset == NULL)
1021 return klass->get_presentation_offset (demux, stream);
1024 /* must be called with manifest_lock taken */
1026 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1028 GstAdaptiveDemuxClass *klass;
1030 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1032 if (klass->get_period_start_time == NULL)
1035 return klass->get_period_start_time (demux);
1038 /* must be called with manifest_lock taken */
1040 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1041 gboolean first_and_live)
1044 GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1046 g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1047 if (demux->prepared_streams != NULL) {
1048 /* Old streams that were never exposed, due to a seek or so */
1049 GST_FIXME_OBJECT (demux,
1050 "Preparing new streams without cleaning up old ones!");
1054 demux->prepared_streams = demux->next_streams;
1055 demux->next_streams = NULL;
1057 if (!gst_adaptive_demux_is_running (demux)) {
1058 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1062 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1063 GstAdaptiveDemuxStream *stream = iter->data;
1065 stream->do_block = TRUE;
1067 if (!gst_adaptive_demux_prepare_stream (demux,
1068 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1069 /* TODO act on error */
1070 GST_FIXME_OBJECT (stream->pad,
1071 "Do something on failure to expose stream");
1074 if (first_and_live) {
1075 /* TODO we only need the first timestamp, maybe create a simple function to
1076 * get the current PTS of a fragment ? */
1077 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
1078 gst_adaptive_demux_stream_update_fragment_info (demux, stream);
1080 if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
1081 min_pts = MIN (min_pts, stream->fragment.timestamp);
1083 min_pts = stream->fragment.timestamp;
1088 period_start = gst_adaptive_demux_get_period_start_time (demux);
1090 /* For live streams, the subclass is supposed to seek to the current
1091 * fragment and then tell us its timestamp in stream->fragment.timestamp.
1092 * We now also have to seek our demuxer segment to reflect this.
1094 * FIXME: This needs some refactoring at some point.
1096 if (first_and_live) {
1097 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1098 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
1099 GST_SEEK_TYPE_NONE, -1, NULL);
1102 for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1103 GstAdaptiveDemuxStream *stream = iter->data;
1104 GstClockTime offset;
1106 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1107 stream->segment = demux->segment;
1109 /* The demuxer segment is just built from seek events, but for each stream
1110 * we have to adjust segments according to the current period and the
1111 * stream specific presentation time offset.
1113 * For each period, buffer timestamps start again from 0. Additionally the
1114 * buffer timestamps are shifted by the stream specific presentation time
1115 * offset, so the first buffer timestamp of a period is 0 + presentation
1116 * time offset. If the stream contains timestamps itself, this is also
1117 * supposed to be the presentation time stored inside the stream.
1119 * The stream time over periods is supposed to be continuous, that is the
1120 * buffer timestamp 0 + presentation time offset should map to the start
1121 * time of the current period.
1124 * The adjustment of the stream segments as such works the following.
1126 * If the demuxer segment start is bigger than the period start, this
1127 * means that we have to drop some media at the beginning of the current
1128 * period, e.g. because a seek into the middle of the period has
1129 * happened. The amount of media to drop is the difference between the
1130 * period start and the demuxer segment start, and as each period starts
1131 * again from 0, this difference is going to be the actual stream's
1132 * segment start. As all timestamps of the stream are shifted by the
1133 * presentation time offset, we will also have to move the segment start
1136 * Likewise, the demuxer segment stop value is adjusted in the same
1139 * Now the running time and stream time at the stream's segment start has
1140 * to be the one that is stored inside the demuxer's segment, which means
1141 * that segment.base and segment.time have to be copied over (done just
1145 * If the demuxer segment start is smaller than the period start time,
1146 * this means that the whole period is inside the segment. As each period
1147 * starts timestamps from 0, and additionally timestamps are shifted by
1148 * the presentation time offset, the stream's first timestamp (and as such
1149 * the stream's segment start) has to be the presentation time offset.
1150 * The stream time at the segment start is supposed to be the stream time
1151 * of the period start according to the demuxer segment, so the stream
1152 * segment's time would be set to that. The same goes for the stream
1153 * segment's base, which is supposed to be the running time of the period
1154 * start according to the demuxer's segment.
1156 * The same logic applies for negative rates with the segment stop and
1157 * the period stop time (which gets clamped).
1160 * For the first case where not the complete period is inside the segment,
1161 * the segment time and base as calculated by the second case would be
1164 GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
1166 GST_DEBUG_OBJECT (demux,
1167 "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
1168 GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
1169 /* note for readers:
1170 * Since stream->segment is initially a copy of demux->segment,
1171 * only the values that need updating are modified below. */
1172 if (first_and_live) {
1173 /* If first and live, demuxer did seek to the current position already */
1174 stream->segment.start = demux->segment.start - period_start + offset;
1175 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1176 stream->segment.stop = demux->segment.stop - period_start + offset;
1177 /* FIXME : Do we need to handle negative rates for this ? */
1178 stream->segment.position = stream->segment.start;
1179 } else if (demux->segment.start > period_start) {
1180 /* seek within a period */
1181 stream->segment.start = demux->segment.start - period_start + offset;
1182 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1183 stream->segment.stop = demux->segment.stop - period_start + offset;
1184 if (stream->segment.rate >= 0)
1185 stream->segment.position = offset;
1187 stream->segment.position = stream->segment.stop;
1189 stream->segment.start = offset;
1190 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1191 stream->segment.stop = demux->segment.stop - period_start + offset;
1192 if (stream->segment.rate >= 0)
1193 stream->segment.position = offset;
1195 stream->segment.position = stream->segment.stop;
1196 stream->segment.time =
1197 gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
1199 stream->segment.base =
1200 gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
1204 stream->pending_segment = gst_event_new_segment (&stream->segment);
1205 gst_event_set_seqnum (stream->pending_segment, demux->priv->segment_seqnum);
1206 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1208 GST_DEBUG_OBJECT (demux,
1209 "Prepared segment %" GST_SEGMENT_FORMAT " for stream %p",
1210 &stream->segment, stream);
1217 gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux)
1222 g_return_val_if_fail (demux->prepared_streams != NULL, FALSE);
1224 old_streams = demux->streams;
1225 demux->streams = demux->prepared_streams;
1226 demux->prepared_streams = NULL;
1228 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1229 GstAdaptiveDemuxStream *stream = iter->data;
1231 if (!gst_adaptive_demux_expose_stream (demux,
1232 GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
1233 /* TODO act on error */
1236 demux->priv->preroll_pending = 0;
1238 GST_MANIFEST_UNLOCK (demux);
1239 gst_element_no_more_pads (GST_ELEMENT_CAST (demux));
1240 GST_MANIFEST_LOCK (demux);
1243 GstEvent *eos = gst_event_new_eos ();
1245 /* before we put streams in the demux->priv->old_streams list,
1246 * we ask the download task to stop. In this way, it will no longer be
1247 * allowed to change the demux object.
1249 for (iter = old_streams; iter; iter = g_list_next (iter)) {
1250 GstAdaptiveDemuxStream *stream = iter->data;
1251 GstPad *pad = gst_object_ref (GST_PAD (stream->pad));
1253 GST_MANIFEST_UNLOCK (demux);
1255 GST_DEBUG_OBJECT (pad, "Pushing EOS");
1256 gst_pad_push_event (pad, gst_event_ref (eos));
1257 gst_pad_set_active (pad, FALSE);
1259 GST_LOG_OBJECT (pad, "Removing stream");
1260 gst_element_remove_pad (GST_ELEMENT (demux), pad);
1261 GST_MANIFEST_LOCK (demux);
1263 gst_object_unref (GST_OBJECT (pad));
1265 /* ask the download task to stop.
1266 * We will not join it now, because our thread can be one of these tasks.
1267 * We will do the joining later, from another stream download task or
1268 * from gst_adaptive_demux_stop_tasks.
1269 * We also cannot change the state of the stream->src element, because
1270 * that will wait on the streaming thread (which could be this thread)
1272 * Because we sent an EOS to the downstream element, the stream->src
1273 * element should detect this in its streaming task and stop.
1274 * Even if it doesn't do that, we will change its state later in
1275 * gst_adaptive_demux_stop_tasks.
1277 GST_LOG_OBJECT (stream, "Marking stream as cancelled");
1278 gst_task_stop (stream->download_task);
1279 g_mutex_lock (&stream->fragment_download_lock);
1280 stream->cancelled = TRUE;
1281 stream->replaced = TRUE;
1282 g_cond_signal (&stream->fragment_download_cond);
1283 g_mutex_unlock (&stream->fragment_download_lock);
1285 gst_event_unref (eos);
1287 /* The list should be freed from another thread as we can't properly
1288 * cleanup a GstTask from itself */
1289 demux->priv->old_streams =
1290 g_list_concat (demux->priv->old_streams, old_streams);
1293 /* Unblock after removing oldstreams */
1294 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1295 GstAdaptiveDemuxStream *stream = iter->data;
1296 stream->do_block = FALSE;
1299 GST_DEBUG_OBJECT (demux, "All streams are exposed");
1304 /* must be called with manifest_lock taken */
1305 GstAdaptiveDemuxStream *
1306 gst_adaptive_demux_stream_new (GstAdaptiveDemux * demux, GstPad * pad)
1308 GstAdaptiveDemuxStream *stream;
1310 stream = g_malloc0 (demux->stream_struct_size);
1312 /* Downloading task */
1313 g_rec_mutex_init (&stream->download_lock);
1314 stream->download_task =
1315 gst_task_new ((GstTaskFunction) gst_adaptive_demux_stream_download_loop,
1317 gst_task_set_lock (stream->download_task, &stream->download_lock);
1320 stream->demux = demux;
1321 stream->fragment_bitrates =
1322 g_malloc0 (sizeof (guint64) * NUM_LOOKBACK_FRAGMENTS);
1323 gst_pad_set_element_private (pad, stream);
1324 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1326 g_mutex_lock (&demux->priv->preroll_lock);
1327 stream->do_block = TRUE;
1328 demux->priv->preroll_pending++;
1329 g_mutex_unlock (&demux->priv->preroll_lock);
1331 gst_pad_set_query_function (pad,
1332 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
1333 gst_pad_set_event_function (pad,
1334 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
1336 gst_segment_init (&stream->segment, GST_FORMAT_TIME);
1337 g_cond_init (&stream->fragment_download_cond);
1338 g_mutex_init (&stream->fragment_download_lock);
1340 demux->next_streams = g_list_append (demux->next_streams, stream);
1345 GstAdaptiveDemuxStream *
1346 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1350 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
1351 GstAdaptiveDemuxStream *stream = iter->data;
1352 if (stream->pad == pad) {
1360 /* must be called with manifest_lock taken.
1361 * It will temporarily drop the manifest_lock in order to join the task.
1362 * It will join only the old_streams (the demux->streams are joined by
1363 * gst_adaptive_demux_stop_tasks before gst_adaptive_demux_stream_free is
1367 gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream)
1369 GstAdaptiveDemux *demux = stream->demux;
1370 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1372 if (klass->stream_free)
1373 klass->stream_free (stream);
1375 g_clear_error (&stream->last_error);
1376 if (stream->download_task) {
1377 if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
1378 GST_DEBUG_OBJECT (demux, "Leaving streaming task %s:%s",
1379 GST_DEBUG_PAD_NAME (stream->pad));
1381 gst_task_stop (stream->download_task);
1383 g_mutex_lock (&stream->fragment_download_lock);
1384 stream->cancelled = TRUE;
1385 g_cond_signal (&stream->fragment_download_cond);
1386 g_mutex_unlock (&stream->fragment_download_lock);
1388 GST_LOG_OBJECT (demux, "Waiting for task to finish");
1390 /* temporarily drop the manifest lock to join the task */
1391 GST_MANIFEST_UNLOCK (demux);
1393 gst_task_join (stream->download_task);
1395 GST_MANIFEST_LOCK (demux);
1397 GST_LOG_OBJECT (demux, "Finished");
1398 gst_object_unref (stream->download_task);
1399 g_rec_mutex_clear (&stream->download_lock);
1400 stream->download_task = NULL;
1403 gst_adaptive_demux_stream_fragment_clear (&stream->fragment);
1405 if (stream->pending_segment) {
1406 gst_event_unref (stream->pending_segment);
1407 stream->pending_segment = NULL;
1410 if (stream->pending_events) {
1411 g_list_free_full (stream->pending_events, (GDestroyNotify) gst_event_unref);
1412 stream->pending_events = NULL;
1415 if (stream->internal_pad) {
1416 gst_object_unparent (GST_OBJECT_CAST (stream->internal_pad));
1419 if (stream->src_srcpad) {
1420 gst_object_unref (stream->src_srcpad);
1421 stream->src_srcpad = NULL;
1425 GstElement *src = stream->src;
1429 GST_MANIFEST_UNLOCK (demux);
1430 gst_element_set_locked_state (src, TRUE);
1431 gst_element_set_state (src, GST_STATE_NULL);
1432 gst_bin_remove (GST_BIN_CAST (demux), src);
1433 GST_MANIFEST_LOCK (demux);
1436 g_cond_clear (&stream->fragment_download_cond);
1437 g_mutex_clear (&stream->fragment_download_lock);
1438 g_free (stream->fragment_bitrates);
1441 gst_object_unref (stream->pad);
1444 if (stream->pending_caps)
1445 gst_caps_unref (stream->pending_caps);
1447 g_clear_pointer (&stream->pending_tags, gst_tag_list_unref);
1452 /* must be called with manifest_lock taken */
1454 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
1455 gint64 * range_start, gint64 * range_stop)
1457 GstAdaptiveDemuxClass *klass;
1459 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1461 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
1463 return klass->get_live_seek_range (demux, range_start, range_stop);
1466 /* must be called with manifest_lock taken */
1468 gst_adaptive_demux_stream_in_live_seek_range (GstAdaptiveDemux * demux,
1469 GstAdaptiveDemuxStream * stream)
1471 gint64 range_start, range_stop;
1472 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
1473 GST_LOG_OBJECT (stream->pad,
1474 "stream position %" GST_TIME_FORMAT " live seek range %"
1475 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
1476 GST_TIME_ARGS (stream->segment.position), GST_STIME_ARGS (range_start),
1477 GST_STIME_ARGS (range_stop));
1478 return (stream->segment.position >= range_start
1479 && stream->segment.position <= range_stop);
1485 /* must be called with manifest_lock taken */
1487 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
1489 GstAdaptiveDemuxClass *klass;
1491 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1492 if (gst_adaptive_demux_is_live (demux)) {
1493 return klass->get_live_seek_range != NULL;
1496 return klass->seek != NULL;
1500 gst_adaptive_demux_update_streams_segment (GstAdaptiveDemux * demux,
1501 GList * streams, gint64 period_start, GstSeekType start_type,
1502 GstSeekType stop_type)
1505 for (iter = streams; iter; iter = g_list_next (iter)) {
1506 GstAdaptiveDemuxStream *stream = iter->data;
1508 GstClockTime offset;
1510 /* See comments in gst_adaptive_demux_get_period_start_time() for
1511 * an explanation of the segment modifications */
1512 stream->segment = demux->segment;
1513 offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1514 stream->segment.start += offset - period_start;
1515 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
1516 stream->segment.stop += offset - period_start;
1517 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
1518 stream->segment.position = stream->segment.start;
1519 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
1520 stream->segment.position = stream->segment.stop;
1521 seg_evt = gst_event_new_segment (&stream->segment);
1522 gst_event_set_seqnum (seg_evt, demux->priv->segment_seqnum);
1523 gst_event_replace (&stream->pending_segment, seg_evt);
1524 GST_DEBUG_OBJECT (stream->pad, "Pending segment now %" GST_PTR_FORMAT,
1525 stream->pending_segment);
1526 gst_event_unref (seg_evt);
1527 /* Make sure the first buffer after a seek has the discont flag */
1528 stream->discont = TRUE;
1529 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
1533 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
1534 GST_SEEK_FLAG_SNAP_AFTER | \
1535 GST_SEEK_FLAG_SNAP_NEAREST | \
1536 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
1537 GST_SEEK_FLAG_KEY_UNIT))
1538 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
1539 GST_SEEK_FLAG_SNAP_AFTER | \
1540 GST_SEEK_FLAG_SNAP_NEAREST))
1543 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
1546 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1550 GstSeekType start_type, stop_type;
1555 GstSegment oldsegment;
1556 GstAdaptiveDemuxStream *stream = NULL;
1558 GST_INFO_OBJECT (demux, "Received seek event");
1560 GST_API_LOCK (demux);
1561 GST_MANIFEST_LOCK (demux);
1563 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
1566 if (format != GST_FORMAT_TIME) {
1567 GST_MANIFEST_UNLOCK (demux);
1568 GST_API_UNLOCK (demux);
1569 GST_WARNING_OBJECT (demux,
1570 "Adaptive demuxers only support TIME-based seeking");
1571 gst_event_unref (event);
1575 if (flags & GST_SEEK_FLAG_SEGMENT) {
1576 GST_FIXME_OBJECT (demux, "Handle segment seeks");
1577 GST_MANIFEST_UNLOCK (demux);
1578 GST_API_UNLOCK (demux);
1579 gst_event_unref (event);
1583 seqnum = gst_event_get_seqnum (event);
1585 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
1586 /* For instant rate seeks, reply directly and update
1587 * our segment so the new rate is reflected in any future
1591 /* instant rate change only supported if direction does not change. All
1592 * other requirements are already checked before creating the seek event
1593 * but let's double-check here to be sure */
1594 if ((demux->segment.rate > 0 && rate < 0) ||
1595 (demux->segment.rate < 0 && rate > 0) ||
1596 start_type != GST_SEEK_TYPE_NONE ||
1597 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
1598 GST_ERROR_OBJECT (demux,
1599 "Instant rate change seeks only supported in the "
1600 "same direction, without flushing and position change");
1601 GST_MANIFEST_UNLOCK (demux);
1602 GST_API_UNLOCK (demux);
1606 ev = gst_event_new_instant_rate_change (rate / demux->segment.rate,
1607 (GstSegmentFlags) flags);
1608 gst_event_set_seqnum (ev, seqnum);
1610 GST_MANIFEST_UNLOCK (demux);
1612 ret = gst_adaptive_demux_push_src_event (demux, ev);
1614 GST_API_UNLOCK (demux);
1615 gst_event_unref (event);
1620 if (!gst_adaptive_demux_can_seek (demux)) {
1621 GST_MANIFEST_UNLOCK (demux);
1622 GST_API_UNLOCK (demux);
1623 gst_event_unref (event);
1627 if (gst_adaptive_demux_is_live (demux)) {
1628 gint64 range_start, range_stop;
1629 gboolean changed = FALSE;
1630 gboolean start_valid = TRUE, stop_valid = TRUE;
1632 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
1634 GST_MANIFEST_UNLOCK (demux);
1635 GST_API_UNLOCK (demux);
1636 gst_event_unref (event);
1637 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
1641 GST_DEBUG_OBJECT (demux,
1642 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
1643 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
1645 /* Handle relative positioning for live streams (relative to the range_stop) */
1646 if (start_type == GST_SEEK_TYPE_END) {
1647 start = range_stop + start;
1648 start_type = GST_SEEK_TYPE_SET;
1651 if (stop_type == GST_SEEK_TYPE_END) {
1652 stop = range_stop + stop;
1653 stop_type = GST_SEEK_TYPE_SET;
1657 /* Adjust the requested start/stop position if it falls beyond the live
1659 * The only case where we don't adjust is for the starting point of
1660 * an accurate seek (start if forward and stop if backwards)
1662 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
1663 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1664 GST_DEBUG_OBJECT (demux,
1665 "seek before live stream start, setting to range start: %"
1666 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
1667 start = range_start;
1670 /* truncate stop position also if set */
1671 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
1672 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
1673 GST_DEBUG_OBJECT (demux,
1674 "seek ending after live start, adjusting to: %"
1675 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
1680 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
1681 (start < range_start || start > range_stop)) {
1682 GST_WARNING_OBJECT (demux,
1683 "Seek to invalid position start:%" GST_STIME_FORMAT
1684 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1685 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
1686 GST_STIME_ARGS (range_stop));
1687 start_valid = FALSE;
1689 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
1690 (stop < range_start || stop > range_stop)) {
1691 GST_WARNING_OBJECT (demux,
1692 "Seek to invalid position stop:%" GST_STIME_FORMAT
1693 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
1694 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
1695 GST_STIME_ARGS (range_stop));
1699 /* If the seek position is still outside of the seekable range, refuse the seek */
1700 if (!start_valid || !stop_valid) {
1701 GST_MANIFEST_UNLOCK (demux);
1702 GST_API_UNLOCK (demux);
1703 gst_event_unref (event);
1707 /* Re-create seek event with changed/updated values */
1709 gst_event_unref (event);
1711 gst_event_new_seek (rate, format, flags,
1712 start_type, start, stop_type, stop);
1713 gst_event_set_seqnum (event, seqnum);
1717 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
1719 /* have a backup in case seek fails */
1720 gst_segment_copy_into (&demux->segment, &oldsegment);
1722 if (flags & GST_SEEK_FLAG_FLUSH) {
1725 GST_DEBUG_OBJECT (demux, "sending flush start");
1726 fevent = gst_event_new_flush_start ();
1727 gst_event_set_seqnum (fevent, seqnum);
1728 GST_MANIFEST_UNLOCK (demux);
1729 gst_adaptive_demux_push_src_event (demux, fevent);
1730 GST_MANIFEST_LOCK (demux);
1732 gst_adaptive_demux_stop_tasks (demux, FALSE);
1733 } else if ((rate > 0 && start_type != GST_SEEK_TYPE_NONE) ||
1734 (rate < 0 && stop_type != GST_SEEK_TYPE_NONE)) {
1736 gst_adaptive_demux_stop_tasks (demux, FALSE);
1739 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1742 * Handle snap seeks as follows:
1743 * 1) do the snap seeking on the stream that received
1745 * 2) use the final position on this stream to seek
1746 * on the other streams to the same position
1748 * We can't snap at all streams at the same time as
1749 * they might end in different positions, so just
1750 * use the one that received the event as the 'leading'
1751 * one to do the snap seek.
1753 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
1754 gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
1756 GstSeekFlags stream_seek_flags = flags;
1758 /* snap-seek on the stream that received the event and then
1759 * use the resulting position to seek on all streams */
1762 if (start_type != GST_SEEK_TYPE_NONE)
1765 ts = stream->segment.position;
1766 start_type = GST_SEEK_TYPE_SET;
1769 if (stop_type != GST_SEEK_TYPE_NONE)
1772 stop_type = GST_SEEK_TYPE_SET;
1773 ts = stream->segment.position;
1778 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
1781 /* replace event with a new one without snapping to seek on all streams */
1782 gst_event_unref (event);
1789 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
1790 start_type, start, stop_type, stop);
1791 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
1795 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
1796 start, stop_type, stop, &update);
1799 /* FIXME - this seems unatural, do_seek() is updating base when we
1800 * only want the start/stop position to change, maybe do_seek() needs
1802 if (!(flags & GST_SEEK_FLAG_FLUSH) && ((rate > 0
1803 && start_type == GST_SEEK_TYPE_NONE) || (rate < 0
1804 && stop_type == GST_SEEK_TYPE_NONE))) {
1805 demux->segment.base = oldsegment.base;
1808 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
1810 ret = demux_class->seek (demux, event);
1814 /* Is there anything else we can do if it fails? */
1815 gst_segment_copy_into (&oldsegment, &demux->segment);
1817 demux->priv->segment_seqnum = seqnum;
1819 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1821 if (flags & GST_SEEK_FLAG_FLUSH) {
1824 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
1825 fevent = gst_event_new_flush_stop (TRUE);
1826 gst_event_set_seqnum (fevent, seqnum);
1827 gst_adaptive_demux_push_src_event (demux, fevent);
1830 if (demux->next_streams) {
1831 /* If the seek generated new streams, get them
1833 gst_adaptive_demux_prepare_streams (demux, FALSE);
1834 gst_adaptive_demux_start_tasks (demux, TRUE);
1836 GstClockTime period_start =
1837 gst_adaptive_demux_get_period_start_time (demux);
1839 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
1840 gst_adaptive_demux_update_streams_segment (demux, demux->streams,
1841 period_start, start_type, stop_type);
1842 gst_adaptive_demux_update_streams_segment (demux, demux->prepared_streams,
1843 period_start, start_type, stop_type);
1844 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
1846 /* Restart the demux */
1847 gst_adaptive_demux_start_tasks (demux, FALSE);
1850 GST_MANIFEST_UNLOCK (demux);
1851 GST_API_UNLOCK (demux);
1852 gst_event_unref (event);
1858 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
1861 GstAdaptiveDemux *demux;
1863 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1865 /* FIXME handle events received on pads that are to be removed */
1867 switch (event->type) {
1868 case GST_EVENT_SEEK:
1870 guint32 seqnum = gst_event_get_seqnum (event);
1871 if (seqnum == demux->priv->segment_seqnum) {
1872 GST_LOG_OBJECT (pad,
1873 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
1874 gst_event_unref (event);
1877 return gst_adaptive_demux_handle_seek_event (demux, pad, event);
1879 case GST_EVENT_RECONFIGURE:{
1880 GstAdaptiveDemuxStream *stream;
1882 GST_MANIFEST_LOCK (demux);
1883 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1886 if (!stream->cancelled && gst_adaptive_demux_is_running (demux) &&
1887 stream->last_ret == GST_FLOW_NOT_LINKED) {
1888 stream->last_ret = GST_FLOW_OK;
1889 stream->restart_download = TRUE;
1890 stream->need_header = TRUE;
1891 stream->discont = TRUE;
1892 GST_DEBUG_OBJECT (stream->pad, "Restarting download loop");
1893 gst_task_start (stream->download_task);
1895 gst_event_unref (event);
1896 GST_MANIFEST_UNLOCK (demux);
1899 GST_MANIFEST_UNLOCK (demux);
1902 case GST_EVENT_LATENCY:{
1903 /* Upstream and our internal source are irrelevant
1904 * for latency, and we should not fail here to
1905 * configure the latency */
1906 gst_event_unref (event);
1909 case GST_EVENT_QOS:{
1910 GstAdaptiveDemuxStream *stream;
1912 GST_MANIFEST_LOCK (demux);
1913 stream = gst_adaptive_demux_find_stream_for_pad (demux, pad);
1916 GstClockTimeDiff diff;
1917 GstClockTime timestamp;
1919 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
1920 /* Only take into account lateness if late */
1922 stream->qos_earliest_time = timestamp + 2 * diff;
1924 stream->qos_earliest_time = timestamp;
1925 GST_DEBUG_OBJECT (stream->pad, "qos_earliest_time %" GST_TIME_FORMAT,
1926 GST_TIME_ARGS (stream->qos_earliest_time));
1928 GST_MANIFEST_UNLOCK (demux);
1935 return gst_pad_event_default (pad, parent, event);
1939 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
1942 GstAdaptiveDemux *demux;
1943 GstAdaptiveDemuxClass *demux_class;
1944 gboolean ret = FALSE;
1949 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1950 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1952 switch (query->type) {
1953 case GST_QUERY_DURATION:{
1954 GstClockTime duration = -1;
1957 gst_query_parse_duration (query, &fmt, NULL);
1959 if (gst_adaptive_demux_is_live (demux)) {
1960 /* We are able to answer this query: the duration is unknown */
1961 gst_query_set_duration (query, fmt, -1);
1966 if (fmt == GST_FORMAT_TIME
1967 && g_atomic_int_get (&demux->priv->have_manifest)) {
1968 duration = demux_class->get_duration (demux);
1970 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
1971 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
1976 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
1977 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
1980 case GST_QUERY_LATENCY:{
1981 gst_query_set_latency (query, FALSE, 0, -1);
1985 case GST_QUERY_SEEKING:{
1990 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
1991 GST_INFO_OBJECT (demux,
1992 "Don't have manifest yet, can't answer seeking query");
1993 return FALSE; /* can't answer without manifest */
1996 GST_MANIFEST_LOCK (demux);
1998 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
1999 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2000 if (fmt == GST_FORMAT_TIME) {
2001 GstClockTime duration;
2002 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2006 if (gst_adaptive_demux_is_live (demux)) {
2007 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2009 GST_MANIFEST_UNLOCK (demux);
2010 GST_INFO_OBJECT (demux, "can't answer seeking query");
2014 duration = demux_class->get_duration (demux);
2015 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2019 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2020 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2021 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2022 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2024 GST_MANIFEST_UNLOCK (demux);
2029 GST_MANIFEST_LOCK (demux);
2031 /* TODO HLS can answer this differently it seems */
2032 if (demux->manifest_uri) {
2033 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2034 * playlist or the the uri of the last downlowaded fragment? */
2035 gst_query_set_uri (query, demux->manifest_uri);
2039 GST_MANIFEST_UNLOCK (demux);
2042 /* Don't forward queries upstream because of the special nature of this
2043 * "demuxer", which relies on the upstream element only to be fed
2052 /* must be called with manifest_lock taken */
2054 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
2055 gboolean start_preroll_streams)
2059 if (!gst_adaptive_demux_is_running (demux)) {
2060 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2064 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2066 iter = start_preroll_streams ? demux->prepared_streams : demux->streams;
2068 for (; iter; iter = g_list_next (iter)) {
2069 GstAdaptiveDemuxStream *stream = iter->data;
2071 if (!start_preroll_streams) {
2072 g_mutex_lock (&stream->fragment_download_lock);
2073 stream->cancelled = FALSE;
2074 stream->replaced = FALSE;
2075 g_mutex_unlock (&stream->fragment_download_lock);
2078 stream->last_ret = GST_FLOW_OK;
2079 gst_task_start (stream->download_task);
2083 /* must be called with manifest_lock taken */
2085 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2087 gst_uri_downloader_cancel (demux->downloader);
2089 gst_task_stop (demux->priv->updates_task);
2091 g_mutex_lock (&demux->priv->updates_timed_lock);
2092 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2093 demux->priv->stop_updates_task = TRUE;
2094 g_cond_signal (&demux->priv->updates_timed_cond);
2095 g_mutex_unlock (&demux->priv->updates_timed_lock);
2098 /* must be called with manifest_lock taken */
2100 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2102 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2104 if (gst_adaptive_demux_is_live (demux)) {
2105 gst_uri_downloader_reset (demux->downloader);
2106 g_mutex_lock (&demux->priv->updates_timed_lock);
2107 demux->priv->stop_updates_task = FALSE;
2108 g_mutex_unlock (&demux->priv->updates_timed_lock);
2109 /* Task to periodically update the manifest */
2110 if (demux_class->requires_periodical_playlist_update (demux)) {
2111 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2112 gst_task_start (demux->priv->updates_task);
2117 /* must be called with manifest_lock taken
2118 * This function will temporarily release manifest_lock in order to join the
2120 * The api_lock will still protect it against other threads trying to modify
2121 * the demux element.
2124 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2128 GList *list_to_process;
2130 GST_LOG_OBJECT (demux, "Stopping tasks");
2133 gst_adaptive_demux_stop_manifest_update_task (demux);
2135 list_to_process = demux->streams;
2136 for (i = 0; i < 2; ++i) {
2137 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2138 GstAdaptiveDemuxStream *stream = iter->data;
2140 g_mutex_lock (&stream->fragment_download_lock);
2141 stream->cancelled = TRUE;
2142 gst_task_stop (stream->download_task);
2143 g_cond_signal (&stream->fragment_download_cond);
2144 g_mutex_unlock (&stream->fragment_download_lock);
2146 list_to_process = demux->prepared_streams;
2149 GST_MANIFEST_UNLOCK (demux);
2150 g_mutex_lock (&demux->priv->preroll_lock);
2151 g_cond_broadcast (&demux->priv->preroll_cond);
2152 g_mutex_unlock (&demux->priv->preroll_lock);
2153 GST_MANIFEST_LOCK (demux);
2155 g_mutex_lock (&demux->priv->manifest_update_lock);
2156 g_cond_broadcast (&demux->priv->manifest_cond);
2157 g_mutex_unlock (&demux->priv->manifest_update_lock);
2159 /* need to release manifest_lock before stopping the src element.
2160 * The streams were asked to cancel, so they will not make any writes to demux
2161 * object. Even if we temporarily release manifest_lock, the demux->streams
2162 * cannot change and iter cannot be invalidated.
2164 list_to_process = demux->streams;
2165 for (i = 0; i < 2; ++i) {
2166 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2167 GstAdaptiveDemuxStream *stream = iter->data;
2168 GstElement *src = stream->src;
2170 GST_MANIFEST_UNLOCK (demux);
2173 gst_element_set_locked_state (src, TRUE);
2174 gst_element_set_state (src, GST_STATE_READY);
2177 /* stream->download_task value never changes, so it is safe to read it
2178 * outside critical section
2180 gst_task_join (stream->download_task);
2182 GST_MANIFEST_LOCK (demux);
2184 list_to_process = demux->prepared_streams;
2187 GST_MANIFEST_UNLOCK (demux);
2189 gst_task_join (demux->priv->updates_task);
2191 GST_MANIFEST_LOCK (demux);
2193 list_to_process = demux->streams;
2194 for (i = 0; i < 2; ++i) {
2195 for (iter = list_to_process; iter; iter = g_list_next (iter)) {
2196 GstAdaptiveDemuxStream *stream = iter->data;
2198 stream->download_error_count = 0;
2199 stream->need_header = TRUE;
2200 stream->qos_earliest_time = GST_CLOCK_TIME_NONE;
2202 list_to_process = demux->prepared_streams;
2206 /* must be called with manifest_lock taken */
2208 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2211 gboolean ret = TRUE;
2213 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2214 GstAdaptiveDemuxStream *stream = iter->data;
2215 gst_event_ref (event);
2216 ret = ret & gst_pad_push_event (stream->pad, event);
2218 gst_event_unref (event);
2222 /* must be called with manifest_lock taken */
2224 gst_adaptive_demux_stream_set_caps (GstAdaptiveDemuxStream * stream,
2227 GST_DEBUG_OBJECT (stream->pad, "setting new caps for stream %" GST_PTR_FORMAT,
2229 gst_caps_replace (&stream->pending_caps, caps);
2230 gst_caps_unref (caps);
2233 /* must be called with manifest_lock taken */
2235 gst_adaptive_demux_stream_set_tags (GstAdaptiveDemuxStream * stream,
2238 GST_DEBUG_OBJECT (stream->pad, "setting new tags for stream %" GST_PTR_FORMAT,
2240 if (stream->pending_tags) {
2241 gst_tag_list_unref (stream->pending_tags);
2243 stream->pending_tags = tags;
2246 /* must be called with manifest_lock taken */
2248 gst_adaptive_demux_stream_queue_event (GstAdaptiveDemuxStream * stream,
2251 stream->pending_events = g_list_append (stream->pending_events, event);
2254 /* must be called with manifest_lock taken */
2256 _update_average_bitrate (GstAdaptiveDemux * demux,
2257 GstAdaptiveDemuxStream * stream, guint64 new_bitrate)
2259 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2261 stream->moving_bitrate -= stream->fragment_bitrates[index];
2262 stream->fragment_bitrates[index] = new_bitrate;
2263 stream->moving_bitrate += new_bitrate;
2265 stream->moving_index += 1;
2267 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2268 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2269 return stream->moving_bitrate / stream->moving_index;
2272 /* must be called with manifest_lock taken */
2274 gst_adaptive_demux_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2275 GstAdaptiveDemuxStream * stream)
2277 guint64 average_bitrate;
2278 guint64 fragment_bitrate;
2280 if (demux->connection_speed) {
2281 GST_LOG_OBJECT (demux, "Connection-speed is set to %u kbps, using it",
2282 demux->connection_speed / 1000);
2283 stream->current_download_rate = demux->connection_speed;
2284 return demux->connection_speed;
2287 fragment_bitrate = stream->last_bitrate;
2288 GST_DEBUG_OBJECT (demux, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2291 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2293 GST_INFO_OBJECT (stream, "last fragment bitrate was %" G_GUINT64_FORMAT,
2295 GST_INFO_OBJECT (stream,
2296 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2297 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2299 /* Conservative approach, make sure we don't upgrade too fast */
2300 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
2302 stream->current_download_rate *= demux->bitrate_limit;
2303 GST_DEBUG_OBJECT (demux, "Bitrate after bitrate limit (%0.2f): %"
2304 G_GUINT64_FORMAT, demux->bitrate_limit, stream->current_download_rate);
2307 /* Debugging code, modulate the bitrate every few fragments */
2309 static guint ctr = 0;
2311 GST_INFO_OBJECT (demux, "Halving reported bitrate for debugging");
2312 stream->current_download_rate /= 2;
2318 return stream->current_download_rate;
2321 /* must be called with manifest_lock taken */
2322 static GstFlowReturn
2323 gst_adaptive_demux_combine_flows (GstAdaptiveDemux * demux)
2325 gboolean all_notlinked = TRUE;
2326 gboolean all_eos = TRUE;
2329 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
2330 GstAdaptiveDemuxStream *stream = iter->data;
2332 if (stream->last_ret != GST_FLOW_NOT_LINKED) {
2333 all_notlinked = FALSE;
2334 if (stream->last_ret != GST_FLOW_EOS)
2338 if (stream->last_ret <= GST_FLOW_NOT_NEGOTIATED
2339 || stream->last_ret == GST_FLOW_FLUSHING) {
2340 return stream->last_ret;
2344 return GST_FLOW_NOT_LINKED;
2346 return GST_FLOW_EOS;
2350 /* Called with preroll_lock */
2352 gst_adaptive_demux_handle_preroll (GstAdaptiveDemux * demux,
2353 GstAdaptiveDemuxStream * stream)
2355 demux->priv->preroll_pending--;
2356 if (demux->priv->preroll_pending == 0) {
2357 /* That was the last one, time to release all streams
2358 * and expose them */
2359 GST_DEBUG_OBJECT (demux, "All streams prerolled. exposing");
2360 gst_adaptive_demux_expose_streams (demux);
2361 g_cond_broadcast (&demux->priv->preroll_cond);
2365 /* must be called with manifest_lock taken.
2366 * Temporarily releases manifest_lock
2369 gst_adaptive_demux_stream_push_buffer (GstAdaptiveDemuxStream * stream,
2372 GstAdaptiveDemux *demux = stream->demux;
2373 GstFlowReturn ret = GST_FLOW_OK;
2374 gboolean discont = FALSE;
2375 /* Pending events */
2376 GstEvent *pending_caps = NULL, *pending_segment = NULL, *pending_tags = NULL;
2377 GList *pending_events = NULL;
2380 * This is duplicating *exactly* the same thing as what is done at the beginning
2381 * of _src_chain if starting_fragment is TRUE */
2382 if (stream->first_fragment_buffer) {
2383 GstClockTime offset =
2384 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2385 GstClockTime period_start =
2386 gst_adaptive_demux_get_period_start_time (demux);
2388 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2389 if (demux->segment.rate < 0)
2390 /* Set DISCONT flag for every first buffer in reverse playback mode
2391 * as each fragment for its own has to be reversed */
2394 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2395 if (GST_BUFFER_PTS_IS_VALID (buffer))
2396 GST_BUFFER_PTS (buffer) += offset;
2398 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2399 stream->segment.position = GST_BUFFER_PTS (buffer);
2401 /* Convert from position inside the stream's segment to the demuxer's
2402 * segment, they are not necessarily the same */
2403 if (stream->segment.position - offset + period_start >
2404 demux->segment.position)
2405 demux->segment.position =
2406 stream->segment.position - offset + period_start;
2408 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2410 GST_LOG_OBJECT (stream->pad,
2411 "Going to push buffer with PTS %" GST_TIME_FORMAT,
2412 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2414 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2417 if (stream->discont) {
2419 stream->discont = FALSE;
2423 GST_DEBUG_OBJECT (stream->pad, "Marking fragment as discontinuous");
2424 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
2426 GST_BUFFER_FLAG_UNSET (buffer, GST_BUFFER_FLAG_DISCONT);
2429 stream->first_fragment_buffer = FALSE;
2431 GST_BUFFER_DURATION (buffer) = GST_CLOCK_TIME_NONE;
2432 GST_BUFFER_DTS (buffer) = GST_CLOCK_TIME_NONE;
2433 if (G_UNLIKELY (stream->pending_caps)) {
2434 pending_caps = gst_event_new_caps (stream->pending_caps);
2435 gst_caps_unref (stream->pending_caps);
2436 stream->pending_caps = NULL;
2439 if (stream->do_block) {
2441 g_mutex_lock (&demux->priv->preroll_lock);
2443 /* If we are preroll state, set caps in here */
2445 gst_pad_push_event (stream->pad, pending_caps);
2446 pending_caps = NULL;
2449 gst_adaptive_demux_handle_preroll (demux, stream);
2450 GST_MANIFEST_UNLOCK (demux);
2452 while (stream->do_block && !stream->cancelled) {
2453 GST_LOG_OBJECT (demux, "Stream %p sleeping for preroll", stream);
2454 g_cond_wait (&demux->priv->preroll_cond, &demux->priv->preroll_lock);
2456 if (stream->cancelled) {
2457 GST_LOG_OBJECT (demux, "stream %p cancelled", stream);
2458 gst_buffer_unref (buffer);
2459 g_mutex_unlock (&demux->priv->preroll_lock);
2460 return GST_FLOW_FLUSHING;
2463 g_mutex_unlock (&demux->priv->preroll_lock);
2464 GST_MANIFEST_LOCK (demux);
2467 if (G_UNLIKELY (stream->pending_segment)) {
2468 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2469 pending_segment = stream->pending_segment;
2470 stream->pending_segment = NULL;
2471 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2473 if (G_UNLIKELY (stream->pending_tags || stream->bitrate_changed)) {
2474 GstTagList *tags = stream->pending_tags;
2476 stream->pending_tags = NULL;
2477 stream->bitrate_changed = 0;
2479 if (stream->fragment.bitrate != 0) {
2481 tags = gst_tag_list_make_writable (tags);
2483 tags = gst_tag_list_new_empty ();
2485 gst_tag_list_add (tags, GST_TAG_MERGE_KEEP,
2486 GST_TAG_NOMINAL_BITRATE, stream->fragment.bitrate, NULL);
2489 pending_tags = gst_event_new_tag (tags);
2491 if (G_UNLIKELY (stream->pending_events)) {
2492 pending_events = stream->pending_events;
2493 stream->pending_events = NULL;
2496 GST_MANIFEST_UNLOCK (demux);
2498 /* Do not push events or buffers holding the manifest lock */
2499 if (G_UNLIKELY (pending_caps)) {
2500 GST_DEBUG_OBJECT (stream->pad, "Setting pending caps: %" GST_PTR_FORMAT,
2502 gst_pad_push_event (stream->pad, pending_caps);
2504 if (G_UNLIKELY (pending_segment)) {
2505 GST_DEBUG_OBJECT (stream->pad, "Sending pending seg: %" GST_PTR_FORMAT,
2507 gst_pad_push_event (stream->pad, pending_segment);
2509 if (G_UNLIKELY (pending_tags)) {
2510 GST_DEBUG_OBJECT (stream->pad, "Sending pending tags: %" GST_PTR_FORMAT,
2512 gst_pad_push_event (stream->pad, pending_tags);
2514 while (pending_events != NULL) {
2515 GstEvent *event = pending_events->data;
2517 if (!gst_pad_push_event (stream->pad, event))
2518 GST_ERROR_OBJECT (stream->pad, "Failed to send pending event");
2520 pending_events = g_list_delete_link (pending_events, pending_events);
2523 /* Wait for preroll if blocking */
2524 GST_DEBUG_OBJECT (stream->pad,
2525 "About to push buffer of size %" G_GSIZE_FORMAT,
2526 gst_buffer_get_size (buffer));
2528 ret = gst_pad_push (stream->pad, buffer);
2530 GST_MANIFEST_LOCK (demux);
2532 g_mutex_lock (&stream->fragment_download_lock);
2533 if (G_UNLIKELY (stream->cancelled)) {
2534 GST_LOG_OBJECT (stream, "Stream was cancelled");
2535 ret = stream->last_ret = GST_FLOW_FLUSHING;
2536 g_mutex_unlock (&stream->fragment_download_lock);
2539 g_mutex_unlock (&stream->fragment_download_lock);
2541 GST_LOG_OBJECT (stream->pad, "Push result: %d %s", ret,
2542 gst_flow_get_name (ret));
2547 /* must be called with manifest_lock taken */
2548 static GstFlowReturn
2549 gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
2550 GstAdaptiveDemuxStream * stream)
2552 /* No need to advance, this isn't a real fragment */
2553 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
2556 return gst_adaptive_demux_stream_advance_fragment (demux, stream,
2557 stream->fragment.duration);
2560 /* must be called with manifest_lock taken.
2561 * Can temporarily release manifest_lock
2563 static GstFlowReturn
2564 gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
2565 GstAdaptiveDemuxStream * stream, GstBuffer * buffer)
2567 return gst_adaptive_demux_stream_push_buffer (stream, buffer);
2571 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
2577 static GstFlowReturn
2578 _src_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2580 GstAdaptiveDemuxStream *stream;
2581 GstAdaptiveDemux *demux;
2582 GstAdaptiveDemuxClass *klass;
2583 GstFlowReturn ret = GST_FLOW_OK;
2585 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2586 stream = gst_pad_get_element_private (pad);
2587 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2589 GST_MANIFEST_LOCK (demux);
2591 /* do not make any changes if the stream is cancelled */
2592 g_mutex_lock (&stream->fragment_download_lock);
2593 if (G_UNLIKELY (stream->cancelled)) {
2594 g_mutex_unlock (&stream->fragment_download_lock);
2595 gst_buffer_unref (buffer);
2596 ret = stream->last_ret = GST_FLOW_FLUSHING;
2597 GST_MANIFEST_UNLOCK (demux);
2600 g_mutex_unlock (&stream->fragment_download_lock);
2602 /* starting_fragment is set to TRUE at the beginning of
2603 * _stream_download_fragment()
2604 * /!\ If there is a header/index being downloaded, then this will
2605 * be TRUE for the first one ... but FALSE for the remaining ones,
2606 * including the *actual* fragment ! */
2607 if (stream->starting_fragment) {
2608 GstClockTime offset =
2609 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
2610 GstClockTime period_start =
2611 gst_adaptive_demux_get_period_start_time (demux);
2613 stream->starting_fragment = FALSE;
2614 if (klass->start_fragment) {
2615 if (!klass->start_fragment (demux, stream)) {
2616 ret = GST_FLOW_ERROR;
2621 GST_BUFFER_PTS (buffer) = stream->fragment.timestamp;
2622 if (GST_BUFFER_PTS_IS_VALID (buffer))
2623 GST_BUFFER_PTS (buffer) += offset;
2625 GST_LOG_OBJECT (stream->pad, "set fragment pts=%" GST_TIME_FORMAT,
2626 GST_TIME_ARGS (GST_BUFFER_PTS (buffer)));
2628 if (GST_BUFFER_PTS_IS_VALID (buffer)) {
2629 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2630 stream->segment.position = GST_BUFFER_PTS (buffer);
2632 /* Convert from position inside the stream's segment to the demuxer's
2633 * segment, they are not necessarily the same */
2634 if (stream->segment.position - offset + period_start >
2635 demux->segment.position)
2636 demux->segment.position =
2637 stream->segment.position - offset + period_start;
2638 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2642 GST_BUFFER_PTS (buffer) = GST_CLOCK_TIME_NONE;
2645 /* downloading_first_buffer is set to TRUE in download_uri() just before
2646 * activating the source (i.e. requesting a given URI)
2648 * The difference with starting_fragment is that this will be called
2649 * for *all* first buffers (of index, and header, and fragment)
2651 * ... to then only do something useful (in this block) for actual
2653 if (stream->downloading_first_buffer) {
2654 gint64 chunk_size = 0;
2656 stream->downloading_first_buffer = FALSE;
2658 if (!stream->downloading_header && !stream->downloading_index) {
2659 /* If this is the first buffer of a fragment (not the headers or index)
2660 * and we don't have a birate from the sub-class, then see if we
2661 * can work it out from the fragment size and duration */
2662 if (stream->fragment.bitrate == 0 &&
2663 stream->fragment.duration != 0 &&
2664 gst_element_query_duration (stream->uri_handler, GST_FORMAT_BYTES,
2666 guint bitrate = MIN (G_MAXUINT, gst_util_uint64_scale (chunk_size,
2667 8 * GST_SECOND, stream->fragment.duration));
2668 GST_LOG_OBJECT (demux,
2669 "Fragment has size %" G_GUINT64_FORMAT " duration %" GST_TIME_FORMAT
2670 " = bitrate %u", chunk_size,
2671 GST_TIME_ARGS (stream->fragment.duration), bitrate);
2672 stream->fragment.bitrate = bitrate;
2674 if (stream->fragment.bitrate) {
2675 stream->bitrate_changed = TRUE;
2677 GST_WARNING_OBJECT (demux, "Bitrate for fragment not available");
2682 stream->download_total_bytes += gst_buffer_get_size (buffer);
2684 GST_TRACE_OBJECT (stream->pad, "Received buffer of size %" G_GSIZE_FORMAT,
2685 gst_buffer_get_size (buffer));
2687 ret = klass->data_received (demux, stream, buffer);
2689 if (ret == GST_FLOW_FLUSHING) {
2690 /* do not make any changes if the stream is cancelled */
2691 g_mutex_lock (&stream->fragment_download_lock);
2692 if (G_UNLIKELY (stream->cancelled)) {
2693 g_mutex_unlock (&stream->fragment_download_lock);
2694 GST_MANIFEST_UNLOCK (demux);
2697 g_mutex_unlock (&stream->fragment_download_lock);
2700 if (ret != GST_FLOW_OK) {
2701 gboolean finished = FALSE;
2703 if (ret < GST_FLOW_EOS) {
2704 GST_ELEMENT_FLOW_ERROR (demux, ret);
2706 /* TODO push this on all pads */
2707 gst_pad_push_event (stream->pad, gst_event_new_eos ());
2709 GST_DEBUG_OBJECT (stream->pad, "stream stopped, reason %s",
2710 gst_flow_get_name (ret));
2713 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2714 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
2715 } else if (ret == GST_ADAPTIVE_DEMUX_FLOW_END_OF_FRAGMENT) {
2716 /* Behaves like an EOS event from upstream */
2717 stream->fragment.finished = TRUE;
2718 ret = klass->finish_fragment (demux, stream);
2719 if (ret == (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH) {
2720 ret = GST_FLOW_EOS; /* return EOS to make the source stop */
2721 } else if (ret != GST_FLOW_OK) {
2727 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2734 GST_MANIFEST_UNLOCK (demux);
2739 /* must be called with manifest_lock taken */
2741 gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
2742 stream, GstFlowReturn ret, GError * err)
2744 GST_DEBUG_OBJECT (stream->pad, "Download finish: %d %s - err: %p", ret,
2745 gst_flow_get_name (ret), err);
2747 /* if we have an error, only replace last_ret if it was OK before to avoid
2748 * overwriting the first error we got */
2749 if (stream->last_ret == GST_FLOW_OK) {
2750 stream->last_ret = ret;
2752 g_clear_error (&stream->last_error);
2753 stream->last_error = g_error_copy (err);
2756 g_mutex_lock (&stream->fragment_download_lock);
2757 stream->download_finished = TRUE;
2758 g_cond_signal (&stream->fragment_download_cond);
2759 g_mutex_unlock (&stream->fragment_download_lock);
2762 static GstFlowReturn
2763 gst_adaptive_demux_eos_handling (GstAdaptiveDemuxStream * stream)
2765 GstFlowReturn ret = GST_FLOW_OK;
2766 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (stream->demux);
2768 if (!klass->need_another_chunk || stream->fragment.chunk_size == -1
2769 || !klass->need_another_chunk (stream)
2770 || stream->fragment.chunk_size == 0) {
2771 stream->fragment.finished = TRUE;
2772 ret = klass->finish_fragment (stream->demux, stream);
2774 gst_adaptive_demux_stream_fragment_download_finish (stream, ret, NULL);
2780 _src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2782 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2783 GstAdaptiveDemux *demux = stream->demux;
2785 switch (GST_EVENT_TYPE (event)) {
2786 case GST_EVENT_EOS:{
2787 GST_DEBUG_OBJECT (pad, "Saw EOS on src pad");
2788 GST_MANIFEST_LOCK (demux);
2790 gst_adaptive_demux_eos_handling (stream);
2793 * _eos_handling() calls fragment_download_finish() which does the
2794 * same thing as below.
2795 * Could this cause races ? */
2796 g_mutex_lock (&stream->fragment_download_lock);
2797 stream->download_finished = TRUE;
2798 g_cond_signal (&stream->fragment_download_cond);
2799 g_mutex_unlock (&stream->fragment_download_lock);
2801 GST_MANIFEST_UNLOCK (demux);
2808 gst_event_unref (event);
2814 _src_query (GstPad * pad, GstObject * parent, GstQuery * query)
2816 GstAdaptiveDemuxStream *stream = gst_pad_get_element_private (pad);
2818 switch (GST_QUERY_TYPE (query)) {
2819 case GST_QUERY_ALLOCATION:
2826 return gst_pad_peer_query (stream->pad, query);
2829 static GstPadProbeReturn
2830 _uri_handler_probe (GstPad * pad, GstPadProbeInfo * info,
2831 GstAdaptiveDemuxStream * stream)
2833 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2835 if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
2836 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2837 if (stream->fragment_bytes_downloaded == 0) {
2838 stream->last_latency =
2839 gst_adaptive_demux_get_monotonic_time (stream->demux) -
2840 (stream->download_start_time * GST_USECOND);
2841 GST_DEBUG_OBJECT (pad,
2842 "FIRST BYTE since download_start %" GST_TIME_FORMAT,
2843 GST_TIME_ARGS (stream->last_latency));
2845 stream->fragment_bytes_downloaded += gst_buffer_get_size (buf);
2846 GST_LOG_OBJECT (pad,
2847 "Received buffer, size %" G_GSIZE_FORMAT " total %" G_GUINT64_FORMAT,
2848 gst_buffer_get_size (buf), stream->fragment_bytes_downloaded);
2849 } else if (GST_PAD_PROBE_INFO_TYPE (info) &
2850 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
2851 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
2852 GST_LOG_OBJECT (pad, "Received event %s %" GST_PTR_FORMAT,
2853 GST_EVENT_TYPE_NAME (ev), ev);
2854 switch (GST_EVENT_TYPE (ev)) {
2855 case GST_EVENT_SEGMENT:
2856 stream->fragment_bytes_downloaded = 0;
2860 stream->last_download_time =
2861 gst_adaptive_demux_get_monotonic_time (stream->demux) -
2862 (stream->download_start_time * GST_USECOND);
2863 stream->last_bitrate =
2864 gst_util_uint64_scale (stream->fragment_bytes_downloaded,
2865 8 * GST_SECOND, stream->last_download_time);
2866 GST_DEBUG_OBJECT (pad,
2867 "EOS since download_start %" GST_TIME_FORMAT " bitrate %"
2868 G_GUINT64_FORMAT " bps", GST_TIME_ARGS (stream->last_download_time),
2869 stream->last_bitrate);
2870 /* Calculate bitrate since URI request */
2881 /* must be called with manifest_lock taken.
2882 * Can temporarily release manifest_lock
2885 gst_adaptive_demux_stream_wait_manifest_update (GstAdaptiveDemux * demux,
2886 GstAdaptiveDemuxStream * stream)
2888 gboolean ret = TRUE;
2890 /* Wait until we're cancelled or there's something for
2891 * us to download in the playlist or the playlist
2892 * became non-live */
2894 GST_DEBUG_OBJECT (demux, "No fragment left but live playlist, wait a bit");
2896 /* get the manifest_update_lock while still holding the manifest_lock.
2897 * This will prevent other threads to signal the condition (they will need
2898 * both manifest_lock and manifest_update_lock in order to signal).
2899 * It cannot deadlock because all threads always get the manifest_lock first
2900 * and manifest_update_lock second.
2902 g_mutex_lock (&demux->priv->manifest_update_lock);
2904 GST_MANIFEST_UNLOCK (demux);
2906 g_cond_wait (&demux->priv->manifest_cond,
2907 &demux->priv->manifest_update_lock);
2908 g_mutex_unlock (&demux->priv->manifest_update_lock);
2910 GST_MANIFEST_LOCK (demux);
2912 /* check for cancelled every time we get the manifest_lock */
2913 g_mutex_lock (&stream->fragment_download_lock);
2914 if (G_UNLIKELY (stream->cancelled)) {
2916 stream->last_ret = GST_FLOW_FLUSHING;
2917 g_mutex_unlock (&stream->fragment_download_lock);
2920 g_mutex_unlock (&stream->fragment_download_lock);
2922 /* Got a new fragment or not live anymore? */
2923 if (gst_adaptive_demux_stream_update_fragment_info (demux, stream) ==
2925 GST_DEBUG_OBJECT (demux, "new fragment available, "
2926 "not waiting for manifest update");
2931 if (!gst_adaptive_demux_is_live (demux)) {
2932 GST_DEBUG_OBJECT (demux, "Not live anymore, "
2933 "not waiting for manifest update");
2938 GST_DEBUG_OBJECT (demux, "Retrying now");
2942 /* must be called with manifest_lock taken */
2944 gst_adaptive_demux_stream_update_source (GstAdaptiveDemuxStream * stream,
2945 const gchar * uri, const gchar * referer, gboolean refresh,
2946 gboolean allow_cache)
2948 GstAdaptiveDemux *demux = stream->demux;
2950 if (!gst_uri_is_valid (uri)) {
2951 GST_WARNING_OBJECT (stream->pad, "Invalid URI: %s", uri);
2955 /* Try to re-use existing source element */
2956 if (stream->src != NULL) {
2957 gchar *old_protocol, *new_protocol;
2960 old_uri = gst_uri_handler_get_uri (GST_URI_HANDLER (stream->uri_handler));
2961 old_protocol = gst_uri_get_protocol (old_uri);
2962 new_protocol = gst_uri_get_protocol (uri);
2964 if (!g_str_equal (old_protocol, new_protocol)) {
2965 GstElement *src = stream->src;
2968 gst_object_unref (stream->src_srcpad);
2969 stream->src_srcpad = NULL;
2970 GST_MANIFEST_UNLOCK (demux);
2971 gst_element_set_locked_state (src, TRUE);
2972 gst_element_set_state (src, GST_STATE_NULL);
2973 gst_bin_remove (GST_BIN_CAST (demux), src);
2974 GST_MANIFEST_LOCK (demux);
2975 GST_DEBUG_OBJECT (demux, "Can't re-use old source element");
2979 GST_DEBUG_OBJECT (demux, "Re-using old source element");
2980 if (!gst_uri_handler_set_uri (GST_URI_HANDLER (stream->uri_handler), uri,
2982 GstElement *src = stream->src;
2985 GST_DEBUG_OBJECT (demux, "Failed to re-use old source element: %s",
2986 err ? err->message : "Unknown error");
2987 g_clear_error (&err);
2988 gst_object_unref (stream->src_srcpad);
2989 stream->src_srcpad = NULL;
2990 GST_MANIFEST_UNLOCK (demux);
2991 gst_element_set_locked_state (src, TRUE);
2992 gst_element_set_state (src, GST_STATE_NULL);
2993 gst_bin_remove (GST_BIN_CAST (demux), src);
2994 GST_MANIFEST_LOCK (demux);
2998 g_free (old_protocol);
2999 g_free (new_protocol);
3002 if (stream->src == NULL) {
3003 GstPad *uri_handler_src;
3006 GstElement *uri_handler;
3008 GstPadLinkReturn pad_link_ret;
3009 GObjectClass *gobject_class;
3010 gchar *internal_name, *bin_name;
3012 /* Our src consists of a bin containing uri_handler -> queue . The
3013 * purpose of the queue is to allow the uri_handler to download an
3014 * entire fragment without blocking, so we can accurately measure the
3015 * download bitrate. */
3017 queue = gst_element_factory_make ("queue", NULL);
3021 g_object_set (queue, "max-size-bytes", (guint) SRC_QUEUE_MAX_BYTES, NULL);
3022 g_object_set (queue, "max-size-buffers", (guint) 0, NULL);
3023 g_object_set (queue, "max-size-time", (guint64) 0, NULL);
3025 uri_handler = gst_element_make_from_uri (GST_URI_SRC, uri, NULL, NULL);
3026 if (uri_handler == NULL) {
3027 GST_ELEMENT_ERROR (demux, CORE, MISSING_PLUGIN,
3028 ("Missing plugin to handle URI: '%s'", uri), (NULL));
3029 gst_object_unref (queue);
3033 gobject_class = G_OBJECT_GET_CLASS (uri_handler);
3035 if (g_object_class_find_property (gobject_class, "compress"))
3036 g_object_set (uri_handler, "compress", FALSE, NULL);
3037 if (g_object_class_find_property (gobject_class, "keep-alive"))
3038 g_object_set (uri_handler, "keep-alive", TRUE, NULL);
3039 if (g_object_class_find_property (gobject_class, "extra-headers")) {
3040 if (referer || refresh || !allow_cache) {
3041 GstStructure *extra_headers = gst_structure_new_empty ("headers");
3044 gst_structure_set (extra_headers, "Referer", G_TYPE_STRING, referer,
3048 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3051 gst_structure_set (extra_headers, "Cache-Control", G_TYPE_STRING,
3054 g_object_set (uri_handler, "extra-headers", extra_headers, NULL);
3056 gst_structure_free (extra_headers);
3058 g_object_set (uri_handler, "extra-headers", NULL, NULL);
3062 /* Source bin creation */
3063 bin_name = g_strdup_printf ("srcbin-%s", GST_PAD_NAME (stream->pad));
3064 stream->src = gst_bin_new (bin_name);
3066 if (stream->src == NULL) {
3067 gst_object_unref (queue);
3068 gst_object_unref (uri_handler);
3072 gst_bin_add (GST_BIN_CAST (stream->src), queue);
3073 gst_bin_add (GST_BIN_CAST (stream->src), uri_handler);
3075 uri_handler_src = gst_element_get_static_pad (uri_handler, "src");
3076 queue_sink = gst_element_get_static_pad (queue, "sink");
3079 gst_pad_link_full (uri_handler_src, queue_sink,
3080 GST_PAD_LINK_CHECK_NOTHING);
3081 if (GST_PAD_LINK_FAILED (pad_link_ret)) {
3082 GST_WARNING_OBJECT (demux,
3083 "Could not link pads %s:%s to %s:%s for reason %d",
3084 GST_DEBUG_PAD_NAME (uri_handler_src), GST_DEBUG_PAD_NAME (queue_sink),
3086 g_object_unref (queue_sink);
3087 g_object_unref (uri_handler_src);
3088 gst_object_unref (stream->src);
3093 /* Add a downstream event and data probe */
3094 gst_pad_add_probe (uri_handler_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
3095 (GstPadProbeCallback) _uri_handler_probe, stream, NULL);
3097 g_object_unref (queue_sink);
3098 g_object_unref (uri_handler_src);
3099 queue_src = gst_element_get_static_pad (queue, "src");
3100 stream->src_srcpad = gst_ghost_pad_new ("src", queue_src);
3101 g_object_unref (queue_src);
3102 gst_element_add_pad (stream->src, stream->src_srcpad);
3104 gst_element_set_locked_state (stream->src, TRUE);
3105 gst_bin_add (GST_BIN_CAST (demux), stream->src);
3106 stream->src_srcpad = gst_element_get_static_pad (stream->src, "src");
3108 /* set up our internal floating pad to drop all events from
3109 * the http src we don't care about. On the chain function
3110 * we just push the buffer forward */
3111 internal_name = g_strdup_printf ("internal-%s", GST_PAD_NAME (stream->pad));
3112 stream->internal_pad = gst_pad_new (internal_name, GST_PAD_SINK);
3113 g_free (internal_name);
3114 gst_object_set_parent (GST_OBJECT_CAST (stream->internal_pad),
3115 GST_OBJECT_CAST (demux));
3116 GST_OBJECT_FLAG_SET (stream->internal_pad, GST_PAD_FLAG_NEED_PARENT);
3117 gst_pad_set_element_private (stream->internal_pad, stream);
3118 gst_pad_set_active (stream->internal_pad, TRUE);
3119 gst_pad_set_chain_function (stream->internal_pad, _src_chain);
3120 gst_pad_set_event_function (stream->internal_pad, _src_event);
3121 gst_pad_set_query_function (stream->internal_pad, _src_query);
3123 if (gst_pad_link_full (stream->src_srcpad, stream->internal_pad,
3124 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
3125 GST_ERROR_OBJECT (stream->pad, "Failed to link internal pad");
3129 stream->uri_handler = uri_handler;
3130 stream->queue = queue;
3132 stream->last_status_code = 200; /* default to OK */
3137 static GstPadProbeReturn
3138 gst_ad_stream_src_to_ready_cb (GstPad * pad, GstPadProbeInfo * info,
3141 GstAdaptiveDemuxStream *stream = user_data;
3143 /* The source's src pad is IDLE so now set the state to READY */
3144 g_mutex_lock (&stream->fragment_download_lock);
3145 stream->src_at_ready = TRUE;
3146 g_cond_signal (&stream->fragment_download_cond);
3147 g_mutex_unlock (&stream->fragment_download_lock);
3149 return GST_PAD_PROBE_REMOVE;
3152 #ifndef GST_DISABLE_GST_DEBUG
3154 uritype (GstAdaptiveDemuxStream * s)
3156 if (s->downloading_header)
3158 if (s->downloading_index)
3164 /* must be called with manifest_lock taken.
3165 * Can temporarily release manifest_lock
3167 * Will return when URI is fully downloaded (or aborted/errored)
3169 static GstFlowReturn
3170 gst_adaptive_demux_stream_download_uri (GstAdaptiveDemux * demux,
3171 GstAdaptiveDemuxStream * stream, const gchar * uri, gint64 start,
3172 gint64 end, guint * http_status)
3174 GstFlowReturn ret = GST_FLOW_OK;
3175 GST_DEBUG_OBJECT (stream->pad,
3176 "Downloading %s uri: %s, range:%" G_GINT64_FORMAT " - %" G_GINT64_FORMAT,
3177 uritype (stream), uri, start, end);
3180 *http_status = 200; /* default to ok if no further information */
3182 if (!gst_adaptive_demux_stream_update_source (stream, uri, NULL, FALSE, TRUE)) {
3183 ret = stream->last_ret = GST_FLOW_ERROR;
3187 gst_element_set_locked_state (stream->src, TRUE);
3189 GST_MANIFEST_UNLOCK (demux);
3190 if (gst_element_set_state (stream->src,
3191 GST_STATE_READY) != GST_STATE_CHANGE_FAILURE) {
3192 /* If ranges are specified, seek to it */
3193 if (start != 0 || end != -1) {
3194 /* HTTP ranges are inclusive, GStreamer segments are exclusive for the
3198 /* Send the seek event to the uri_handler, as the other pipeline elements
3199 * can't handle it when READY. */
3200 if (!gst_element_send_event (stream->uri_handler, gst_event_new_seek (1.0,
3201 GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
3202 GST_SEEK_TYPE_SET, start, GST_SEEK_TYPE_SET, end))) {
3204 GST_MANIFEST_LOCK (demux);
3205 /* looks like the source can't handle seeks in READY */
3206 g_clear_error (&stream->last_error);
3207 stream->last_error = g_error_new (GST_CORE_ERROR,
3208 GST_CORE_ERROR_NOT_IMPLEMENTED,
3209 "Source element can't handle range requests");
3210 stream->last_ret = GST_FLOW_ERROR;
3212 GST_MANIFEST_LOCK (demux);
3215 GST_MANIFEST_LOCK (demux);
3218 if (G_LIKELY (stream->last_ret == GST_FLOW_OK)) {
3219 stream->download_start_time =
3220 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
3222 /* src element is in state READY. Before we start it, we reset
3225 g_mutex_lock (&stream->fragment_download_lock);
3226 stream->download_finished = FALSE;
3227 stream->downloading_first_buffer = TRUE;
3228 g_mutex_unlock (&stream->fragment_download_lock);
3230 GST_MANIFEST_UNLOCK (demux);
3232 if (!gst_element_sync_state_with_parent (stream->src)) {
3233 GST_WARNING_OBJECT (demux, "Could not sync state for src element");
3234 GST_MANIFEST_LOCK (demux);
3235 ret = stream->last_ret = GST_FLOW_ERROR;
3239 /* wait for the fragment to be completely downloaded */
3240 GST_DEBUG_OBJECT (stream->pad,
3241 "Waiting for %s download to finish: %s", uritype (stream), uri);
3243 g_mutex_lock (&stream->fragment_download_lock);
3244 stream->src_at_ready = FALSE;
3245 if (G_UNLIKELY (stream->cancelled)) {
3246 g_mutex_unlock (&stream->fragment_download_lock);
3247 GST_MANIFEST_LOCK (demux);
3248 ret = stream->last_ret = GST_FLOW_FLUSHING;
3251 /* download_finished is only set:
3252 * * in ::fragment_download_finish()
3253 * * if EOS is received on the _src pad
3255 while (!stream->cancelled && !stream->download_finished) {
3256 g_cond_wait (&stream->fragment_download_cond,
3257 &stream->fragment_download_lock);
3259 g_mutex_unlock (&stream->fragment_download_lock);
3261 GST_DEBUG_OBJECT (stream->pad,
3262 "Finished Waiting for %s download: %s", uritype (stream), uri);
3264 GST_MANIFEST_LOCK (demux);
3265 g_mutex_lock (&stream->fragment_download_lock);
3266 if (G_UNLIKELY (stream->cancelled)) {
3267 ret = stream->last_ret = GST_FLOW_FLUSHING;
3268 g_mutex_unlock (&stream->fragment_download_lock);
3271 g_mutex_unlock (&stream->fragment_download_lock);
3273 ret = stream->last_ret;
3275 GST_DEBUG_OBJECT (stream->pad, "%s download finished: %s %d %s",
3276 uritype (stream), uri, stream->last_ret,
3277 gst_flow_get_name (stream->last_ret));
3278 if (stream->last_ret != GST_FLOW_OK && http_status) {
3279 *http_status = stream->last_status_code;
3283 /* changing src element state might try to join the streaming thread, so
3284 * we must not hold the manifest lock.
3286 GST_MANIFEST_UNLOCK (demux);
3288 GST_MANIFEST_UNLOCK (demux);
3289 if (stream->last_ret == GST_FLOW_OK)
3290 stream->last_ret = GST_FLOW_CUSTOM_ERROR;
3291 ret = GST_FLOW_CUSTOM_ERROR;
3294 stream->src_at_ready = FALSE;
3296 gst_element_set_locked_state (stream->src, TRUE);
3297 gst_pad_add_probe (stream->src_srcpad, GST_PAD_PROBE_TYPE_IDLE,
3298 gst_ad_stream_src_to_ready_cb, stream, NULL);
3300 g_mutex_lock (&stream->fragment_download_lock);
3301 while (!stream->src_at_ready) {
3302 g_cond_wait (&stream->fragment_download_cond,
3303 &stream->fragment_download_lock);
3305 g_mutex_unlock (&stream->fragment_download_lock);
3307 gst_element_set_state (stream->src, GST_STATE_READY);
3309 /* Need to drop the fragment_download_lock to get the MANIFEST lock */
3310 GST_MANIFEST_LOCK (demux);
3311 g_mutex_lock (&stream->fragment_download_lock);
3312 if (G_UNLIKELY (stream->cancelled)) {
3313 ret = stream->last_ret = GST_FLOW_FLUSHING;
3314 g_mutex_unlock (&stream->fragment_download_lock);
3317 g_mutex_unlock (&stream->fragment_download_lock);
3319 /* deactivate and reactivate our ghostpad to make it fresh for a new
3321 gst_pad_set_active (stream->internal_pad, FALSE);
3322 gst_pad_set_active (stream->internal_pad, TRUE);
3327 /* must be called with manifest_lock taken.
3328 * Can temporarily release manifest_lock
3330 static GstFlowReturn
3331 gst_adaptive_demux_stream_download_header_fragment (GstAdaptiveDemuxStream *
3334 GstAdaptiveDemux *demux = stream->demux;
3335 GstFlowReturn ret = GST_FLOW_OK;
3337 if (stream->fragment.header_uri != NULL) {
3338 GST_DEBUG_OBJECT (demux, "Fetching header %s %" G_GINT64_FORMAT "-%"
3339 G_GINT64_FORMAT, stream->fragment.header_uri,
3340 stream->fragment.header_range_start, stream->fragment.header_range_end);
3342 stream->downloading_header = TRUE;
3343 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3344 stream->fragment.header_uri, stream->fragment.header_range_start,
3345 stream->fragment.header_range_end, NULL);
3346 stream->downloading_header = FALSE;
3349 /* check if we have an index */
3350 if (ret == GST_FLOW_OK) { /* TODO check for other valid types */
3352 if (stream->fragment.index_uri != NULL) {
3353 GST_DEBUG_OBJECT (demux,
3354 "Fetching index %s %" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
3355 stream->fragment.index_uri,
3356 stream->fragment.index_range_start, stream->fragment.index_range_end);
3357 stream->downloading_index = TRUE;
3358 ret = gst_adaptive_demux_stream_download_uri (demux, stream,
3359 stream->fragment.index_uri, stream->fragment.index_range_start,
3360 stream->fragment.index_range_end, NULL);
3361 stream->downloading_index = FALSE;
3368 /* must be called with manifest_lock taken.
3369 * Can temporarily release manifest_lock
3371 static GstFlowReturn
3372 gst_adaptive_demux_stream_download_fragment (GstAdaptiveDemuxStream * stream)
3374 GstAdaptiveDemux *demux = stream->demux;
3375 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3378 gboolean retried_once = FALSE, live;
3380 guint last_status_code;
3383 /* THERE ARE THREE DIFFERENT VARIABLES FOR THE "BEGINNING" OF A FRAGMENT ! */
3384 stream->starting_fragment = TRUE;
3385 stream->last_ret = GST_FLOW_OK;
3386 stream->first_fragment_buffer = TRUE;
3388 GST_DEBUG_OBJECT (stream->pad, "Downloading %s%s%s",
3389 stream->fragment.uri ? "FRAGMENT " : "",
3390 stream->fragment.header_uri ? "HEADER " : "",
3391 stream->fragment.index_uri ? "INDEX" : "");
3393 if (stream->fragment.uri == NULL && stream->fragment.header_uri == NULL &&
3394 stream->fragment.index_uri == NULL)
3397 if (stream->need_header) {
3398 ret = gst_adaptive_demux_stream_download_header_fragment (stream);
3399 if (ret != GST_FLOW_OK) {
3402 stream->need_header = FALSE;
3407 url = stream->fragment.uri;
3408 GST_DEBUG_OBJECT (stream->pad, "Got url '%s' for stream %p", url, stream);
3412 stream->last_ret = GST_FLOW_OK;
3415 /* Download the actual fragment, either in fragments or in one go */
3416 if (klass->need_another_chunk && klass->need_another_chunk (stream)
3417 && stream->fragment.chunk_size != 0) {
3418 /* Handle chunk downloading */
3419 gint64 range_start, range_end, chunk_start, chunk_end;
3420 guint64 download_total_bytes;
3421 gint chunk_size = stream->fragment.chunk_size;
3423 range_start = chunk_start = stream->fragment.range_start;
3424 range_end = stream->fragment.range_end;
3425 /* HTTP ranges are inclusive for the end */
3426 if (chunk_size != -1)
3427 chunk_end = range_start + chunk_size - 1;
3429 chunk_end = range_end;
3431 if (range_end != -1)
3432 chunk_end = MIN (chunk_end, range_end);
3434 while (!stream->fragment.finished && (chunk_start <= range_end
3435 || range_end == -1)) {
3436 download_total_bytes = stream->download_total_bytes;
3439 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3440 chunk_start, chunk_end, &http_status);
3442 GST_DEBUG_OBJECT (stream->pad,
3443 "Fragment chunk download result: %d (%d) %s", stream->last_ret,
3444 http_status, gst_flow_get_name (stream->last_ret));
3446 /* Don't retry for any chunks except the first. We would have sent
3447 * data downstream already otherwise and it's difficult to recover
3448 * from that in a meaningful way */
3449 if (chunk_start > range_start)
3450 retried_once = TRUE;
3452 /* FIXME: Check for 416 Range Not Satisfiable here and fall back to
3453 * downloading up to -1. We don't know the full duration.
3454 * Needs https://bugzilla.gnome.org/show_bug.cgi?id=756806 */
3455 if (ret != GST_FLOW_OK && chunk_end == -1) {
3457 } else if (ret != GST_FLOW_OK) {
3459 stream->last_ret = GST_FLOW_OK;
3463 if (chunk_end == -1)
3466 /* Short read, we're at the end now */
3467 if (stream->download_total_bytes - download_total_bytes <
3468 chunk_end + 1 - chunk_start)
3471 if (!klass->need_another_chunk (stream))
3474 /* HTTP ranges are inclusive for the end */
3475 chunk_start += chunk_size;
3476 chunk_size = stream->fragment.chunk_size;
3477 if (chunk_size != -1)
3478 chunk_end = chunk_start + chunk_size - 1;
3480 chunk_end = range_end;
3482 if (range_end != -1)
3483 chunk_end = MIN (chunk_end, range_end);
3487 gst_adaptive_demux_stream_download_uri (demux, stream, url,
3488 stream->fragment.range_start, stream->fragment.range_end, &http_status);
3489 GST_DEBUG_OBJECT (stream->pad, "Fragment download result: %d (%d) %s",
3490 stream->last_ret, http_status, gst_flow_get_name (stream->last_ret));
3492 if (ret == GST_FLOW_OK)
3495 g_mutex_lock (&stream->fragment_download_lock);
3496 if (G_UNLIKELY (stream->cancelled)) {
3497 g_mutex_unlock (&stream->fragment_download_lock);
3500 g_mutex_unlock (&stream->fragment_download_lock);
3502 /* TODO check if we are truly stopping */
3503 if (ret != GST_FLOW_CUSTOM_ERROR)
3506 last_status_code = stream->last_status_code;
3507 GST_WARNING_OBJECT (stream->pad, "Got custom error, status %u, dc %d",
3508 last_status_code, stream->download_error_count);
3510 live = gst_adaptive_demux_is_live (demux);
3511 if (!retried_once && ((last_status_code / 100 == 4 && live)
3512 || last_status_code / 100 == 5)) {
3514 /* if current position is before available start, switch to next */
3515 if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream))
3519 gint64 range_start, range_stop;
3521 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
3525 if (demux->segment.position < range_start) {
3526 GST_DEBUG_OBJECT (stream->pad, "Retrying once with next segment");
3527 stream->last_ret = GST_FLOW_OK;
3528 ret = gst_adaptive_demux_eos_handling (stream);
3529 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3530 gst_flow_get_name (ret));
3531 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3532 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3533 GST_DEBUG_OBJECT (stream->pad, "finish_fragment: %s",
3534 gst_flow_get_name (ret));
3535 if (ret == GST_FLOW_OK) {
3536 retried_once = TRUE;
3539 } else if (demux->segment.position > range_stop) {
3540 /* wait a bit to be in range, we don't have any locks at that point */
3542 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3543 if (wait_time > 0) {
3544 gint64 end_time = g_get_monotonic_time () + wait_time / GST_USECOND;
3546 GST_DEBUG_OBJECT (stream->pad,
3547 "Download waiting for %" GST_TIME_FORMAT,
3548 GST_TIME_ARGS (wait_time));
3550 GST_MANIFEST_UNLOCK (demux);
3551 g_mutex_lock (&stream->fragment_download_lock);
3552 if (G_UNLIKELY (stream->cancelled)) {
3553 g_mutex_unlock (&stream->fragment_download_lock);
3554 GST_MANIFEST_LOCK (demux);
3555 stream->last_ret = GST_FLOW_FLUSHING;
3559 g_cond_wait_until (&stream->fragment_download_cond,
3560 &stream->fragment_download_lock, end_time);
3561 if (G_UNLIKELY (stream->cancelled)) {
3562 g_mutex_unlock (&stream->fragment_download_lock);
3563 GST_MANIFEST_LOCK (demux);
3564 stream->last_ret = GST_FLOW_FLUSHING;
3567 } while (!stream->download_finished);
3568 g_mutex_unlock (&stream->fragment_download_lock);
3570 GST_MANIFEST_LOCK (demux);
3576 if (stream->download_error_count >= MAX_DOWNLOAD_ERROR_COUNT) {
3577 /* looks like there is no way of knowing when a live stream has ended
3578 * Have to assume we are falling behind and cause a manifest reload */
3579 GST_DEBUG_OBJECT (stream->pad, "Converting error of live stream to EOS");
3580 return GST_FLOW_EOS;
3582 } else if (!gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
3583 /* If this is the last fragment, consider failures EOS and not actual
3584 * errors. Due to rounding errors in the durations, the last fragment
3585 * might not actually exist */
3586 GST_DEBUG_OBJECT (stream->pad, "Converting error for last fragment to EOS");
3587 return GST_FLOW_EOS;
3589 /* retry once (same segment) for 5xx (server errors) */
3590 if (!retried_once) {
3591 retried_once = TRUE;
3592 /* wait a short time in case the server needs a bit to recover, we don't
3593 * care if we get woken up before end time. We can use sleep here since
3594 * we're already blocking and just want to wait some time. */
3595 g_usleep (100000); /* a tenth of a second */
3605 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
3606 (_("Failed to get fragment URL.")),
3607 ("An error happened when getting fragment URL"));
3608 gst_task_stop (stream->download_task);
3609 return GST_FLOW_ERROR;
3613 /* this function will take the manifest_lock and will keep it until the end.
3614 * It will release it temporarily only when going to sleep.
3615 * Every time it takes the manifest_lock, it will check for cancelled condition
3618 gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream * stream)
3620 GstAdaptiveDemux *demux = stream->demux;
3621 GstClockTime next_download = gst_adaptive_demux_get_monotonic_time (demux);
3625 GST_LOG_OBJECT (stream->pad, "download loop start");
3627 GST_MANIFEST_LOCK (demux);
3629 g_mutex_lock (&stream->fragment_download_lock);
3630 if (G_UNLIKELY (stream->cancelled)) {
3631 stream->last_ret = GST_FLOW_FLUSHING;
3632 g_mutex_unlock (&stream->fragment_download_lock);
3635 g_mutex_unlock (&stream->fragment_download_lock);
3637 /* Check if we're done with our segment */
3638 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3639 if (demux->segment.rate > 0) {
3640 if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop)
3641 && stream->segment.position >= stream->segment.stop) {
3642 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3644 gst_task_stop (stream->download_task);
3645 goto end_of_manifest;
3648 if (GST_CLOCK_TIME_IS_VALID (demux->segment.start)
3649 && stream->segment.position <= stream->segment.start) {
3650 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3652 gst_task_stop (stream->download_task);
3653 goto end_of_manifest;
3656 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3658 /* Cleanup old streams if any */
3659 if (G_UNLIKELY (demux->priv->old_streams != NULL)) {
3660 GList *old_streams = demux->priv->old_streams;
3661 demux->priv->old_streams = NULL;
3663 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams");
3664 g_list_free_full (old_streams,
3665 (GDestroyNotify) gst_adaptive_demux_stream_free);
3666 GST_DEBUG_OBJECT (stream->pad, "Cleaning up old streams (done)");
3668 /* gst_adaptive_demux_stream_free had temporarily released the manifest_lock.
3669 * Recheck the cancelled flag.
3671 g_mutex_lock (&stream->fragment_download_lock);
3672 if (G_UNLIKELY (stream->cancelled)) {
3673 stream->last_ret = GST_FLOW_FLUSHING;
3674 g_mutex_unlock (&stream->fragment_download_lock);
3677 g_mutex_unlock (&stream->fragment_download_lock);
3680 /* Restarting download, figure out new position
3681 * FIXME : Move this to a separate function ? */
3682 if (G_UNLIKELY (stream->restart_download)) {
3683 GstEvent *seg_event;
3684 GstClockTime cur, ts = 0;
3687 GST_DEBUG_OBJECT (stream->pad,
3688 "Activating stream due to reconfigure event");
3690 if (gst_pad_peer_query_position (stream->pad, GST_FORMAT_TIME, &pos)) {
3691 ts = (GstClockTime) pos;
3692 GST_DEBUG_OBJECT (demux, "Downstream position: %"
3693 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3695 /* query other pads as some faulty element in the pad's branch might
3696 * reject position queries. This should be better than using the
3697 * demux segment position that can be much ahead */
3700 for (iter = demux->streams; iter != NULL; iter = g_list_next (iter)) {
3701 GstAdaptiveDemuxStream *cur_stream =
3702 (GstAdaptiveDemuxStream *) iter->data;
3704 if (gst_pad_peer_query_position (cur_stream->pad, GST_FORMAT_TIME,
3706 ts = (GstClockTime) pos;
3707 GST_DEBUG_OBJECT (stream->pad, "Downstream position: %"
3708 GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3714 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3716 gst_segment_to_stream_time (&stream->segment, GST_FORMAT_TIME,
3717 stream->segment.position);
3719 /* we might have already pushed this data */
3722 GST_DEBUG_OBJECT (stream->pad, "Restarting stream at "
3723 "position %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
3725 if (GST_CLOCK_TIME_IS_VALID (ts)) {
3726 GstClockTime offset, period_start;
3729 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
3730 period_start = gst_adaptive_demux_get_period_start_time (demux);
3732 /* TODO check return */
3733 gst_adaptive_demux_stream_seek (demux, stream, demux->segment.rate >= 0,
3736 stream->segment.position = ts - period_start + offset;
3739 /* The stream's segment is still correct except for
3740 * the position, so let's send a new one with the
3741 * updated position */
3742 seg_event = gst_event_new_segment (&stream->segment);
3743 gst_event_set_seqnum (seg_event, demux->priv->segment_seqnum);
3744 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3746 GST_DEBUG_OBJECT (stream->pad, "Sending restart segment: %"
3747 GST_PTR_FORMAT, seg_event);
3748 gst_pad_push_event (stream->pad, seg_event);
3750 stream->discont = TRUE;
3751 stream->restart_download = FALSE;
3754 live = gst_adaptive_demux_is_live (demux);
3756 /* Get information about the fragment to download */
3757 GST_DEBUG_OBJECT (demux, "Calling update_fragment_info");
3758 ret = gst_adaptive_demux_stream_update_fragment_info (demux, stream);
3759 GST_DEBUG_OBJECT (stream->pad, "Fragment info update result: %d %s",
3760 ret, gst_flow_get_name (ret));
3761 if (ret == GST_FLOW_OK) {
3763 /* wait for live fragments to be available */
3766 gst_adaptive_demux_stream_get_fragment_waiting_time (demux, stream);
3767 if (wait_time > 0) {
3768 GstClockTime end_time =
3769 gst_adaptive_demux_get_monotonic_time (demux) + wait_time;
3771 GST_DEBUG_OBJECT (stream->pad, "Download waiting for %" GST_TIME_FORMAT,
3772 GST_TIME_ARGS (wait_time));
3774 GST_MANIFEST_UNLOCK (demux);
3776 g_mutex_lock (&stream->fragment_download_lock);
3777 if (G_UNLIKELY (stream->cancelled)) {
3778 g_mutex_unlock (&stream->fragment_download_lock);
3779 GST_MANIFEST_LOCK (demux);
3780 stream->last_ret = GST_FLOW_FLUSHING;
3783 gst_adaptive_demux_wait_until (demux->realtime_clock,
3784 &stream->fragment_download_cond, &stream->fragment_download_lock,
3786 g_mutex_unlock (&stream->fragment_download_lock);
3788 GST_DEBUG_OBJECT (stream->pad, "Download finished waiting");
3790 GST_MANIFEST_LOCK (demux);
3792 g_mutex_lock (&stream->fragment_download_lock);
3793 if (G_UNLIKELY (stream->cancelled)) {
3794 stream->last_ret = GST_FLOW_FLUSHING;
3795 g_mutex_unlock (&stream->fragment_download_lock);
3798 g_mutex_unlock (&stream->fragment_download_lock);
3802 stream->last_ret = GST_FLOW_OK;
3804 next_download = gst_adaptive_demux_get_monotonic_time (demux);
3805 ret = gst_adaptive_demux_stream_download_fragment (stream);
3807 if (ret == GST_FLOW_FLUSHING) {
3808 g_mutex_lock (&stream->fragment_download_lock);
3809 if (G_UNLIKELY (stream->cancelled)) {
3810 stream->last_ret = GST_FLOW_FLUSHING;
3811 g_mutex_unlock (&stream->fragment_download_lock);
3814 g_mutex_unlock (&stream->fragment_download_lock);
3818 stream->last_ret = ret;
3823 break; /* all is good, let's go */
3825 GST_DEBUG_OBJECT (stream->pad, "EOS, checking to stop download loop");
3827 /* we push the EOS after releasing the object lock */
3828 if (gst_adaptive_demux_is_live (demux)
3829 && (demux->segment.rate == 1.0
3830 || gst_adaptive_demux_stream_in_live_seek_range (demux,
3832 GstAdaptiveDemuxClass *demux_class =
3833 GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3835 /* this might be a fragment download error, refresh the manifest, just in case */
3836 if (!demux_class->requires_periodical_playlist_update (demux)) {
3837 ret = gst_adaptive_demux_update_manifest (demux);
3839 /* Wait only if we can ensure current manifest has been expired.
3840 * The meaning "we have next period" *WITH* EOS is that, current
3841 * period has been ended but we can continue to the next period */
3842 } else if (!gst_adaptive_demux_has_next_period (demux) &&
3843 gst_adaptive_demux_stream_wait_manifest_update (demux, stream)) {
3846 gst_task_stop (stream->download_task);
3847 if (stream->replaced) {
3851 gst_task_stop (stream->download_task);
3854 if (gst_adaptive_demux_combine_flows (demux) == GST_FLOW_EOS) {
3855 if (gst_adaptive_demux_has_next_period (demux)) {
3856 GST_DEBUG_OBJECT (stream->pad,
3857 "Next period available, not sending EOS");
3858 gst_adaptive_demux_advance_period (demux);
3864 case GST_FLOW_NOT_LINKED:
3867 gst_task_stop (stream->download_task);
3869 ret = gst_adaptive_demux_combine_flows (demux);
3870 if (ret == GST_FLOW_NOT_LINKED) {
3871 GST_ELEMENT_FLOW_ERROR (demux, ret);
3876 case GST_FLOW_FLUSHING:{
3879 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
3880 GstAdaptiveDemuxStream *other;
3883 gst_task_stop (other->download_task);
3889 if (ret <= GST_FLOW_ERROR) {
3890 gboolean is_live = gst_adaptive_demux_is_live (demux);
3891 GST_WARNING_OBJECT (demux, "Error while downloading fragment");
3892 if (++stream->download_error_count > MAX_DOWNLOAD_ERROR_COUNT) {
3893 goto download_error;
3896 g_clear_error (&stream->last_error);
3898 /* First try to update the playlist for non-live playlists
3899 * in case the URIs have changed in the meantime. But only
3900 * try it the first time, after that we're going to wait a
3901 * a bit to not flood the server */
3902 if (stream->download_error_count == 1 && !is_live) {
3903 /* TODO hlsdemux had more options to this function (boolean and err) */
3905 if (gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3906 /* Retry immediately, the playlist actually has changed */
3907 GST_DEBUG_OBJECT (demux, "Updated the playlist");
3912 /* Wait half the fragment duration before retrying */
3913 next_download += stream->fragment.duration / 2;
3915 GST_MANIFEST_UNLOCK (demux);
3917 g_mutex_lock (&stream->fragment_download_lock);
3918 if (G_UNLIKELY (stream->cancelled)) {
3919 g_mutex_unlock (&stream->fragment_download_lock);
3920 GST_MANIFEST_LOCK (demux);
3921 stream->last_ret = GST_FLOW_FLUSHING;
3924 gst_adaptive_demux_wait_until (demux->realtime_clock,
3925 &stream->fragment_download_cond, &stream->fragment_download_lock,
3927 g_mutex_unlock (&stream->fragment_download_lock);
3929 GST_DEBUG_OBJECT (demux, "Retrying now");
3931 GST_MANIFEST_LOCK (demux);
3933 g_mutex_lock (&stream->fragment_download_lock);
3934 if (G_UNLIKELY (stream->cancelled)) {
3935 stream->last_ret = GST_FLOW_FLUSHING;
3936 g_mutex_unlock (&stream->fragment_download_lock);
3939 g_mutex_unlock (&stream->fragment_download_lock);
3941 /* Refetch the playlist now after we waited */
3943 && gst_adaptive_demux_update_manifest (demux) == GST_FLOW_OK) {
3944 GST_DEBUG_OBJECT (demux, "Updated the playlist");
3952 if (G_UNLIKELY (ret == GST_FLOW_EOS)) {
3953 if (GST_OBJECT_PARENT (stream->pad) != NULL) {
3954 if (demux->next_streams == NULL && demux->prepared_streams == NULL) {
3955 GST_DEBUG_OBJECT (stream->src, "Pushing EOS on pad");
3956 gst_adaptive_demux_stream_push_event (stream, gst_event_new_eos ());
3958 GST_DEBUG_OBJECT (stream->src,
3959 "Stream is EOS, but we're switching fragments. Not sending.");
3962 GST_ERROR_OBJECT (demux, "Can't push EOS on non-exposed pad");
3963 goto download_error;
3968 GST_MANIFEST_UNLOCK (demux);
3969 GST_LOG_OBJECT (stream->pad, "download loop end");
3974 GST_DEBUG_OBJECT (stream->pad, "Stream has been cancelled");
3981 if (stream->last_error) {
3982 gchar *debug = g_strdup_printf ("Error on stream %s:%s",
3983 GST_DEBUG_PAD_NAME (stream->pad));
3985 gst_message_new_error (GST_OBJECT_CAST (demux), stream->last_error,
3987 GST_ERROR_OBJECT (stream->pad, "Download error: %s",
3988 stream->last_error->message);
3992 g_error_new (GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_NOT_FOUND,
3993 _("Couldn't download fragments"));
3995 gst_message_new_error (GST_OBJECT_CAST (demux), err,
3996 "Fragment downloading has failed consecutive times");
3998 GST_ERROR_OBJECT (stream->pad,
3999 "Download error: Couldn't download fragments, too many failures");
4002 gst_task_stop (stream->download_task);
4004 GstElement *src = stream->src;
4007 GST_MANIFEST_UNLOCK (demux);
4008 gst_element_set_locked_state (src, TRUE);
4009 gst_element_set_state (src, GST_STATE_NULL);
4010 gst_bin_remove (GST_BIN_CAST (demux), src);
4011 GST_MANIFEST_LOCK (demux);
4014 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
4021 gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux)
4023 GstClockTime next_update;
4024 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4026 /* Loop for updating of the playlist. This periodically checks if
4027 * the playlist is updated and does so, then signals the streaming
4028 * thread in case it can continue downloading now. */
4030 /* block until the next scheduled update or the signal to quit this thread */
4031 GST_DEBUG_OBJECT (demux, "Started updates task");
4033 GST_MANIFEST_LOCK (demux);
4036 gst_adaptive_demux_get_monotonic_time (demux) +
4037 klass->get_manifest_update_interval (demux) * GST_USECOND;
4039 /* Updating playlist only needed for live playlists */
4040 while (gst_adaptive_demux_is_live (demux)) {
4041 GstFlowReturn ret = GST_FLOW_OK;
4043 /* Wait here until we should do the next update or we're cancelled */
4044 GST_DEBUG_OBJECT (demux, "Wait for next playlist update");
4046 GST_MANIFEST_UNLOCK (demux);
4048 g_mutex_lock (&demux->priv->updates_timed_lock);
4049 if (demux->priv->stop_updates_task) {
4050 g_mutex_unlock (&demux->priv->updates_timed_lock);
4053 gst_adaptive_demux_wait_until (demux->realtime_clock,
4054 &demux->priv->updates_timed_cond,
4055 &demux->priv->updates_timed_lock, next_update);
4056 g_mutex_unlock (&demux->priv->updates_timed_lock);
4058 g_mutex_lock (&demux->priv->updates_timed_lock);
4059 if (demux->priv->stop_updates_task) {
4060 g_mutex_unlock (&demux->priv->updates_timed_lock);
4063 g_mutex_unlock (&demux->priv->updates_timed_lock);
4065 GST_MANIFEST_LOCK (demux);
4067 GST_DEBUG_OBJECT (demux, "Updating playlist");
4069 ret = gst_adaptive_demux_update_manifest (demux);
4071 if (ret == GST_FLOW_EOS) {
4072 } else if (ret != GST_FLOW_OK) {
4073 /* update_failed_count is used only here, no need to protect it */
4074 demux->priv->update_failed_count++;
4075 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
4076 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
4077 gst_flow_get_name (ret));
4078 next_update = gst_adaptive_demux_get_monotonic_time (demux)
4079 + klass->get_manifest_update_interval (demux) * GST_USECOND;
4081 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
4082 (_("Internal data stream error.")), ("Could not update playlist"));
4083 GST_DEBUG_OBJECT (demux, "Stopped updates task because of error");
4084 gst_task_stop (demux->priv->updates_task);
4085 GST_MANIFEST_UNLOCK (demux);
4089 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
4090 demux->priv->update_failed_count = 0;
4092 gst_adaptive_demux_get_monotonic_time (demux) +
4093 klass->get_manifest_update_interval (demux) * GST_USECOND;
4095 /* Wake up download tasks */
4096 g_mutex_lock (&demux->priv->manifest_update_lock);
4097 g_cond_broadcast (&demux->priv->manifest_cond);
4098 g_mutex_unlock (&demux->priv->manifest_update_lock);
4102 GST_MANIFEST_UNLOCK (demux);
4106 GST_DEBUG_OBJECT (demux, "Stop updates task request detected.");
4115 /* must be called with manifest_lock taken */
4117 gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
4122 GstAdaptiveDemux *demux = stream->demux;
4124 if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
4128 pad = gst_object_ref (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream));
4130 /* Can't push events holding the manifest lock */
4131 GST_MANIFEST_UNLOCK (demux);
4133 GST_DEBUG_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
4134 "Pushing event %" GST_PTR_FORMAT, event);
4136 ret = gst_pad_push_event (pad, event);
4138 gst_object_unref (pad);
4140 GST_MANIFEST_LOCK (demux);
4145 /* must be called with manifest_lock taken */
4147 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
4149 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4152 return klass->is_live (demux);
4156 /* must be called with manifest_lock taken */
4157 static GstFlowReturn
4158 gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
4159 GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
4160 GstClockTime ts, GstClockTime * final_ts)
4162 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4164 if (klass->stream_seek)
4165 return klass->stream_seek (stream, forward, flags, ts, final_ts);
4166 return GST_FLOW_ERROR;
4169 /* must be called with manifest_lock taken */
4171 gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux * demux,
4172 GstAdaptiveDemuxStream * stream)
4174 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4175 gboolean ret = TRUE;
4177 if (klass->stream_has_next_fragment)
4178 ret = klass->stream_has_next_fragment (stream);
4183 /* must be called with manifest_lock taken */
4185 * the ::finish_fragment() handlers when an *actual* fragment is done
4188 gst_adaptive_demux_stream_advance_fragment (GstAdaptiveDemux * demux,
4189 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4193 if (stream->last_ret == GST_FLOW_OK) {
4195 gst_adaptive_demux_stream_advance_fragment_unlocked (demux, stream,
4198 ret = stream->last_ret;
4203 /* must be called with manifest_lock taken */
4205 gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
4206 GstAdaptiveDemuxStream * stream, GstClockTime duration)
4208 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4211 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
4213 GST_LOG_OBJECT (stream->pad,
4214 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4215 GST_TIME_ARGS (stream->fragment.timestamp), GST_TIME_ARGS (duration));
4217 stream->download_error_count = 0;
4218 g_clear_error (&stream->last_error);
4220 /* FIXME - url has no indication of byte ranges for subsegments */
4221 /* FIXME : All those time statistics are biased, since they are calculated
4222 * *AFTER* the queue2, which might be blocking. They should ideally be
4223 * calculated *before* queue2 in the uri_handler_probe */
4224 gst_element_post_message (GST_ELEMENT_CAST (demux),
4225 gst_message_new_element (GST_OBJECT_CAST (demux),
4226 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
4227 "manifest-uri", G_TYPE_STRING,
4228 demux->manifest_uri, "uri", G_TYPE_STRING,
4229 stream->fragment.uri, "fragment-start-time",
4230 GST_TYPE_CLOCK_TIME, stream->download_start_time,
4231 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
4232 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
4233 stream->download_total_bytes, "fragment-download-time",
4234 GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
4236 /* Don't update to the end of the segment if in reverse playback */
4237 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4238 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
4239 GstClockTime offset =
4240 gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
4241 GstClockTime period_start =
4242 gst_adaptive_demux_get_period_start_time (demux);
4244 stream->segment.position += duration;
4246 /* Convert from position inside the stream's segment to the demuxer's
4247 * segment, they are not necessarily the same */
4248 if (stream->segment.position - offset + period_start >
4249 demux->segment.position)
4250 demux->segment.position =
4251 stream->segment.position - offset + period_start;
4253 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
4255 /* When advancing with a non 1.0 rate on live streams, we need to check
4256 * the live seeking range again to make sure we can still advance to
4258 if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
4259 if (!gst_adaptive_demux_stream_in_live_seek_range (demux, stream))
4262 ret = klass->stream_advance_fragment (stream);
4263 } else if (gst_adaptive_demux_is_live (demux)
4264 || gst_adaptive_demux_stream_has_next_fragment (demux, stream)) {
4265 ret = klass->stream_advance_fragment (stream);
4270 stream->download_start_time =
4271 GST_TIME_AS_USECONDS (gst_adaptive_demux_get_monotonic_time (demux));
4273 if (ret == GST_FLOW_OK) {
4274 if (gst_adaptive_demux_stream_select_bitrate (demux, stream,
4275 gst_adaptive_demux_stream_update_current_bitrate (demux, stream))) {
4276 stream->need_header = TRUE;
4277 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
4280 /* the subclass might want to switch pads */
4281 if (G_UNLIKELY (demux->next_streams)) {
4283 gboolean can_expose = TRUE;
4285 gst_task_stop (stream->download_task);
4289 for (iter = demux->streams; iter; iter = g_list_next (iter)) {
4290 /* Only expose if all streams are now cancelled or finished downloading */
4291 GstAdaptiveDemuxStream *other = iter->data;
4292 if (other != stream) {
4293 g_mutex_lock (&other->fragment_download_lock);
4294 can_expose &= (other->cancelled == TRUE
4295 || other->download_finished == TRUE);
4296 g_mutex_unlock (&other->fragment_download_lock);
4301 GST_DEBUG_OBJECT (demux, "Subclass wants new pads "
4302 "to do bitrate switching");
4303 gst_adaptive_demux_prepare_streams (demux, FALSE);
4304 gst_adaptive_demux_start_tasks (demux, TRUE);
4306 GST_LOG_OBJECT (demux, "Not switching yet - ongoing downloads");
4314 /* must be called with manifest_lock taken */
4316 gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
4317 demux, GstAdaptiveDemuxStream * stream, guint64 bitrate)
4319 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4321 if (klass->stream_select_bitrate)
4322 return klass->stream_select_bitrate (stream, bitrate);
4326 /* must be called with manifest_lock taken */
4327 static GstFlowReturn
4328 gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
4329 GstAdaptiveDemuxStream * stream)
4331 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4334 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
4337 /* Make sure the sub-class will update bitrate, or else
4339 stream->fragment.bitrate = 0;
4340 stream->fragment.finished = FALSE;
4342 GST_LOG_OBJECT (stream->pad, "position %" GST_TIME_FORMAT,
4343 GST_TIME_ARGS (stream->segment.position));
4345 ret = klass->stream_update_fragment_info (stream);
4347 GST_LOG_OBJECT (stream->pad, "ret:%s uri:%s", gst_flow_get_name (ret),
4348 stream->fragment.uri);
4349 if (ret == GST_FLOW_OK) {
4350 GST_LOG_OBJECT (stream->pad,
4351 "timestamp %" GST_TIME_FORMAT " duration:%" GST_TIME_FORMAT,
4352 GST_TIME_ARGS (stream->fragment.timestamp),
4353 GST_TIME_ARGS (stream->fragment.duration));
4354 GST_LOG_OBJECT (stream->pad,
4355 "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
4356 stream->fragment.range_start, stream->fragment.range_end);
4362 /* must be called with manifest_lock taken */
4364 gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux *
4365 demux, GstAdaptiveDemuxStream * stream)
4367 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4369 if (klass->stream_get_fragment_waiting_time)
4370 return klass->stream_get_fragment_waiting_time (stream);
4374 /* must be called with manifest_lock taken */
4375 static GstFlowReturn
4376 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4378 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4379 GstFragment *download;
4382 GError *error = NULL;
4384 download = gst_uri_downloader_fetch_uri (demux->downloader,
4385 demux->manifest_uri, NULL, TRUE, TRUE, TRUE, &error);
4387 g_free (demux->manifest_uri);
4388 g_free (demux->manifest_base_uri);
4389 if (download->redirect_permanent && download->redirect_uri) {
4390 demux->manifest_uri = g_strdup (download->redirect_uri);
4391 demux->manifest_base_uri = NULL;
4393 demux->manifest_uri = g_strdup (download->uri);
4394 demux->manifest_base_uri = g_strdup (download->redirect_uri);
4397 buffer = gst_fragment_get_buffer (download);
4398 g_object_unref (download);
4399 ret = klass->update_manifest_data (demux, buffer);
4400 gst_buffer_unref (buffer);
4401 /* FIXME: Should the manifest uri vars be reverted to original
4402 * values if updating fails? */
4404 GST_WARNING_OBJECT (demux, "Failed to download manifest: %s",
4406 ret = GST_FLOW_NOT_LINKED;
4408 g_clear_error (&error);
4413 /* must be called with manifest_lock taken */
4414 static GstFlowReturn
4415 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4417 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4420 ret = klass->update_manifest (demux);
4422 if (ret == GST_FLOW_OK) {
4423 GstClockTime duration;
4424 /* Send an updated duration message */
4425 duration = klass->get_duration (demux);
4426 if (duration != GST_CLOCK_TIME_NONE) {
4427 GST_DEBUG_OBJECT (demux,
4428 "Sending duration message : %" GST_TIME_FORMAT,
4429 GST_TIME_ARGS (duration));
4430 gst_element_post_message (GST_ELEMENT (demux),
4431 gst_message_new_duration_changed (GST_OBJECT (demux)));
4433 GST_DEBUG_OBJECT (demux,
4434 "Duration unknown, can not send the duration message");
4437 /* If a manifest changes it's liveness or periodic updateness, we need
4438 * to start/stop the manifest update task appropriately */
4439 /* Keep this condition in sync with the one in
4440 * gst_adaptive_demux_start_manifest_update_task()
4442 if (gst_adaptive_demux_is_live (demux) &&
4443 klass->requires_periodical_playlist_update (demux)) {
4444 gst_adaptive_demux_start_manifest_update_task (demux);
4446 gst_adaptive_demux_stop_manifest_update_task (demux);
4454 gst_adaptive_demux_stream_fragment_clear (GstAdaptiveDemuxStreamFragment * f)
4461 g_free (f->header_uri);
4462 f->header_uri = NULL;
4463 f->header_range_start = 0;
4464 f->header_range_end = -1;
4466 g_free (f->index_uri);
4467 f->index_uri = NULL;
4468 f->index_range_start = 0;
4469 f->index_range_end = -1;
4471 f->finished = FALSE;
4474 /* must be called with manifest_lock taken */
4476 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4478 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4479 gboolean ret = FALSE;
4481 if (klass->has_next_period)
4482 ret = klass->has_next_period (demux);
4483 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4487 /* must be called with manifest_lock taken */
4489 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4491 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4493 g_return_if_fail (klass->advance_period != NULL);
4495 GST_DEBUG_OBJECT (demux, "Advancing to next period");
4496 klass->advance_period (demux);
4497 gst_adaptive_demux_prepare_streams (demux, FALSE);
4498 gst_adaptive_demux_start_tasks (demux, TRUE);
4502 * gst_adaptive_demux_get_monotonic_time:
4503 * Returns: a monotonically increasing time, using the system realtime clock
4506 gst_adaptive_demux_get_monotonic_time (GstAdaptiveDemux * demux)
4508 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4509 return gst_clock_get_time (demux->realtime_clock);
4513 * gst_adaptive_demux_get_client_now_utc:
4514 * @demux: #GstAdaptiveDemux
4515 * Returns: the client's estimate of UTC
4517 * Used to find the client's estimate of UTC, using the system realtime clock.
4520 gst_adaptive_demux_get_client_now_utc (GstAdaptiveDemux * demux)
4522 GstClockTime rtc_now;
4523 GDateTime *unix_datetime;
4524 GDateTime *result_datetime;
4525 gint64 utc_now_in_us;
4527 rtc_now = gst_clock_get_time (demux->realtime_clock);
4528 utc_now_in_us = demux->clock_offset + GST_TIME_AS_USECONDS (rtc_now);
4530 g_date_time_new_from_unix_utc (utc_now_in_us / G_TIME_SPAN_SECOND);
4532 g_date_time_add (unix_datetime, utc_now_in_us % G_TIME_SPAN_SECOND);
4533 g_date_time_unref (unix_datetime);
4534 return result_datetime;
4538 * gst_adaptive_demux_is_running
4539 * @demux: #GstAdaptiveDemux
4540 * Returns: whether the demuxer is processing data
4542 * Returns FALSE if shutdown has started (transitioning down from
4543 * PAUSED), otherwise TRUE.
4546 gst_adaptive_demux_is_running (GstAdaptiveDemux * demux)
4548 return g_atomic_int_get (&demux->running);
4551 static GstAdaptiveDemuxTimer *
4552 gst_adaptive_demux_timer_new (GCond * cond, GMutex * mutex)
4554 GstAdaptiveDemuxTimer *timer;
4556 timer = g_slice_new (GstAdaptiveDemuxTimer);
4557 timer->fired = FALSE;
4559 timer->mutex = mutex;
4560 timer->ref_count = 1;
4564 static GstAdaptiveDemuxTimer *
4565 gst_adaptive_demux_timer_ref (GstAdaptiveDemuxTimer * timer)
4567 g_return_val_if_fail (timer != NULL, NULL);
4568 g_atomic_int_inc (&timer->ref_count);
4573 gst_adaptive_demux_timer_unref (GstAdaptiveDemuxTimer * timer)
4575 g_return_if_fail (timer != NULL);
4576 if (g_atomic_int_dec_and_test (&timer->ref_count)) {
4577 g_slice_free (GstAdaptiveDemuxTimer, timer);
4581 /* gst_adaptive_demux_wait_until:
4582 * A replacement for g_cond_wait_until that uses the clock rather
4583 * than system time to control the duration of the sleep. Typically
4584 * clock is actually a #GstSystemClock, in which case this function
4585 * behaves exactly like g_cond_wait_until. Inside unit tests,
4586 * the clock is typically a #GstTestClock, which allows tests to run
4588 * This function must be called with mutex held.
4591 gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
4592 GstClockTime end_time)
4594 GstAdaptiveDemuxTimer *timer;
4598 if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (end_time))) {
4599 /* for an invalid time, gst_clock_id_wait_async will try to call
4600 * gst_adaptive_demux_clock_callback from the current thread.
4601 * It still holds the mutex while doing that, so it will deadlock.
4602 * g_cond_wait_until would return immediately with false, so we'll do the same.
4606 timer = gst_adaptive_demux_timer_new (cond, mutex);
4607 timer->clock_id = gst_clock_new_single_shot_id (clock, end_time);
4609 gst_clock_id_wait_async (timer->clock_id,
4610 gst_adaptive_demux_clock_callback, gst_adaptive_demux_timer_ref (timer),
4611 (GDestroyNotify) gst_adaptive_demux_timer_unref);
4612 /* clock does not support asynchronously wait. Assert and return */
4613 if (res == GST_CLOCK_UNSUPPORTED) {
4614 gst_clock_id_unref (timer->clock_id);
4615 gst_adaptive_demux_timer_unref (timer);
4616 g_return_val_if_reached (TRUE);
4618 g_assert (!timer->fired);
4619 /* the gst_adaptive_demux_clock_callback() will signal the
4620 * cond when the clock's single shot timer fires, or the cond will be
4621 * signalled by another thread that wants to cause this wait to finish
4622 * early (e.g. to terminate the waiting thread).
4623 * There is no need for a while loop here, because that logic is
4624 * implemented by the function calling gst_adaptive_demux_wait_until() */
4625 g_cond_wait (cond, mutex);
4626 fired = timer->fired;
4628 gst_clock_id_unschedule (timer->clock_id);
4629 gst_clock_id_unref (timer->clock_id);
4630 gst_adaptive_demux_timer_unref (timer);
4635 gst_adaptive_demux_clock_callback (GstClock * clock,
4636 GstClockTime time, GstClockID id, gpointer user_data)
4638 GstAdaptiveDemuxTimer *timer = (GstAdaptiveDemuxTimer *) user_data;
4639 g_return_val_if_fail (timer != NULL, FALSE);
4640 g_mutex_lock (timer->mutex);
4641 timer->fired = TRUE;
4642 g_cond_signal (timer->cond);
4643 g_mutex_unlock (timer->mutex);