3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
27 * SECTION:plugin-adaptivedemux2
28 * @short_description: Next Generation adaptive demuxers
30 * What is an adaptive demuxer? Adaptive demuxers are special demuxers in the
31 * sense that they don't actually demux data received from upstream but download
32 * the data themselves.
34 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35 * of fragments. The manifest describes the available media and the sequence of
36 * fragments to use. Each fragment contains a small part of the media (typically
37 * only a few seconds). It is possible for the manifest to have the same media
38 * available in different configurations (bitrates for example) so that the
39 * client can select the one that best suits its scenario (network fluctuation,
40 * hardware requirements...).
42 * Furthermore, that manifest can also specify alternative medias (such as audio
43 * or subtitle tracks in different languages). Only the fragments for the
44 * requested selection will be download.
46 * These elements can therefore "adapt" themselves to the network conditions (as
47 * opposed to the server doing that adaptation) and user choices, which is why
48 * they are called "adaptive" demuxers.
50 * Note: These elements require a "streams-aware" container to work
51 * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52 * GST_BIN_FLAG_STREAMS_AWARE flag set).
55 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56 * about the intrinsics of the subclass formats, so the subclasses are
57 * responsible for maintaining the manifest data structures and stream
64 See the adaptive-demuxer.md design documentation for more information
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
98 By using the api_lock a thread is protected against other API calls.
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
108 #include "gst/gst-i18n-plugin.h"
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
112 GST_DEBUG_CATEGORY (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
122 #define DEFAULT_MAX_BUFFERING_TIME (30 * GST_SECOND)
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME (3 * GST_SECOND)
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
139 PROP_CONNECTION_SPEED,
140 PROP_BANDWIDTH_TARGET_RATIO,
141 PROP_CONNECTION_BITRATE,
144 PROP_CURRENT_BANDWIDTH,
145 PROP_MAX_BUFFERING_TIME,
146 PROP_BUFFERING_HIGH_WATERMARK_TIME,
147 PROP_BUFFERING_LOW_WATERMARK_TIME,
148 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150 PROP_CURRENT_LEVEL_TIME_VIDEO,
151 PROP_CURRENT_LEVEL_TIME_AUDIO,
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
159 GST_STATIC_CAPS_ANY);
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
165 GST_STATIC_CAPS_ANY);
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
171 GST_STATIC_CAPS_ANY);
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
179 /* Last flow return */
180 GstFlowReturn flow_ret;
185 /* Target track (reference) */
186 GstAdaptiveDemuxTrack *track;
188 /* Pending track (which will replace track) */
189 GstAdaptiveDemuxTrack *pending_track;
191 /* TRUE if a buffer or a gap event was pushed through this slot. */
192 gboolean pushed_timed_data;
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200 GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203 element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
207 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
209 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
211 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
212 GstObject * parent, GstBuffer * buffer);
213 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
215 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
219 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
221 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
222 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
223 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
224 gboolean first_and_live);
226 static gboolean gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
227 demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate);
229 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
231 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
233 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
236 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
237 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
238 gboolean stop_updates);
240 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
241 GstAdaptiveDemux2Stream * stream, GstBuffer * buffer);
243 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
244 GstAdaptiveDemux2Stream * stream);
246 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
247 GstAdaptiveDemux2Stream * stream, GstClockTime duration);
250 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
251 GstAdaptiveDemux2Stream * stream);
254 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
257 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
258 * method to get to the padtemplates */
260 gst_adaptive_demux_ng_get_type (void)
262 static gsize type = 0;
264 if (g_once_init_enter (&type)) {
266 static const GTypeInfo info = {
267 sizeof (GstAdaptiveDemuxClass),
270 (GClassInitFunc) gst_adaptive_demux_class_init,
273 sizeof (GstAdaptiveDemux),
275 (GInstanceInitFunc) gst_adaptive_demux_init,
278 _type = g_type_register_static (GST_TYPE_BIN,
279 "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
282 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
284 g_once_init_leave (&type, _type);
289 static inline GstAdaptiveDemuxPrivate *
290 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
292 return (G_STRUCT_MEMBER_P (self, private_offset));
296 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
297 const GValue * value, GParamSpec * pspec)
299 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
301 GST_OBJECT_LOCK (demux);
304 case PROP_CONNECTION_SPEED:
305 demux->connection_speed = g_value_get_uint (value) * 1000;
306 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
307 demux->connection_speed);
309 case PROP_BANDWIDTH_TARGET_RATIO:
310 demux->bandwidth_target_ratio = g_value_get_float (value);
312 case PROP_MIN_BITRATE:
313 demux->min_bitrate = g_value_get_uint (value);
315 case PROP_MAX_BITRATE:
316 demux->max_bitrate = g_value_get_uint (value);
318 case PROP_CONNECTION_BITRATE:
319 demux->connection_speed = g_value_get_uint (value);
321 /* FIXME: Recalculate track and buffering levels
322 * when watermarks change? */
323 case PROP_MAX_BUFFERING_TIME:
324 demux->max_buffering_time = g_value_get_uint64 (value);
326 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
327 demux->buffering_high_watermark_time = g_value_get_uint64 (value);
329 case PROP_BUFFERING_LOW_WATERMARK_TIME:
330 demux->buffering_low_watermark_time = g_value_get_uint64 (value);
332 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
333 demux->buffering_high_watermark_fragments = g_value_get_double (value);
335 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
336 demux->buffering_low_watermark_fragments = g_value_get_double (value);
339 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
343 GST_OBJECT_UNLOCK (demux);
347 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
348 GValue * value, GParamSpec * pspec)
350 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
352 GST_OBJECT_LOCK (demux);
355 case PROP_CONNECTION_SPEED:
356 g_value_set_uint (value, demux->connection_speed / 1000);
358 case PROP_BANDWIDTH_TARGET_RATIO:
359 g_value_set_float (value, demux->bandwidth_target_ratio);
361 case PROP_MIN_BITRATE:
362 g_value_set_uint (value, demux->min_bitrate);
364 case PROP_MAX_BITRATE:
365 g_value_set_uint (value, demux->max_bitrate);
367 case PROP_CONNECTION_BITRATE:
368 g_value_set_uint (value, demux->connection_speed);
370 case PROP_CURRENT_BANDWIDTH:
371 g_value_set_uint (value, demux->current_download_rate);
373 case PROP_MAX_BUFFERING_TIME:
374 g_value_set_uint64 (value, demux->max_buffering_time);
376 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
377 g_value_set_uint64 (value, demux->buffering_high_watermark_time);
379 case PROP_BUFFERING_LOW_WATERMARK_TIME:
380 g_value_set_uint64 (value, demux->buffering_low_watermark_time);
382 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
383 g_value_set_double (value, demux->buffering_high_watermark_fragments);
385 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
386 g_value_set_double (value, demux->buffering_low_watermark_fragments);
388 case PROP_CURRENT_LEVEL_TIME_VIDEO:
389 g_value_set_uint64 (value, demux->current_level_time_video);
391 case PROP_CURRENT_LEVEL_TIME_AUDIO:
392 g_value_set_uint64 (value, demux->current_level_time_audio);
395 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
399 GST_OBJECT_UNLOCK (demux);
403 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
405 GObjectClass *gobject_class;
406 GstElementClass *gstelement_class;
407 GstBinClass *gstbin_class;
409 gobject_class = G_OBJECT_CLASS (klass);
410 gstelement_class = GST_ELEMENT_CLASS (klass);
411 gstbin_class = GST_BIN_CLASS (klass);
413 GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
414 "Base Adaptive Demux (ng)");
416 parent_class = g_type_class_peek_parent (klass);
418 if (private_offset != 0)
419 g_type_class_adjust_private_offset (klass, &private_offset);
421 gobject_class->set_property = gst_adaptive_demux_set_property;
422 gobject_class->get_property = gst_adaptive_demux_get_property;
423 gobject_class->finalize = gst_adaptive_demux_finalize;
425 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
426 g_param_spec_uint ("connection-speed", "Connection Speed",
427 "Network connection speed to use in kbps (0 = calculate from downloaded"
428 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
429 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431 g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
432 g_param_spec_float ("bandwidth-target-ratio",
433 "Ratio of target bandwidth / available bandwidth",
434 "Limit of the available bitrate to use when switching to alternates",
435 0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
436 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438 g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
439 g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
440 "Network connection speed to use (0 = automatic) (bits/s)",
441 0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
442 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444 g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
445 g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
446 "Minimum bitrate to use when switching to alternates (bits/s)",
447 0, G_MAXUINT, DEFAULT_MIN_BITRATE,
448 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450 g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
451 g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
452 "Maximum bitrate to use when switching to alternates (bits/s)",
453 0, G_MAXUINT, DEFAULT_MAX_BITRATE,
454 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456 g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
457 g_param_spec_uint ("current-bandwidth",
458 "Current download bandwidth (bits/s)",
459 "Report of current download bandwidth (based on arriving data) (bits/s)",
460 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
462 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
463 g_param_spec_uint64 ("max-buffering-time",
464 "Buffering maximum size (ns)",
465 "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
466 0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
467 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468 G_PARAM_STATIC_STRINGS));
470 g_object_class_install_property (gobject_class,
471 PROP_BUFFERING_HIGH_WATERMARK_TIME,
472 g_param_spec_uint64 ("high-watermark-time",
473 "High buffering watermark size (ns)",
474 "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
475 0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
476 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477 G_PARAM_STATIC_STRINGS));
479 g_object_class_install_property (gobject_class,
480 PROP_BUFFERING_LOW_WATERMARK_TIME,
481 g_param_spec_uint64 ("low-watermark-time",
482 "Low buffering watermark size (ns)",
483 "Low watermark for parsed data below which downloads are resumed (in ns, 0=disable)",
484 0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
485 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
486 G_PARAM_STATIC_STRINGS));
488 g_object_class_install_property (gobject_class,
489 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
490 g_param_spec_double ("high-watermark-fragments",
491 "High buffering watermark size (fragments)",
492 "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
493 0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
494 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495 G_PARAM_STATIC_STRINGS));
497 g_object_class_install_property (gobject_class,
498 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
499 g_param_spec_double ("low-watermark-fragments",
500 "Low buffering watermark size (fragments)",
501 "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
502 0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
503 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
504 G_PARAM_STATIC_STRINGS));
506 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
507 g_param_spec_uint64 ("current-level-time-video",
508 "Currently buffered level of video (ns)",
509 "Currently buffered level of video track(s) (ns)",
510 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
511 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
512 G_PARAM_STATIC_STRINGS));
514 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
515 g_param_spec_uint64 ("current-level-time-audio",
516 "Currently buffered level of audio (ns)",
517 "Currently buffered level of audio track(s) (ns)",
518 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
519 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
520 G_PARAM_STATIC_STRINGS));
522 gst_element_class_add_static_pad_template (gstelement_class,
523 &gst_adaptive_demux_audiosrc_template);
524 gst_element_class_add_static_pad_template (gstelement_class,
525 &gst_adaptive_demux_videosrc_template);
526 gst_element_class_add_static_pad_template (gstelement_class,
527 &gst_adaptive_demux_subtitlesrc_template);
530 gstelement_class->change_state = gst_adaptive_demux_change_state;
531 gstelement_class->query = gst_adaptive_demux_query;
533 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
535 klass->data_received = gst_adaptive_demux2_stream_data_received_default;
536 klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
537 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
538 klass->requires_periodical_playlist_update =
539 gst_adaptive_demux_requires_periodical_playlist_update_default;
540 klass->stream_update_tracks = gst_adaptive_demux2_stream_update_tracks;
541 gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
545 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
546 GstAdaptiveDemuxClass * klass)
548 GstPadTemplate *pad_template;
550 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
552 demux->priv = gst_adaptive_demux_get_instance_private (demux);
553 demux->priv->input_adapter = gst_adapter_new ();
554 demux->realtime_clock = gst_adaptive_demux_clock_new ();
556 demux->download_helper = downloadhelper_new (demux->realtime_clock);
557 demux->priv->segment_seqnum = gst_util_seqnum_next ();
558 demux->have_group_id = FALSE;
559 demux->group_id = G_MAXUINT;
561 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
562 demux->instant_rate_multiplier = 1.0;
564 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
565 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
567 g_rec_mutex_init (&demux->priv->manifest_lock);
569 demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
570 g_mutex_init (&demux->priv->scheduler_lock);
572 g_mutex_init (&demux->priv->api_lock);
573 g_mutex_init (&demux->priv->segment_lock);
575 g_mutex_init (&demux->priv->tracks_lock);
576 g_cond_init (&demux->priv->tracks_add);
578 g_mutex_init (&demux->priv->buffering_lock);
580 demux->priv->periods = g_queue_new ();
583 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
584 g_return_if_fail (pad_template != NULL);
586 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
587 gst_pad_set_event_function (demux->sinkpad,
588 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
589 gst_pad_set_chain_function (demux->sinkpad,
590 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
593 demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
594 demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
595 demux->min_bitrate = DEFAULT_MIN_BITRATE;
596 demux->max_bitrate = DEFAULT_MAX_BITRATE;
598 demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
599 demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
600 demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
601 demux->buffering_high_watermark_fragments =
602 DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
603 demux->buffering_low_watermark_fragments =
604 DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
606 demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
607 demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
609 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
611 demux->priv->duration = GST_CLOCK_TIME_NONE;
613 /* Output combiner */
614 demux->priv->flowcombiner = gst_flow_combiner_new ();
617 g_rec_mutex_init (&demux->priv->output_lock);
618 demux->priv->output_task =
619 gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
621 gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
625 gst_adaptive_demux_finalize (GObject * object)
627 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
628 GstAdaptiveDemuxPrivate *priv = demux->priv;
630 GST_DEBUG_OBJECT (object, "finalize");
632 g_object_unref (priv->input_adapter);
634 downloadhelper_free (demux->download_helper);
636 g_rec_mutex_clear (&demux->priv->manifest_lock);
637 g_mutex_clear (&demux->priv->api_lock);
638 g_mutex_clear (&demux->priv->segment_lock);
640 g_mutex_clear (&demux->priv->buffering_lock);
642 g_mutex_clear (&demux->priv->scheduler_lock);
643 gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
645 /* The input period is present after a reset, clear it now */
646 if (demux->input_period)
647 gst_adaptive_demux_period_unref (demux->input_period);
649 if (demux->realtime_clock) {
650 gst_adaptive_demux_clock_unref (demux->realtime_clock);
651 demux->realtime_clock = NULL;
653 g_object_unref (priv->output_task);
654 g_rec_mutex_clear (&priv->output_lock);
656 gst_flow_combiner_free (priv->flowcombiner);
658 g_queue_free (priv->periods);
660 G_OBJECT_CLASS (parent_class)->finalize (object);
664 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
666 gboolean ret = FALSE;
667 GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
669 ret = (parent && GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE));
674 static GstStateChangeReturn
675 gst_adaptive_demux_change_state (GstElement * element,
676 GstStateChange transition)
678 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
679 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
681 switch (transition) {
682 case GST_STATE_CHANGE_NULL_TO_READY:
683 if (!gst_adaptive_demux_check_streams_aware (demux)) {
684 GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
685 (_("Element requires a streams-aware context.")), (NULL));
689 case GST_STATE_CHANGE_PAUSED_TO_READY:
690 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
691 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
693 gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
694 downloadhelper_stop (demux->download_helper);
697 demux->priv->flushing = TRUE;
698 g_cond_signal (&demux->priv->tracks_add);
699 gst_task_stop (demux->priv->output_task);
700 TRACKS_UNLOCK (demux);
702 gst_task_join (demux->priv->output_task);
704 GST_API_LOCK (demux);
705 gst_adaptive_demux_reset (demux);
706 GST_API_UNLOCK (demux);
708 case GST_STATE_CHANGE_READY_TO_PAUSED:
709 GST_API_LOCK (demux);
710 gst_adaptive_demux_reset (demux);
712 gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
713 if (g_atomic_int_get (&demux->priv->have_manifest))
714 gst_adaptive_demux_start_manifest_update_task (demux);
715 GST_API_UNLOCK (demux);
716 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
717 GST_DEBUG_OBJECT (demux, "demuxer has started running");
718 /* gst_task_start (demux->priv->output_task); */
724 /* this must be run with the scheduler and output tasks stopped. */
725 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
727 switch (transition) {
728 case GST_STATE_CHANGE_READY_TO_PAUSED:
729 /* Start download task */
730 downloadhelper_start (demux->download_helper);
741 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
744 GstEvent *eos = gst_event_new_eos ();
745 GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
747 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
748 gst_pad_push_event (slot->pad, eos);
749 gst_pad_set_active (slot->pad, FALSE);
750 gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
751 gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
753 gst_adaptive_demux_track_unref (slot->track);
754 if (slot->pending_track)
755 gst_adaptive_demux_track_unref (slot->pending_track);
757 g_slice_free (OutputSlot, slot);
761 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
762 GstStreamType streamtype)
765 GstPadTemplate *tmpl;
768 switch (streamtype) {
769 case GST_STREAM_TYPE_AUDIO:
770 name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
772 gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
774 case GST_STREAM_TYPE_VIDEO:
775 name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
777 gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
779 case GST_STREAM_TYPE_TEXT:
781 g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
783 gst_static_pad_template_get
784 (&gst_adaptive_demux_subtitlesrc_template);
787 g_assert_not_reached ();
791 slot = g_slice_new0 (OutputSlot);
792 slot->type = streamtype;
793 slot->pushed_timed_data = FALSE;
795 /* Create and activate new pads */
796 slot->pad = gst_pad_new_from_template (tmpl, name);
798 gst_object_unref (tmpl);
800 gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
801 gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
802 gst_pad_set_active (slot->pad, TRUE);
804 gst_pad_set_query_function (slot->pad,
805 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
806 gst_pad_set_event_function (slot->pad,
807 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
809 gst_pad_set_element_private (slot->pad, slot);
811 GST_INFO_OBJECT (demux, "Created output slot %s:%s",
812 GST_DEBUG_PAD_NAME (slot->pad));
817 * * After `process_manifest` or when a period starts
818 * * Or when all tracks have been created
820 * Goes over tracks and creates the collection
822 * Returns TRUE if the collection was fully created.
824 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
827 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
828 GstAdaptiveDemuxPeriod * period)
830 GstStreamCollection *collection;
833 GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
835 if (!period->tracks_changed) {
836 GST_DEBUG_OBJECT (demux, "Tracks didn't change");
840 if (!period->tracks) {
841 GST_WARNING_OBJECT (demux, "No tracks registered/present");
845 if (gst_adaptive_demux_period_has_pending_tracks (period)) {
846 GST_DEBUG_OBJECT (demux,
847 "Streams still have pending tracks, not creating/updating collection");
851 /* Update collection */
852 collection = gst_stream_collection_new ("adaptivedemux");
854 for (iter = period->tracks; iter; iter = iter->next) {
855 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
857 GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
858 gst_stream_collection_add_stream (collection,
859 gst_object_ref (track->stream_object));
862 if (period->collection)
863 gst_object_unref (period->collection);
864 period->collection = collection;
870 * Called for the output period:
871 * * after `update_collection()` if the input period is the same as the output period
872 * * When the output period changes
874 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
877 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
879 GstStreamCollection *collection;
880 GstAdaptiveDemuxPeriod *period = demux->output_period;
881 guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
883 g_return_val_if_fail (period, FALSE);
884 if (!period->collection) {
885 GST_DEBUG_OBJECT (demux, "No collection available yet");
889 collection = period->collection;
891 GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
894 /* Post collection */
895 TRACKS_UNLOCK (demux);
896 GST_MANIFEST_UNLOCK (demux);
898 gst_element_post_message (GST_ELEMENT_CAST (demux),
899 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
901 GST_MANIFEST_LOCK (demux);
904 /* If no stream selection was handled, make a default selection */
905 if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
906 gst_adaptive_demux_period_select_default_tracks (demux,
907 demux->output_period);
910 /* Make sure the output task is running */
911 if (gst_adaptive_demux2_is_running (demux)) {
912 demux->priv->flushing = FALSE;
913 GST_DEBUG_OBJECT (demux, "Starting the output task");
914 gst_task_start (demux->priv->output_task);
921 handle_incoming_manifest (GstAdaptiveDemux * demux)
923 GstAdaptiveDemuxClass *demux_class;
928 GstBuffer *manifest_buffer;
930 GST_API_LOCK (demux);
931 GST_MANIFEST_LOCK (demux);
933 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
935 available = gst_adapter_available (demux->priv->input_adapter);
938 goto eos_without_data;
940 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
942 /* Need to get the URI to use it as a base to generate the fragment's
944 query = gst_query_new_uri ();
945 query_res = gst_pad_peer_query (demux->sinkpad, query);
947 gchar *uri, *redirect_uri;
950 gst_query_parse_uri (query, &uri);
951 gst_query_parse_uri_redirection (query, &redirect_uri);
952 gst_query_parse_uri_redirection_permanent (query, &permanent);
954 if (permanent && redirect_uri) {
955 demux->manifest_uri = redirect_uri;
956 demux->manifest_base_uri = NULL;
959 demux->manifest_uri = uri;
960 demux->manifest_base_uri = redirect_uri;
963 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
964 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
966 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
968 gst_query_unref (query);
970 /* If somehow we didn't receive a stream-start with a group_id, pick one now */
971 if (!demux->have_group_id) {
972 demux->have_group_id = TRUE;
973 demux->group_id = gst_util_group_id_next ();
976 /* Let the subclass parse the manifest */
978 gst_adapter_take_buffer (demux->priv->input_adapter, available);
979 ret = demux_class->process_manifest (demux, manifest_buffer);
980 gst_buffer_unref (manifest_buffer);
982 gst_element_post_message (GST_ELEMENT_CAST (demux),
983 gst_message_new_element (GST_OBJECT_CAST (demux),
984 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
985 "manifest-uri", G_TYPE_STRING,
986 demux->manifest_uri, "uri", G_TYPE_STRING,
988 "manifest-download-start", GST_TYPE_CLOCK_TIME,
990 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
991 gst_util_get_timestamp (), NULL)));
994 goto invalid_manifest;
996 /* Streams should have been added to the input period if the manifest parsing
998 if (!demux->input_period->streams)
1001 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1003 GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1004 /* Send duration message */
1005 if (!gst_adaptive_demux_is_live (demux)) {
1006 GstClockTime duration = demux_class->get_duration (demux);
1008 demux->priv->duration = duration;
1009 if (duration != GST_CLOCK_TIME_NONE) {
1010 GST_DEBUG_OBJECT (demux,
1011 "Sending duration message : %" GST_TIME_FORMAT,
1012 GST_TIME_ARGS (duration));
1013 gst_element_post_message (GST_ELEMENT (demux),
1014 gst_message_new_duration_changed (GST_OBJECT (demux)));
1016 GST_DEBUG_OBJECT (demux,
1017 "media duration unknown, can not send the duration message");
1021 TRACKS_LOCK (demux);
1022 /* New streams/tracks will have been added to the input period */
1023 /* The input period has streams, make it the active output period */
1024 /* FIXME : Factorize this into a function to make a period active */
1025 demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1026 ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1027 gst_adaptive_demux_post_collection (demux);
1028 TRACKS_UNLOCK (demux);
1030 gst_adaptive_demux_prepare_streams (demux,
1031 gst_adaptive_demux_is_live (demux));
1032 gst_adaptive_demux_start_tasks (demux);
1033 gst_adaptive_demux_start_manifest_update_task (demux);
1036 GST_MANIFEST_UNLOCK (demux);
1037 GST_API_UNLOCK (demux);
1044 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1052 GST_WARNING_OBJECT (demux, "No streams created from manifest");
1053 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1054 (_("This file contains no playable streams.")),
1055 ("No known stream formats found at the Manifest"));
1062 GST_MANIFEST_UNLOCK (demux);
1063 GST_API_UNLOCK (demux);
1065 /* In most cases, this will happen if we set a wrong url in the
1066 * source element and we have received the 404 HTML response instead of
1068 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1073 struct http_headers_collector
1075 GstAdaptiveDemux *demux;
1080 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1081 const GValue * value, gpointer userdata)
1083 struct http_headers_collector *hdr_data = userdata;
1084 GstAdaptiveDemux *demux = hdr_data->demux;
1085 const gchar *field_name = g_quark_to_string (field_id);
1087 if (G_UNLIKELY (value == NULL))
1088 return TRUE; /* This should not happen */
1090 if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1091 const gchar *user_agent = g_value_get_string (value);
1093 GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1094 downloadhelper_set_user_agent (demux->download_helper, user_agent);
1097 if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1098 g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1099 guint i = 0, prev_len = 0, total_len = 0;
1100 gchar **cookies = NULL;
1102 if (hdr_data->cookies != NULL)
1103 prev_len = g_strv_length (hdr_data->cookies);
1105 if (GST_VALUE_HOLDS_ARRAY (value)) {
1106 total_len = gst_value_array_get_size (value) + prev_len;
1107 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1109 for (i = 0; i < gst_value_array_get_size (value); i++) {
1110 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1111 g_value_get_string (gst_value_array_get_value (value, i)));
1112 cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1114 } else if (G_VALUE_HOLDS_STRING (value)) {
1115 total_len = 1 + prev_len;
1116 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1118 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1119 g_value_get_string (value));
1120 cookies[0] = g_value_dup_string (value);
1122 GST_WARNING_OBJECT (demux, "%s field is not string or array",
1123 g_quark_to_string (field_id));
1129 for (j = 0; j < prev_len; j++) {
1130 GST_DEBUG_OBJECT (demux,
1131 "Append existing cookie %s", hdr_data->cookies[j]);
1132 cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1135 cookies[total_len] = NULL;
1137 g_strfreev (hdr_data->cookies);
1138 hdr_data->cookies = cookies;
1142 if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1143 const gchar *referer = g_value_get_string (value);
1144 GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1146 downloadhelper_set_referer (demux->download_helper, referer);
1149 /* Date header can be used to estimate server offset */
1150 if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1151 const gchar *http_date = g_value_get_string (value);
1154 GstDateTime *datetime =
1155 gst_adaptive_demux_util_parse_http_head_date (http_date);
1158 GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1159 gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1161 GST_INFO_OBJECT (demux,
1162 "HTTP response Date %s", GST_STR_NULL (date_string));
1163 g_free (date_string);
1165 gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1174 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1177 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1180 switch (event->type) {
1181 case GST_EVENT_FLUSH_STOP:{
1182 GST_API_LOCK (demux);
1183 GST_MANIFEST_LOCK (demux);
1185 gst_adaptive_demux_reset (demux);
1187 ret = gst_pad_event_default (pad, parent, event);
1189 GST_MANIFEST_UNLOCK (demux);
1190 GST_API_UNLOCK (demux);
1196 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1197 if (!handle_incoming_manifest (demux)) {
1198 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1199 return gst_pad_event_default (pad, parent, event);
1201 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1203 GST_ERROR_OBJECT (demux,
1204 "Failed to acquire scheduler to handle manifest");
1205 return gst_pad_event_default (pad, parent, event);
1207 gst_event_unref (event);
1210 case GST_EVENT_STREAM_START:
1211 if (gst_event_parse_group_id (event, &demux->group_id))
1212 demux->have_group_id = TRUE;
1214 demux->have_group_id = FALSE;
1215 /* Swallow stream-start, we'll push our own */
1216 gst_event_unref (event);
1218 case GST_EVENT_SEGMENT:
1219 /* Swallow newsegments, we'll push our own */
1220 gst_event_unref (event);
1222 case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1223 const GstStructure *structure = gst_event_get_structure (event);
1224 struct http_headers_collector c = { demux, NULL };
1226 if (gst_structure_has_name (structure, "http-headers")) {
1227 if (gst_structure_has_field (structure, "request-headers")) {
1228 GstStructure *req_headers = NULL;
1229 gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1230 &req_headers, NULL);
1232 gst_structure_foreach (req_headers,
1233 gst_adaptive_demux_handle_upstream_http_header, &c);
1234 gst_structure_free (req_headers);
1237 if (gst_structure_has_field (structure, "response-headers")) {
1238 GstStructure *res_headers = NULL;
1239 gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1240 &res_headers, NULL);
1242 gst_structure_foreach (res_headers,
1243 gst_adaptive_demux_handle_upstream_http_header, &c);
1244 gst_structure_free (res_headers);
1249 downloadhelper_set_cookies (demux->download_helper, c.cookies);
1257 return gst_pad_event_default (pad, parent, event);
1260 static GstFlowReturn
1261 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1264 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1266 GST_MANIFEST_LOCK (demux);
1268 gst_adapter_push (demux->priv->input_adapter, buffer);
1270 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1271 (gint) gst_adapter_available (demux->priv->input_adapter));
1273 GST_MANIFEST_UNLOCK (demux);
1278 /* Called with TRACKS_LOCK taken */
1280 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1284 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1285 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1287 gst_adaptive_demux_track_flush (track);
1288 if (gst_pad_is_active (track->sinkpad)) {
1289 gst_pad_set_active (track->sinkpad, FALSE);
1290 gst_pad_set_active (track->sinkpad, TRUE);
1295 /* Resets all tracks to their initial state, ready to receive new data. */
1297 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1299 TRACKS_LOCK (demux);
1300 g_queue_foreach (demux->priv->periods,
1301 (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1302 TRACKS_UNLOCK (demux);
1305 /* Subclasses will call this function to ensure that a new input period is
1306 * available to receive new streams and tracks */
1308 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1310 if (demux->input_period && !demux->input_period->prepared) {
1311 GST_DEBUG_OBJECT (demux, "Using existing input period");
1315 if (demux->input_period) {
1316 GST_DEBUG_OBJECT (demux, "Closing previous period");
1317 demux->input_period->closed = TRUE;
1319 GST_DEBUG_OBJECT (demux, "Setting up new period");
1321 demux->input_period = gst_adaptive_demux_period_new (demux);
1326 /* must be called with manifest_lock taken */
1328 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1330 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1333 gst_adaptive_demux_stop_tasks (demux, TRUE);
1336 klass->reset (demux);
1338 /* Disable and remove all outputs */
1339 GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1340 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1341 gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1343 g_list_free (demux->priv->outputs);
1344 demux->priv->outputs = NULL;
1346 g_queue_clear_full (demux->priv->periods,
1347 (GDestroyNotify) gst_adaptive_demux_period_unref);
1349 /* The output period always has an extra ref taken on it */
1350 if (demux->output_period)
1351 gst_adaptive_demux_period_unref (demux->output_period);
1352 demux->output_period = NULL;
1353 /* The input period doesn't have an extra ref taken on it */
1354 demux->input_period = NULL;
1356 gst_adaptive_demux_start_new_period (demux);
1358 g_free (demux->manifest_uri);
1359 g_free (demux->manifest_base_uri);
1360 demux->manifest_uri = NULL;
1361 demux->manifest_base_uri = NULL;
1363 gst_adapter_clear (demux->priv->input_adapter);
1364 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1366 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1367 demux->instant_rate_multiplier = 1.0;
1369 demux->priv->duration = GST_CLOCK_TIME_NONE;
1371 demux->priv->percent = -1;
1372 demux->priv->is_buffering = TRUE;
1374 demux->have_group_id = FALSE;
1375 demux->group_id = G_MAXUINT;
1376 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1378 demux->priv->global_output_position = 0;
1380 demux->priv->n_audio_streams = 0;
1381 demux->priv->n_video_streams = 0;
1382 demux->priv->n_subtitle_streams = 0;
1384 gst_flow_combiner_reset (demux->priv->flowcombiner);
1388 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1390 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1392 GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1394 switch (GST_QUERY_TYPE (query)) {
1395 case GST_QUERY_BUFFERING:
1398 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1400 if (!demux->output_period) {
1401 if (format != GST_FORMAT_TIME) {
1402 GST_DEBUG_OBJECT (demux,
1403 "No period setup yet, can't answer non-TIME buffering queries");
1407 GST_DEBUG_OBJECT (demux,
1408 "No period setup yet, but still answering buffering query");
1416 return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1419 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1420 static GstAdaptiveDemux2Stream *
1421 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1425 /* We only look in the streams of the input period (i.e. with active streams) */
1426 for (iter = demux->input_period->streams; iter; iter = iter->next) {
1427 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1428 if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1436 /* TRACKS_LOCK held */
1437 static GstAdaptiveDemuxTrack *
1438 gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
1439 GstStreamType stream_type)
1443 for (iter = stream->tracks; iter; iter = iter->next) {
1444 GstAdaptiveDemuxTrack *track = iter->data;
1446 if (track->type == stream_type)
1453 /* MANIFEST and TRACKS lock held */
1455 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
1456 GstAdaptiveDemux2Stream * stream)
1460 GST_DEBUG_OBJECT (stream, "Updating track information from collection");
1462 for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
1464 GstStream *gst_stream =
1465 gst_stream_collection_get_stream (stream->stream_collection, i);
1466 GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
1467 GstAdaptiveDemuxTrack *track;
1469 if (stream_type == GST_STREAM_TYPE_UNKNOWN)
1471 track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
1473 GST_DEBUG_OBJECT (stream,
1474 "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
1479 if (track->upstream_stream_id)
1480 g_free (track->upstream_stream_id);
1481 track->upstream_stream_id =
1482 g_strdup (gst_stream_get_stream_id (gst_stream));
1488 tags_have_language_info (GstTagList * tags)
1490 const gchar *language = NULL;
1495 if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
1498 if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
1506 can_handle_collection (GstAdaptiveDemux2Stream * stream,
1507 GstStreamCollection * collection)
1510 guint nb_audio, nb_video, nb_text;
1511 gboolean have_audio_languages = TRUE;
1512 gboolean have_text_languages = TRUE;
1514 nb_audio = nb_video = nb_text = 0;
1516 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1517 GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
1518 GstTagList *tags = gst_stream_get_tags (gst_stream);
1520 GST_DEBUG_OBJECT (stream,
1521 "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
1522 switch (gst_stream_get_stream_type (gst_stream)) {
1523 case GST_STREAM_TYPE_AUDIO:
1524 have_audio_languages &= tags_have_language_info (tags);
1527 case GST_STREAM_TYPE_VIDEO:
1530 case GST_STREAM_TYPE_TEXT:
1531 have_text_languages &= tags_have_language_info (tags);
1539 /* Check that we either have at most 1 of each track type, or that
1540 * we have language tags for each to tell which is which */
1542 (nb_audio > 1 && !have_audio_languages) ||
1543 (nb_text > 1 && !have_text_languages)) {
1545 ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
1546 nb_audio, nb_video, nb_text);
1554 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1557 GstAdaptiveDemux2Stream *stream;
1558 GstStreamCollection *collection = NULL;
1559 gboolean pending_tracks_activated = FALSE;
1561 GST_MANIFEST_LOCK (demux);
1563 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1564 if (stream == NULL) {
1565 GST_WARNING_OBJECT (demux,
1566 "Failed to locate stream for collection message");
1570 gst_message_parse_stream_collection (msg, &collection);
1574 /* Check whether the collection is "sane" or not.
1576 * In the context of adaptive streaming, we can only handle multiplexed
1577 * content that provides at most one stream of valid types (audio, video,
1578 * text). Without this we cannot reliably match the output of this multiplex
1579 * to the various tracks.
1581 * FIXME : In the future and *IF* we encounter such streams, we could envision
1582 * supporting multiple streams of the same type if, and only if, they have
1583 * tags that allow differentiating them (ex: languages).
1585 if (!can_handle_collection (stream, collection)) {
1586 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1587 (_("Stream format can't be handled")),
1588 ("The streams provided by the multiplex are ambiguous"));
1592 /* Store the collection on the stream */
1593 gst_object_replace ((GstObject **) & stream->stream_collection,
1594 (GstObject *) collection);
1596 /* IF there are pending tracks, ask the subclass to handle that */
1597 if (stream->pending_tracks) {
1598 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1599 g_assert (demux_class->stream_update_tracks);
1600 demux_class->stream_update_tracks (demux, stream);
1601 TRACKS_LOCK (demux);
1602 stream->pending_tracks = FALSE;
1603 pending_tracks_activated = TRUE;
1604 if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1605 demux->input_period == demux->output_period)
1606 gst_adaptive_demux_post_collection (demux);
1608 g_assert (stream->tracks);
1609 TRACKS_LOCK (demux);
1610 /* If we already have assigned tracks, update the pending upstream stream_id
1611 * for each of them based on the collection information. */
1612 gst_adaptive_demux2_stream_update_tracks (demux, stream);
1615 /* If we discovered pending tracks and we no longer have any, we can ensure
1616 * selected tracks are started */
1617 if (pending_tracks_activated
1618 && !gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1619 GList *iter = demux->input_period->streams;
1620 for (; iter; iter = iter->next) {
1621 GstAdaptiveDemux2Stream *new_stream = iter->data;
1623 /* The stream that posted this collection was already started. If a
1624 * different stream is now selected, start it */
1625 if (stream != new_stream
1626 && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1627 gst_adaptive_demux2_stream_start (new_stream);
1630 TRACKS_UNLOCK (demux);
1633 GST_MANIFEST_UNLOCK (demux);
1635 gst_message_unref (msg);
1640 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1642 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1644 switch (GST_MESSAGE_TYPE (msg)) {
1645 case GST_MESSAGE_STREAM_COLLECTION:
1647 gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1650 case GST_MESSAGE_ERROR:{
1651 GstAdaptiveDemux2Stream *stream = NULL;
1653 gchar *debug = NULL;
1654 gchar *new_error = NULL;
1655 const GstStructure *details = NULL;
1657 GST_MANIFEST_LOCK (demux);
1659 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1660 if (stream == NULL) {
1661 GST_WARNING_OBJECT (demux,
1662 "Failed to locate stream for errored element");
1663 GST_MANIFEST_UNLOCK (demux);
1667 gst_message_parse_error (msg, &err, &debug);
1669 GST_WARNING_OBJECT (demux,
1670 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1671 err->message, debug);
1674 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1676 g_free (err->message);
1677 err->message = new_error;
1680 gst_message_parse_error_details (msg, &details);
1682 gst_structure_get_uint (details, "http-status-code",
1683 &stream->last_status_code);
1686 /* error, but ask to retry */
1687 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1688 gst_adaptive_demux2_stream_parse_error (stream, err);
1689 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1695 GST_MANIFEST_UNLOCK (demux);
1697 gst_message_unref (msg);
1706 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1709 /* must be called with manifest_lock taken */
1711 gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1712 GstAdaptiveDemux2Stream * stream)
1714 GstAdaptiveDemuxClass *klass;
1716 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1718 if (klass->get_presentation_offset == NULL)
1721 return klass->get_presentation_offset (demux, stream);
1724 /* must be called with manifest_lock taken */
1726 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1728 GstAdaptiveDemuxClass *klass;
1730 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1732 if (klass->get_period_start_time == NULL)
1735 return klass->get_period_start_time (demux);
1738 /* must be called with manifest_lock taken */
1740 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1741 gboolean first_and_live)
1744 GstClockTime period_start;
1745 GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1748 g_return_val_if_fail (demux->input_period->streams, FALSE);
1749 g_assert (demux->input_period->prepared == FALSE);
1751 new_streams = demux->input_period->streams;
1753 if (!gst_adaptive_demux2_is_running (demux)) {
1754 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1758 GST_DEBUG_OBJECT (demux,
1759 "Preparing %d streams for period %d , first_and_live:%d",
1760 g_list_length (new_streams), demux->input_period->period_num,
1763 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1764 GstAdaptiveDemux2Stream *stream = iter->data;
1766 GST_DEBUG_OBJECT (stream, "Preparing stream");
1768 stream->need_header = TRUE;
1769 stream->discont = TRUE;
1771 /* Grab the first stream time for live streams
1772 * * If the stream is selected
1773 * * Or it provides dynamic tracks (in which case we need to force an update)
1776 && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1777 || stream->pending_tracks)) {
1778 /* TODO we only need the first timestamp, maybe create a simple function to
1779 * get the current PTS of a fragment ? */
1780 GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1781 gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1783 GST_DEBUG_OBJECT (stream,
1784 "Got stream time %" GST_STIME_FORMAT,
1785 GST_STIME_ARGS (stream->fragment.stream_time));
1787 if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1788 min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1790 min_stream_time = stream->fragment.stream_time;
1795 period_start = gst_adaptive_demux_get_period_start_time (demux);
1797 /* For live streams, the subclass is supposed to seek to the current fragment
1798 * and then tell us its stream time in stream->fragment.stream_time. We now
1799 * also have to seek our demuxer segment to reflect this.
1801 * FIXME: This needs some refactoring at some point.
1803 if (first_and_live) {
1804 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1805 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1806 GST_SEEK_TYPE_NONE, -1, NULL);
1809 GST_DEBUG_OBJECT (demux,
1810 "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1811 " demux segment %" GST_SEGMENT_FORMAT,
1812 GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1815 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1816 GstAdaptiveDemux2Stream *stream = iter->data;
1817 stream->compute_segment = TRUE;
1818 stream->first_and_live = first_and_live;
1820 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1821 demux->input_period->prepared = TRUE;
1826 static GstAdaptiveDemuxTrack *
1827 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1831 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1832 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1833 if (!g_strcmp0 (track->stream_id, stream_id))
1840 /* TRACKS_LOCK hold */
1842 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1844 GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1845 GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1846 GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1848 gint min_percent = -1, percent;
1849 gboolean all_eos = TRUE;
1851 /* Go over all active tracks of the output period and update level */
1853 /* Check that all tracks are above their respective low thresholds (different
1854 * tracks may have different fragment durations yielding different buffering
1855 * percentages) Overall buffering percent is the lowest. */
1856 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1857 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1859 GST_LOG_OBJECT (demux,
1860 "Checking track '%s' active:%d selected:%d eos:%d level:%"
1861 GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1862 track->stream_id, track->active, track->selected, track->eos,
1863 GST_TIME_ARGS (track->level_time),
1864 GST_TIME_ARGS (track->buffering_threshold));
1866 if (track->active && track->selected) {
1871 if (min_level_time == GST_CLOCK_TIME_NONE) {
1872 min_level_time = track->level_time;
1873 } else if (track->level_time < min_level_time) {
1874 min_level_time = track->level_time;
1877 if (track->type & GST_STREAM_TYPE_VIDEO
1878 && video_level_time > track->level_time)
1879 video_level_time = track->level_time;
1881 if (track->type & GST_STREAM_TYPE_AUDIO
1882 && audio_level_time > track->level_time)
1883 audio_level_time = track->level_time;
1885 if (track->level_time != GST_CLOCK_TIME_NONE
1886 && track->buffering_threshold != 0) {
1888 gst_util_uint64_scale (track->level_time, 100,
1889 track->buffering_threshold);
1890 if (min_percent < 0 || cur_percent < min_percent)
1891 min_percent = cur_percent;
1897 GST_DEBUG_OBJECT (demux,
1898 "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1899 GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1901 /* Update demuxer video/audio level properties */
1902 GST_OBJECT_LOCK (demux);
1903 demux->current_level_time_video = video_level_time;
1904 demux->current_level_time_audio = audio_level_time;
1905 GST_OBJECT_UNLOCK (demux);
1907 if (min_percent < 0 && !all_eos)
1910 if (min_percent > 100 || all_eos)
1913 percent = MAX (0, min_percent);
1915 GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1917 if (demux->priv->is_buffering) {
1919 demux->priv->is_buffering = FALSE;
1920 if (demux->priv->percent != percent) {
1921 demux->priv->percent = percent;
1922 demux->priv->percent_changed = TRUE;
1924 } else if (percent < 1) {
1925 demux->priv->is_buffering = TRUE;
1926 if (demux->priv->percent != percent) {
1927 demux->priv->percent = percent;
1928 demux->priv->percent_changed = TRUE;
1932 if (demux->priv->percent_changed)
1933 GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1934 demux->priv->is_buffering);
1937 /* With TRACKS_LOCK held */
1939 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1944 if (!demux->priv->percent_changed)
1947 BUFFERING_LOCK (demux);
1948 percent = demux->priv->percent;
1949 msg = gst_message_new_buffering ((GstObject *) demux, percent);
1950 TRACKS_UNLOCK (demux);
1951 gst_element_post_message ((GstElement *) demux, msg);
1953 BUFFERING_UNLOCK (demux);
1954 TRACKS_LOCK (demux);
1955 if (percent == demux->priv->percent)
1956 demux->priv->percent_changed = FALSE;
1959 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1960 GstAdaptiveDemux2Stream *
1961 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1962 GstAdaptiveDemuxTrack * track)
1966 for (iter = demux->output_period->streams; iter; iter = iter->next) {
1967 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1968 if (g_list_find (stream->tracks, track))
1975 /* Scheduler context held, takes TRACKS_LOCK */
1976 static GstAdaptiveDemux2Stream *
1977 gst_adaptive_demux_find_stream_for_pad (GstAdaptiveDemux * demux, GstPad * pad)
1980 GstAdaptiveDemuxTrack *track = NULL;
1981 GstAdaptiveDemux2Stream *stream = NULL;
1983 TRACKS_LOCK (demux);
1984 for (iter = demux->output_period->tracks; iter; iter = g_list_next (iter)) {
1985 OutputSlot *cand = iter->data;
1986 if (cand->pad == pad) {
1987 track = cand->track;
1993 stream = find_stream_for_track_locked (demux, track);
1995 TRACKS_UNLOCK (demux);
2000 /* Called from seek handler
2002 * This function is used when a (flushing) seek caused a new period to be activated.
2004 * This will ensure that:
2005 * * the current output period is marked as finished (EOS)
2006 * * Any potential intermediate (non-input/non-output) periods are removed
2007 * * That the new input period is prepared and ready
2010 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
2014 GST_DEBUG_OBJECT (demux,
2015 "Preparing new input period %u", demux->input_period->period_num);
2017 /* Prepare the new input period */
2018 gst_adaptive_demux_update_collection (demux, demux->input_period);
2020 /* Transfer the previous selection to the new input period */
2021 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
2022 demux->output_period);
2023 gst_adaptive_demux_prepare_streams (demux, FALSE);
2025 /* Remove all periods except for the input (last) and output (first) period */
2026 while (demux->priv->periods->length > 2) {
2027 GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
2028 /* Mark all tracks of the removed period as not selected and EOS so they
2029 * will be skipped / ignored */
2030 for (iter = period->tracks; iter; iter = iter->next) {
2031 GstAdaptiveDemuxTrack *track = iter->data;
2032 track->selected = FALSE;
2035 gst_adaptive_demux_period_unref (period);
2038 /* Mark all tracks of the output period as EOS so that the output loop
2039 * will immediately move to the new period */
2040 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2041 GstAdaptiveDemuxTrack *track = iter->data;
2045 /* Go over all slots, and clear any pending track */
2046 for (iter = demux->priv->outputs; iter; iter = iter->next) {
2047 OutputSlot *slot = (OutputSlot *) iter->data;
2049 if (slot->pending_track != NULL) {
2050 GST_DEBUG_OBJECT (demux,
2051 "Removing track '%s' as pending from output of current track '%s'",
2052 slot->pending_track->stream_id, slot->track->stream_id);
2053 gst_adaptive_demux_track_unref (slot->pending_track);
2054 slot->pending_track = NULL;
2059 /* must be called with manifest_lock taken */
2061 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
2062 gint64 * range_start, gint64 * range_stop)
2064 GstAdaptiveDemuxClass *klass;
2066 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2068 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
2070 return klass->get_live_seek_range (demux, range_start, range_stop);
2073 /* must be called with manifest_lock taken */
2075 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
2076 GstAdaptiveDemux2Stream * stream)
2078 gint64 range_start, range_stop;
2079 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
2080 GST_LOG_OBJECT (stream,
2081 "stream position %" GST_TIME_FORMAT " live seek range %"
2082 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
2083 GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
2084 GST_STIME_ARGS (range_stop));
2085 return (stream->current_position >= range_start
2086 && stream->current_position <= range_stop);
2092 /* must be called with manifest_lock taken */
2094 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
2096 GstAdaptiveDemuxClass *klass;
2098 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2099 if (gst_adaptive_demux_is_live (demux)) {
2100 return klass->get_live_seek_range != NULL;
2103 return klass->seek != NULL;
2107 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
2108 GstSeekType start_type, GstSeekType stop_type)
2112 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2113 GstAdaptiveDemux2Stream *stream = iter->data;
2115 /* Make sure the download loop clears and restarts on the next start,
2116 * which will recompute the stream segment */
2117 g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2118 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
2119 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2120 stream->start_position = 0;
2122 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
2123 stream->start_position = demux->segment.start;
2124 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
2125 stream->start_position = demux->segment.stop;
2129 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
2130 GST_SEEK_FLAG_SNAP_AFTER | \
2131 GST_SEEK_FLAG_SNAP_NEAREST | \
2132 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
2133 GST_SEEK_FLAG_KEY_UNIT))
2134 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
2135 GST_SEEK_FLAG_SNAP_AFTER | \
2136 GST_SEEK_FLAG_SNAP_NEAREST))
2139 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux, GstPad * pad,
2142 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2146 GstSeekType start_type, stop_type;
2151 GstSegment oldsegment;
2152 GstAdaptiveDemux2Stream *stream = NULL;
2153 GstEvent *flush_event;
2155 GST_INFO_OBJECT (demux, "Received seek event");
2157 GST_API_LOCK (demux);
2159 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2162 if (format != GST_FORMAT_TIME) {
2163 GST_API_UNLOCK (demux);
2164 GST_WARNING_OBJECT (demux,
2165 "Adaptive demuxers only support TIME-based seeking");
2166 gst_event_unref (event);
2170 if (flags & GST_SEEK_FLAG_SEGMENT) {
2171 GST_FIXME_OBJECT (demux, "Handle segment seeks");
2172 GST_API_UNLOCK (demux);
2173 gst_event_unref (event);
2177 seqnum = gst_event_get_seqnum (event);
2179 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2180 GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2184 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2185 /* For instant rate seeks, reply directly and update
2186 * our segment so the new rate is reflected in any future
2189 gdouble rate_multiplier;
2191 /* instant rate change only supported if direction does not change. All
2192 * other requirements are already checked before creating the seek event
2193 * but let's double-check here to be sure */
2194 if ((demux->segment.rate > 0 && rate < 0) ||
2195 (demux->segment.rate < 0 && rate > 0) ||
2196 start_type != GST_SEEK_TYPE_NONE ||
2197 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2198 GST_ERROR_OBJECT (demux,
2199 "Instant rate change seeks only supported in the "
2200 "same direction, without flushing and position change");
2201 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2202 GST_API_UNLOCK (demux);
2206 rate_multiplier = rate / demux->segment.rate;
2208 ev = gst_event_new_instant_rate_change (rate_multiplier,
2209 (GstSegmentFlags) flags);
2210 gst_event_set_seqnum (ev, seqnum);
2212 ret = gst_adaptive_demux_push_src_event (demux, ev);
2215 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2216 demux->instant_rate_multiplier = rate_multiplier;
2217 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2220 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2221 GST_API_UNLOCK (demux);
2222 gst_event_unref (event);
2227 if (!gst_adaptive_demux_can_seek (demux)) {
2228 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2230 GST_API_UNLOCK (demux);
2231 gst_event_unref (event);
2235 /* We can only accept flushing seeks from this point onward */
2236 if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2237 GST_ERROR_OBJECT (demux,
2238 "Non-flushing non-instant-rate seeks are not possible");
2240 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2242 GST_API_UNLOCK (demux);
2243 gst_event_unref (event);
2247 if (gst_adaptive_demux_is_live (demux)) {
2248 gint64 range_start, range_stop;
2249 gboolean changed = FALSE;
2250 gboolean start_valid = TRUE, stop_valid = TRUE;
2252 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2254 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2255 GST_API_UNLOCK (demux);
2256 gst_event_unref (event);
2257 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2261 GST_DEBUG_OBJECT (demux,
2262 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2263 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2265 /* Handle relative positioning for live streams (relative to the range_stop) */
2266 if (start_type == GST_SEEK_TYPE_END) {
2267 start = range_stop + start;
2268 start_type = GST_SEEK_TYPE_SET;
2271 if (stop_type == GST_SEEK_TYPE_END) {
2272 stop = range_stop + stop;
2273 stop_type = GST_SEEK_TYPE_SET;
2277 /* Adjust the requested start/stop position if it falls beyond the live
2279 * The only case where we don't adjust is for the starting point of
2280 * an accurate seek (start if forward and stop if backwards)
2282 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2283 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2284 GST_DEBUG_OBJECT (demux,
2285 "seek before live stream start, setting to range start: %"
2286 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2287 start = range_start;
2290 /* truncate stop position also if set */
2291 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2292 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2293 GST_DEBUG_OBJECT (demux,
2294 "seek ending after live start, adjusting to: %"
2295 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2300 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2301 (start < range_start || start > range_stop)) {
2302 GST_WARNING_OBJECT (demux,
2303 "Seek to invalid position start:%" GST_STIME_FORMAT
2304 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2305 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2306 GST_STIME_ARGS (range_stop));
2307 start_valid = FALSE;
2309 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2310 (stop < range_start || stop > range_stop)) {
2311 GST_WARNING_OBJECT (demux,
2312 "Seek to invalid position stop:%" GST_STIME_FORMAT
2313 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2314 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2315 GST_STIME_ARGS (range_stop));
2319 /* If the seek position is still outside of the seekable range, refuse the seek */
2320 if (!start_valid || !stop_valid) {
2321 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2322 GST_API_UNLOCK (demux);
2323 gst_event_unref (event);
2327 /* Re-create seek event with changed/updated values */
2329 gst_event_unref (event);
2331 gst_event_new_seek (rate, format, flags,
2332 start_type, start, stop_type, stop);
2333 gst_event_set_seqnum (event, seqnum);
2337 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2339 /* have a backup in case seek fails */
2340 gst_segment_copy_into (&demux->segment, &oldsegment);
2342 GST_DEBUG_OBJECT (demux, "sending flush start");
2343 flush_event = gst_event_new_flush_start ();
2344 gst_event_set_seqnum (flush_event, seqnum);
2346 gst_adaptive_demux_push_src_event (demux, flush_event);
2348 gst_adaptive_demux_stop_tasks (demux, FALSE);
2349 gst_adaptive_demux_reset_tracks (demux);
2351 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2354 * Handle snap seeks as follows:
2355 * 1) do the snap seeking on the stream that received
2357 * 2) use the final position on this stream to seek
2358 * on the other streams to the same position
2360 * We can't snap at all streams at the same time as
2361 * they might end in different positions, so just
2362 * use the one that received the event as the 'leading'
2363 * one to do the snap seek.
2365 * FIXME: Could use the global_output_position (running time)
2366 * as the snap reference
2368 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek && (stream =
2369 gst_adaptive_demux_find_stream_for_pad (demux, pad))) {
2370 GstClockTimeDiff ts;
2371 GstSeekFlags stream_seek_flags = flags;
2373 /* snap-seek on the stream that received the event and then
2374 * use the resulting position to seek on all streams */
2377 if (start_type != GST_SEEK_TYPE_NONE)
2380 ts = stream->current_position;
2381 start_type = GST_SEEK_TYPE_SET;
2384 if (stop_type != GST_SEEK_TYPE_NONE)
2387 stop_type = GST_SEEK_TYPE_SET;
2388 ts = stream->current_position;
2393 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2396 /* replace event with a new one without snapping to seek on all streams */
2397 gst_event_unref (event);
2404 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2405 start_type, start, stop_type, stop);
2406 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2410 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2411 start, stop_type, stop, &update);
2414 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2416 ret = demux_class->seek (demux, event);
2420 /* Is there anything else we can do if it fails? */
2421 gst_segment_copy_into (&oldsegment, &demux->segment);
2423 demux->priv->segment_seqnum = seqnum;
2425 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2427 /* Resetting flow combiner */
2428 gst_flow_combiner_reset (demux->priv->flowcombiner);
2430 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2431 flush_event = gst_event_new_flush_stop (TRUE);
2432 gst_event_set_seqnum (flush_event, seqnum);
2433 gst_adaptive_demux_push_src_event (demux, flush_event);
2435 /* If the seek generated a new period, prepare it */
2436 if (!demux->input_period->prepared) {
2437 /* This can only happen on flushing seeks */
2438 g_assert (flags & GST_SEEK_FLAG_FLUSH);
2439 gst_adaptive_demux_seek_to_input_period (demux);
2442 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2443 GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2445 gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2446 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2448 /* Reset the global output position (running time) for when the output loop restarts */
2449 demux->priv->global_output_position = 0;
2451 /* After a flushing seek, any instant-rate override is undone */
2452 demux->instant_rate_multiplier = 1.0;
2454 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2456 /* Restart the demux */
2457 gst_adaptive_demux_start_tasks (demux);
2459 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2460 GST_API_UNLOCK (demux);
2461 gst_event_unref (event);
2466 /* Returns TRUE if the stream has at least one selected track */
2468 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2473 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2474 GstAdaptiveDemuxTrack *track = tmp->data;
2476 if (track->selected)
2484 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2487 gboolean selection_handled = TRUE;
2489 GList *tracks = NULL;
2491 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2494 TRACKS_LOCK (demux);
2495 /* Validate the streams and fill:
2496 * tracks : list of tracks corresponding to requested streams
2498 for (iter = streams; iter; iter = iter->next) {
2499 gchar *stream_id = (gchar *) iter->data;
2500 GstAdaptiveDemuxTrack *track;
2502 GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2503 track = find_track_for_stream_id (demux->output_period, stream_id);
2505 GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2506 selection_handled = FALSE;
2507 goto select_streams_done;
2509 tracks = g_list_append (tracks, track);
2510 GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2513 /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2514 * SCHEDULING THREAD */
2516 /* FIXME: We want to iterate all streams, mark them as deselected,
2517 * then iterate tracks and mark any streams that have at least 1
2518 * active output track, then loop over all streams again and start/stop
2521 /* Go over all tracks present and (de)select based on current selection */
2522 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2523 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2525 if (track->selected && !g_list_find (tracks, track)) {
2526 GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2527 track->stream_id, track->active);
2528 track->selected = FALSE;
2529 track->draining = TRUE;
2530 } else if (!track->selected && g_list_find (tracks, track)) {
2531 GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2533 track->selected = TRUE;
2537 /* Start or stop streams based on the updated track selection */
2538 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2539 GstAdaptiveDemux2Stream *stream = iter->data;
2542 gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2543 gboolean should_be_running =
2544 gst_adaptive_demux2_stream_has_selected_tracks (stream);
2546 if (!is_running && should_be_running) {
2547 GstClockTime output_running_ts = demux->priv->global_output_position;
2548 GstClockTime start_position;
2550 /* Calculate where we should start the stream, and then
2552 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2554 GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2555 GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2556 GST_TIME_ARGS (output_running_ts), &demux->segment);
2559 gst_segment_position_from_running_time (&demux->segment,
2560 GST_FORMAT_TIME, output_running_ts);
2562 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2564 GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2565 GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2567 stream->current_position = stream->start_position = start_position;
2568 stream->compute_segment = TRUE;
2570 /* If output has already begun, ensure we seek this segment
2571 * to the correct restart position when the download loop begins */
2572 if (output_running_ts != 0)
2573 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2575 /* Activate track pads for this stream */
2576 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2577 GstAdaptiveDemuxTrack *track =
2578 (GstAdaptiveDemuxTrack *) trackiter->data;
2579 gst_pad_set_active (track->sinkpad, TRUE);
2582 gst_adaptive_demux2_stream_start (stream);
2583 } else if (is_running && !should_be_running) {
2584 /* Stream should not be running and needs stopping */
2585 gst_adaptive_demux2_stream_stop (stream);
2587 /* Set all track sinkpads to inactive for this stream */
2588 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2589 GstAdaptiveDemuxTrack *track =
2590 (GstAdaptiveDemuxTrack *) trackiter->data;
2591 gst_pad_set_active (track->sinkpad, FALSE);
2596 g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2598 select_streams_done:
2599 demux_update_buffering_locked (demux);
2600 demux_post_buffering_locked (demux);
2602 TRACKS_UNLOCK (demux);
2603 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2606 g_list_free (tracks);
2607 return selection_handled;
2611 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2614 GstAdaptiveDemux *demux;
2616 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2618 switch (event->type) {
2619 case GST_EVENT_SEEK:
2621 guint32 seqnum = gst_event_get_seqnum (event);
2622 if (seqnum == demux->priv->segment_seqnum) {
2623 GST_LOG_OBJECT (pad,
2624 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2625 gst_event_unref (event);
2628 return gst_adaptive_demux_handle_seek_event (demux, pad, event);
2630 case GST_EVENT_LATENCY:{
2631 /* Upstream and our internal source are irrelevant
2632 * for latency, and we should not fail here to
2633 * configure the latency */
2634 gst_event_unref (event);
2637 case GST_EVENT_QOS:{
2638 GstClockTimeDiff diff;
2639 GstClockTime timestamp;
2640 GstClockTime earliest_time;
2642 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2643 /* Only take into account lateness if late */
2645 earliest_time = timestamp + 2 * diff;
2647 earliest_time = timestamp;
2649 GST_OBJECT_LOCK (demux);
2650 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2651 earliest_time > demux->priv->qos_earliest_time) {
2652 demux->priv->qos_earliest_time = earliest_time;
2653 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2654 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2656 GST_OBJECT_UNLOCK (demux);
2659 case GST_EVENT_SELECT_STREAMS:
2662 gboolean selection_handled;
2664 if (GST_EVENT_SEQNUM (event) ==
2665 g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2666 GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2667 GST_EVENT_SEQNUM (event));
2671 gst_event_parse_select_streams (event, &streams);
2673 handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2674 g_list_free_full (streams, g_free);
2675 return selection_handled;
2681 return gst_pad_event_default (pad, parent, event);
2685 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2688 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2689 gboolean ret = FALSE;
2695 switch (query->type) {
2696 case GST_QUERY_DURATION:{
2698 GstClockTime duration = GST_CLOCK_TIME_NONE;
2700 gst_query_parse_duration (query, &fmt, NULL);
2702 if (gst_adaptive_demux_is_live (demux)) {
2703 /* We are able to answer this query: the duration is unknown */
2704 gst_query_set_duration (query, fmt, -1);
2709 if (fmt == GST_FORMAT_TIME
2710 && g_atomic_int_get (&demux->priv->have_manifest)) {
2712 GST_MANIFEST_LOCK (demux);
2713 duration = demux->priv->duration;
2714 GST_MANIFEST_UNLOCK (demux);
2716 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2717 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2722 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2723 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2726 case GST_QUERY_LATENCY:{
2727 gst_query_set_latency (query, FALSE, 0, -1);
2731 case GST_QUERY_SEEKING:{
2736 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2737 GST_INFO_OBJECT (demux,
2738 "Don't have manifest yet, can't answer seeking query");
2739 return FALSE; /* can't answer without manifest */
2742 GST_MANIFEST_LOCK (demux);
2744 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2745 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2746 if (fmt == GST_FORMAT_TIME) {
2747 GstClockTime duration;
2748 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2752 if (gst_adaptive_demux_is_live (demux)) {
2753 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2755 GST_MANIFEST_UNLOCK (demux);
2756 GST_INFO_OBJECT (demux, "can't answer seeking query");
2760 duration = demux->priv->duration;
2761 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2765 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2766 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2767 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2768 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2770 GST_MANIFEST_UNLOCK (demux);
2775 GST_MANIFEST_LOCK (demux);
2777 /* TODO HLS can answer this differently it seems */
2778 if (demux->manifest_uri) {
2779 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2780 * playlist or the the uri of the last downlowaded fragment? */
2781 gst_query_set_uri (query, demux->manifest_uri);
2785 GST_MANIFEST_UNLOCK (demux);
2787 case GST_QUERY_SELECTABLE:
2788 gst_query_set_selectable (query, TRUE);
2792 /* Don't forward queries upstream because of the special nature of this
2793 * "demuxer", which relies on the upstream element only to be fed
2802 /* Called when the scheduler starts, to kick off manifest updates
2803 * and stream downloads */
2805 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2809 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2811 iter = demux->input_period->streams;
2813 for (; iter; iter = g_list_next (iter)) {
2814 GstAdaptiveDemux2Stream *stream = iter->data;
2816 /* If we need to process this stream to discover tracks *OR* it has any
2817 * tracks which are selected, start it now */
2818 if ((stream->pending_tracks == TRUE)
2819 || gst_adaptive_demux2_stream_is_selected_locked (stream))
2820 gst_adaptive_demux2_stream_start (stream);
2826 /* must be called with manifest_lock taken */
2828 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2830 if (!gst_adaptive_demux2_is_running (demux)) {
2831 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2835 GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2836 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2837 (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2839 TRACKS_LOCK (demux);
2840 demux->priv->flushing = FALSE;
2841 GST_DEBUG_OBJECT (demux, "Starting the output task");
2842 gst_task_start (demux->priv->output_task);
2843 TRACKS_UNLOCK (demux);
2846 /* must be called with manifest_lock taken */
2848 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2850 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2851 if (demux->priv->manifest_updates_cb != 0) {
2852 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2853 demux->priv->manifest_updates_cb);
2854 demux->priv->manifest_updates_cb = 0;
2858 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2860 /* must be called with manifest_lock taken */
2862 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2864 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2866 if (gst_adaptive_demux_is_live (demux)) {
2867 /* Task to periodically update the manifest */
2868 if (demux_class->requires_periodical_playlist_update (demux)) {
2869 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2870 if (demux->priv->manifest_updates_cb == 0) {
2871 demux->priv->manifest_updates_cb =
2872 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2873 (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2879 /* must be called with manifest_lock taken
2880 * This function will temporarily release manifest_lock in order to join the
2882 * The api_lock will still protect it against other threads trying to modify
2883 * the demux element.
2886 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2888 GST_LOG_OBJECT (demux, "Stopping tasks");
2891 gst_adaptive_demux_stop_manifest_update_task (demux);
2893 TRACKS_LOCK (demux);
2894 demux->priv->flushing = TRUE;
2895 g_cond_signal (&demux->priv->tracks_add);
2896 gst_task_stop (demux->priv->output_task);
2897 TRACKS_UNLOCK (demux);
2899 gst_task_join (demux->priv->output_task);
2901 if (demux->input_period)
2902 gst_adaptive_demux_period_stop_tasks (demux->input_period);
2904 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2907 /* must be called with manifest_lock taken */
2909 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2912 gboolean ret = TRUE;
2914 GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2916 TRACKS_LOCK (demux);
2917 for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2918 OutputSlot *slot = (OutputSlot *) iter->data;
2919 gst_event_ref (event);
2920 GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2921 ret = ret & gst_pad_push_event (slot->pad, event);
2922 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2923 slot->pushed_timed_data = FALSE;
2925 TRACKS_UNLOCK (demux);
2926 gst_event_unref (event);
2930 /* must be called with manifest_lock taken */
2932 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2935 GST_DEBUG_OBJECT (stream,
2936 "setting new caps for stream %" GST_PTR_FORMAT, caps);
2937 gst_caps_replace (&stream->pending_caps, caps);
2938 gst_caps_unref (caps);
2941 /* must be called with manifest_lock taken */
2943 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2946 GST_DEBUG_OBJECT (stream,
2947 "setting new tags for stream %" GST_PTR_FORMAT, tags);
2948 if (stream->pending_tags) {
2949 gst_tag_list_unref (stream->pending_tags);
2951 stream->pending_tags = tags;
2954 /* must be called with manifest_lock taken */
2956 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2959 stream->pending_events = g_list_append (stream->pending_events, event);
2963 _update_average_bitrate (GstAdaptiveDemux * demux,
2964 GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
2966 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2968 stream->moving_bitrate -= stream->fragment_bitrates[index];
2969 stream->fragment_bitrates[index] = new_bitrate;
2970 stream->moving_bitrate += new_bitrate;
2972 stream->moving_index += 1;
2974 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2975 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2976 return stream->moving_bitrate / stream->moving_index;
2980 gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2981 GstAdaptiveDemux2Stream * stream)
2983 guint64 average_bitrate;
2984 guint64 fragment_bitrate;
2985 guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
2987 fragment_bitrate = stream->last_bitrate;
2988 GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2991 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2993 GST_INFO_OBJECT (stream,
2994 "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
2995 GST_INFO_OBJECT (stream,
2996 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
2997 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
2999 /* Conservative approach, make sure we don't upgrade too fast */
3000 GST_OBJECT_LOCK (demux);
3001 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
3003 /* If this is the/a video stream update the overall demuxer
3004 * reported bitrate and notify, to give the application a
3005 * chance to choose a new connection-bitrate */
3006 if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
3007 demux->current_download_rate = stream->current_download_rate;
3008 GST_OBJECT_UNLOCK (demux);
3009 g_object_notify (G_OBJECT (demux), "current-bandwidth");
3010 GST_OBJECT_LOCK (demux);
3013 connection_speed = demux->connection_speed;
3014 min_bitrate = demux->min_bitrate;
3015 max_bitrate = demux->max_bitrate;
3016 GST_OBJECT_UNLOCK (demux);
3018 if (connection_speed) {
3019 GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
3020 connection_speed / 1000);
3021 return connection_speed;
3024 /* No explicit connection_speed, so choose the new variant to use as a
3025 * fraction of the measured download rate */
3026 target_download_rate =
3027 CLAMP (stream->current_download_rate, 0,
3028 G_MAXUINT) * demux->bandwidth_target_ratio;
3030 GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
3031 demux->bandwidth_target_ratio, target_download_rate);
3034 /* Debugging code, modulate the bitrate every few fragments */
3036 static guint ctr = 0;
3038 GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
3039 target_download_rate /= 2;
3045 if (min_bitrate > 0 && target_download_rate < min_bitrate) {
3046 target_download_rate = min_bitrate;
3047 GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
3051 if (max_bitrate > 0 && target_download_rate > max_bitrate) {
3052 target_download_rate = max_bitrate;
3053 GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
3057 GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
3058 target_download_rate);
3060 return target_download_rate;
3063 /* must be called with manifest_lock taken */
3064 static GstFlowReturn
3065 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
3066 GstAdaptiveDemux2Stream * stream)
3068 /* No need to advance, this isn't a real fragment */
3069 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
3072 return gst_adaptive_demux2_stream_advance_fragment (demux, stream,
3073 stream->fragment.duration);
3076 /* must be called with manifest_lock taken.
3077 * Can temporarily release manifest_lock
3079 static GstFlowReturn
3080 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
3081 GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
3083 return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
3087 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
3093 /* Called when a stream needs waking after the manifest is updated */
3095 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
3097 demux->priv->stream_waiting_for_manifest = TRUE;
3101 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
3103 GstFlowReturn ret = GST_FLOW_OK;
3104 gboolean schedule_again = TRUE;
3106 GST_MANIFEST_LOCK (demux);
3107 demux->priv->manifest_updates_cb = 0;
3109 /* Updating playlist only needed for live playlists */
3110 if (!gst_adaptive_demux_is_live (demux)) {
3111 GST_MANIFEST_UNLOCK (demux);
3112 return G_SOURCE_REMOVE;
3115 GST_DEBUG_OBJECT (demux, "Updating playlist");
3116 ret = gst_adaptive_demux_update_manifest (demux);
3118 if (ret == GST_FLOW_EOS) {
3119 GST_MANIFEST_UNLOCK (demux);
3120 return G_SOURCE_REMOVE;
3123 if (ret == GST_FLOW_OK) {
3124 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
3125 demux->priv->update_failed_count = 0;
3127 /* Wake up download tasks */
3128 if (demux->priv->stream_waiting_for_manifest) {
3131 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
3132 GstAdaptiveDemux2Stream *stream = iter->data;
3133 gst_adaptive_demux2_stream_on_manifest_update (stream);
3135 demux->priv->stream_waiting_for_manifest = FALSE;
3138 demux->priv->update_failed_count++;
3140 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
3141 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
3142 gst_flow_get_name (ret));
3144 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
3145 (_("Internal data stream error.")), ("Could not update playlist"));
3146 GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
3147 schedule_again = FALSE;
3151 if (schedule_again) {
3152 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3154 demux->priv->manifest_updates_cb =
3155 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3156 klass->get_manifest_update_interval (demux) * GST_USECOND,
3157 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3160 GST_MANIFEST_UNLOCK (demux);
3162 return G_SOURCE_REMOVE;
3166 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
3168 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3170 /* Loop for updating of the playlist. This periodically checks if
3171 * the playlist is updated and does so, then signals the streaming
3172 * thread in case it can continue downloading now. */
3174 /* block until the next scheduled update or the signal to quit this thread */
3175 GST_DEBUG_OBJECT (demux, "Started updates task");
3176 demux->priv->manifest_updates_cb =
3177 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3178 klass->get_manifest_update_interval (demux) * GST_USECOND,
3179 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3181 return G_SOURCE_REMOVE;
3185 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
3186 GstAdaptiveDemuxTrack * track)
3190 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3191 OutputSlot *slot = (OutputSlot *) tmp->data;
3192 /* Incompatible output type */
3193 if (slot->type != track->type)
3196 /* Slot which is already assigned to this pending track */
3197 if (slot->pending_track == track)
3200 /* slot already used for another pending track */
3201 if (slot->pending_track != NULL)
3204 /* Current output track is of the same type and is draining */
3205 if (slot->track && slot->track->draining)
3212 /* TRACKS_LOCK taken */
3214 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3218 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3219 OutputSlot *slot = (OutputSlot *) tmp->data;
3221 if (slot->track == track)
3228 /* TRACKS_LOCK held */
3230 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3235 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3236 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3238 if (track->selected && !track->active)
3242 /* All selected tracks are active, created message */
3244 gst_message_new_streams_selected (GST_OBJECT (demux),
3245 demux->output_period->collection);
3246 GST_MESSAGE_SEQNUM (msg) = seqnum;
3247 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3248 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3249 if (track->active) {
3250 gst_message_streams_selected_add (msg, track->stream_object);
3258 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3261 GstAdaptiveDemuxTrack *track = slot->track;
3264 /* Send EVENT_STREAM_START */
3265 event = gst_event_new_stream_start (track->stream_id);
3266 if (demux->have_group_id)
3267 gst_event_set_group_id (event, demux->group_id);
3268 gst_event_set_stream_flags (event, track->flags);
3269 gst_event_set_stream (event, track->stream_object);
3270 GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3272 gst_pad_push_event (slot->pad, event);
3274 /* Send EVENT_STREAM_COLLECTION */
3275 event = gst_event_new_stream_collection (demux->output_period->collection);
3276 GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3278 gst_pad_push_event (slot->pad, event);
3280 /* Mark all sticky events for re-sending */
3281 gst_event_store_mark_all_undelivered (&track->sticky_events);
3285 * Called with TRACKS_LOCK taken
3288 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3291 guint requested_selection_seqnum;
3294 /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3295 output slots vs active/draining tracks */
3296 requested_selection_seqnum =
3297 g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3299 if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3302 GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3304 /* Go over all slots, and if they have a pending track that's no longer
3305 * selected, clear it so the slot can be reused */
3306 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3307 OutputSlot *slot = (OutputSlot *) tmp->data;
3309 if (slot->pending_track != NULL && !slot->pending_track->selected) {
3310 GST_DEBUG_OBJECT (demux,
3311 "Removing deselected track '%s' as pending from output of current track '%s'",
3312 slot->pending_track->stream_id, slot->track->stream_id);
3313 gst_adaptive_demux_track_unref (slot->pending_track);
3314 slot->pending_track = NULL;
3318 /* Go over all tracks and create/re-assign/remove slots */
3319 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3320 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3322 if (track->selected) {
3323 OutputSlot *slot = find_slot_for_track (demux, track);
3325 /* 0. Track is selected and has a slot. Nothing to do */
3327 GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3332 slot = find_replacement_slot_for_track (demux, track);
3334 /* 1. There is an existing slot of the same type which is currently
3335 * draining, assign this track as a replacement for it */
3336 g_assert (slot->pending_track == NULL || slot->pending_track == track);
3337 if (slot->pending_track == NULL) {
3338 slot->pending_track = gst_adaptive_demux_track_ref (track);
3339 GST_DEBUG_OBJECT (demux,
3340 "Track '%s' will be used on output of track '%s'",
3341 track->stream_id, slot->track->stream_id);
3344 /* 2. There is no compatible replacement slot, create a new one */
3345 slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3346 GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3348 demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3350 track->update_next_segment = TRUE;
3352 slot->track = gst_adaptive_demux_track_ref (track);
3353 track->active = TRUE;
3354 gst_adaptive_demux_send_initial_events (demux, slot);
3357 /* If we were draining this track, we no longer are */
3358 track->draining = FALSE;
3362 /* Finally check all slots have a current/pending track. If not remove it */
3363 for (tmp = demux->priv->outputs; tmp;) {
3364 OutputSlot *slot = (OutputSlot *) tmp->data;
3365 /* We should never has slots without target tracks */
3366 g_assert (slot->track);
3367 if (slot->track->draining && !slot->pending_track) {
3368 GstAdaptiveDemux2Stream *stream;
3370 GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3371 slot->track->stream_id);
3372 slot->track->active = FALSE;
3374 /* If the stream feeding this track is stopped, flush and clear
3375 * the track now that it's going inactive. If the stream was not
3376 * found, it means we advanced past that period already (and the
3377 * stream was stopped and discarded) */
3378 stream = find_stream_for_track_locked (demux, slot->track);
3379 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3380 gst_adaptive_demux_track_flush (slot->track);
3382 tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3383 gst_adaptive_demux_output_slot_free (demux, slot);
3388 demux->priv->current_selection_seqnum = requested_selection_seqnum;
3389 msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3391 TRACKS_UNLOCK (demux);
3392 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3393 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3394 TRACKS_LOCK (demux);
3398 /* TRACKS_LOCK held */
3400 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3403 GstAdaptiveDemuxPeriod *previous_period;
3404 GstStreamCollection *collection;
3406 /* Grab the next period, should be demux->periods->next->data */
3407 previous_period = g_queue_pop_head (demux->priv->periods);
3409 /* Remove ref held by demux->output_period */
3410 gst_adaptive_demux_period_unref (previous_period);
3411 demux->output_period =
3412 gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3414 GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3415 demux->output_period->period_num);
3417 /* We can now post the collection of the new period */
3418 collection = demux->output_period->collection;
3419 TRACKS_UNLOCK (demux);
3420 gst_element_post_message (GST_ELEMENT_CAST (demux),
3421 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3422 TRACKS_LOCK (demux);
3424 /* Unselect all tracks of the previous period */
3425 for (iter = previous_period->tracks; iter; iter = iter->next) {
3426 GstAdaptiveDemuxTrack *track = iter->data;
3427 if (track->selected) {
3428 track->selected = FALSE;
3429 track->draining = TRUE;
3433 /* Force a selection re-check */
3434 g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3435 check_and_handle_selection_update_locked (demux);
3437 /* Remove the final ref on the previous period now that we have done the switch */
3438 gst_adaptive_demux_period_unref (previous_period);
3443 /* Called with TRACKS_LOCK taken */
3445 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3448 GstAdaptiveDemuxTrack *track = slot->track;
3450 gboolean pending_is_ready;
3451 GstAdaptiveDemux2Stream *stream;
3453 /* If we have a pending track for this slot, the current track should be
3454 * draining and no longer selected */
3455 g_assert (track->draining && !track->selected);
3457 /* If we're draining, check if the pending track has enough data *or* that
3458 we've already drained out entirely */
3460 (slot->pending_track->level_time >=
3461 slot->pending_track->buffering_threshold);
3462 pending_is_ready |= slot->pending_track->eos;
3464 if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3465 GST_DEBUG_OBJECT (demux,
3466 "Replacement track '%s' doesn't have enough data for switching yet",
3467 slot->pending_track->stream_id);
3471 GST_DEBUG_OBJECT (demux,
3472 "Pending replacement track has enough data, switching");
3473 track->active = FALSE;
3474 track->draining = FALSE;
3476 /* If the stream feeding this track is stopped, flush and clear
3477 * the track now that it's going inactive. If the stream was not
3478 * found, it means we advanced past that period already (and the
3479 * stream was stopped and discarded) */
3480 stream = find_stream_for_track_locked (demux, track);
3481 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3482 gst_adaptive_demux_track_flush (track);
3484 gst_adaptive_demux_track_unref (track);
3485 /* We steal the reference of pending_track */
3486 track = slot->track = slot->pending_track;
3487 slot->pending_track = NULL;
3488 slot->track->active = TRUE;
3490 /* Make sure the track segment will start at the current position */
3491 track->update_next_segment = TRUE;
3493 /* Send stream start and collection, and schedule sticky events */
3494 gst_adaptive_demux_send_initial_events (demux, slot);
3496 /* Can we emit the streams-selected message now ? */
3498 all_selected_tracks_are_active (demux,
3499 g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3501 TRACKS_UNLOCK (demux);
3502 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3503 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3504 TRACKS_LOCK (demux);
3510 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3513 GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3514 gboolean wait_for_data = FALSE;
3517 GST_DEBUG_OBJECT (demux, "enter");
3519 TRACKS_LOCK (demux);
3521 /* Check if stopping */
3522 if (demux->priv->flushing) {
3523 ret = GST_FLOW_FLUSHING;
3527 /* If the selection changed, handle it */
3528 check_and_handle_selection_update_locked (demux);
3532 global_output_position = GST_CLOCK_STIME_NONE;
3533 if (wait_for_data) {
3534 GST_DEBUG_OBJECT (demux, "Waiting for data");
3535 g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3536 GST_DEBUG_OBJECT (demux, "Done waiting for data");
3537 if (demux->priv->flushing) {
3538 ret = GST_FLOW_FLUSHING;
3541 wait_for_data = FALSE;
3544 /* Grab/Recalculate current global output position
3545 * This is the minimum pending output position of all tracks used for output
3547 * If there is a track which is empty and not EOS, wait for it to receive data
3548 * then recalculate global output position.
3550 * This also pushes downstream all non-timed data that might be present.
3552 * IF all tracks are EOS : stop task
3554 GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3555 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3556 OutputSlot *slot = (OutputSlot *) tmp->data;
3557 GstAdaptiveDemuxTrack *track;
3559 /* If there is a pending track, Check if it's time to switch to it */
3560 if (slot->pending_track)
3561 handle_slot_pending_track_switch_locked (demux, slot);
3563 track = slot->track;
3565 if (!track->active) {
3566 /* Note: Edward: I can't see in what cases we would end up with inactive
3567 tracks assigned to slots. */
3568 GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3569 g_assert (track->active);
3573 if (track->next_position == GST_CLOCK_STIME_NONE) {
3574 gst_adaptive_demux_track_update_next_position (track);
3577 if (track->next_position != GST_CLOCK_STIME_NONE) {
3578 if (global_output_position == GST_CLOCK_STIME_NONE)
3579 global_output_position = track->next_position;
3581 global_output_position =
3582 MIN (global_output_position, track->next_position);
3583 track->waiting_add = FALSE;
3584 } else if (!track->eos) {
3585 GST_DEBUG_OBJECT (demux, "Need timed data on track %s", track->stream_id);
3586 wait_for_data = track->waiting_add = TRUE;
3588 GST_DEBUG_OBJECT (demux, "Track %s is EOS, not waiting for timed data",
3596 if (global_output_position == GST_CLOCK_STIME_NONE
3597 && demux->output_period->closed) {
3598 GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3599 demux->output_period->period_num);
3600 if (!gst_adaptive_demux_advance_output_period (demux)) {
3601 /* Failed to move to next period, error out */
3602 ret = GST_FLOW_ERROR;
3605 /* Restart the loop */
3609 GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3610 GST_STIME_ARGS (global_output_position));
3614 * We know all active tracks have pending timed data
3615 * * while track next_position <= global output position
3616 * * push pending data
3617 * * Update track next_position
3618 * * recalculate global output position
3619 * * Pop next pending data from track and update pending position
3622 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3623 OutputSlot *slot = (OutputSlot *) tmp->data;
3624 GstAdaptiveDemuxTrack *track = slot->track;
3626 GST_LOG_OBJECT (track->element,
3627 "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3628 " global_output_position:%" GST_STIME_FORMAT, track->active,
3629 track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3630 GST_STIME_ARGS (global_output_position));
3635 while (global_output_position == GST_CLOCK_STIME_NONE
3636 || !slot->pushed_timed_data
3637 || ((track->next_position != GST_CLOCK_STIME_NONE)
3638 && track->next_position <= global_output_position)) {
3639 GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3642 GST_DEBUG_OBJECT (demux,
3643 "Track '%s' doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3644 track->stream_id, track->eos, slot->pushed_timed_data);
3645 /* This should only happen if the track is EOS, or exactly in between
3646 * the parser outputting segment/caps before buffers. */
3647 g_assert (track->eos || !slot->pushed_timed_data);
3651 demux_update_buffering_locked (demux);
3652 demux_post_buffering_locked (demux);
3653 TRACKS_UNLOCK (demux);
3655 GST_DEBUG_OBJECT (demux, "Track '%s' dequeued %" GST_PTR_FORMAT,
3656 track->stream_id, mo);
3658 if (GST_IS_EVENT (mo)) {
3659 GstEvent *event = (GstEvent *) mo;
3660 if (GST_EVENT_TYPE (event) == GST_EVENT_GAP)
3661 slot->pushed_timed_data = TRUE;
3662 gst_pad_push_event (slot->pad, event);
3664 if (GST_EVENT_IS_STICKY (event))
3665 gst_event_store_mark_delivered (&track->sticky_events, event);
3666 } else if (GST_IS_BUFFER (mo)) {
3667 GstBuffer *buffer = (GstBuffer *) mo;
3669 if (track->output_discont) {
3670 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3671 buffer = gst_buffer_make_writable (buffer);
3672 GST_DEBUG_OBJECT (slot->pad,
3673 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3675 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3677 track->output_discont = FALSE;
3679 slot->flow_ret = gst_pad_push (slot->pad, buffer);
3681 gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3682 slot->pad, slot->flow_ret);
3683 GST_DEBUG_OBJECT (slot->pad,
3684 "track %s push returned %s (combined %s)", track->stream_id,
3685 gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3686 slot->pushed_timed_data = TRUE;
3688 GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3691 TRACKS_LOCK (demux);
3692 gst_adaptive_demux_track_update_next_position (track);
3694 if (ret != GST_FLOW_OK)
3699 /* Store global output position */
3700 if (global_output_position != GST_CLOCK_STIME_NONE)
3701 demux->priv->global_output_position = global_output_position;
3703 if (global_output_position == GST_CLOCK_STIME_NONE) {
3704 if (!demux->priv->flushing) {
3705 GST_DEBUG_OBJECT (demux,
3706 "Pausing output task after reaching NONE global_output_position");
3707 gst_task_pause (demux->priv->output_task);
3711 TRACKS_UNLOCK (demux);
3712 GST_DEBUG_OBJECT (demux, "leave");
3717 GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3718 /* If the flushing flag is set, then the task is being
3719 * externally stopped, so don't go to pause(), otherwise we
3720 * should so we don't keep spinning */
3721 if (!demux->priv->flushing) {
3722 GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3723 gst_flow_get_name (ret));
3724 gst_task_pause (demux->priv->output_task);
3727 TRACKS_UNLOCK (demux);
3729 if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3730 GstEvent *eos = gst_event_new_eos ();
3732 if (ret != GST_FLOW_EOS) {
3733 GST_ELEMENT_FLOW_ERROR (demux, ret);
3736 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3737 if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3738 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3739 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3741 gst_adaptive_demux_push_src_event (demux, eos);
3748 /* must be called from the scheduler */
3750 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3752 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3755 return klass->is_live (demux);
3759 /* must be called from the scheduler */
3761 gst_adaptive_demux2_stream_seek (GstAdaptiveDemux * demux,
3762 GstAdaptiveDemux2Stream * stream, gboolean forward, GstSeekFlags flags,
3763 GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
3765 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3767 if (klass->stream_seek)
3768 return klass->stream_seek (stream, forward, flags, ts, final_ts);
3769 return GST_FLOW_ERROR;
3772 /* must be called from the scheduler */
3774 gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux * demux,
3775 GstAdaptiveDemux2Stream * stream)
3777 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3778 gboolean ret = TRUE;
3780 if (klass->stream_has_next_fragment)
3781 ret = klass->stream_has_next_fragment (stream);
3786 /* must be called from the scheduler */
3788 * the ::finish_fragment() handlers when an *actual* fragment is done
3791 gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux * demux,
3792 GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3794 if (stream->last_ret != GST_FLOW_OK)
3795 return stream->last_ret;
3798 gst_adaptive_demux2_stream_advance_fragment_unlocked (demux, stream,
3801 return stream->last_ret;
3804 /* must be called with manifest_lock taken */
3805 static GstFlowReturn
3806 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
3807 GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3809 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3812 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
3814 GST_LOG_OBJECT (stream,
3815 "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3816 GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
3818 stream->download_error_count = 0;
3819 g_clear_error (&stream->last_error);
3822 /* FIXME - url has no indication of byte ranges for subsegments */
3823 /* FIXME: Reenable statistics sending? */
3824 gst_element_post_message (GST_ELEMENT_CAST (demux),
3825 gst_message_new_element (GST_OBJECT_CAST (demux),
3826 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
3827 "manifest-uri", G_TYPE_STRING,
3828 demux->manifest_uri, "uri", G_TYPE_STRING,
3829 stream->fragment.uri, "fragment-start-time",
3830 GST_TYPE_CLOCK_TIME, stream->download_start_time,
3831 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
3832 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
3833 stream->download_total_bytes, "fragment-download-time",
3834 GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
3837 /* Don't update to the end of the segment if in reverse playback */
3838 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3839 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
3840 stream->parse_segment.position += duration;
3841 stream->current_position += duration;
3843 GST_DEBUG_OBJECT (stream,
3844 "stream position now %" GST_TIME_FORMAT,
3845 GST_TIME_ARGS (stream->current_position));
3847 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3849 /* When advancing with a non 1.0 rate on live streams, we need to check
3850 * the live seeking range again to make sure we can still advance to
3852 if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
3853 if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
3856 ret = klass->stream_advance_fragment (stream);
3857 } else if (gst_adaptive_demux_is_live (demux)
3858 || gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
3859 ret = klass->stream_advance_fragment (stream);
3864 stream->download_start_time =
3865 GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
3867 if (ret == GST_FLOW_OK) {
3868 GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
3869 if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
3870 gst_adaptive_demux2_stream_update_current_bitrate (demux,
3872 GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
3873 stream->need_header = TRUE;
3874 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
3881 /* must be called with manifest_lock taken */
3883 gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
3884 demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
3886 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3888 if (klass->stream_select_bitrate)
3889 return klass->stream_select_bitrate (stream, bitrate);
3893 /* must be called with manifest_lock taken */
3895 gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux * demux,
3896 GstAdaptiveDemux2Stream * stream)
3898 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3901 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
3904 /* Make sure the sub-class will update bitrate, or else
3906 stream->fragment.finished = FALSE;
3908 GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
3909 GST_TIME_ARGS (stream->current_position));
3911 ret = klass->stream_update_fragment_info (stream);
3913 GST_LOG_OBJECT (stream, "ret:%s uri:%s",
3914 gst_flow_get_name (ret), stream->fragment.uri);
3915 if (ret == GST_FLOW_OK) {
3916 GST_LOG_OBJECT (stream,
3917 "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3918 GST_STIME_ARGS (stream->fragment.stream_time),
3919 GST_TIME_ARGS (stream->fragment.duration));
3920 GST_LOG_OBJECT (stream,
3921 "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
3922 stream->fragment.range_start, stream->fragment.range_end);
3928 /* must be called with manifest_lock taken */
3930 gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux *
3931 demux, GstAdaptiveDemux2Stream * stream)
3933 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3935 if (klass->stream_get_fragment_waiting_time)
3936 return klass->stream_get_fragment_waiting_time (stream);
3941 handle_manifest_download_complete (DownloadRequest * request,
3942 DownloadRequestState state, GstAdaptiveDemux * demux)
3944 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3946 GstFlowReturn result;
3948 g_free (demux->manifest_base_uri);
3949 g_free (demux->manifest_uri);
3951 if (request->redirect_permanent && request->redirect_uri) {
3952 demux->manifest_uri = g_strdup (request->redirect_uri);
3953 demux->manifest_base_uri = NULL;
3955 demux->manifest_uri = g_strdup (request->uri);
3956 demux->manifest_base_uri = g_strdup (request->redirect_uri);
3959 buffer = download_request_take_buffer (request);
3961 /* We should always have a buffer since this function is the non-error
3962 * callback for the download */
3965 result = klass->update_manifest_data (demux, buffer);
3966 gst_buffer_unref (buffer);
3968 /* FIXME: Should the manifest uri vars be reverted to original
3969 * values if updating fails? */
3971 if (result == GST_FLOW_OK) {
3972 GstClockTime duration;
3973 /* Send an updated duration message */
3974 duration = klass->get_duration (demux);
3975 if (duration != GST_CLOCK_TIME_NONE) {
3976 GST_DEBUG_OBJECT (demux,
3977 "Sending duration message : %" GST_TIME_FORMAT,
3978 GST_TIME_ARGS (duration));
3979 gst_element_post_message (GST_ELEMENT (demux),
3980 gst_message_new_duration_changed (GST_OBJECT (demux)));
3982 GST_DEBUG_OBJECT (demux,
3983 "Duration unknown, can not send the duration message");
3986 /* If a manifest changes it's liveness or periodic updateness, we need
3987 * to start/stop the manifest update task appropriately */
3988 /* Keep this condition in sync with the one in
3989 * gst_adaptive_demux_start_manifest_update_task()
3991 if (gst_adaptive_demux_is_live (demux) &&
3992 klass->requires_periodical_playlist_update (demux)) {
3993 gst_adaptive_demux_start_manifest_update_task (demux);
3995 gst_adaptive_demux_stop_manifest_update_task (demux);
4001 handle_manifest_download_failure (DownloadRequest * request,
4002 DownloadRequestState state, GstAdaptiveDemux * demux)
4004 GST_FIXME_OBJECT (demux, "Manifest download failed.");
4005 /* Retry or error out here */
4008 static GstFlowReturn
4009 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4011 DownloadRequest *request;
4012 GstFlowReturn ret = GST_FLOW_OK;
4013 GError *error = NULL;
4015 request = download_request_new_uri (demux->manifest_uri);
4017 download_request_set_callbacks (request,
4018 (DownloadRequestEventCallback) handle_manifest_download_complete,
4019 (DownloadRequestEventCallback) handle_manifest_download_failure,
4022 if (!downloadhelper_submit_request (demux->download_helper, NULL,
4023 DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
4026 GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
4027 ("Failed to download manifest: %s", error->message), (NULL));
4028 g_clear_error (&error);
4030 ret = GST_FLOW_NOT_LINKED;
4036 /* must be called with manifest_lock taken */
4038 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4040 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4043 ret = klass->update_manifest (demux);
4049 gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
4056 g_free (f->header_uri);
4057 f->header_uri = NULL;
4058 f->header_range_start = 0;
4059 f->header_range_end = -1;
4061 g_free (f->index_uri);
4062 f->index_uri = NULL;
4063 f->index_range_start = 0;
4064 f->index_range_end = -1;
4066 f->stream_time = GST_CLOCK_STIME_NONE;
4067 f->duration = GST_CLOCK_TIME_NONE;
4068 f->finished = FALSE;
4071 /* must be called with manifest_lock taken */
4073 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4075 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4076 gboolean ret = FALSE;
4078 if (klass->has_next_period)
4079 ret = klass->has_next_period (demux);
4080 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4084 /* must be called with manifest_lock taken */
4086 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4088 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4089 GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
4091 g_return_if_fail (klass->advance_period != NULL);
4093 GST_DEBUG_OBJECT (demux, "Advancing to next period");
4094 /* FIXME : no return value ? What if it fails ? */
4095 klass->advance_period (demux);
4097 if (previous_period == demux->input_period) {
4098 GST_ERROR_OBJECT (demux, "Advancing period failed");
4102 /* Stop the previous period stream tasks */
4103 gst_adaptive_demux_period_stop_tasks (previous_period);
4105 gst_adaptive_demux_update_collection (demux, demux->input_period);
4106 /* Figure out a pre-emptive selection based on the output period selection */
4107 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
4108 demux->output_period);
4110 gst_adaptive_demux_prepare_streams (demux, FALSE);
4111 gst_adaptive_demux_start_tasks (demux);
4115 * gst_adaptive_demux_get_monotonic_time:
4116 * Returns: a monotonically increasing time, using the system realtime clock
4119 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
4121 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4122 return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
4126 * gst_adaptive_demux_get_client_now_utc:
4127 * @demux: #GstAdaptiveDemux
4128 * Returns: the client's estimate of UTC
4130 * Used to find the client's estimate of UTC, using the system realtime clock.
4133 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
4135 return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
4139 * gst_adaptive_demux_is_running
4140 * @demux: #GstAdaptiveDemux
4141 * Returns: whether the demuxer is processing data
4143 * Returns FALSE if shutdown has started (transitioning down from
4144 * PAUSED), otherwise TRUE.
4147 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
4149 return g_atomic_int_get (&demux->running);
4153 * gst_adaptive_demux_get_qos_earliest_time:
4155 * Returns: The QOS earliest time
4160 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
4162 GstClockTime earliest;
4164 GST_OBJECT_LOCK (demux);
4165 earliest = demux->priv->qos_earliest_time;
4166 GST_OBJECT_UNLOCK (demux);
4172 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
4173 GstAdaptiveDemux2Stream * stream)
4175 g_return_val_if_fail (demux && stream, FALSE);
4177 /* FIXME : Migrate to parent */
4178 g_return_val_if_fail (stream->demux == NULL, FALSE);
4180 GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
4182 TRACKS_LOCK (demux);
4183 if (demux->input_period->prepared) {
4184 GST_ERROR_OBJECT (demux,
4185 "Attempted to add streams but no new period was created");
4186 TRACKS_UNLOCK (demux);
4189 stream->demux = demux;
4190 stream->period = demux->input_period;
4191 demux->input_period->streams =
4192 g_list_append (demux->input_period->streams, stream);
4194 if (stream->tracks) {
4196 for (iter = stream->tracks; iter; iter = iter->next)
4197 if (!gst_adaptive_demux_period_add_track (demux->input_period,
4198 (GstAdaptiveDemuxTrack *) iter->data)) {
4199 GST_ERROR_OBJECT (demux, "Failed to add track elements");
4200 TRACKS_UNLOCK (demux);
4204 TRACKS_UNLOCK (demux);
4208 /* Return the current playback rate including any instant rate multiplier */
4210 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
4213 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4214 rate = demux->segment.rate * demux->instant_rate_multiplier;
4215 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);