3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
27 * SECTION:plugin-adaptivedemux2
28 * @short_description: Next Generation adaptive demuxers
30 * What is an adaptive demuxer? Adaptive demuxers are special demuxers in the
31 * sense that they don't actually demux data received from upstream but download
32 * the data themselves.
34 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35 * of fragments. The manifest describes the available media and the sequence of
36 * fragments to use. Each fragment contains a small part of the media (typically
37 * only a few seconds). It is possible for the manifest to have the same media
38 * available in different configurations (bitrates for example) so that the
39 * client can select the one that best suits its scenario (network fluctuation,
40 * hardware requirements...).
42 * Furthermore, that manifest can also specify alternative medias (such as audio
43 * or subtitle tracks in different languages). Only the fragments for the
44 * requested selection will be download.
46 * These elements can therefore "adapt" themselves to the network conditions (as
47 * opposed to the server doing that adaptation) and user choices, which is why
48 * they are called "adaptive" demuxers.
50 * Note: These elements require a "streams-aware" container to work
51 * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52 * GST_BIN_FLAG_STREAMS_AWARE flag set).
55 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56 * about the intrinsics of the subclass formats, so the subclasses are
57 * responsible for maintaining the manifest data structures and stream
64 See the adaptive-demuxer.md design documentation for more information
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
98 By using the api_lock a thread is protected against other API calls.
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
122 #define DEFAULT_MAX_BUFFERING_TIME (30 * GST_SECOND)
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME (10 * GST_SECOND)
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
139 PROP_CONNECTION_SPEED,
140 PROP_BANDWIDTH_TARGET_RATIO,
141 PROP_CONNECTION_BITRATE,
144 PROP_CURRENT_BANDWIDTH,
145 PROP_MAX_BUFFERING_TIME,
146 PROP_BUFFERING_HIGH_WATERMARK_TIME,
147 PROP_BUFFERING_LOW_WATERMARK_TIME,
148 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150 PROP_CURRENT_LEVEL_TIME_VIDEO,
151 PROP_CURRENT_LEVEL_TIME_AUDIO,
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
159 GST_STATIC_CAPS_ANY);
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
165 GST_STATIC_CAPS_ANY);
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
171 GST_STATIC_CAPS_ANY);
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
179 /* Last flow return */
180 GstFlowReturn flow_ret;
185 /* Target track (reference) */
186 GstAdaptiveDemuxTrack *track;
188 /* Pending track (which will replace track) */
189 GstAdaptiveDemuxTrack *pending_track;
191 /* TRUE if a buffer or a gap event was pushed through this slot. */
192 gboolean pushed_timed_data;
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200 GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203 element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
207 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
209 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
211 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
212 GstObject * parent, GstBuffer * buffer);
213 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
215 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
219 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
221 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
222 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
223 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
224 gboolean first_and_live);
226 static gboolean gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
227 demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate);
229 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
231 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
233 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
236 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
237 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
238 gboolean stop_updates);
240 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
241 GstAdaptiveDemux2Stream * stream, GstBuffer * buffer);
243 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
244 GstAdaptiveDemux2Stream * stream);
246 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
247 GstAdaptiveDemux2Stream * stream, GstClockTime duration);
250 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
251 GstAdaptiveDemux2Stream * stream);
254 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
257 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
258 * method to get to the padtemplates */
260 gst_adaptive_demux_ng_get_type (void)
262 static gsize type = 0;
264 if (g_once_init_enter (&type)) {
266 static const GTypeInfo info = {
267 sizeof (GstAdaptiveDemuxClass),
270 (GClassInitFunc) gst_adaptive_demux_class_init,
273 sizeof (GstAdaptiveDemux),
275 (GInstanceInitFunc) gst_adaptive_demux_init,
278 _type = g_type_register_static (GST_TYPE_BIN,
279 "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
282 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
284 g_once_init_leave (&type, _type);
289 static inline GstAdaptiveDemuxPrivate *
290 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
292 return (G_STRUCT_MEMBER_P (self, private_offset));
296 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
297 const GValue * value, GParamSpec * pspec)
299 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
301 GST_OBJECT_LOCK (demux);
304 case PROP_CONNECTION_SPEED:
305 demux->connection_speed = g_value_get_uint (value) * 1000;
306 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
307 demux->connection_speed);
309 case PROP_BANDWIDTH_TARGET_RATIO:
310 demux->bandwidth_target_ratio = g_value_get_float (value);
312 case PROP_MIN_BITRATE:
313 demux->min_bitrate = g_value_get_uint (value);
315 case PROP_MAX_BITRATE:
316 demux->max_bitrate = g_value_get_uint (value);
318 case PROP_CONNECTION_BITRATE:
319 demux->connection_speed = g_value_get_uint (value);
321 /* FIXME: Recalculate track and buffering levels
322 * when watermarks change? */
323 case PROP_MAX_BUFFERING_TIME:
324 demux->max_buffering_time = g_value_get_uint64 (value);
326 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
327 demux->buffering_high_watermark_time = g_value_get_uint64 (value);
329 case PROP_BUFFERING_LOW_WATERMARK_TIME:
330 demux->buffering_low_watermark_time = g_value_get_uint64 (value);
332 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
333 demux->buffering_high_watermark_fragments = g_value_get_double (value);
335 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
336 demux->buffering_low_watermark_fragments = g_value_get_double (value);
339 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
343 GST_OBJECT_UNLOCK (demux);
347 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
348 GValue * value, GParamSpec * pspec)
350 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
352 GST_OBJECT_LOCK (demux);
355 case PROP_CONNECTION_SPEED:
356 g_value_set_uint (value, demux->connection_speed / 1000);
358 case PROP_BANDWIDTH_TARGET_RATIO:
359 g_value_set_float (value, demux->bandwidth_target_ratio);
361 case PROP_MIN_BITRATE:
362 g_value_set_uint (value, demux->min_bitrate);
364 case PROP_MAX_BITRATE:
365 g_value_set_uint (value, demux->max_bitrate);
367 case PROP_CONNECTION_BITRATE:
368 g_value_set_uint (value, demux->connection_speed);
370 case PROP_CURRENT_BANDWIDTH:
371 g_value_set_uint (value, demux->current_download_rate);
373 case PROP_MAX_BUFFERING_TIME:
374 g_value_set_uint64 (value, demux->max_buffering_time);
376 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
377 g_value_set_uint64 (value, demux->buffering_high_watermark_time);
379 case PROP_BUFFERING_LOW_WATERMARK_TIME:
380 g_value_set_uint64 (value, demux->buffering_low_watermark_time);
382 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
383 g_value_set_double (value, demux->buffering_high_watermark_fragments);
385 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
386 g_value_set_double (value, demux->buffering_low_watermark_fragments);
388 case PROP_CURRENT_LEVEL_TIME_VIDEO:
389 g_value_set_uint64 (value, demux->current_level_time_video);
391 case PROP_CURRENT_LEVEL_TIME_AUDIO:
392 g_value_set_uint64 (value, demux->current_level_time_audio);
395 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
399 GST_OBJECT_UNLOCK (demux);
403 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
405 GObjectClass *gobject_class;
406 GstElementClass *gstelement_class;
407 GstBinClass *gstbin_class;
409 gobject_class = G_OBJECT_CLASS (klass);
410 gstelement_class = GST_ELEMENT_CLASS (klass);
411 gstbin_class = GST_BIN_CLASS (klass);
413 GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
414 "Base Adaptive Demux (ng)");
416 parent_class = g_type_class_peek_parent (klass);
418 if (private_offset != 0)
419 g_type_class_adjust_private_offset (klass, &private_offset);
421 gobject_class->set_property = gst_adaptive_demux_set_property;
422 gobject_class->get_property = gst_adaptive_demux_get_property;
423 gobject_class->finalize = gst_adaptive_demux_finalize;
425 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
426 g_param_spec_uint ("connection-speed", "Connection Speed",
427 "Network connection speed to use in kbps (0 = calculate from downloaded"
428 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
429 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431 g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
432 g_param_spec_float ("bandwidth-target-ratio",
433 "Ratio of target bandwidth / available bandwidth",
434 "Limit of the available bitrate to use when switching to alternates",
435 0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
436 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438 g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
439 g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
440 "Network connection speed to use (0 = automatic) (bits/s)",
441 0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
442 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444 g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
445 g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
446 "Minimum bitrate to use when switching to alternates (bits/s)",
447 0, G_MAXUINT, DEFAULT_MIN_BITRATE,
448 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450 g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
451 g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
452 "Maximum bitrate to use when switching to alternates (bits/s)",
453 0, G_MAXUINT, DEFAULT_MAX_BITRATE,
454 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456 g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
457 g_param_spec_uint ("current-bandwidth",
458 "Current download bandwidth (bits/s)",
459 "Report of current download bandwidth (based on arriving data) (bits/s)",
460 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
462 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
463 g_param_spec_uint64 ("max-buffering-time",
464 "Buffering maximum size (ns)",
465 "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
466 0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
467 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468 G_PARAM_STATIC_STRINGS));
470 g_object_class_install_property (gobject_class,
471 PROP_BUFFERING_HIGH_WATERMARK_TIME,
472 g_param_spec_uint64 ("high-watermark-time",
473 "High buffering watermark size (ns)",
474 "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
475 0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
476 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477 G_PARAM_STATIC_STRINGS));
479 g_object_class_install_property (gobject_class,
480 PROP_BUFFERING_LOW_WATERMARK_TIME,
481 g_param_spec_uint64 ("low-watermark-time",
482 "Low buffering watermark size (ns)",
483 "Low watermark for parsed data below which downloads are resumed (in ns, 0=disable)",
484 0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
485 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
486 G_PARAM_STATIC_STRINGS));
488 g_object_class_install_property (gobject_class,
489 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
490 g_param_spec_double ("high-watermark-fragments",
491 "High buffering watermark size (fragments)",
492 "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
493 0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
494 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495 G_PARAM_STATIC_STRINGS));
497 g_object_class_install_property (gobject_class,
498 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
499 g_param_spec_double ("low-watermark-fragments",
500 "Low buffering watermark size (fragments)",
501 "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
502 0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
503 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
504 G_PARAM_STATIC_STRINGS));
506 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
507 g_param_spec_uint64 ("current-level-time-video",
508 "Currently buffered level of video (ns)",
509 "Currently buffered level of video track(s) (ns)",
510 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
511 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
512 G_PARAM_STATIC_STRINGS));
514 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
515 g_param_spec_uint64 ("current-level-time-audio",
516 "Currently buffered level of audio (ns)",
517 "Currently buffered level of audio track(s) (ns)",
518 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
519 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
520 G_PARAM_STATIC_STRINGS));
522 gst_element_class_add_static_pad_template (gstelement_class,
523 &gst_adaptive_demux_audiosrc_template);
524 gst_element_class_add_static_pad_template (gstelement_class,
525 &gst_adaptive_demux_videosrc_template);
526 gst_element_class_add_static_pad_template (gstelement_class,
527 &gst_adaptive_demux_subtitlesrc_template);
530 gstelement_class->change_state = gst_adaptive_demux_change_state;
531 gstelement_class->query = gst_adaptive_demux_query;
533 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
535 klass->data_received = gst_adaptive_demux2_stream_data_received_default;
536 klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
537 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
538 klass->requires_periodical_playlist_update =
539 gst_adaptive_demux_requires_periodical_playlist_update_default;
540 klass->stream_update_tracks = gst_adaptive_demux2_stream_update_tracks;
541 gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
545 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
546 GstAdaptiveDemuxClass * klass)
548 GstPadTemplate *pad_template;
550 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
552 demux->priv = gst_adaptive_demux_get_instance_private (demux);
553 demux->priv->input_adapter = gst_adapter_new ();
554 demux->realtime_clock = gst_adaptive_demux_clock_new ();
556 demux->download_helper = downloadhelper_new (demux->realtime_clock);
557 demux->priv->segment_seqnum = gst_util_seqnum_next ();
558 demux->have_group_id = FALSE;
559 demux->group_id = G_MAXUINT;
561 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
562 demux->instant_rate_multiplier = 1.0;
564 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
565 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
567 g_rec_mutex_init (&demux->priv->manifest_lock);
569 demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
570 g_mutex_init (&demux->priv->scheduler_lock);
572 g_mutex_init (&demux->priv->api_lock);
573 g_mutex_init (&demux->priv->segment_lock);
575 g_mutex_init (&demux->priv->tracks_lock);
576 g_cond_init (&demux->priv->tracks_add);
578 g_mutex_init (&demux->priv->buffering_lock);
580 demux->priv->periods = g_queue_new ();
583 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
584 g_return_if_fail (pad_template != NULL);
586 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
587 gst_pad_set_event_function (demux->sinkpad,
588 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
589 gst_pad_set_chain_function (demux->sinkpad,
590 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
593 demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
594 demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
595 demux->min_bitrate = DEFAULT_MIN_BITRATE;
596 demux->max_bitrate = DEFAULT_MAX_BITRATE;
598 demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
599 demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
600 demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
601 demux->buffering_high_watermark_fragments =
602 DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
603 demux->buffering_low_watermark_fragments =
604 DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
606 demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
607 demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
609 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
611 demux->priv->duration = GST_CLOCK_TIME_NONE;
613 /* Output combiner */
614 demux->priv->flowcombiner = gst_flow_combiner_new ();
617 g_rec_mutex_init (&demux->priv->output_lock);
618 demux->priv->output_task =
619 gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
621 gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
625 gst_adaptive_demux_finalize (GObject * object)
627 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
628 GstAdaptiveDemuxPrivate *priv = demux->priv;
630 GST_DEBUG_OBJECT (object, "finalize");
632 g_object_unref (priv->input_adapter);
634 downloadhelper_free (demux->download_helper);
636 g_rec_mutex_clear (&demux->priv->manifest_lock);
637 g_mutex_clear (&demux->priv->api_lock);
638 g_mutex_clear (&demux->priv->segment_lock);
640 g_mutex_clear (&demux->priv->buffering_lock);
642 g_mutex_clear (&demux->priv->scheduler_lock);
643 gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
645 /* The input period is present after a reset, clear it now */
646 if (demux->input_period)
647 gst_adaptive_demux_period_unref (demux->input_period);
649 if (demux->realtime_clock) {
650 gst_adaptive_demux_clock_unref (demux->realtime_clock);
651 demux->realtime_clock = NULL;
653 g_object_unref (priv->output_task);
654 g_rec_mutex_clear (&priv->output_lock);
656 gst_flow_combiner_free (priv->flowcombiner);
658 g_queue_free (priv->periods);
660 G_OBJECT_CLASS (parent_class)->finalize (object);
664 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
666 gboolean ret = FALSE;
667 GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
669 ret = (parent && GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE));
674 static GstStateChangeReturn
675 gst_adaptive_demux_change_state (GstElement * element,
676 GstStateChange transition)
678 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
679 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
681 switch (transition) {
682 case GST_STATE_CHANGE_NULL_TO_READY:
683 if (!gst_adaptive_demux_check_streams_aware (demux)) {
684 GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
685 (_("Element requires a streams-aware context.")), (NULL));
689 case GST_STATE_CHANGE_PAUSED_TO_READY:
690 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
691 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
693 gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
694 downloadhelper_stop (demux->download_helper);
697 demux->priv->flushing = TRUE;
698 g_cond_signal (&demux->priv->tracks_add);
699 gst_task_stop (demux->priv->output_task);
700 TRACKS_UNLOCK (demux);
702 gst_task_join (demux->priv->output_task);
704 GST_API_LOCK (demux);
705 gst_adaptive_demux_reset (demux);
706 GST_API_UNLOCK (demux);
708 case GST_STATE_CHANGE_READY_TO_PAUSED:
709 GST_API_LOCK (demux);
710 gst_adaptive_demux_reset (demux);
712 gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
713 if (g_atomic_int_get (&demux->priv->have_manifest))
714 gst_adaptive_demux_start_manifest_update_task (demux);
715 GST_API_UNLOCK (demux);
716 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
717 GST_DEBUG_OBJECT (demux, "demuxer has started running");
718 /* gst_task_start (demux->priv->output_task); */
724 /* this must be run with the scheduler and output tasks stopped. */
725 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
727 switch (transition) {
728 case GST_STATE_CHANGE_READY_TO_PAUSED:
729 /* Start download task */
730 downloadhelper_start (demux->download_helper);
741 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
744 GstEvent *eos = gst_event_new_eos ();
745 GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
747 /* FIXME: The slot might not have output any data, caps or segment yet */
748 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
749 gst_pad_push_event (slot->pad, eos);
750 gst_pad_set_active (slot->pad, FALSE);
751 gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
752 gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
754 gst_adaptive_demux_track_unref (slot->track);
755 if (slot->pending_track)
756 gst_adaptive_demux_track_unref (slot->pending_track);
758 g_slice_free (OutputSlot, slot);
762 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
763 GstStreamType streamtype)
766 GstPadTemplate *tmpl;
769 switch (streamtype) {
770 case GST_STREAM_TYPE_AUDIO:
771 name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
773 gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
775 case GST_STREAM_TYPE_VIDEO:
776 name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
778 gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
780 case GST_STREAM_TYPE_TEXT:
782 g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
784 gst_static_pad_template_get
785 (&gst_adaptive_demux_subtitlesrc_template);
788 g_assert_not_reached ();
792 slot = g_slice_new0 (OutputSlot);
793 slot->type = streamtype;
794 slot->pushed_timed_data = FALSE;
796 /* Create and activate new pads */
797 slot->pad = gst_pad_new_from_template (tmpl, name);
799 gst_object_unref (tmpl);
801 gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
802 gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
803 gst_pad_set_active (slot->pad, TRUE);
805 gst_pad_set_query_function (slot->pad,
806 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
807 gst_pad_set_event_function (slot->pad,
808 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
810 gst_pad_set_element_private (slot->pad, slot);
812 GST_INFO_OBJECT (demux, "Created output slot %s:%s",
813 GST_DEBUG_PAD_NAME (slot->pad));
818 * * After `process_manifest` or when a period starts
819 * * Or when all tracks have been created
821 * Goes over tracks and creates the collection
823 * Returns TRUE if the collection was fully created.
825 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
828 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
829 GstAdaptiveDemuxPeriod * period)
831 GstStreamCollection *collection;
834 GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
836 if (!period->tracks_changed) {
837 GST_DEBUG_OBJECT (demux, "Tracks didn't change");
841 if (!period->tracks) {
842 GST_WARNING_OBJECT (demux, "No tracks registered/present");
846 if (gst_adaptive_demux_period_has_pending_tracks (period)) {
847 GST_DEBUG_OBJECT (demux,
848 "Streams still have pending tracks, not creating/updating collection");
852 /* Update collection */
853 collection = gst_stream_collection_new ("adaptivedemux");
855 for (iter = period->tracks; iter; iter = iter->next) {
856 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
858 GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
859 gst_stream_collection_add_stream (collection,
860 gst_object_ref (track->stream_object));
863 if (period->collection)
864 gst_object_unref (period->collection);
865 period->collection = collection;
871 * Called for the output period:
872 * * after `update_collection()` if the input period is the same as the output period
873 * * When the output period changes
875 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
878 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
880 GstStreamCollection *collection;
881 GstAdaptiveDemuxPeriod *period = demux->output_period;
882 guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
884 g_return_val_if_fail (period, FALSE);
885 if (!period->collection) {
886 GST_DEBUG_OBJECT (demux, "No collection available yet");
890 collection = period->collection;
892 GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
895 /* Post collection */
896 TRACKS_UNLOCK (demux);
897 GST_MANIFEST_UNLOCK (demux);
899 gst_element_post_message (GST_ELEMENT_CAST (demux),
900 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
902 GST_MANIFEST_LOCK (demux);
905 /* If no stream selection was handled, make a default selection */
906 if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
907 gst_adaptive_demux_period_select_default_tracks (demux,
908 demux->output_period);
911 /* Make sure the output task is running */
912 if (gst_adaptive_demux2_is_running (demux)) {
913 demux->priv->flushing = FALSE;
914 GST_DEBUG_OBJECT (demux, "Starting the output task");
915 gst_task_start (demux->priv->output_task);
922 handle_incoming_manifest (GstAdaptiveDemux * demux)
924 GstAdaptiveDemuxClass *demux_class;
929 GstBuffer *manifest_buffer;
931 GST_API_LOCK (demux);
932 GST_MANIFEST_LOCK (demux);
934 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
936 available = gst_adapter_available (demux->priv->input_adapter);
939 goto eos_without_data;
941 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
943 /* Need to get the URI to use it as a base to generate the fragment's
945 query = gst_query_new_uri ();
946 query_res = gst_pad_peer_query (demux->sinkpad, query);
948 gchar *uri, *redirect_uri;
951 gst_query_parse_uri (query, &uri);
952 gst_query_parse_uri_redirection (query, &redirect_uri);
953 gst_query_parse_uri_redirection_permanent (query, &permanent);
955 if (permanent && redirect_uri) {
956 demux->manifest_uri = redirect_uri;
957 demux->manifest_base_uri = NULL;
960 demux->manifest_uri = uri;
961 demux->manifest_base_uri = redirect_uri;
964 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
965 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
967 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
969 gst_query_unref (query);
971 /* If somehow we didn't receive a stream-start with a group_id, pick one now */
972 if (!demux->have_group_id) {
973 demux->have_group_id = TRUE;
974 demux->group_id = gst_util_group_id_next ();
977 /* Let the subclass parse the manifest */
979 gst_adapter_take_buffer (demux->priv->input_adapter, available);
980 ret = demux_class->process_manifest (demux, manifest_buffer);
981 gst_buffer_unref (manifest_buffer);
983 gst_element_post_message (GST_ELEMENT_CAST (demux),
984 gst_message_new_element (GST_OBJECT_CAST (demux),
985 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
986 "manifest-uri", G_TYPE_STRING,
987 demux->manifest_uri, "uri", G_TYPE_STRING,
989 "manifest-download-start", GST_TYPE_CLOCK_TIME,
991 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
992 gst_util_get_timestamp (), NULL)));
995 goto invalid_manifest;
997 /* Streams should have been added to the input period if the manifest parsing
999 if (!demux->input_period->streams)
1002 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1004 GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1005 /* Send duration message */
1006 if (!gst_adaptive_demux_is_live (demux)) {
1007 GstClockTime duration = demux_class->get_duration (demux);
1009 demux->priv->duration = duration;
1010 if (duration != GST_CLOCK_TIME_NONE) {
1011 GST_DEBUG_OBJECT (demux,
1012 "Sending duration message : %" GST_TIME_FORMAT,
1013 GST_TIME_ARGS (duration));
1014 gst_element_post_message (GST_ELEMENT (demux),
1015 gst_message_new_duration_changed (GST_OBJECT (demux)));
1017 GST_DEBUG_OBJECT (demux,
1018 "media duration unknown, can not send the duration message");
1022 TRACKS_LOCK (demux);
1023 /* New streams/tracks will have been added to the input period */
1024 /* The input period has streams, make it the active output period */
1025 /* FIXME : Factorize this into a function to make a period active */
1026 demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1027 ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1028 gst_adaptive_demux_post_collection (demux);
1029 TRACKS_UNLOCK (demux);
1031 gst_adaptive_demux_prepare_streams (demux,
1032 gst_adaptive_demux_is_live (demux));
1033 gst_adaptive_demux_start_tasks (demux);
1034 gst_adaptive_demux_start_manifest_update_task (demux);
1037 GST_MANIFEST_UNLOCK (demux);
1038 GST_API_UNLOCK (demux);
1045 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1053 GST_WARNING_OBJECT (demux, "No streams created from manifest");
1054 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1055 (_("This file contains no playable streams.")),
1056 ("No known stream formats found at the Manifest"));
1063 GST_MANIFEST_UNLOCK (demux);
1064 GST_API_UNLOCK (demux);
1066 /* In most cases, this will happen if we set a wrong url in the
1067 * source element and we have received the 404 HTML response instead of
1069 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1074 struct http_headers_collector
1076 GstAdaptiveDemux *demux;
1081 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1082 const GValue * value, gpointer userdata)
1084 struct http_headers_collector *hdr_data = userdata;
1085 GstAdaptiveDemux *demux = hdr_data->demux;
1086 const gchar *field_name = g_quark_to_string (field_id);
1088 if (G_UNLIKELY (value == NULL))
1089 return TRUE; /* This should not happen */
1091 if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1092 const gchar *user_agent = g_value_get_string (value);
1094 GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1095 downloadhelper_set_user_agent (demux->download_helper, user_agent);
1098 if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1099 g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1100 guint i = 0, prev_len = 0, total_len = 0;
1101 gchar **cookies = NULL;
1103 if (hdr_data->cookies != NULL)
1104 prev_len = g_strv_length (hdr_data->cookies);
1106 if (GST_VALUE_HOLDS_ARRAY (value)) {
1107 total_len = gst_value_array_get_size (value) + prev_len;
1108 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1110 for (i = 0; i < gst_value_array_get_size (value); i++) {
1111 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1112 g_value_get_string (gst_value_array_get_value (value, i)));
1113 cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1115 } else if (G_VALUE_HOLDS_STRING (value)) {
1116 total_len = 1 + prev_len;
1117 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1119 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1120 g_value_get_string (value));
1121 cookies[0] = g_value_dup_string (value);
1123 GST_WARNING_OBJECT (demux, "%s field is not string or array",
1124 g_quark_to_string (field_id));
1130 for (j = 0; j < prev_len; j++) {
1131 GST_DEBUG_OBJECT (demux,
1132 "Append existing cookie %s", hdr_data->cookies[j]);
1133 cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1136 cookies[total_len] = NULL;
1138 g_strfreev (hdr_data->cookies);
1139 hdr_data->cookies = cookies;
1143 if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1144 const gchar *referer = g_value_get_string (value);
1145 GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1147 downloadhelper_set_referer (demux->download_helper, referer);
1150 /* Date header can be used to estimate server offset */
1151 if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1152 const gchar *http_date = g_value_get_string (value);
1155 GstDateTime *datetime =
1156 gst_adaptive_demux_util_parse_http_head_date (http_date);
1159 GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1160 gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1162 GST_INFO_OBJECT (demux,
1163 "HTTP response Date %s", GST_STR_NULL (date_string));
1164 g_free (date_string);
1166 gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1168 g_date_time_unref (utc_now);
1169 gst_date_time_unref (datetime);
1178 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1181 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1184 switch (event->type) {
1185 case GST_EVENT_FLUSH_STOP:{
1186 GST_API_LOCK (demux);
1187 GST_MANIFEST_LOCK (demux);
1189 gst_adaptive_demux_reset (demux);
1191 ret = gst_pad_event_default (pad, parent, event);
1193 GST_MANIFEST_UNLOCK (demux);
1194 GST_API_UNLOCK (demux);
1200 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1201 if (!handle_incoming_manifest (demux)) {
1202 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1203 return gst_pad_event_default (pad, parent, event);
1205 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1207 GST_ERROR_OBJECT (demux,
1208 "Failed to acquire scheduler to handle manifest");
1209 return gst_pad_event_default (pad, parent, event);
1211 gst_event_unref (event);
1214 case GST_EVENT_STREAM_START:
1215 if (gst_event_parse_group_id (event, &demux->group_id))
1216 demux->have_group_id = TRUE;
1218 demux->have_group_id = FALSE;
1219 /* Swallow stream-start, we'll push our own */
1220 gst_event_unref (event);
1222 case GST_EVENT_SEGMENT:
1223 /* Swallow newsegments, we'll push our own */
1224 gst_event_unref (event);
1226 case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1227 const GstStructure *structure = gst_event_get_structure (event);
1228 struct http_headers_collector c = { demux, NULL };
1230 if (gst_structure_has_name (structure, "http-headers")) {
1231 if (gst_structure_has_field (structure, "request-headers")) {
1232 GstStructure *req_headers = NULL;
1233 gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1234 &req_headers, NULL);
1236 gst_structure_foreach (req_headers,
1237 gst_adaptive_demux_handle_upstream_http_header, &c);
1238 gst_structure_free (req_headers);
1241 if (gst_structure_has_field (structure, "response-headers")) {
1242 GstStructure *res_headers = NULL;
1243 gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1244 &res_headers, NULL);
1246 gst_structure_foreach (res_headers,
1247 gst_adaptive_demux_handle_upstream_http_header, &c);
1248 gst_structure_free (res_headers);
1253 downloadhelper_set_cookies (demux->download_helper, c.cookies);
1261 return gst_pad_event_default (pad, parent, event);
1264 static GstFlowReturn
1265 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1268 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1270 GST_MANIFEST_LOCK (demux);
1272 gst_adapter_push (demux->priv->input_adapter, buffer);
1274 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1275 (gint) gst_adapter_available (demux->priv->input_adapter));
1277 GST_MANIFEST_UNLOCK (demux);
1282 /* Called with TRACKS_LOCK taken */
1284 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1288 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1289 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1291 gst_adaptive_demux_track_flush (track);
1292 if (gst_pad_is_active (track->sinkpad)) {
1293 gst_pad_set_active (track->sinkpad, FALSE);
1294 gst_pad_set_active (track->sinkpad, TRUE);
1299 /* Resets all tracks to their initial state, ready to receive new data. */
1301 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1303 TRACKS_LOCK (demux);
1304 g_queue_foreach (demux->priv->periods,
1305 (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1306 TRACKS_UNLOCK (demux);
1309 /* Subclasses will call this function to ensure that a new input period is
1310 * available to receive new streams and tracks */
1312 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1314 if (demux->input_period && !demux->input_period->prepared) {
1315 GST_DEBUG_OBJECT (demux, "Using existing input period");
1319 if (demux->input_period) {
1320 GST_DEBUG_OBJECT (demux, "Marking that previous period has a next one");
1321 demux->input_period->has_next_period = TRUE;
1323 GST_DEBUG_OBJECT (demux, "Setting up new period");
1325 demux->input_period = gst_adaptive_demux_period_new (demux);
1330 /* must be called with manifest_lock taken */
1332 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1334 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1337 gst_adaptive_demux_stop_tasks (demux, TRUE);
1340 klass->reset (demux);
1342 /* Disable and remove all outputs */
1343 GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1344 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1345 gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1347 g_list_free (demux->priv->outputs);
1348 demux->priv->outputs = NULL;
1350 g_queue_clear_full (demux->priv->periods,
1351 (GDestroyNotify) gst_adaptive_demux_period_unref);
1353 /* The output period always has an extra ref taken on it */
1354 if (demux->output_period)
1355 gst_adaptive_demux_period_unref (demux->output_period);
1356 demux->output_period = NULL;
1357 /* The input period doesn't have an extra ref taken on it */
1358 demux->input_period = NULL;
1360 gst_adaptive_demux_start_new_period (demux);
1362 g_free (demux->manifest_uri);
1363 g_free (demux->manifest_base_uri);
1364 demux->manifest_uri = NULL;
1365 demux->manifest_base_uri = NULL;
1367 gst_adapter_clear (demux->priv->input_adapter);
1368 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1370 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1371 demux->instant_rate_multiplier = 1.0;
1373 demux->priv->duration = GST_CLOCK_TIME_NONE;
1375 demux->priv->percent = -1;
1376 demux->priv->is_buffering = TRUE;
1378 demux->have_group_id = FALSE;
1379 demux->group_id = G_MAXUINT;
1380 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1382 demux->priv->global_output_position = 0;
1384 demux->priv->n_audio_streams = 0;
1385 demux->priv->n_video_streams = 0;
1386 demux->priv->n_subtitle_streams = 0;
1388 gst_flow_combiner_reset (demux->priv->flowcombiner);
1392 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1394 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1396 GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1398 switch (GST_QUERY_TYPE (query)) {
1399 case GST_QUERY_BUFFERING:
1402 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1404 if (!demux->output_period) {
1405 if (format != GST_FORMAT_TIME) {
1406 GST_DEBUG_OBJECT (demux,
1407 "No period setup yet, can't answer non-TIME buffering queries");
1411 GST_DEBUG_OBJECT (demux,
1412 "No period setup yet, but still answering buffering query");
1420 return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1423 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1424 static GstAdaptiveDemux2Stream *
1425 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1429 /* We only look in the streams of the input period (i.e. with active streams) */
1430 for (iter = demux->input_period->streams; iter; iter = iter->next) {
1431 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1432 if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1440 /* TRACKS_LOCK held */
1441 static GstAdaptiveDemuxTrack *
1442 gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
1443 GstStreamType stream_type)
1447 for (iter = stream->tracks; iter; iter = iter->next) {
1448 GstAdaptiveDemuxTrack *track = iter->data;
1450 if (track->type == stream_type)
1457 /* MANIFEST and TRACKS lock held */
1459 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
1460 GstAdaptiveDemux2Stream * stream)
1464 GST_DEBUG_OBJECT (stream, "Updating track information from collection");
1466 for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
1468 GstStream *gst_stream =
1469 gst_stream_collection_get_stream (stream->stream_collection, i);
1470 GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
1471 GstAdaptiveDemuxTrack *track;
1473 if (stream_type == GST_STREAM_TYPE_UNKNOWN)
1475 track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
1477 GST_DEBUG_OBJECT (stream,
1478 "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
1483 if (track->upstream_stream_id)
1484 g_free (track->upstream_stream_id);
1485 track->upstream_stream_id =
1486 g_strdup (gst_stream_get_stream_id (gst_stream));
1492 tags_have_language_info (GstTagList * tags)
1494 const gchar *language = NULL;
1499 if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
1502 if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
1510 can_handle_collection (GstAdaptiveDemux2Stream * stream,
1511 GstStreamCollection * collection)
1514 guint nb_audio, nb_video, nb_text;
1515 gboolean have_audio_languages = TRUE;
1516 gboolean have_text_languages = TRUE;
1518 nb_audio = nb_video = nb_text = 0;
1520 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1521 GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
1522 GstTagList *tags = gst_stream_get_tags (gst_stream);
1524 GST_DEBUG_OBJECT (stream,
1525 "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
1526 switch (gst_stream_get_stream_type (gst_stream)) {
1527 case GST_STREAM_TYPE_AUDIO:
1528 have_audio_languages &= tags_have_language_info (tags);
1531 case GST_STREAM_TYPE_VIDEO:
1534 case GST_STREAM_TYPE_TEXT:
1535 have_text_languages &= tags_have_language_info (tags);
1543 /* Check that we either have at most 1 of each track type, or that
1544 * we have language tags for each to tell which is which */
1546 (nb_audio > 1 && !have_audio_languages) ||
1547 (nb_text > 1 && !have_text_languages)) {
1549 ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
1550 nb_audio, nb_video, nb_text);
1558 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1561 GstAdaptiveDemux2Stream *stream;
1562 GstStreamCollection *collection = NULL;
1563 gboolean pending_tracks_activated = FALSE;
1565 GST_MANIFEST_LOCK (demux);
1567 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1568 if (stream == NULL) {
1569 GST_WARNING_OBJECT (demux,
1570 "Failed to locate stream for collection message");
1574 gst_message_parse_stream_collection (msg, &collection);
1578 /* Check whether the collection is "sane" or not.
1580 * In the context of adaptive streaming, we can only handle multiplexed
1581 * content that provides at most one stream of valid types (audio, video,
1582 * text). Without this we cannot reliably match the output of this multiplex
1583 * to the various tracks.
1585 * FIXME : In the future and *IF* we encounter such streams, we could envision
1586 * supporting multiple streams of the same type if, and only if, they have
1587 * tags that allow differentiating them (ex: languages).
1589 if (!can_handle_collection (stream, collection)) {
1590 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1591 (_("Stream format can't be handled")),
1592 ("The streams provided by the multiplex are ambiguous"));
1596 /* Store the collection on the stream */
1597 gst_object_replace ((GstObject **) & stream->stream_collection,
1598 (GstObject *) collection);
1600 /* IF there are pending tracks, ask the subclass to handle that */
1601 if (stream->pending_tracks) {
1602 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1603 g_assert (demux_class->stream_update_tracks);
1604 demux_class->stream_update_tracks (demux, stream);
1605 TRACKS_LOCK (demux);
1606 stream->pending_tracks = FALSE;
1607 pending_tracks_activated = TRUE;
1608 if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1609 demux->input_period == demux->output_period)
1610 gst_adaptive_demux_post_collection (demux);
1612 g_assert (stream->tracks);
1613 TRACKS_LOCK (demux);
1614 /* If we already have assigned tracks, update the pending upstream stream_id
1615 * for each of them based on the collection information. */
1616 gst_adaptive_demux2_stream_update_tracks (demux, stream);
1619 /* If we discovered pending tracks and we no longer have any, we can ensure
1620 * selected tracks are started */
1621 if (pending_tracks_activated
1622 && !gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1623 GList *iter = demux->input_period->streams;
1624 for (; iter; iter = iter->next) {
1625 GstAdaptiveDemux2Stream *new_stream = iter->data;
1627 /* The stream that posted this collection was already started. If a
1628 * different stream is now selected, start it */
1629 if (stream != new_stream
1630 && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1631 gst_adaptive_demux2_stream_start (new_stream);
1634 TRACKS_UNLOCK (demux);
1637 GST_MANIFEST_UNLOCK (demux);
1639 gst_message_unref (msg);
1644 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1646 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1648 switch (GST_MESSAGE_TYPE (msg)) {
1649 case GST_MESSAGE_STREAM_COLLECTION:
1651 gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1654 case GST_MESSAGE_ERROR:{
1655 GstAdaptiveDemux2Stream *stream = NULL;
1657 gchar *debug = NULL;
1658 gchar *new_error = NULL;
1659 const GstStructure *details = NULL;
1661 GST_MANIFEST_LOCK (demux);
1663 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1664 if (stream == NULL) {
1665 GST_WARNING_OBJECT (demux,
1666 "Failed to locate stream for errored element");
1667 GST_MANIFEST_UNLOCK (demux);
1671 gst_message_parse_error (msg, &err, &debug);
1673 GST_WARNING_OBJECT (demux,
1674 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1675 err->message, debug);
1678 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1680 g_free (err->message);
1681 err->message = new_error;
1684 gst_message_parse_error_details (msg, &details);
1686 gst_structure_get_uint (details, "http-status-code",
1687 &stream->last_status_code);
1690 /* error, but ask to retry */
1691 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1692 gst_adaptive_demux2_stream_parse_error (stream, err);
1693 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1699 GST_MANIFEST_UNLOCK (demux);
1701 gst_message_unref (msg);
1710 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1713 /* must be called with manifest_lock taken */
1715 gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1716 GstAdaptiveDemux2Stream * stream)
1718 GstAdaptiveDemuxClass *klass;
1720 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1722 if (klass->get_presentation_offset == NULL)
1725 return klass->get_presentation_offset (demux, stream);
1728 /* must be called with manifest_lock taken */
1730 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1732 GstAdaptiveDemuxClass *klass;
1734 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1736 if (klass->get_period_start_time == NULL)
1739 return klass->get_period_start_time (demux);
1742 /* must be called with manifest_lock taken */
1744 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1745 gboolean first_and_live)
1748 GstClockTime period_start;
1749 GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1752 g_return_val_if_fail (demux->input_period->streams, FALSE);
1753 g_assert (demux->input_period->prepared == FALSE);
1755 new_streams = demux->input_period->streams;
1757 if (!gst_adaptive_demux2_is_running (demux)) {
1758 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1762 GST_DEBUG_OBJECT (demux,
1763 "Preparing %d streams for period %d , first_and_live:%d",
1764 g_list_length (new_streams), demux->input_period->period_num,
1767 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1768 GstAdaptiveDemux2Stream *stream = iter->data;
1770 GST_DEBUG_OBJECT (stream, "Preparing stream");
1772 stream->need_header = TRUE;
1773 stream->discont = TRUE;
1775 /* Grab the first stream time for live streams
1776 * * If the stream is selected
1777 * * Or it provides dynamic tracks (in which case we need to force an update)
1780 && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1781 || stream->pending_tracks)) {
1782 /* TODO we only need the first timestamp, maybe create a simple function to
1783 * get the current PTS of a fragment ? */
1784 GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1785 gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1787 GST_DEBUG_OBJECT (stream,
1788 "Got stream time %" GST_STIME_FORMAT,
1789 GST_STIME_ARGS (stream->fragment.stream_time));
1791 if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1792 min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1794 min_stream_time = stream->fragment.stream_time;
1799 period_start = gst_adaptive_demux_get_period_start_time (demux);
1801 /* For live streams, the subclass is supposed to seek to the current fragment
1802 * and then tell us its stream time in stream->fragment.stream_time. We now
1803 * also have to seek our demuxer segment to reflect this.
1805 * FIXME: This needs some refactoring at some point.
1807 if (first_and_live) {
1808 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1809 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1810 GST_SEEK_TYPE_NONE, -1, NULL);
1813 GST_DEBUG_OBJECT (demux,
1814 "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1815 " demux segment %" GST_SEGMENT_FORMAT,
1816 GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1819 /* Synchronize stream start/current positions */
1820 if (min_stream_time == GST_CLOCK_STIME_NONE)
1821 min_stream_time = period_start;
1823 min_stream_time += period_start;
1824 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1825 GstAdaptiveDemux2Stream *stream = iter->data;
1826 stream->start_position = stream->current_position = min_stream_time;
1829 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1830 GstAdaptiveDemux2Stream *stream = iter->data;
1831 stream->compute_segment = TRUE;
1832 stream->first_and_live = first_and_live;
1834 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1835 demux->input_period->prepared = TRUE;
1840 static GstAdaptiveDemuxTrack *
1841 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1845 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1846 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1847 if (!g_strcmp0 (track->stream_id, stream_id))
1854 /* TRACKS_LOCK hold */
1856 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1858 GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1859 GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1860 GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1862 gint min_percent = -1, percent;
1863 gboolean all_eos = TRUE;
1865 /* Go over all active tracks of the output period and update level */
1867 /* Check that all tracks are above their respective low thresholds (different
1868 * tracks may have different fragment durations yielding different buffering
1869 * percentages) Overall buffering percent is the lowest. */
1870 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1871 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1873 GST_LOG_OBJECT (demux,
1874 "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1875 GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1876 track->stream_id, track->period_num, track->active, track->selected,
1877 track->eos, GST_TIME_ARGS (track->level_time),
1878 GST_TIME_ARGS (track->buffering_threshold));
1880 if (track->active && track->selected) {
1885 if (min_level_time == GST_CLOCK_TIME_NONE) {
1886 min_level_time = track->level_time;
1887 } else if (track->level_time < min_level_time) {
1888 min_level_time = track->level_time;
1891 if (track->type & GST_STREAM_TYPE_VIDEO
1892 && video_level_time > track->level_time)
1893 video_level_time = track->level_time;
1895 if (track->type & GST_STREAM_TYPE_AUDIO
1896 && audio_level_time > track->level_time)
1897 audio_level_time = track->level_time;
1899 if (track->level_time != GST_CLOCK_TIME_NONE
1900 && track->buffering_threshold != 0) {
1902 gst_util_uint64_scale (track->level_time, 100,
1903 track->buffering_threshold);
1904 if (min_percent < 0 || cur_percent < min_percent)
1905 min_percent = cur_percent;
1911 GST_DEBUG_OBJECT (demux,
1912 "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1913 GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1915 /* Update demuxer video/audio level properties */
1916 GST_OBJECT_LOCK (demux);
1917 demux->current_level_time_video = video_level_time;
1918 demux->current_level_time_audio = audio_level_time;
1919 GST_OBJECT_UNLOCK (demux);
1921 if (min_percent < 0 && !all_eos)
1924 if (min_percent > 100 || all_eos)
1927 percent = MAX (0, min_percent);
1929 GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1931 if (demux->priv->is_buffering) {
1933 demux->priv->is_buffering = FALSE;
1934 if (demux->priv->percent != percent) {
1935 demux->priv->percent = percent;
1936 demux->priv->percent_changed = TRUE;
1938 } else if (percent < 1) {
1939 demux->priv->is_buffering = TRUE;
1940 if (demux->priv->percent != percent) {
1941 demux->priv->percent = percent;
1942 demux->priv->percent_changed = TRUE;
1946 if (demux->priv->percent_changed)
1947 GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1948 demux->priv->is_buffering);
1951 /* With TRACKS_LOCK held */
1953 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1958 if (!demux->priv->percent_changed)
1961 BUFFERING_LOCK (demux);
1962 percent = demux->priv->percent;
1963 msg = gst_message_new_buffering ((GstObject *) demux, percent);
1964 TRACKS_UNLOCK (demux);
1965 gst_element_post_message ((GstElement *) demux, msg);
1967 BUFFERING_UNLOCK (demux);
1968 TRACKS_LOCK (demux);
1969 if (percent == demux->priv->percent)
1970 demux->priv->percent_changed = FALSE;
1973 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1974 GstAdaptiveDemux2Stream *
1975 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1976 GstAdaptiveDemuxTrack * track)
1980 for (iter = demux->output_period->streams; iter; iter = iter->next) {
1981 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1982 if (g_list_find (stream->tracks, track))
1989 /* Called from seek handler
1991 * This function is used when a (flushing) seek caused a new period to be activated.
1993 * This will ensure that:
1994 * * the current output period is marked as finished (EOS)
1995 * * Any potential intermediate (non-input/non-output) periods are removed
1996 * * That the new input period is prepared and ready
1999 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
2003 GST_DEBUG_OBJECT (demux,
2004 "Preparing new input period %u", demux->input_period->period_num);
2006 /* Prepare the new input period */
2007 gst_adaptive_demux_update_collection (demux, demux->input_period);
2009 /* Transfer the previous selection to the new input period */
2010 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
2011 demux->output_period);
2012 gst_adaptive_demux_prepare_streams (demux, FALSE);
2014 /* Remove all periods except for the input (last) and output (first) period */
2015 while (demux->priv->periods->length > 2) {
2016 GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
2017 /* Mark all tracks of the removed period as not selected and EOS so they
2018 * will be skipped / ignored */
2019 for (iter = period->tracks; iter; iter = iter->next) {
2020 GstAdaptiveDemuxTrack *track = iter->data;
2021 track->selected = FALSE;
2024 gst_adaptive_demux_period_unref (period);
2027 /* Mark all tracks of the output period as EOS so that the output loop
2028 * will immediately move to the new period */
2029 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2030 GstAdaptiveDemuxTrack *track = iter->data;
2034 /* Go over all slots, and clear any pending track */
2035 for (iter = demux->priv->outputs; iter; iter = iter->next) {
2036 OutputSlot *slot = (OutputSlot *) iter->data;
2038 if (slot->pending_track != NULL) {
2039 GST_DEBUG_OBJECT (demux,
2040 "Removing track '%s' as pending from output of current track '%s'",
2041 slot->pending_track->stream_id, slot->track->stream_id);
2042 gst_adaptive_demux_track_unref (slot->pending_track);
2043 slot->pending_track = NULL;
2048 /* must be called with manifest_lock taken */
2050 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
2051 gint64 * range_start, gint64 * range_stop)
2053 GstAdaptiveDemuxClass *klass;
2055 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2057 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
2059 return klass->get_live_seek_range (demux, range_start, range_stop);
2062 /* must be called with manifest_lock taken */
2064 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
2065 GstAdaptiveDemux2Stream * stream)
2067 gint64 range_start, range_stop;
2068 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
2069 GST_LOG_OBJECT (stream,
2070 "stream position %" GST_TIME_FORMAT " live seek range %"
2071 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
2072 GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
2073 GST_STIME_ARGS (range_stop));
2074 return (stream->current_position >= range_start
2075 && stream->current_position <= range_stop);
2081 /* must be called with manifest_lock taken */
2083 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
2085 GstAdaptiveDemuxClass *klass;
2087 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2088 if (gst_adaptive_demux_is_live (demux)) {
2089 return klass->get_live_seek_range != NULL;
2092 return klass->seek != NULL;
2096 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
2097 GstSeekType start_type, GstSeekType stop_type)
2101 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2102 GstAdaptiveDemux2Stream *stream = iter->data;
2104 /* Make sure the download loop clears and restarts on the next start,
2105 * which will recompute the stream segment */
2106 g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2107 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
2108 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2109 stream->start_position = 0;
2111 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
2112 stream->start_position = demux->segment.start;
2113 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
2114 stream->start_position = demux->segment.stop;
2118 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
2119 GST_SEEK_FLAG_SNAP_AFTER | \
2120 GST_SEEK_FLAG_SNAP_NEAREST | \
2121 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
2122 GST_SEEK_FLAG_KEY_UNIT))
2123 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
2124 GST_SEEK_FLAG_SNAP_AFTER | \
2125 GST_SEEK_FLAG_SNAP_NEAREST))
2128 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
2131 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2135 GstSeekType start_type, stop_type;
2140 GstSegment oldsegment;
2141 GstEvent *flush_event;
2143 GST_INFO_OBJECT (demux, "Received seek event");
2145 GST_API_LOCK (demux);
2147 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2150 if (format != GST_FORMAT_TIME) {
2151 GST_API_UNLOCK (demux);
2152 GST_WARNING_OBJECT (demux,
2153 "Adaptive demuxers only support TIME-based seeking");
2154 gst_event_unref (event);
2158 if (flags & GST_SEEK_FLAG_SEGMENT) {
2159 GST_FIXME_OBJECT (demux, "Handle segment seeks");
2160 GST_API_UNLOCK (demux);
2161 gst_event_unref (event);
2165 seqnum = gst_event_get_seqnum (event);
2167 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2168 GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2172 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2173 /* For instant rate seeks, reply directly and update
2174 * our segment so the new rate is reflected in any future
2177 gdouble rate_multiplier;
2179 /* instant rate change only supported if direction does not change. All
2180 * other requirements are already checked before creating the seek event
2181 * but let's double-check here to be sure */
2182 if ((demux->segment.rate > 0 && rate < 0) ||
2183 (demux->segment.rate < 0 && rate > 0) ||
2184 start_type != GST_SEEK_TYPE_NONE ||
2185 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2186 GST_ERROR_OBJECT (demux,
2187 "Instant rate change seeks only supported in the "
2188 "same direction, without flushing and position change");
2189 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2190 GST_API_UNLOCK (demux);
2194 rate_multiplier = rate / demux->segment.rate;
2196 ev = gst_event_new_instant_rate_change (rate_multiplier,
2197 (GstSegmentFlags) flags);
2198 gst_event_set_seqnum (ev, seqnum);
2200 ret = gst_adaptive_demux_push_src_event (demux, ev);
2203 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2204 demux->instant_rate_multiplier = rate_multiplier;
2205 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2208 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2209 GST_API_UNLOCK (demux);
2210 gst_event_unref (event);
2215 if (!gst_adaptive_demux_can_seek (demux)) {
2216 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2218 GST_API_UNLOCK (demux);
2219 gst_event_unref (event);
2223 /* We can only accept flushing seeks from this point onward */
2224 if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2225 GST_ERROR_OBJECT (demux,
2226 "Non-flushing non-instant-rate seeks are not possible");
2228 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2230 GST_API_UNLOCK (demux);
2231 gst_event_unref (event);
2235 if (gst_adaptive_demux_is_live (demux)) {
2236 gint64 range_start, range_stop;
2237 gboolean changed = FALSE;
2238 gboolean start_valid = TRUE, stop_valid = TRUE;
2240 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2242 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2243 GST_API_UNLOCK (demux);
2244 gst_event_unref (event);
2245 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2249 GST_DEBUG_OBJECT (demux,
2250 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2251 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2253 /* Handle relative positioning for live streams (relative to the range_stop) */
2254 if (start_type == GST_SEEK_TYPE_END) {
2255 start = range_stop + start;
2256 start_type = GST_SEEK_TYPE_SET;
2259 if (stop_type == GST_SEEK_TYPE_END) {
2260 stop = range_stop + stop;
2261 stop_type = GST_SEEK_TYPE_SET;
2265 /* Adjust the requested start/stop position if it falls beyond the live
2267 * The only case where we don't adjust is for the starting point of
2268 * an accurate seek (start if forward and stop if backwards)
2270 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2271 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2272 GST_DEBUG_OBJECT (demux,
2273 "seek before live stream start, setting to range start: %"
2274 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2275 start = range_start;
2278 /* truncate stop position also if set */
2279 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2280 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2281 GST_DEBUG_OBJECT (demux,
2282 "seek ending after live start, adjusting to: %"
2283 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2288 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2289 (start < range_start || start > range_stop)) {
2290 GST_WARNING_OBJECT (demux,
2291 "Seek to invalid position start:%" GST_STIME_FORMAT
2292 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2293 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2294 GST_STIME_ARGS (range_stop));
2295 start_valid = FALSE;
2297 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2298 (stop < range_start || stop > range_stop)) {
2299 GST_WARNING_OBJECT (demux,
2300 "Seek to invalid position stop:%" GST_STIME_FORMAT
2301 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2302 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2303 GST_STIME_ARGS (range_stop));
2307 /* If the seek position is still outside of the seekable range, refuse the seek */
2308 if (!start_valid || !stop_valid) {
2309 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2310 GST_API_UNLOCK (demux);
2311 gst_event_unref (event);
2315 /* Re-create seek event with changed/updated values */
2317 gst_event_unref (event);
2319 gst_event_new_seek (rate, format, flags,
2320 start_type, start, stop_type, stop);
2321 gst_event_set_seqnum (event, seqnum);
2325 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2327 /* have a backup in case seek fails */
2328 gst_segment_copy_into (&demux->segment, &oldsegment);
2330 GST_DEBUG_OBJECT (demux, "sending flush start");
2331 flush_event = gst_event_new_flush_start ();
2332 gst_event_set_seqnum (flush_event, seqnum);
2334 gst_adaptive_demux_push_src_event (demux, flush_event);
2336 gst_adaptive_demux_stop_tasks (demux, FALSE);
2337 gst_adaptive_demux_reset_tracks (demux);
2339 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2342 * Handle snap seeks as follows:
2343 * 1) do the snap seeking a (random) active stream
2344 * 2) use the final position on this stream to seek
2345 * on the other streams to the same position
2347 * We can't snap at all streams at the same time as they might end in
2348 * different positions, so just pick one and align all others to that
2351 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek) {
2352 GstAdaptiveDemux2Stream *stream = NULL;
2353 GstClockTimeDiff ts;
2354 GstSeekFlags stream_seek_flags = flags;
2357 /* snap-seek on the stream that received the event and then
2358 * use the resulting position to seek on all streams */
2361 if (start_type != GST_SEEK_TYPE_NONE)
2364 ts = gst_segment_position_from_running_time (&demux->segment,
2365 GST_FORMAT_TIME, demux->priv->global_output_position);
2366 start_type = GST_SEEK_TYPE_SET;
2369 if (stop_type != GST_SEEK_TYPE_NONE)
2372 stop_type = GST_SEEK_TYPE_SET;
2373 ts = gst_segment_position_from_running_time (&demux->segment,
2374 GST_FORMAT_TIME, demux->priv->global_output_position);
2378 /* Pick a random active stream on which to do the stream seek */
2379 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2380 GstAdaptiveDemux2Stream *cand = iter->data;
2381 if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2387 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2390 /* replace event with a new one without snapping to seek on all streams */
2391 gst_event_unref (event);
2398 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2399 start_type, start, stop_type, stop);
2400 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2403 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2404 start, stop_type, stop, &update);
2407 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2409 ret = demux_class->seek (demux, event);
2413 /* Is there anything else we can do if it fails? */
2414 gst_segment_copy_into (&oldsegment, &demux->segment);
2416 demux->priv->segment_seqnum = seqnum;
2418 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2420 /* Resetting flow combiner */
2421 gst_flow_combiner_reset (demux->priv->flowcombiner);
2423 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2424 flush_event = gst_event_new_flush_stop (TRUE);
2425 gst_event_set_seqnum (flush_event, seqnum);
2426 gst_adaptive_demux_push_src_event (demux, flush_event);
2428 /* If the seek generated a new period, prepare it */
2429 if (!demux->input_period->prepared) {
2430 /* This can only happen on flushing seeks */
2431 g_assert (flags & GST_SEEK_FLAG_FLUSH);
2432 gst_adaptive_demux_seek_to_input_period (demux);
2435 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2436 GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2438 gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2439 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2441 /* Reset the global output position (running time) for when the output loop restarts */
2442 demux->priv->global_output_position = 0;
2444 /* After a flushing seek, any instant-rate override is undone */
2445 demux->instant_rate_multiplier = 1.0;
2447 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2449 /* Restart the demux */
2450 gst_adaptive_demux_start_tasks (demux);
2452 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2453 GST_API_UNLOCK (demux);
2454 gst_event_unref (event);
2459 /* Returns TRUE if the stream has at least one selected track */
2461 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2466 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2467 GstAdaptiveDemuxTrack *track = tmp->data;
2469 if (track->selected)
2477 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2480 gboolean selection_handled = TRUE;
2482 GList *tracks = NULL;
2484 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2487 TRACKS_LOCK (demux);
2488 /* Validate the streams and fill:
2489 * tracks : list of tracks corresponding to requested streams
2491 for (iter = streams; iter; iter = iter->next) {
2492 gchar *stream_id = (gchar *) iter->data;
2493 GstAdaptiveDemuxTrack *track;
2495 GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2496 track = find_track_for_stream_id (demux->output_period, stream_id);
2498 GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2499 selection_handled = FALSE;
2500 goto select_streams_done;
2502 tracks = g_list_append (tracks, track);
2503 GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2506 /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2507 * SCHEDULING THREAD */
2509 /* FIXME: We want to iterate all streams, mark them as deselected,
2510 * then iterate tracks and mark any streams that have at least 1
2511 * active output track, then loop over all streams again and start/stop
2514 /* Go over all tracks present and (de)select based on current selection */
2515 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2516 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2518 if (track->selected && !g_list_find (tracks, track)) {
2519 GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2520 track->stream_id, track->active);
2521 track->selected = FALSE;
2522 track->draining = TRUE;
2523 } else if (!track->selected && g_list_find (tracks, track)) {
2524 GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2526 track->selected = TRUE;
2530 /* Start or stop streams based on the updated track selection */
2531 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2532 GstAdaptiveDemux2Stream *stream = iter->data;
2535 gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2536 gboolean should_be_running =
2537 gst_adaptive_demux2_stream_has_selected_tracks (stream);
2539 if (!is_running && should_be_running) {
2540 GstClockTime output_running_ts = demux->priv->global_output_position;
2541 GstClockTime start_position;
2543 /* Calculate where we should start the stream, and then
2545 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2547 GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2548 GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2549 GST_TIME_ARGS (output_running_ts), &demux->segment);
2552 gst_segment_position_from_running_time (&demux->segment,
2553 GST_FORMAT_TIME, output_running_ts);
2555 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2557 GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2558 GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2560 stream->current_position = stream->start_position = start_position;
2561 stream->compute_segment = TRUE;
2563 /* If output has already begun, ensure we seek this segment
2564 * to the correct restart position when the download loop begins */
2565 if (output_running_ts != 0)
2566 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2568 /* Activate track pads for this stream */
2569 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2570 GstAdaptiveDemuxTrack *track =
2571 (GstAdaptiveDemuxTrack *) trackiter->data;
2572 gst_pad_set_active (track->sinkpad, TRUE);
2575 gst_adaptive_demux2_stream_start (stream);
2576 } else if (is_running && !should_be_running) {
2577 /* Stream should not be running and needs stopping */
2578 gst_adaptive_demux2_stream_stop (stream);
2580 /* Set all track sinkpads to inactive for this stream */
2581 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2582 GstAdaptiveDemuxTrack *track =
2583 (GstAdaptiveDemuxTrack *) trackiter->data;
2584 gst_pad_set_active (track->sinkpad, FALSE);
2589 g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2591 select_streams_done:
2592 demux_update_buffering_locked (demux);
2593 demux_post_buffering_locked (demux);
2595 TRACKS_UNLOCK (demux);
2596 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2599 g_list_free (tracks);
2600 return selection_handled;
2604 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2607 GstAdaptiveDemux *demux;
2609 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2611 switch (event->type) {
2612 case GST_EVENT_SEEK:
2614 guint32 seqnum = gst_event_get_seqnum (event);
2615 if (seqnum == demux->priv->segment_seqnum) {
2616 GST_LOG_OBJECT (pad,
2617 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2618 gst_event_unref (event);
2621 return gst_adaptive_demux_handle_seek_event (demux, event);
2623 case GST_EVENT_LATENCY:{
2624 /* Upstream and our internal source are irrelevant
2625 * for latency, and we should not fail here to
2626 * configure the latency */
2627 gst_event_unref (event);
2630 case GST_EVENT_QOS:{
2631 GstClockTimeDiff diff;
2632 GstClockTime timestamp;
2633 GstClockTime earliest_time;
2635 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2636 /* Only take into account lateness if late */
2638 earliest_time = timestamp + 2 * diff;
2640 earliest_time = timestamp;
2642 GST_OBJECT_LOCK (demux);
2643 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2644 earliest_time > demux->priv->qos_earliest_time) {
2645 demux->priv->qos_earliest_time = earliest_time;
2646 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2647 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2649 GST_OBJECT_UNLOCK (demux);
2652 case GST_EVENT_SELECT_STREAMS:
2655 gboolean selection_handled;
2657 if (GST_EVENT_SEQNUM (event) ==
2658 g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2659 GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2660 GST_EVENT_SEQNUM (event));
2664 gst_event_parse_select_streams (event, &streams);
2666 handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2667 g_list_free_full (streams, g_free);
2668 return selection_handled;
2674 return gst_pad_event_default (pad, parent, event);
2678 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2681 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2682 gboolean ret = FALSE;
2688 switch (query->type) {
2689 case GST_QUERY_DURATION:{
2691 GstClockTime duration = GST_CLOCK_TIME_NONE;
2693 gst_query_parse_duration (query, &fmt, NULL);
2695 if (gst_adaptive_demux_is_live (demux)) {
2696 /* We are able to answer this query: the duration is unknown */
2697 gst_query_set_duration (query, fmt, -1);
2702 if (fmt == GST_FORMAT_TIME
2703 && g_atomic_int_get (&demux->priv->have_manifest)) {
2705 GST_MANIFEST_LOCK (demux);
2706 duration = demux->priv->duration;
2707 GST_MANIFEST_UNLOCK (demux);
2709 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2710 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2715 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2716 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2719 case GST_QUERY_LATENCY:{
2720 gst_query_set_latency (query, FALSE, 0, -1);
2724 case GST_QUERY_SEEKING:{
2729 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2730 GST_INFO_OBJECT (demux,
2731 "Don't have manifest yet, can't answer seeking query");
2732 return FALSE; /* can't answer without manifest */
2735 GST_MANIFEST_LOCK (demux);
2737 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2738 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2739 if (fmt == GST_FORMAT_TIME) {
2740 GstClockTime duration;
2741 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2745 if (gst_adaptive_demux_is_live (demux)) {
2746 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2748 GST_MANIFEST_UNLOCK (demux);
2749 GST_INFO_OBJECT (demux, "can't answer seeking query");
2753 duration = demux->priv->duration;
2754 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2758 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2759 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2760 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2761 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2763 GST_MANIFEST_UNLOCK (demux);
2768 GST_MANIFEST_LOCK (demux);
2770 /* TODO HLS can answer this differently it seems */
2771 if (demux->manifest_uri) {
2772 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2773 * playlist or the the uri of the last downlowaded fragment? */
2774 gst_query_set_uri (query, demux->manifest_uri);
2778 GST_MANIFEST_UNLOCK (demux);
2780 case GST_QUERY_SELECTABLE:
2781 gst_query_set_selectable (query, TRUE);
2785 /* Don't forward queries upstream because of the special nature of this
2786 * "demuxer", which relies on the upstream element only to be fed
2796 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2800 GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2803 gst_event_new_seek (1.0, GST_FORMAT_TIME,
2804 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2805 GST_SEEK_TYPE_NONE, 0);
2806 gst_adaptive_demux_handle_seek_event (demux, seek);
2811 /* Called when the scheduler starts, to kick off manifest updates
2812 * and stream downloads */
2814 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2818 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2820 iter = demux->input_period->streams;
2822 for (; iter; iter = g_list_next (iter)) {
2823 GstAdaptiveDemux2Stream *stream = iter->data;
2825 /* If we need to process this stream to discover tracks *OR* it has any
2826 * tracks which are selected, start it now */
2827 if ((stream->pending_tracks == TRUE)
2828 || gst_adaptive_demux2_stream_is_selected_locked (stream))
2829 gst_adaptive_demux2_stream_start (stream);
2835 /* must be called with manifest_lock taken */
2837 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2839 if (!gst_adaptive_demux2_is_running (demux)) {
2840 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2844 GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2845 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2846 (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2848 TRACKS_LOCK (demux);
2849 demux->priv->flushing = FALSE;
2850 GST_DEBUG_OBJECT (demux, "Starting the output task");
2851 gst_task_start (demux->priv->output_task);
2852 TRACKS_UNLOCK (demux);
2855 /* must be called with manifest_lock taken */
2857 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2859 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2860 if (demux->priv->manifest_updates_cb != 0) {
2861 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2862 demux->priv->manifest_updates_cb);
2863 demux->priv->manifest_updates_cb = 0;
2867 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2869 /* must be called with manifest_lock taken */
2871 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2873 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2875 if (gst_adaptive_demux_is_live (demux)) {
2876 /* Task to periodically update the manifest */
2877 if (demux_class->requires_periodical_playlist_update (demux)) {
2878 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2879 if (demux->priv->manifest_updates_cb == 0) {
2880 demux->priv->manifest_updates_cb =
2881 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2882 (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2888 /* must be called with manifest_lock taken
2889 * This function will temporarily release manifest_lock in order to join the
2891 * The api_lock will still protect it against other threads trying to modify
2892 * the demux element.
2895 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2897 GST_LOG_OBJECT (demux, "Stopping tasks");
2900 gst_adaptive_demux_stop_manifest_update_task (demux);
2902 TRACKS_LOCK (demux);
2903 demux->priv->flushing = TRUE;
2904 g_cond_signal (&demux->priv->tracks_add);
2905 gst_task_stop (demux->priv->output_task);
2906 TRACKS_UNLOCK (demux);
2908 gst_task_join (demux->priv->output_task);
2910 if (demux->input_period)
2911 gst_adaptive_demux_period_stop_tasks (demux->input_period);
2913 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2916 /* must be called with manifest_lock taken */
2918 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2921 gboolean ret = TRUE;
2923 GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2925 TRACKS_LOCK (demux);
2926 for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2927 OutputSlot *slot = (OutputSlot *) iter->data;
2928 gst_event_ref (event);
2929 GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2930 ret = ret & gst_pad_push_event (slot->pad, event);
2931 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2932 slot->pushed_timed_data = FALSE;
2934 TRACKS_UNLOCK (demux);
2935 gst_event_unref (event);
2939 /* must be called with manifest_lock taken */
2941 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2944 GST_DEBUG_OBJECT (stream,
2945 "setting new caps for stream %" GST_PTR_FORMAT, caps);
2946 gst_caps_replace (&stream->pending_caps, caps);
2947 gst_caps_unref (caps);
2950 /* must be called with manifest_lock taken */
2952 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2955 GST_DEBUG_OBJECT (stream,
2956 "setting new tags for stream %" GST_PTR_FORMAT, tags);
2957 if (stream->pending_tags) {
2958 gst_tag_list_unref (stream->pending_tags);
2960 stream->pending_tags = tags;
2963 /* must be called with manifest_lock taken */
2965 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2968 stream->pending_events = g_list_append (stream->pending_events, event);
2972 _update_average_bitrate (GstAdaptiveDemux * demux,
2973 GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
2975 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2977 stream->moving_bitrate -= stream->fragment_bitrates[index];
2978 stream->fragment_bitrates[index] = new_bitrate;
2979 stream->moving_bitrate += new_bitrate;
2981 stream->moving_index += 1;
2983 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2984 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2985 return stream->moving_bitrate / stream->moving_index;
2989 gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2990 GstAdaptiveDemux2Stream * stream)
2992 guint64 average_bitrate;
2993 guint64 fragment_bitrate;
2994 guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
2996 fragment_bitrate = stream->last_bitrate;
2997 GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
3000 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
3002 GST_INFO_OBJECT (stream,
3003 "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
3004 GST_INFO_OBJECT (stream,
3005 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
3006 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
3008 /* Conservative approach, make sure we don't upgrade too fast */
3009 GST_OBJECT_LOCK (demux);
3010 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
3012 /* If this is the/a video stream update the overall demuxer
3013 * reported bitrate and notify, to give the application a
3014 * chance to choose a new connection-bitrate */
3015 if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
3016 demux->current_download_rate = stream->current_download_rate;
3017 GST_OBJECT_UNLOCK (demux);
3018 g_object_notify (G_OBJECT (demux), "current-bandwidth");
3019 GST_OBJECT_LOCK (demux);
3022 connection_speed = demux->connection_speed;
3023 min_bitrate = demux->min_bitrate;
3024 max_bitrate = demux->max_bitrate;
3025 GST_OBJECT_UNLOCK (demux);
3027 if (connection_speed) {
3028 GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
3029 connection_speed / 1000);
3030 return connection_speed;
3033 /* No explicit connection_speed, so choose the new variant to use as a
3034 * fraction of the measured download rate */
3035 target_download_rate =
3036 CLAMP (stream->current_download_rate, 0,
3037 G_MAXUINT) * demux->bandwidth_target_ratio;
3039 GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
3040 demux->bandwidth_target_ratio, target_download_rate);
3043 /* Debugging code, modulate the bitrate every few fragments */
3045 static guint ctr = 0;
3047 GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
3048 target_download_rate /= 2;
3054 if (min_bitrate > 0 && target_download_rate < min_bitrate) {
3055 target_download_rate = min_bitrate;
3056 GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
3060 if (max_bitrate > 0 && target_download_rate > max_bitrate) {
3061 target_download_rate = max_bitrate;
3062 GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
3066 GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
3067 target_download_rate);
3069 return target_download_rate;
3072 /* must be called with manifest_lock taken */
3073 static GstFlowReturn
3074 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
3075 GstAdaptiveDemux2Stream * stream)
3077 /* No need to advance, this isn't a real fragment */
3078 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
3081 return gst_adaptive_demux2_stream_advance_fragment (demux, stream,
3082 stream->fragment.duration);
3085 /* must be called with manifest_lock taken.
3086 * Can temporarily release manifest_lock
3088 static GstFlowReturn
3089 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
3090 GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
3092 return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
3096 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
3102 /* Called when a stream needs waking after the manifest is updated */
3104 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
3106 demux->priv->stream_waiting_for_manifest = TRUE;
3110 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
3112 GstFlowReturn ret = GST_FLOW_OK;
3113 gboolean schedule_again = TRUE;
3115 GST_MANIFEST_LOCK (demux);
3116 demux->priv->manifest_updates_cb = 0;
3118 /* Updating playlist only needed for live playlists */
3119 if (!gst_adaptive_demux_is_live (demux)) {
3120 GST_MANIFEST_UNLOCK (demux);
3121 return G_SOURCE_REMOVE;
3124 GST_DEBUG_OBJECT (demux, "Updating playlist");
3125 ret = gst_adaptive_demux_update_manifest (demux);
3127 if (ret == GST_FLOW_EOS) {
3128 GST_MANIFEST_UNLOCK (demux);
3129 return G_SOURCE_REMOVE;
3132 if (ret == GST_FLOW_OK) {
3133 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
3134 demux->priv->update_failed_count = 0;
3136 /* Wake up download tasks */
3137 if (demux->priv->stream_waiting_for_manifest) {
3140 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
3141 GstAdaptiveDemux2Stream *stream = iter->data;
3142 gst_adaptive_demux2_stream_on_manifest_update (stream);
3144 demux->priv->stream_waiting_for_manifest = FALSE;
3147 demux->priv->update_failed_count++;
3149 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
3150 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
3151 gst_flow_get_name (ret));
3153 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
3154 (_("Internal data stream error.")), ("Could not update playlist"));
3155 GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
3156 schedule_again = FALSE;
3160 if (schedule_again) {
3161 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3163 demux->priv->manifest_updates_cb =
3164 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3165 klass->get_manifest_update_interval (demux) * GST_USECOND,
3166 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3169 GST_MANIFEST_UNLOCK (demux);
3171 return G_SOURCE_REMOVE;
3175 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
3177 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3179 /* Loop for updating of the playlist. This periodically checks if
3180 * the playlist is updated and does so, then signals the streaming
3181 * thread in case it can continue downloading now. */
3183 /* block until the next scheduled update or the signal to quit this thread */
3184 GST_DEBUG_OBJECT (demux, "Started updates task");
3185 demux->priv->manifest_updates_cb =
3186 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3187 klass->get_manifest_update_interval (demux) * GST_USECOND,
3188 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3190 return G_SOURCE_REMOVE;
3194 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
3195 GstAdaptiveDemuxTrack * track)
3199 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3200 OutputSlot *slot = (OutputSlot *) tmp->data;
3201 /* Incompatible output type */
3202 if (slot->type != track->type)
3205 /* Slot which is already assigned to this pending track */
3206 if (slot->pending_track == track)
3209 /* slot already used for another pending track */
3210 if (slot->pending_track != NULL)
3213 /* Current output track is of the same type and is draining */
3214 if (slot->track && slot->track->draining)
3221 /* TRACKS_LOCK taken */
3223 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3227 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3228 OutputSlot *slot = (OutputSlot *) tmp->data;
3230 if (slot->track == track)
3237 /* TRACKS_LOCK held */
3239 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3244 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3245 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3247 if (track->selected && !track->active)
3251 /* All selected tracks are active, created message */
3253 gst_message_new_streams_selected (GST_OBJECT (demux),
3254 demux->output_period->collection);
3255 GST_MESSAGE_SEQNUM (msg) = seqnum;
3256 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3257 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3258 if (track->active) {
3259 gst_message_streams_selected_add (msg, track->stream_object);
3267 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3270 GstAdaptiveDemuxTrack *track = slot->track;
3273 /* Send EVENT_STREAM_START */
3274 event = gst_event_new_stream_start (track->stream_id);
3275 if (demux->have_group_id)
3276 gst_event_set_group_id (event, demux->group_id);
3277 gst_event_set_stream_flags (event, track->flags);
3278 gst_event_set_stream (event, track->stream_object);
3279 GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3281 gst_pad_push_event (slot->pad, event);
3283 /* Send EVENT_STREAM_COLLECTION */
3284 event = gst_event_new_stream_collection (demux->output_period->collection);
3285 GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3287 gst_pad_push_event (slot->pad, event);
3289 /* Mark all sticky events for re-sending */
3290 gst_event_store_mark_all_undelivered (&track->sticky_events);
3294 * Called with TRACKS_LOCK taken
3297 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3300 guint requested_selection_seqnum;
3303 /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3304 output slots vs active/draining tracks */
3305 requested_selection_seqnum =
3306 g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3308 if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3311 GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3313 /* Go over all slots, and if they have a pending track that's no longer
3314 * selected, clear it so the slot can be reused */
3315 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3316 OutputSlot *slot = (OutputSlot *) tmp->data;
3318 if (slot->pending_track != NULL && !slot->pending_track->selected) {
3319 GST_DEBUG_OBJECT (demux,
3320 "Removing deselected track '%s' as pending from output of current track '%s'",
3321 slot->pending_track->stream_id, slot->track->stream_id);
3322 gst_adaptive_demux_track_unref (slot->pending_track);
3323 slot->pending_track = NULL;
3327 /* Go over all tracks and create/re-assign/remove slots */
3328 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3329 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3331 if (track->selected) {
3332 OutputSlot *slot = find_slot_for_track (demux, track);
3334 /* 0. Track is selected and has a slot. Nothing to do */
3336 GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3341 slot = find_replacement_slot_for_track (demux, track);
3343 /* 1. There is an existing slot of the same type which is currently
3344 * draining, assign this track as a replacement for it */
3345 g_assert (slot->pending_track == NULL || slot->pending_track == track);
3346 if (slot->pending_track == NULL) {
3347 slot->pending_track = gst_adaptive_demux_track_ref (track);
3348 GST_DEBUG_OBJECT (demux,
3349 "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3350 track->stream_id, track->period_num,
3351 slot->track->stream_id, slot->track->period_num);
3354 /* 2. There is no compatible replacement slot, create a new one */
3355 slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3356 GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3358 demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3360 track->update_next_segment = TRUE;
3362 slot->track = gst_adaptive_demux_track_ref (track);
3363 track->active = TRUE;
3364 gst_adaptive_demux_send_initial_events (demux, slot);
3367 /* If we were draining this track, we no longer are */
3368 track->draining = FALSE;
3372 /* Finally check all slots have a current/pending track. If not remove it */
3373 for (tmp = demux->priv->outputs; tmp;) {
3374 OutputSlot *slot = (OutputSlot *) tmp->data;
3375 /* We should never has slots without target tracks */
3376 g_assert (slot->track);
3377 if (slot->track->draining && !slot->pending_track) {
3378 GstAdaptiveDemux2Stream *stream;
3380 GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3381 slot->track->stream_id);
3382 slot->track->active = FALSE;
3384 /* If the stream feeding this track is stopped, flush and clear
3385 * the track now that it's going inactive. If the stream was not
3386 * found, it means we advanced past that period already (and the
3387 * stream was stopped and discarded) */
3388 stream = find_stream_for_track_locked (demux, slot->track);
3389 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3390 gst_adaptive_demux_track_flush (slot->track);
3392 tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3393 gst_adaptive_demux_output_slot_free (demux, slot);
3398 demux->priv->current_selection_seqnum = requested_selection_seqnum;
3399 msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3401 TRACKS_UNLOCK (demux);
3402 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3403 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3404 TRACKS_LOCK (demux);
3408 /* TRACKS_LOCK held */
3410 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3413 GstAdaptiveDemuxPeriod *previous_period;
3414 GstStreamCollection *collection;
3416 /* Grab the next period, should be demux->periods->next->data */
3417 previous_period = g_queue_pop_head (demux->priv->periods);
3419 /* Remove ref held by demux->output_period */
3420 gst_adaptive_demux_period_unref (previous_period);
3421 demux->output_period =
3422 gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3424 GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3425 demux->output_period->period_num);
3427 /* We can now post the collection of the new period */
3428 collection = demux->output_period->collection;
3429 TRACKS_UNLOCK (demux);
3430 gst_element_post_message (GST_ELEMENT_CAST (demux),
3431 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3432 TRACKS_LOCK (demux);
3434 /* Unselect all tracks of the previous period */
3435 for (iter = previous_period->tracks; iter; iter = iter->next) {
3436 GstAdaptiveDemuxTrack *track = iter->data;
3437 if (track->selected) {
3438 track->selected = FALSE;
3439 track->draining = TRUE;
3443 /* Force a selection re-check */
3444 g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3445 check_and_handle_selection_update_locked (demux);
3447 /* Remove the final ref on the previous period now that we have done the switch */
3448 gst_adaptive_demux_period_unref (previous_period);
3453 /* Called with TRACKS_LOCK taken */
3455 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3458 GstAdaptiveDemuxTrack *track = slot->track;
3460 gboolean pending_is_ready;
3461 GstAdaptiveDemux2Stream *stream;
3463 /* If we have a pending track for this slot, the current track should be
3464 * draining and no longer selected */
3465 g_assert (track->draining && !track->selected);
3467 /* If we're draining, check if the pending track has enough data *or* that
3468 we've already drained out entirely */
3470 (slot->pending_track->level_time >=
3471 slot->pending_track->buffering_threshold);
3472 pending_is_ready |= slot->pending_track->eos;
3474 if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3475 GST_DEBUG_OBJECT (demux,
3476 "Replacement track '%s' doesn't have enough data for switching yet",
3477 slot->pending_track->stream_id);
3481 GST_DEBUG_OBJECT (demux,
3482 "Pending replacement track has enough data, switching");
3483 track->active = FALSE;
3484 track->draining = FALSE;
3486 /* If the stream feeding this track is stopped, flush and clear
3487 * the track now that it's going inactive. If the stream was not
3488 * found, it means we advanced past that period already (and the
3489 * stream was stopped and discarded) */
3490 stream = find_stream_for_track_locked (demux, track);
3491 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3492 gst_adaptive_demux_track_flush (track);
3494 gst_adaptive_demux_track_unref (track);
3495 /* We steal the reference of pending_track */
3496 track = slot->track = slot->pending_track;
3497 slot->pending_track = NULL;
3498 slot->track->active = TRUE;
3500 /* Make sure the track segment will start at the current position */
3501 track->update_next_segment = TRUE;
3503 /* Send stream start and collection, and schedule sticky events */
3504 gst_adaptive_demux_send_initial_events (demux, slot);
3506 /* Can we emit the streams-selected message now ? */
3508 all_selected_tracks_are_active (demux,
3509 g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3511 TRACKS_UNLOCK (demux);
3512 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3513 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3514 TRACKS_LOCK (demux);
3520 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3523 GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3524 gboolean wait_for_data = FALSE;
3525 gboolean all_tracks_empty;
3528 GST_DEBUG_OBJECT (demux, "enter");
3530 TRACKS_LOCK (demux);
3532 /* Check if stopping */
3533 if (demux->priv->flushing) {
3534 ret = GST_FLOW_FLUSHING;
3538 /* If the selection changed, handle it */
3539 check_and_handle_selection_update_locked (demux);
3543 global_output_position = GST_CLOCK_STIME_NONE;
3544 all_tracks_empty = TRUE;
3546 if (wait_for_data) {
3547 GST_DEBUG_OBJECT (demux, "Waiting for data");
3548 g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3549 GST_DEBUG_OBJECT (demux, "Done waiting for data");
3550 if (demux->priv->flushing) {
3551 ret = GST_FLOW_FLUSHING;
3554 wait_for_data = FALSE;
3557 /* Grab/Recalculate current global output position
3558 * This is the minimum pending output position of all tracks used for output
3560 * If there is a track which is empty and not EOS, wait for it to receive data
3561 * then recalculate global output position.
3563 * This also pushes downstream all non-timed data that might be present.
3565 * IF all tracks are EOS : stop task
3567 GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3568 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3569 OutputSlot *slot = (OutputSlot *) tmp->data;
3570 GstAdaptiveDemuxTrack *track;
3572 /* If there is a pending track, Check if it's time to switch to it */
3573 if (slot->pending_track)
3574 handle_slot_pending_track_switch_locked (demux, slot);
3576 track = slot->track;
3578 if (!track->active) {
3579 /* Note: Edward: I can't see in what cases we would end up with inactive
3580 tracks assigned to slots. */
3581 GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3582 g_assert (track->active);
3586 if (track->next_position == GST_CLOCK_STIME_NONE) {
3587 gst_adaptive_demux_track_update_next_position (track);
3590 GST_TRACE_OBJECT (demux,
3591 "Looking at track %s (period %u). next_position %" GST_STIME_FORMAT,
3592 track->stream_id, track->period_num,
3593 GST_STIME_ARGS (track->next_position));
3595 if (track->next_position != GST_CLOCK_STIME_NONE) {
3596 if (global_output_position == GST_CLOCK_STIME_NONE)
3597 global_output_position = track->next_position;
3599 global_output_position =
3600 MIN (global_output_position, track->next_position);
3601 track->waiting_add = FALSE;
3602 all_tracks_empty = FALSE;
3603 } else if (!track->eos) {
3604 GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3605 track->stream_id, track->period_num);
3606 all_tracks_empty = FALSE;
3607 wait_for_data = track->waiting_add = TRUE;
3609 GST_DEBUG_OBJECT (demux,
3610 "Track %s (period %u) is EOS, not waiting for timed data",
3611 track->stream_id, track->period_num);
3613 if (gst_queue_array_get_length (track->queue) > 0) {
3614 all_tracks_empty = FALSE;
3622 if (all_tracks_empty && demux->output_period->has_next_period) {
3623 GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3624 demux->output_period->period_num);
3625 if (!gst_adaptive_demux_advance_output_period (demux)) {
3626 /* Failed to move to next period, error out */
3627 ret = GST_FLOW_ERROR;
3630 /* Restart the loop */
3634 GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3635 GST_STIME_ARGS (global_output_position));
3639 * We know all active tracks have pending timed data
3640 * * while track next_position <= global output position
3641 * * push pending data
3642 * * Update track next_position
3643 * * recalculate global output position
3644 * * Pop next pending data from track and update pending position
3647 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3648 OutputSlot *slot = (OutputSlot *) tmp->data;
3649 GstAdaptiveDemuxTrack *track = slot->track;
3651 GST_LOG_OBJECT (track->element,
3652 "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3653 " global_output_position:%" GST_STIME_FORMAT, track->active,
3654 track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3655 GST_STIME_ARGS (global_output_position));
3660 while (global_output_position == GST_CLOCK_STIME_NONE
3661 || !slot->pushed_timed_data
3662 || ((track->next_position != GST_CLOCK_STIME_NONE)
3663 && track->next_position <= global_output_position)
3664 || ((track->next_position == GST_CLOCK_STIME_NONE) && track->eos)) {
3665 GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3668 GST_DEBUG_OBJECT (demux,
3669 "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3670 track->stream_id, track->period_num, track->eos,
3671 slot->pushed_timed_data);
3672 /* This should only happen if the track is EOS, or exactly in between
3673 * the parser outputting segment/caps before buffers. */
3674 g_assert (track->eos || !slot->pushed_timed_data);
3676 /* If we drained the track, but there's a pending track on the slot
3677 * loop again to activate it */
3678 if (slot->pending_track) {
3679 GST_DEBUG_OBJECT (demux,
3680 "Track '%s' (period %u) drained, but has a pending track to activate",
3681 track->stream_id, track->period_num);
3687 demux_update_buffering_locked (demux);
3688 demux_post_buffering_locked (demux);
3689 TRACKS_UNLOCK (demux);
3691 GST_DEBUG_OBJECT (demux,
3692 "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3693 track->period_num, mo);
3695 if (GST_IS_EVENT (mo)) {
3696 GstEvent *event = (GstEvent *) mo;
3697 if (GST_EVENT_TYPE (event) == GST_EVENT_GAP) {
3698 slot->pushed_timed_data = TRUE;
3699 } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
3700 /* If there is a pending next period, don't send the EOS */
3701 if (demux->output_period->has_next_period) {
3702 GST_LOG_OBJECT (demux,
3703 "Dropping EOS on track '%s' (period %u) before next period",
3704 track->stream_id, track->period_num);
3705 gst_event_store_mark_delivered (&track->sticky_events, event);
3706 gst_event_unref (event);
3711 if (event != NULL) {
3712 gst_pad_push_event (slot->pad, gst_event_ref (event));
3714 if (GST_EVENT_IS_STICKY (event))
3715 gst_event_store_mark_delivered (&track->sticky_events, event);
3716 gst_event_unref (event);
3718 } else if (GST_IS_BUFFER (mo)) {
3719 GstBuffer *buffer = (GstBuffer *) mo;
3721 if (track->output_discont) {
3722 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3723 buffer = gst_buffer_make_writable (buffer);
3724 GST_DEBUG_OBJECT (slot->pad,
3725 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3727 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3729 track->output_discont = FALSE;
3731 slot->flow_ret = gst_pad_push (slot->pad, buffer);
3733 gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3734 slot->pad, slot->flow_ret);
3735 GST_DEBUG_OBJECT (slot->pad,
3736 "track %s (period %u) push returned %s (combined %s)",
3737 track->stream_id, track->period_num,
3738 gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3739 slot->pushed_timed_data = TRUE;
3741 GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3744 TRACKS_LOCK (demux);
3745 gst_adaptive_demux_track_update_next_position (track);
3747 if (ret != GST_FLOW_OK)
3752 /* Store global output position */
3753 if (global_output_position != GST_CLOCK_STIME_NONE)
3754 demux->priv->global_output_position = global_output_position;
3756 if (global_output_position == GST_CLOCK_STIME_NONE) {
3757 if (!demux->priv->flushing) {
3758 GST_DEBUG_OBJECT (demux,
3759 "Pausing output task after reaching NONE global_output_position");
3760 gst_task_pause (demux->priv->output_task);
3764 TRACKS_UNLOCK (demux);
3765 GST_DEBUG_OBJECT (demux, "leave");
3770 GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3771 /* If the flushing flag is set, then the task is being
3772 * externally stopped, so don't go to pause(), otherwise we
3773 * should so we don't keep spinning */
3774 if (!demux->priv->flushing) {
3775 GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3776 gst_flow_get_name (ret));
3777 gst_task_pause (demux->priv->output_task);
3780 TRACKS_UNLOCK (demux);
3782 if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3783 GstEvent *eos = gst_event_new_eos ();
3785 if (ret != GST_FLOW_EOS) {
3786 GST_ELEMENT_FLOW_ERROR (demux, ret);
3789 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3790 if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3791 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3792 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3794 gst_adaptive_demux_push_src_event (demux, eos);
3801 /* must be called from the scheduler */
3803 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3805 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3808 return klass->is_live (demux);
3812 /* must be called from the scheduler */
3814 gst_adaptive_demux2_stream_seek (GstAdaptiveDemux * demux,
3815 GstAdaptiveDemux2Stream * stream, gboolean forward, GstSeekFlags flags,
3816 GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
3818 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3820 if (klass->stream_seek)
3821 return klass->stream_seek (stream, forward, flags, ts, final_ts);
3822 return GST_FLOW_ERROR;
3825 /* must be called from the scheduler */
3827 gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux * demux,
3828 GstAdaptiveDemux2Stream * stream)
3830 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3831 gboolean ret = TRUE;
3833 if (klass->stream_has_next_fragment)
3834 ret = klass->stream_has_next_fragment (stream);
3839 /* must be called from the scheduler */
3841 * the ::finish_fragment() handlers when an *actual* fragment is done
3844 gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux * demux,
3845 GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3847 if (stream->last_ret != GST_FLOW_OK)
3848 return stream->last_ret;
3851 gst_adaptive_demux2_stream_advance_fragment_unlocked (demux, stream,
3854 return stream->last_ret;
3857 /* must be called with manifest_lock taken */
3858 static GstFlowReturn
3859 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
3860 GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3862 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3865 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
3867 GST_LOG_OBJECT (stream,
3868 "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3869 GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
3871 stream->download_error_count = 0;
3872 g_clear_error (&stream->last_error);
3875 /* FIXME - url has no indication of byte ranges for subsegments */
3876 /* FIXME: Reenable statistics sending? */
3877 gst_element_post_message (GST_ELEMENT_CAST (demux),
3878 gst_message_new_element (GST_OBJECT_CAST (demux),
3879 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
3880 "manifest-uri", G_TYPE_STRING,
3881 demux->manifest_uri, "uri", G_TYPE_STRING,
3882 stream->fragment.uri, "fragment-start-time",
3883 GST_TYPE_CLOCK_TIME, stream->download_start_time,
3884 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
3885 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
3886 stream->download_total_bytes, "fragment-download-time",
3887 GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
3890 /* Don't update to the end of the segment if in reverse playback */
3891 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3892 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
3893 stream->parse_segment.position += duration;
3894 stream->current_position += duration;
3896 GST_DEBUG_OBJECT (stream,
3897 "stream position now %" GST_TIME_FORMAT,
3898 GST_TIME_ARGS (stream->current_position));
3900 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3902 /* When advancing with a non 1.0 rate on live streams, we need to check
3903 * the live seeking range again to make sure we can still advance to
3905 if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
3906 if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
3909 ret = klass->stream_advance_fragment (stream);
3910 } else if (gst_adaptive_demux_is_live (demux)
3911 || gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
3912 ret = klass->stream_advance_fragment (stream);
3917 stream->download_start_time =
3918 GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
3920 if (ret == GST_FLOW_OK) {
3921 GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
3922 if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
3923 gst_adaptive_demux2_stream_update_current_bitrate (demux,
3925 GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
3926 stream->need_header = TRUE;
3927 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
3934 /* must be called with manifest_lock taken */
3936 gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
3937 demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
3939 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3941 if (klass->stream_select_bitrate)
3942 return klass->stream_select_bitrate (stream, bitrate);
3946 /* must be called with manifest_lock taken */
3948 gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux * demux,
3949 GstAdaptiveDemux2Stream * stream)
3951 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3954 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
3957 /* Make sure the sub-class will update bitrate, or else
3959 stream->fragment.finished = FALSE;
3961 GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
3962 GST_TIME_ARGS (stream->current_position));
3964 ret = klass->stream_update_fragment_info (stream);
3966 GST_LOG_OBJECT (stream, "ret:%s uri:%s",
3967 gst_flow_get_name (ret), stream->fragment.uri);
3968 if (ret == GST_FLOW_OK) {
3969 GST_LOG_OBJECT (stream,
3970 "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3971 GST_STIME_ARGS (stream->fragment.stream_time),
3972 GST_TIME_ARGS (stream->fragment.duration));
3973 GST_LOG_OBJECT (stream,
3974 "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
3975 stream->fragment.range_start, stream->fragment.range_end);
3981 /* must be called with manifest_lock taken */
3983 gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux *
3984 demux, GstAdaptiveDemux2Stream * stream)
3986 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3988 if (klass->stream_get_fragment_waiting_time)
3989 return klass->stream_get_fragment_waiting_time (stream);
3994 handle_manifest_download_complete (DownloadRequest * request,
3995 DownloadRequestState state, GstAdaptiveDemux * demux)
3997 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3999 GstFlowReturn result;
4001 g_free (demux->manifest_base_uri);
4002 g_free (demux->manifest_uri);
4004 if (request->redirect_permanent && request->redirect_uri) {
4005 demux->manifest_uri = g_strdup (request->redirect_uri);
4006 demux->manifest_base_uri = NULL;
4008 demux->manifest_uri = g_strdup (request->uri);
4009 demux->manifest_base_uri = g_strdup (request->redirect_uri);
4012 buffer = download_request_take_buffer (request);
4014 /* We should always have a buffer since this function is the non-error
4015 * callback for the download */
4018 result = klass->update_manifest_data (demux, buffer);
4019 gst_buffer_unref (buffer);
4021 /* FIXME: Should the manifest uri vars be reverted to original
4022 * values if updating fails? */
4024 if (result == GST_FLOW_OK) {
4025 GstClockTime duration;
4026 /* Send an updated duration message */
4027 duration = klass->get_duration (demux);
4028 if (duration != GST_CLOCK_TIME_NONE) {
4029 GST_DEBUG_OBJECT (demux,
4030 "Sending duration message : %" GST_TIME_FORMAT,
4031 GST_TIME_ARGS (duration));
4032 gst_element_post_message (GST_ELEMENT (demux),
4033 gst_message_new_duration_changed (GST_OBJECT (demux)));
4035 GST_DEBUG_OBJECT (demux,
4036 "Duration unknown, can not send the duration message");
4039 /* If a manifest changes it's liveness or periodic updateness, we need
4040 * to start/stop the manifest update task appropriately */
4041 /* Keep this condition in sync with the one in
4042 * gst_adaptive_demux_start_manifest_update_task()
4044 if (gst_adaptive_demux_is_live (demux) &&
4045 klass->requires_periodical_playlist_update (demux)) {
4046 gst_adaptive_demux_start_manifest_update_task (demux);
4048 gst_adaptive_demux_stop_manifest_update_task (demux);
4054 handle_manifest_download_failure (DownloadRequest * request,
4055 DownloadRequestState state, GstAdaptiveDemux * demux)
4057 GST_FIXME_OBJECT (demux, "Manifest download failed.");
4058 /* Retry or error out here */
4061 static GstFlowReturn
4062 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4064 DownloadRequest *request;
4065 GstFlowReturn ret = GST_FLOW_OK;
4066 GError *error = NULL;
4068 request = download_request_new_uri (demux->manifest_uri);
4070 download_request_set_callbacks (request,
4071 (DownloadRequestEventCallback) handle_manifest_download_complete,
4072 (DownloadRequestEventCallback) handle_manifest_download_failure,
4075 if (!downloadhelper_submit_request (demux->download_helper, NULL,
4076 DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
4079 GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
4080 ("Failed to download manifest: %s", error->message), (NULL));
4081 g_clear_error (&error);
4083 ret = GST_FLOW_NOT_LINKED;
4089 /* must be called with manifest_lock taken */
4091 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4093 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4096 ret = klass->update_manifest (demux);
4102 gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
4109 g_free (f->header_uri);
4110 f->header_uri = NULL;
4111 f->header_range_start = 0;
4112 f->header_range_end = -1;
4114 g_free (f->index_uri);
4115 f->index_uri = NULL;
4116 f->index_range_start = 0;
4117 f->index_range_end = -1;
4119 f->stream_time = GST_CLOCK_STIME_NONE;
4120 f->duration = GST_CLOCK_TIME_NONE;
4121 f->finished = FALSE;
4124 /* must be called with manifest_lock taken */
4126 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4128 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4129 gboolean ret = FALSE;
4131 if (klass->has_next_period)
4132 ret = klass->has_next_period (demux);
4133 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4137 /* must be called with manifest_lock taken */
4139 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4141 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4142 GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
4144 g_return_if_fail (klass->advance_period != NULL);
4146 GST_DEBUG_OBJECT (demux, "Advancing to next period");
4147 /* FIXME : no return value ? What if it fails ? */
4148 klass->advance_period (demux);
4150 if (previous_period == demux->input_period) {
4151 GST_ERROR_OBJECT (demux, "Advancing period failed");
4155 /* Stop the previous period stream tasks */
4156 gst_adaptive_demux_period_stop_tasks (previous_period);
4158 gst_adaptive_demux_update_collection (demux, demux->input_period);
4159 /* Figure out a pre-emptive selection based on the output period selection */
4160 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
4161 demux->output_period);
4163 gst_adaptive_demux_prepare_streams (demux, FALSE);
4164 gst_adaptive_demux_start_tasks (demux);
4168 * gst_adaptive_demux_get_monotonic_time:
4169 * Returns: a monotonically increasing time, using the system realtime clock
4172 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
4174 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4175 return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
4179 * gst_adaptive_demux_get_client_now_utc:
4180 * @demux: #GstAdaptiveDemux
4181 * Returns: the client's estimate of UTC
4183 * Used to find the client's estimate of UTC, using the system realtime clock.
4186 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
4188 return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
4192 * gst_adaptive_demux_is_running
4193 * @demux: #GstAdaptiveDemux
4194 * Returns: whether the demuxer is processing data
4196 * Returns FALSE if shutdown has started (transitioning down from
4197 * PAUSED), otherwise TRUE.
4200 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
4202 return g_atomic_int_get (&demux->running);
4206 * gst_adaptive_demux_get_qos_earliest_time:
4208 * Returns: The QOS earliest time
4213 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
4215 GstClockTime earliest;
4217 GST_OBJECT_LOCK (demux);
4218 earliest = demux->priv->qos_earliest_time;
4219 GST_OBJECT_UNLOCK (demux);
4225 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
4226 GstAdaptiveDemux2Stream * stream)
4228 g_return_val_if_fail (demux && stream, FALSE);
4230 /* FIXME : Migrate to parent */
4231 g_return_val_if_fail (stream->demux == NULL, FALSE);
4233 GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
4235 TRACKS_LOCK (demux);
4236 if (demux->input_period->prepared) {
4237 GST_ERROR_OBJECT (demux,
4238 "Attempted to add streams but no new period was created");
4239 TRACKS_UNLOCK (demux);
4242 stream->demux = demux;
4243 stream->period = demux->input_period;
4244 demux->input_period->streams =
4245 g_list_append (demux->input_period->streams, stream);
4247 if (stream->tracks) {
4249 for (iter = stream->tracks; iter; iter = iter->next)
4250 if (!gst_adaptive_demux_period_add_track (demux->input_period,
4251 (GstAdaptiveDemuxTrack *) iter->data)) {
4252 GST_ERROR_OBJECT (demux, "Failed to add track elements");
4253 TRACKS_UNLOCK (demux);
4257 TRACKS_UNLOCK (demux);
4261 /* Return the current playback rate including any instant rate multiplier */
4263 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
4266 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4267 rate = demux->segment.rate * demux->instant_rate_multiplier;
4268 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);