3 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
4 * Author: Thiago Santos <thiagoss@osg.samsung.com>
6 * Copyright (C) 2021-2022 Centricular Ltd
7 * Author: Edward Hervey <edward@centricular.com>
8 * Author: Jan Schmidt <jan@centricular.com>
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
27 * SECTION:plugin-adaptivedemux2
28 * @short_description: Next Generation adaptive demuxers
30 * What is an adaptive demuxer? Adaptive demuxers are special demuxers in the
31 * sense that they don't actually demux data received from upstream but download
32 * the data themselves.
34 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and a set
35 * of fragments. The manifest describes the available media and the sequence of
36 * fragments to use. Each fragment contains a small part of the media (typically
37 * only a few seconds). It is possible for the manifest to have the same media
38 * available in different configurations (bitrates for example) so that the
39 * client can select the one that best suits its scenario (network fluctuation,
40 * hardware requirements...).
42 * Furthermore, that manifest can also specify alternative medias (such as audio
43 * or subtitle tracks in different languages). Only the fragments for the
44 * requested selection will be download.
46 * These elements can therefore "adapt" themselves to the network conditions (as
47 * opposed to the server doing that adaptation) and user choices, which is why
48 * they are called "adaptive" demuxers.
50 * Note: These elements require a "streams-aware" container to work
51 * (i.e. urisourcebin, decodebin3, playbin3, or any bin/pipeline with the
52 * GST_BIN_FLAG_STREAMS_AWARE flag set).
55 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
56 * about the intrinsics of the subclass formats, so the subclasses are
57 * responsible for maintaining the manifest data structures and stream
64 See the adaptive-demuxer.md design documentation for more information
67 The following rules were observed while implementing MT safety in adaptive demux:
68 1. If a variable is accessed from multiple threads and at least one thread
69 writes to it, then all the accesses needs to be done from inside a critical section.
70 2. If thread A wants to join thread B then at the moment it calls gst_task_join
71 it must not hold any mutexes that thread B might take.
73 Adaptive demux API can be called from several threads. More, adaptive demux
74 starts some threads to monitor the download of fragments. In order to protect
75 accesses to shared variables (demux and streams) all the API functions that
76 can be run in different threads will need to get a mutex (manifest_lock)
77 when they start and release it when they end. Because some of those functions
78 can indirectly call other API functions (eg they can generate events or messages
79 that are processed in the same thread) the manifest_lock must be recursive.
81 The manifest_lock will serialize the public API making access to shared
82 variables safe. But some of these functions will try at some moment to join
83 threads created by adaptive demux, or to change the state of src elements
84 (which will block trying to join the src element streaming thread). Because
85 of rule 2, those functions will need to release the manifest_lock during the
86 call of gst_task_join. During this time they can be interrupted by other API calls.
87 For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
88 is called and this will join all threads. In order to prevent interruptions
89 during such period, all the API functions will also use a second lock: api_lock.
90 This will be taken at the beginning of the function and released at the end,
91 but this time this lock will not be temporarily released during join.
92 This lock will be used only by API calls (not by the SCHEDULER task)
93 so it is safe to hold it while joining the threads or changing the src element state. The
94 api_lock will serialise all external requests to adaptive demux. In order to
95 avoid deadlocks, if a function needs to acquire both manifest and api locks,
96 the api_lock will be taken first and the manifest_lock second.
98 By using the api_lock a thread is protected against other API calls.
105 #include "gstadaptivedemux.h"
106 #include "gstadaptivedemux-private.h"
108 #include <glib/gi18n-lib.h>
109 #include <gst/base/gstadapter.h>
110 #include <gst/app/gstappsrc.h>
112 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
113 #define GST_CAT_DEFAULT adaptivedemux2_debug
115 #define DEFAULT_FAILED_COUNT 3
116 #define DEFAULT_CONNECTION_BITRATE 0
117 #define DEFAULT_BANDWIDTH_TARGET_RATIO 0.8f
119 #define DEFAULT_MIN_BITRATE 0
120 #define DEFAULT_MAX_BITRATE 0
122 #define DEFAULT_MAX_BUFFERING_TIME (30 * GST_SECOND)
124 #define DEFAULT_BUFFERING_HIGH_WATERMARK_TIME (30 * GST_SECOND)
125 #define DEFAULT_BUFFERING_LOW_WATERMARK_TIME (10 * GST_SECOND)
126 #define DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS 0.0
127 #define DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS 0.0
129 #define DEFAULT_CURRENT_LEVEL_TIME_VIDEO 0
130 #define DEFAULT_CURRENT_LEVEL_TIME_AUDIO 0
132 #define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
133 #define GST_API_LOCK(d) g_mutex_lock (GST_API_GET_LOCK (d));
134 #define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));
139 PROP_CONNECTION_SPEED,
140 PROP_BANDWIDTH_TARGET_RATIO,
141 PROP_CONNECTION_BITRATE,
144 PROP_CURRENT_BANDWIDTH,
145 PROP_MAX_BUFFERING_TIME,
146 PROP_BUFFERING_HIGH_WATERMARK_TIME,
147 PROP_BUFFERING_LOW_WATERMARK_TIME,
148 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
149 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
150 PROP_CURRENT_LEVEL_TIME_VIDEO,
151 PROP_CURRENT_LEVEL_TIME_AUDIO,
155 static GstStaticPadTemplate gst_adaptive_demux_videosrc_template =
156 GST_STATIC_PAD_TEMPLATE ("video_%02u",
159 GST_STATIC_CAPS_ANY);
161 static GstStaticPadTemplate gst_adaptive_demux_audiosrc_template =
162 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
165 GST_STATIC_CAPS_ANY);
167 static GstStaticPadTemplate gst_adaptive_demux_subtitlesrc_template =
168 GST_STATIC_PAD_TEMPLATE ("subtitle_%02u",
171 GST_STATIC_CAPS_ANY);
173 /* Private structure for a track being outputted */
174 typedef struct _OutputSlot
179 /* Last flow return */
180 GstFlowReturn flow_ret;
185 /* Target track (reference) */
186 GstAdaptiveDemuxTrack *track;
188 /* Pending track (which will replace track) */
189 GstAdaptiveDemuxTrack *pending_track;
191 /* TRUE if a buffer or a gap event was pushed through this slot. */
192 gboolean pushed_timed_data;
195 static GstBinClass *parent_class = NULL;
196 static gint private_offset = 0;
198 static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
199 static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
200 GstAdaptiveDemuxClass * klass);
201 static void gst_adaptive_demux_finalize (GObject * object);
202 static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
203 element, GstStateChange transition);
204 static gboolean gst_adaptive_demux_query (GstElement * element,
207 static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);
209 static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
211 static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
212 GstObject * parent, GstBuffer * buffer);
213 static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
215 static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
219 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);
221 static void gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux);
222 static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
223 static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
224 gboolean first_and_live);
226 static gboolean gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
227 demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate);
229 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
231 static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
233 static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
236 static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux);
237 static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
238 gboolean stop_updates);
240 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
241 GstAdaptiveDemux2Stream * stream, GstBuffer * buffer);
243 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
244 GstAdaptiveDemux2Stream * stream);
246 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
247 GstAdaptiveDemux2Stream * stream, GstClockTime duration);
250 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
251 GstAdaptiveDemux2Stream * stream);
254 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
257 /* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
258 * method to get to the padtemplates */
260 gst_adaptive_demux_ng_get_type (void)
262 static gsize type = 0;
264 if (g_once_init_enter (&type)) {
266 static const GTypeInfo info = {
267 sizeof (GstAdaptiveDemuxClass),
270 (GClassInitFunc) gst_adaptive_demux_class_init,
273 sizeof (GstAdaptiveDemux),
275 (GInstanceInitFunc) gst_adaptive_demux_init,
278 _type = g_type_register_static (GST_TYPE_BIN,
279 "GstAdaptiveDemux2", &info, G_TYPE_FLAG_ABSTRACT);
282 g_type_add_instance_private (_type, sizeof (GstAdaptiveDemuxPrivate));
284 g_once_init_leave (&type, _type);
289 static inline GstAdaptiveDemuxPrivate *
290 gst_adaptive_demux_get_instance_private (GstAdaptiveDemux * self)
292 return (G_STRUCT_MEMBER_P (self, private_offset));
296 gst_adaptive_demux_set_property (GObject * object, guint prop_id,
297 const GValue * value, GParamSpec * pspec)
299 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
301 GST_OBJECT_LOCK (demux);
304 case PROP_CONNECTION_SPEED:
305 demux->connection_speed = g_value_get_uint (value) * 1000;
306 GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
307 demux->connection_speed);
309 case PROP_BANDWIDTH_TARGET_RATIO:
310 demux->bandwidth_target_ratio = g_value_get_float (value);
312 case PROP_MIN_BITRATE:
313 demux->min_bitrate = g_value_get_uint (value);
315 case PROP_MAX_BITRATE:
316 demux->max_bitrate = g_value_get_uint (value);
318 case PROP_CONNECTION_BITRATE:
319 demux->connection_speed = g_value_get_uint (value);
321 /* FIXME: Recalculate track and buffering levels
322 * when watermarks change? */
323 case PROP_MAX_BUFFERING_TIME:
324 demux->max_buffering_time = g_value_get_uint64 (value);
326 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
327 demux->buffering_high_watermark_time = g_value_get_uint64 (value);
329 case PROP_BUFFERING_LOW_WATERMARK_TIME:
330 demux->buffering_low_watermark_time = g_value_get_uint64 (value);
332 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
333 demux->buffering_high_watermark_fragments = g_value_get_double (value);
335 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
336 demux->buffering_low_watermark_fragments = g_value_get_double (value);
339 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
343 GST_OBJECT_UNLOCK (demux);
347 gst_adaptive_demux_get_property (GObject * object, guint prop_id,
348 GValue * value, GParamSpec * pspec)
350 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);
352 GST_OBJECT_LOCK (demux);
355 case PROP_CONNECTION_SPEED:
356 g_value_set_uint (value, demux->connection_speed / 1000);
358 case PROP_BANDWIDTH_TARGET_RATIO:
359 g_value_set_float (value, demux->bandwidth_target_ratio);
361 case PROP_MIN_BITRATE:
362 g_value_set_uint (value, demux->min_bitrate);
364 case PROP_MAX_BITRATE:
365 g_value_set_uint (value, demux->max_bitrate);
367 case PROP_CONNECTION_BITRATE:
368 g_value_set_uint (value, demux->connection_speed);
370 case PROP_CURRENT_BANDWIDTH:
371 g_value_set_uint (value, demux->current_download_rate);
373 case PROP_MAX_BUFFERING_TIME:
374 g_value_set_uint64 (value, demux->max_buffering_time);
376 case PROP_BUFFERING_HIGH_WATERMARK_TIME:
377 g_value_set_uint64 (value, demux->buffering_high_watermark_time);
379 case PROP_BUFFERING_LOW_WATERMARK_TIME:
380 g_value_set_uint64 (value, demux->buffering_low_watermark_time);
382 case PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS:
383 g_value_set_double (value, demux->buffering_high_watermark_fragments);
385 case PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS:
386 g_value_set_double (value, demux->buffering_low_watermark_fragments);
388 case PROP_CURRENT_LEVEL_TIME_VIDEO:
389 g_value_set_uint64 (value, demux->current_level_time_video);
391 case PROP_CURRENT_LEVEL_TIME_AUDIO:
392 g_value_set_uint64 (value, demux->current_level_time_audio);
395 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
399 GST_OBJECT_UNLOCK (demux);
403 gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
405 GObjectClass *gobject_class;
406 GstElementClass *gstelement_class;
407 GstBinClass *gstbin_class;
409 gobject_class = G_OBJECT_CLASS (klass);
410 gstelement_class = GST_ELEMENT_CLASS (klass);
411 gstbin_class = GST_BIN_CLASS (klass);
413 GST_DEBUG_CATEGORY_INIT (adaptivedemux2_debug, "adaptivedemux2", 0,
414 "Base Adaptive Demux (ng)");
416 parent_class = g_type_class_peek_parent (klass);
418 if (private_offset != 0)
419 g_type_class_adjust_private_offset (klass, &private_offset);
421 gobject_class->set_property = gst_adaptive_demux_set_property;
422 gobject_class->get_property = gst_adaptive_demux_get_property;
423 gobject_class->finalize = gst_adaptive_demux_finalize;
425 g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
426 g_param_spec_uint ("connection-speed", "Connection Speed",
427 "Network connection speed to use in kbps (0 = calculate from downloaded"
428 " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_BITRATE / 1000,
429 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431 g_object_class_install_property (gobject_class, PROP_BANDWIDTH_TARGET_RATIO,
432 g_param_spec_float ("bandwidth-target-ratio",
433 "Ratio of target bandwidth / available bandwidth",
434 "Limit of the available bitrate to use when switching to alternates",
435 0, 1, DEFAULT_BANDWIDTH_TARGET_RATIO,
436 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438 g_object_class_install_property (gobject_class, PROP_CONNECTION_BITRATE,
439 g_param_spec_uint ("connection-bitrate", "Connection Speed (bits/s)",
440 "Network connection speed to use (0 = automatic) (bits/s)",
441 0, G_MAXUINT, DEFAULT_CONNECTION_BITRATE,
442 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444 g_object_class_install_property (gobject_class, PROP_MIN_BITRATE,
445 g_param_spec_uint ("min-bitrate", "Minimum Bitrate",
446 "Minimum bitrate to use when switching to alternates (bits/s)",
447 0, G_MAXUINT, DEFAULT_MIN_BITRATE,
448 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450 g_object_class_install_property (gobject_class, PROP_MAX_BITRATE,
451 g_param_spec_uint ("max-bitrate", "Maximum Bitrate",
452 "Maximum bitrate to use when switching to alternates (bits/s)",
453 0, G_MAXUINT, DEFAULT_MAX_BITRATE,
454 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
456 g_object_class_install_property (gobject_class, PROP_CURRENT_BANDWIDTH,
457 g_param_spec_uint ("current-bandwidth",
458 "Current download bandwidth (bits/s)",
459 "Report of current download bandwidth (based on arriving data) (bits/s)",
460 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
462 g_object_class_install_property (gobject_class, PROP_MAX_BUFFERING_TIME,
463 g_param_spec_uint64 ("max-buffering-time",
464 "Buffering maximum size (ns)",
465 "Upper limit on the high watermark for parsed data, above which downloads are paused (in ns, 0=disable)",
466 0, G_MAXUINT64, DEFAULT_MAX_BUFFERING_TIME,
467 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
468 G_PARAM_STATIC_STRINGS));
470 g_object_class_install_property (gobject_class,
471 PROP_BUFFERING_HIGH_WATERMARK_TIME,
472 g_param_spec_uint64 ("high-watermark-time",
473 "High buffering watermark size (ns)",
474 "High watermark for parsed data above which downloads are paused (in ns, 0=disable)",
475 0, G_MAXUINT64, DEFAULT_BUFFERING_HIGH_WATERMARK_TIME,
476 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
477 G_PARAM_STATIC_STRINGS));
479 g_object_class_install_property (gobject_class,
480 PROP_BUFFERING_LOW_WATERMARK_TIME,
481 g_param_spec_uint64 ("low-watermark-time",
482 "Low buffering watermark size (ns)",
483 "Low watermark for parsed data below which downloads are resumed (in ns, 0=disable)",
484 0, G_MAXUINT64, DEFAULT_BUFFERING_LOW_WATERMARK_TIME,
485 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
486 G_PARAM_STATIC_STRINGS));
488 g_object_class_install_property (gobject_class,
489 PROP_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
490 g_param_spec_double ("high-watermark-fragments",
491 "High buffering watermark size (fragments)",
492 "High watermark for parsed data above which downloads are paused (in fragments, 0=disable)",
493 0, G_MAXFLOAT, DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS,
494 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
495 G_PARAM_STATIC_STRINGS));
497 g_object_class_install_property (gobject_class,
498 PROP_BUFFERING_LOW_WATERMARK_FRAGMENTS,
499 g_param_spec_double ("low-watermark-fragments",
500 "Low buffering watermark size (fragments)",
501 "Low watermark for parsed data below which downloads are resumed (in fragments, 0=disable)",
502 0, G_MAXFLOAT, DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS,
503 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
504 G_PARAM_STATIC_STRINGS));
506 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_VIDEO,
507 g_param_spec_uint64 ("current-level-time-video",
508 "Currently buffered level of video (ns)",
509 "Currently buffered level of video track(s) (ns)",
510 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_VIDEO,
511 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
512 G_PARAM_STATIC_STRINGS));
514 g_object_class_install_property (gobject_class, PROP_CURRENT_LEVEL_TIME_AUDIO,
515 g_param_spec_uint64 ("current-level-time-audio",
516 "Currently buffered level of audio (ns)",
517 "Currently buffered level of audio track(s) (ns)",
518 0, G_MAXUINT64, DEFAULT_CURRENT_LEVEL_TIME_AUDIO,
519 G_PARAM_READABLE | GST_PARAM_MUTABLE_PLAYING |
520 G_PARAM_STATIC_STRINGS));
522 gst_element_class_add_static_pad_template (gstelement_class,
523 &gst_adaptive_demux_audiosrc_template);
524 gst_element_class_add_static_pad_template (gstelement_class,
525 &gst_adaptive_demux_videosrc_template);
526 gst_element_class_add_static_pad_template (gstelement_class,
527 &gst_adaptive_demux_subtitlesrc_template);
530 gstelement_class->change_state = gst_adaptive_demux_change_state;
531 gstelement_class->query = gst_adaptive_demux_query;
533 gstbin_class->handle_message = gst_adaptive_demux_handle_message;
535 klass->data_received = gst_adaptive_demux2_stream_data_received_default;
536 klass->finish_fragment = gst_adaptive_demux2_stream_finish_fragment_default;
537 klass->update_manifest = gst_adaptive_demux_update_manifest_default;
538 klass->requires_periodical_playlist_update =
539 gst_adaptive_demux_requires_periodical_playlist_update_default;
540 klass->stream_update_tracks = gst_adaptive_demux2_stream_update_tracks;
541 gst_type_mark_as_plugin_api (GST_TYPE_ADAPTIVE_DEMUX, 0);
545 gst_adaptive_demux_init (GstAdaptiveDemux * demux,
546 GstAdaptiveDemuxClass * klass)
548 GstPadTemplate *pad_template;
550 GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");
552 demux->priv = gst_adaptive_demux_get_instance_private (demux);
553 demux->priv->input_adapter = gst_adapter_new ();
554 demux->realtime_clock = gst_adaptive_demux_clock_new ();
556 demux->download_helper = downloadhelper_new (demux->realtime_clock);
557 demux->priv->segment_seqnum = gst_util_seqnum_next ();
558 demux->have_group_id = FALSE;
559 demux->group_id = G_MAXUINT;
561 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
562 demux->instant_rate_multiplier = 1.0;
564 gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
565 GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
567 g_rec_mutex_init (&demux->priv->manifest_lock);
569 demux->priv->scheduler_task = gst_adaptive_demux_loop_new ();
570 g_mutex_init (&demux->priv->scheduler_lock);
572 g_mutex_init (&demux->priv->api_lock);
573 g_mutex_init (&demux->priv->segment_lock);
575 g_mutex_init (&demux->priv->tracks_lock);
576 g_cond_init (&demux->priv->tracks_add);
578 g_mutex_init (&demux->priv->buffering_lock);
580 demux->priv->periods = g_queue_new ();
583 gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
584 g_return_if_fail (pad_template != NULL);
586 demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
587 gst_pad_set_event_function (demux->sinkpad,
588 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
589 gst_pad_set_chain_function (demux->sinkpad,
590 GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));
593 demux->bandwidth_target_ratio = DEFAULT_BANDWIDTH_TARGET_RATIO;
594 demux->connection_speed = DEFAULT_CONNECTION_BITRATE;
595 demux->min_bitrate = DEFAULT_MIN_BITRATE;
596 demux->max_bitrate = DEFAULT_MAX_BITRATE;
598 demux->max_buffering_time = DEFAULT_MAX_BUFFERING_TIME;
599 demux->buffering_high_watermark_time = DEFAULT_BUFFERING_HIGH_WATERMARK_TIME;
600 demux->buffering_low_watermark_time = DEFAULT_BUFFERING_LOW_WATERMARK_TIME;
601 demux->buffering_high_watermark_fragments =
602 DEFAULT_BUFFERING_HIGH_WATERMARK_FRAGMENTS;
603 demux->buffering_low_watermark_fragments =
604 DEFAULT_BUFFERING_LOW_WATERMARK_FRAGMENTS;
606 demux->current_level_time_video = DEFAULT_CURRENT_LEVEL_TIME_VIDEO;
607 demux->current_level_time_audio = DEFAULT_CURRENT_LEVEL_TIME_AUDIO;
609 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
611 demux->priv->duration = GST_CLOCK_TIME_NONE;
613 /* Output combiner */
614 demux->priv->flowcombiner = gst_flow_combiner_new ();
617 g_rec_mutex_init (&demux->priv->output_lock);
618 demux->priv->output_task =
619 gst_task_new ((GstTaskFunction) gst_adaptive_demux_output_loop, demux,
621 gst_task_set_lock (demux->priv->output_task, &demux->priv->output_lock);
625 gst_adaptive_demux_finalize (GObject * object)
627 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
628 GstAdaptiveDemuxPrivate *priv = demux->priv;
630 GST_DEBUG_OBJECT (object, "finalize");
632 g_object_unref (priv->input_adapter);
634 downloadhelper_free (demux->download_helper);
636 g_rec_mutex_clear (&demux->priv->manifest_lock);
637 g_mutex_clear (&demux->priv->api_lock);
638 g_mutex_clear (&demux->priv->segment_lock);
640 g_mutex_clear (&demux->priv->buffering_lock);
642 g_mutex_clear (&demux->priv->scheduler_lock);
643 gst_adaptive_demux_loop_unref (demux->priv->scheduler_task);
645 /* The input period is present after a reset, clear it now */
646 if (demux->input_period)
647 gst_adaptive_demux_period_unref (demux->input_period);
649 if (demux->realtime_clock) {
650 gst_adaptive_demux_clock_unref (demux->realtime_clock);
651 demux->realtime_clock = NULL;
653 g_object_unref (priv->output_task);
654 g_rec_mutex_clear (&priv->output_lock);
656 gst_flow_combiner_free (priv->flowcombiner);
658 g_queue_free (priv->periods);
660 G_OBJECT_CLASS (parent_class)->finalize (object);
664 gst_adaptive_demux_check_streams_aware (GstAdaptiveDemux * demux)
666 gboolean ret = FALSE;
667 GstObject *parent = gst_object_get_parent (GST_OBJECT (demux));
669 ret = (parent && GST_OBJECT_FLAG_IS_SET (parent, GST_BIN_FLAG_STREAMS_AWARE));
674 static GstStateChangeReturn
675 gst_adaptive_demux_change_state (GstElement * element,
676 GstStateChange transition)
678 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
679 GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
681 switch (transition) {
682 case GST_STATE_CHANGE_NULL_TO_READY:
683 if (!gst_adaptive_demux_check_streams_aware (demux)) {
684 GST_ELEMENT_ERROR (demux, CORE, STATE_CHANGE,
685 (_("Element requires a streams-aware context.")), (NULL));
689 case GST_STATE_CHANGE_PAUSED_TO_READY:
690 if (g_atomic_int_compare_and_exchange (&demux->running, TRUE, FALSE))
691 GST_DEBUG_OBJECT (demux, "demuxer has stopped running");
693 gst_adaptive_demux_loop_stop (demux->priv->scheduler_task, TRUE);
694 downloadhelper_stop (demux->download_helper);
697 demux->priv->flushing = TRUE;
698 g_cond_signal (&demux->priv->tracks_add);
699 gst_task_stop (demux->priv->output_task);
700 TRACKS_UNLOCK (demux);
702 gst_task_join (demux->priv->output_task);
704 GST_API_LOCK (demux);
705 gst_adaptive_demux_reset (demux);
706 GST_API_UNLOCK (demux);
708 case GST_STATE_CHANGE_READY_TO_PAUSED:
709 GST_API_LOCK (demux);
710 gst_adaptive_demux_reset (demux);
712 gst_adaptive_demux_loop_start (demux->priv->scheduler_task);
713 if (g_atomic_int_get (&demux->priv->have_manifest))
714 gst_adaptive_demux_start_manifest_update_task (demux);
715 GST_API_UNLOCK (demux);
716 if (g_atomic_int_compare_and_exchange (&demux->running, FALSE, TRUE))
717 GST_DEBUG_OBJECT (demux, "demuxer has started running");
718 /* gst_task_start (demux->priv->output_task); */
724 /* this must be run with the scheduler and output tasks stopped. */
725 result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
727 switch (transition) {
728 case GST_STATE_CHANGE_READY_TO_PAUSED:
729 /* Start download task */
730 downloadhelper_start (demux->download_helper);
741 gst_adaptive_demux_output_slot_free (GstAdaptiveDemux * demux,
744 GstEvent *eos = gst_event_new_eos ();
745 GST_DEBUG_OBJECT (slot->pad, "Releasing slot");
747 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
748 gst_pad_push_event (slot->pad, eos);
749 gst_pad_set_active (slot->pad, FALSE);
750 gst_flow_combiner_remove_pad (demux->priv->flowcombiner, slot->pad);
751 gst_element_remove_pad (GST_ELEMENT_CAST (demux), slot->pad);
753 gst_adaptive_demux_track_unref (slot->track);
754 if (slot->pending_track)
755 gst_adaptive_demux_track_unref (slot->pending_track);
757 g_slice_free (OutputSlot, slot);
761 gst_adaptive_demux_output_slot_new (GstAdaptiveDemux * demux,
762 GstStreamType streamtype)
765 GstPadTemplate *tmpl;
768 switch (streamtype) {
769 case GST_STREAM_TYPE_AUDIO:
770 name = g_strdup_printf ("audio_%02u", demux->priv->n_audio_streams++);
772 gst_static_pad_template_get (&gst_adaptive_demux_audiosrc_template);
774 case GST_STREAM_TYPE_VIDEO:
775 name = g_strdup_printf ("video_%02u", demux->priv->n_video_streams++);
777 gst_static_pad_template_get (&gst_adaptive_demux_videosrc_template);
779 case GST_STREAM_TYPE_TEXT:
781 g_strdup_printf ("subtitle_%02u", demux->priv->n_subtitle_streams++);
783 gst_static_pad_template_get
784 (&gst_adaptive_demux_subtitlesrc_template);
787 g_assert_not_reached ();
791 slot = g_slice_new0 (OutputSlot);
792 slot->type = streamtype;
793 slot->pushed_timed_data = FALSE;
795 /* Create and activate new pads */
796 slot->pad = gst_pad_new_from_template (tmpl, name);
798 gst_object_unref (tmpl);
800 gst_element_add_pad (GST_ELEMENT_CAST (demux), slot->pad);
801 gst_flow_combiner_add_pad (demux->priv->flowcombiner, slot->pad);
802 gst_pad_set_active (slot->pad, TRUE);
804 gst_pad_set_query_function (slot->pad,
805 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_query));
806 gst_pad_set_event_function (slot->pad,
807 GST_DEBUG_FUNCPTR (gst_adaptive_demux_src_event));
809 gst_pad_set_element_private (slot->pad, slot);
811 GST_INFO_OBJECT (demux, "Created output slot %s:%s",
812 GST_DEBUG_PAD_NAME (slot->pad));
817 * * After `process_manifest` or when a period starts
818 * * Or when all tracks have been created
820 * Goes over tracks and creates the collection
822 * Returns TRUE if the collection was fully created.
824 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
827 gst_adaptive_demux_update_collection (GstAdaptiveDemux * demux,
828 GstAdaptiveDemuxPeriod * period)
830 GstStreamCollection *collection;
833 GST_DEBUG_OBJECT (demux, "tracks_changed : %d", period->tracks_changed);
835 if (!period->tracks_changed) {
836 GST_DEBUG_OBJECT (demux, "Tracks didn't change");
840 if (!period->tracks) {
841 GST_WARNING_OBJECT (demux, "No tracks registered/present");
845 if (gst_adaptive_demux_period_has_pending_tracks (period)) {
846 GST_DEBUG_OBJECT (demux,
847 "Streams still have pending tracks, not creating/updating collection");
851 /* Update collection */
852 collection = gst_stream_collection_new ("adaptivedemux");
854 for (iter = period->tracks; iter; iter = iter->next) {
855 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
857 GST_DEBUG_OBJECT (demux, "Adding '%s' to collection", track->stream_id);
858 gst_stream_collection_add_stream (collection,
859 gst_object_ref (track->stream_object));
862 if (period->collection)
863 gst_object_unref (period->collection);
864 period->collection = collection;
870 * Called for the output period:
871 * * after `update_collection()` if the input period is the same as the output period
872 * * When the output period changes
874 * Must be called with MANIFEST_LOCK and TRACKS_LOCK taken.
877 gst_adaptive_demux_post_collection (GstAdaptiveDemux * demux)
879 GstStreamCollection *collection;
880 GstAdaptiveDemuxPeriod *period = demux->output_period;
881 guint32 seqnum = g_atomic_int_get (&demux->priv->requested_selection_seqnum);
883 g_return_val_if_fail (period, FALSE);
884 if (!period->collection) {
885 GST_DEBUG_OBJECT (demux, "No collection available yet");
889 collection = period->collection;
891 GST_DEBUG_OBJECT (demux, "Posting collection for period %d",
894 /* Post collection */
895 TRACKS_UNLOCK (demux);
896 GST_MANIFEST_UNLOCK (demux);
898 gst_element_post_message (GST_ELEMENT_CAST (demux),
899 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
901 GST_MANIFEST_LOCK (demux);
904 /* If no stream selection was handled, make a default selection */
905 if (seqnum == g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
906 gst_adaptive_demux_period_select_default_tracks (demux,
907 demux->output_period);
910 /* Make sure the output task is running */
911 if (gst_adaptive_demux2_is_running (demux)) {
912 demux->priv->flushing = FALSE;
913 GST_DEBUG_OBJECT (demux, "Starting the output task");
914 gst_task_start (demux->priv->output_task);
921 handle_incoming_manifest (GstAdaptiveDemux * demux)
923 GstAdaptiveDemuxClass *demux_class;
928 GstBuffer *manifest_buffer;
930 GST_API_LOCK (demux);
931 GST_MANIFEST_LOCK (demux);
933 demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
935 available = gst_adapter_available (demux->priv->input_adapter);
938 goto eos_without_data;
940 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");
942 /* Need to get the URI to use it as a base to generate the fragment's
944 query = gst_query_new_uri ();
945 query_res = gst_pad_peer_query (demux->sinkpad, query);
947 gchar *uri, *redirect_uri;
950 gst_query_parse_uri (query, &uri);
951 gst_query_parse_uri_redirection (query, &redirect_uri);
952 gst_query_parse_uri_redirection_permanent (query, &permanent);
954 if (permanent && redirect_uri) {
955 demux->manifest_uri = redirect_uri;
956 demux->manifest_base_uri = NULL;
959 demux->manifest_uri = uri;
960 demux->manifest_base_uri = redirect_uri;
963 GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
964 demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
966 GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
968 gst_query_unref (query);
970 /* If somehow we didn't receive a stream-start with a group_id, pick one now */
971 if (!demux->have_group_id) {
972 demux->have_group_id = TRUE;
973 demux->group_id = gst_util_group_id_next ();
976 /* Let the subclass parse the manifest */
978 gst_adapter_take_buffer (demux->priv->input_adapter, available);
979 ret = demux_class->process_manifest (demux, manifest_buffer);
980 gst_buffer_unref (manifest_buffer);
982 gst_element_post_message (GST_ELEMENT_CAST (demux),
983 gst_message_new_element (GST_OBJECT_CAST (demux),
984 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
985 "manifest-uri", G_TYPE_STRING,
986 demux->manifest_uri, "uri", G_TYPE_STRING,
988 "manifest-download-start", GST_TYPE_CLOCK_TIME,
990 "manifest-download-stop", GST_TYPE_CLOCK_TIME,
991 gst_util_get_timestamp (), NULL)));
994 goto invalid_manifest;
996 /* Streams should have been added to the input period if the manifest parsing
998 if (!demux->input_period->streams)
1001 g_atomic_int_set (&demux->priv->have_manifest, TRUE);
1003 GST_DEBUG_OBJECT (demux, "Manifest was processed, setting ourselves up");
1004 /* Send duration message */
1005 if (!gst_adaptive_demux_is_live (demux)) {
1006 GstClockTime duration = demux_class->get_duration (demux);
1008 demux->priv->duration = duration;
1009 if (duration != GST_CLOCK_TIME_NONE) {
1010 GST_DEBUG_OBJECT (demux,
1011 "Sending duration message : %" GST_TIME_FORMAT,
1012 GST_TIME_ARGS (duration));
1013 gst_element_post_message (GST_ELEMENT (demux),
1014 gst_message_new_duration_changed (GST_OBJECT (demux)));
1016 GST_DEBUG_OBJECT (demux,
1017 "media duration unknown, can not send the duration message");
1021 TRACKS_LOCK (demux);
1022 /* New streams/tracks will have been added to the input period */
1023 /* The input period has streams, make it the active output period */
1024 /* FIXME : Factorize this into a function to make a period active */
1025 demux->output_period = gst_adaptive_demux_period_ref (demux->input_period);
1026 ret = gst_adaptive_demux_update_collection (demux, demux->output_period) &&
1027 gst_adaptive_demux_post_collection (demux);
1028 TRACKS_UNLOCK (demux);
1030 gst_adaptive_demux_prepare_streams (demux,
1031 gst_adaptive_demux_is_live (demux));
1032 gst_adaptive_demux_start_tasks (demux);
1033 gst_adaptive_demux_start_manifest_update_task (demux);
1036 GST_MANIFEST_UNLOCK (demux);
1037 GST_API_UNLOCK (demux);
1044 GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
1052 GST_WARNING_OBJECT (demux, "No streams created from manifest");
1053 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1054 (_("This file contains no playable streams.")),
1055 ("No known stream formats found at the Manifest"));
1062 GST_MANIFEST_UNLOCK (demux);
1063 GST_API_UNLOCK (demux);
1065 /* In most cases, this will happen if we set a wrong url in the
1066 * source element and we have received the 404 HTML response instead of
1068 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."), (NULL));
1073 struct http_headers_collector
1075 GstAdaptiveDemux *demux;
1080 gst_adaptive_demux_handle_upstream_http_header (GQuark field_id,
1081 const GValue * value, gpointer userdata)
1083 struct http_headers_collector *hdr_data = userdata;
1084 GstAdaptiveDemux *demux = hdr_data->demux;
1085 const gchar *field_name = g_quark_to_string (field_id);
1087 if (G_UNLIKELY (value == NULL))
1088 return TRUE; /* This should not happen */
1090 if (g_ascii_strcasecmp (field_name, "User-Agent") == 0) {
1091 const gchar *user_agent = g_value_get_string (value);
1093 GST_INFO_OBJECT (demux, "User-Agent : %s", GST_STR_NULL (user_agent));
1094 downloadhelper_set_user_agent (demux->download_helper, user_agent);
1097 if ((g_ascii_strcasecmp (field_name, "Cookie") == 0) ||
1098 g_ascii_strcasecmp (field_name, "Set-Cookie") == 0) {
1099 guint i = 0, prev_len = 0, total_len = 0;
1100 gchar **cookies = NULL;
1102 if (hdr_data->cookies != NULL)
1103 prev_len = g_strv_length (hdr_data->cookies);
1105 if (GST_VALUE_HOLDS_ARRAY (value)) {
1106 total_len = gst_value_array_get_size (value) + prev_len;
1107 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1109 for (i = 0; i < gst_value_array_get_size (value); i++) {
1110 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1111 g_value_get_string (gst_value_array_get_value (value, i)));
1112 cookies[i] = g_value_dup_string (gst_value_array_get_value (value, i));
1114 } else if (G_VALUE_HOLDS_STRING (value)) {
1115 total_len = 1 + prev_len;
1116 cookies = (gchar **) g_malloc0 ((total_len + 1) * sizeof (gchar *));
1118 GST_INFO_OBJECT (demux, "%s : %s", g_quark_to_string (field_id),
1119 g_value_get_string (value));
1120 cookies[0] = g_value_dup_string (value);
1122 GST_WARNING_OBJECT (demux, "%s field is not string or array",
1123 g_quark_to_string (field_id));
1129 for (j = 0; j < prev_len; j++) {
1130 GST_DEBUG_OBJECT (demux,
1131 "Append existing cookie %s", hdr_data->cookies[j]);
1132 cookies[i + j] = g_strdup (hdr_data->cookies[j]);
1135 cookies[total_len] = NULL;
1137 g_strfreev (hdr_data->cookies);
1138 hdr_data->cookies = cookies;
1142 if (g_ascii_strcasecmp (field_name, "Referer") == 0) {
1143 const gchar *referer = g_value_get_string (value);
1144 GST_INFO_OBJECT (demux, "Referer : %s", GST_STR_NULL (referer));
1146 downloadhelper_set_referer (demux->download_helper, referer);
1149 /* Date header can be used to estimate server offset */
1150 if (g_ascii_strcasecmp (field_name, "Date") == 0) {
1151 const gchar *http_date = g_value_get_string (value);
1154 GstDateTime *datetime =
1155 gst_adaptive_demux_util_parse_http_head_date (http_date);
1158 GDateTime *utc_now = gst_date_time_to_g_date_time (datetime);
1159 gchar *date_string = gst_date_time_to_iso8601_string (datetime);
1161 GST_INFO_OBJECT (demux,
1162 "HTTP response Date %s", GST_STR_NULL (date_string));
1163 g_free (date_string);
1165 gst_adaptive_demux_clock_set_utc_time (demux->realtime_clock, utc_now);
1174 gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
1177 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1180 switch (event->type) {
1181 case GST_EVENT_FLUSH_STOP:{
1182 GST_API_LOCK (demux);
1183 GST_MANIFEST_LOCK (demux);
1185 gst_adaptive_demux_reset (demux);
1187 ret = gst_pad_event_default (pad, parent, event);
1189 GST_MANIFEST_UNLOCK (demux);
1190 GST_API_UNLOCK (demux);
1196 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1197 if (!handle_incoming_manifest (demux)) {
1198 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1199 return gst_pad_event_default (pad, parent, event);
1201 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1203 GST_ERROR_OBJECT (demux,
1204 "Failed to acquire scheduler to handle manifest");
1205 return gst_pad_event_default (pad, parent, event);
1207 gst_event_unref (event);
1210 case GST_EVENT_STREAM_START:
1211 if (gst_event_parse_group_id (event, &demux->group_id))
1212 demux->have_group_id = TRUE;
1214 demux->have_group_id = FALSE;
1215 /* Swallow stream-start, we'll push our own */
1216 gst_event_unref (event);
1218 case GST_EVENT_SEGMENT:
1219 /* Swallow newsegments, we'll push our own */
1220 gst_event_unref (event);
1222 case GST_EVENT_CUSTOM_DOWNSTREAM_STICKY:{
1223 const GstStructure *structure = gst_event_get_structure (event);
1224 struct http_headers_collector c = { demux, NULL };
1226 if (gst_structure_has_name (structure, "http-headers")) {
1227 if (gst_structure_has_field (structure, "request-headers")) {
1228 GstStructure *req_headers = NULL;
1229 gst_structure_get (structure, "request-headers", GST_TYPE_STRUCTURE,
1230 &req_headers, NULL);
1232 gst_structure_foreach (req_headers,
1233 gst_adaptive_demux_handle_upstream_http_header, &c);
1234 gst_structure_free (req_headers);
1237 if (gst_structure_has_field (structure, "response-headers")) {
1238 GstStructure *res_headers = NULL;
1239 gst_structure_get (structure, "response-headers", GST_TYPE_STRUCTURE,
1240 &res_headers, NULL);
1242 gst_structure_foreach (res_headers,
1243 gst_adaptive_demux_handle_upstream_http_header, &c);
1244 gst_structure_free (res_headers);
1249 downloadhelper_set_cookies (demux->download_helper, c.cookies);
1257 return gst_pad_event_default (pad, parent, event);
1260 static GstFlowReturn
1261 gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
1264 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
1266 GST_MANIFEST_LOCK (demux);
1268 gst_adapter_push (demux->priv->input_adapter, buffer);
1270 GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
1271 (gint) gst_adapter_available (demux->priv->input_adapter));
1273 GST_MANIFEST_UNLOCK (demux);
1278 /* Called with TRACKS_LOCK taken */
1280 gst_adaptive_demux_period_reset_tracks (GstAdaptiveDemuxPeriod * period)
1284 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1285 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1287 gst_adaptive_demux_track_flush (track);
1288 if (gst_pad_is_active (track->sinkpad)) {
1289 gst_pad_set_active (track->sinkpad, FALSE);
1290 gst_pad_set_active (track->sinkpad, TRUE);
1295 /* Resets all tracks to their initial state, ready to receive new data. */
1297 gst_adaptive_demux_reset_tracks (GstAdaptiveDemux * demux)
1299 TRACKS_LOCK (demux);
1300 g_queue_foreach (demux->priv->periods,
1301 (GFunc) gst_adaptive_demux_period_reset_tracks, NULL);
1302 TRACKS_UNLOCK (demux);
1305 /* Subclasses will call this function to ensure that a new input period is
1306 * available to receive new streams and tracks */
1308 gst_adaptive_demux_start_new_period (GstAdaptiveDemux * demux)
1310 if (demux->input_period && !demux->input_period->prepared) {
1311 GST_DEBUG_OBJECT (demux, "Using existing input period");
1315 if (demux->input_period) {
1316 GST_DEBUG_OBJECT (demux, "Closing previous period");
1317 demux->input_period->closed = TRUE;
1319 GST_DEBUG_OBJECT (demux, "Setting up new period");
1321 demux->input_period = gst_adaptive_demux_period_new (demux);
1326 /* must be called with manifest_lock taken */
1328 gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
1330 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1333 gst_adaptive_demux_stop_tasks (demux, TRUE);
1336 klass->reset (demux);
1338 /* Disable and remove all outputs */
1339 GST_DEBUG_OBJECT (demux, "Disabling and removing all outputs");
1340 for (iter = demux->priv->outputs; iter; iter = iter->next) {
1341 gst_adaptive_demux_output_slot_free (demux, (OutputSlot *) iter->data);
1343 g_list_free (demux->priv->outputs);
1344 demux->priv->outputs = NULL;
1346 g_queue_clear_full (demux->priv->periods,
1347 (GDestroyNotify) gst_adaptive_demux_period_unref);
1349 /* The output period always has an extra ref taken on it */
1350 if (demux->output_period)
1351 gst_adaptive_demux_period_unref (demux->output_period);
1352 demux->output_period = NULL;
1353 /* The input period doesn't have an extra ref taken on it */
1354 demux->input_period = NULL;
1356 gst_adaptive_demux_start_new_period (demux);
1358 g_free (demux->manifest_uri);
1359 g_free (demux->manifest_base_uri);
1360 demux->manifest_uri = NULL;
1361 demux->manifest_base_uri = NULL;
1363 gst_adapter_clear (demux->priv->input_adapter);
1364 g_atomic_int_set (&demux->priv->have_manifest, FALSE);
1366 gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1367 demux->instant_rate_multiplier = 1.0;
1369 demux->priv->duration = GST_CLOCK_TIME_NONE;
1371 demux->priv->percent = -1;
1372 demux->priv->is_buffering = TRUE;
1374 demux->have_group_id = FALSE;
1375 demux->group_id = G_MAXUINT;
1376 demux->priv->segment_seqnum = gst_util_seqnum_next ();
1378 demux->priv->global_output_position = 0;
1380 demux->priv->n_audio_streams = 0;
1381 demux->priv->n_video_streams = 0;
1382 demux->priv->n_subtitle_streams = 0;
1384 gst_flow_combiner_reset (demux->priv->flowcombiner);
1388 gst_adaptive_demux_query (GstElement * element, GstQuery * query)
1390 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
1392 GST_LOG_OBJECT (demux, "%" GST_PTR_FORMAT, query);
1394 switch (GST_QUERY_TYPE (query)) {
1395 case GST_QUERY_BUFFERING:
1398 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1400 if (!demux->output_period) {
1401 if (format != GST_FORMAT_TIME) {
1402 GST_DEBUG_OBJECT (demux,
1403 "No period setup yet, can't answer non-TIME buffering queries");
1407 GST_DEBUG_OBJECT (demux,
1408 "No period setup yet, but still answering buffering query");
1416 return GST_ELEMENT_CLASS (parent_class)->query (element, query);
1419 /* MANIFEST_LOCK held. Find the stream that owns the given element */
1420 static GstAdaptiveDemux2Stream *
1421 find_stream_for_element_locked (GstAdaptiveDemux * demux, GstObject * o)
1425 /* We only look in the streams of the input period (i.e. with active streams) */
1426 for (iter = demux->input_period->streams; iter; iter = iter->next) {
1427 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1428 if (gst_object_has_as_ancestor (o, GST_OBJECT_CAST (stream->parsebin))) {
1436 /* TRACKS_LOCK held */
1437 static GstAdaptiveDemuxTrack *
1438 gst_adaptive_demux2_stream_find_track_of_type (GstAdaptiveDemux2Stream * stream,
1439 GstStreamType stream_type)
1443 for (iter = stream->tracks; iter; iter = iter->next) {
1444 GstAdaptiveDemuxTrack *track = iter->data;
1446 if (track->type == stream_type)
1453 /* MANIFEST and TRACKS lock held */
1455 gst_adaptive_demux2_stream_update_tracks (GstAdaptiveDemux * demux,
1456 GstAdaptiveDemux2Stream * stream)
1460 GST_DEBUG_OBJECT (stream, "Updating track information from collection");
1462 for (i = 0; i < gst_stream_collection_get_size (stream->stream_collection);
1464 GstStream *gst_stream =
1465 gst_stream_collection_get_stream (stream->stream_collection, i);
1466 GstStreamType stream_type = gst_stream_get_stream_type (gst_stream);
1467 GstAdaptiveDemuxTrack *track;
1469 if (stream_type == GST_STREAM_TYPE_UNKNOWN)
1471 track = gst_adaptive_demux2_stream_find_track_of_type (stream, stream_type);
1473 GST_DEBUG_OBJECT (stream,
1474 "We don't have an existing track to handle stream %" GST_PTR_FORMAT,
1479 if (track->upstream_stream_id)
1480 g_free (track->upstream_stream_id);
1481 track->upstream_stream_id =
1482 g_strdup (gst_stream_get_stream_id (gst_stream));
1488 tags_have_language_info (GstTagList * tags)
1490 const gchar *language = NULL;
1495 if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_CODE, 0,
1498 if (gst_tag_list_peek_string_index (tags, GST_TAG_LANGUAGE_NAME, 0,
1506 can_handle_collection (GstAdaptiveDemux2Stream * stream,
1507 GstStreamCollection * collection)
1510 guint nb_audio, nb_video, nb_text;
1511 gboolean have_audio_languages = TRUE;
1512 gboolean have_text_languages = TRUE;
1514 nb_audio = nb_video = nb_text = 0;
1516 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1517 GstStream *gst_stream = gst_stream_collection_get_stream (collection, i);
1518 GstTagList *tags = gst_stream_get_tags (gst_stream);
1520 GST_DEBUG_OBJECT (stream,
1521 "Internal collection stream #%d %" GST_PTR_FORMAT, i, gst_stream);
1522 switch (gst_stream_get_stream_type (gst_stream)) {
1523 case GST_STREAM_TYPE_AUDIO:
1524 have_audio_languages &= tags_have_language_info (tags);
1527 case GST_STREAM_TYPE_VIDEO:
1530 case GST_STREAM_TYPE_TEXT:
1531 have_text_languages &= tags_have_language_info (tags);
1539 /* Check that we either have at most 1 of each track type, or that
1540 * we have language tags for each to tell which is which */
1542 (nb_audio > 1 && !have_audio_languages) ||
1543 (nb_text > 1 && !have_text_languages)) {
1545 ("Collection can't be handled (nb_audio:%d, nb_video:%d, nb_text:%d)",
1546 nb_audio, nb_video, nb_text);
1554 gst_adaptive_demux_handle_stream_collection_msg (GstAdaptiveDemux * demux,
1557 GstAdaptiveDemux2Stream *stream;
1558 GstStreamCollection *collection = NULL;
1559 gboolean pending_tracks_activated = FALSE;
1561 GST_MANIFEST_LOCK (demux);
1563 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1564 if (stream == NULL) {
1565 GST_WARNING_OBJECT (demux,
1566 "Failed to locate stream for collection message");
1570 gst_message_parse_stream_collection (msg, &collection);
1574 /* Check whether the collection is "sane" or not.
1576 * In the context of adaptive streaming, we can only handle multiplexed
1577 * content that provides at most one stream of valid types (audio, video,
1578 * text). Without this we cannot reliably match the output of this multiplex
1579 * to the various tracks.
1581 * FIXME : In the future and *IF* we encounter such streams, we could envision
1582 * supporting multiple streams of the same type if, and only if, they have
1583 * tags that allow differentiating them (ex: languages).
1585 if (!can_handle_collection (stream, collection)) {
1586 GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
1587 (_("Stream format can't be handled")),
1588 ("The streams provided by the multiplex are ambiguous"));
1592 /* Store the collection on the stream */
1593 gst_object_replace ((GstObject **) & stream->stream_collection,
1594 (GstObject *) collection);
1596 /* IF there are pending tracks, ask the subclass to handle that */
1597 if (stream->pending_tracks) {
1598 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1599 g_assert (demux_class->stream_update_tracks);
1600 demux_class->stream_update_tracks (demux, stream);
1601 TRACKS_LOCK (demux);
1602 stream->pending_tracks = FALSE;
1603 pending_tracks_activated = TRUE;
1604 if (gst_adaptive_demux_update_collection (demux, demux->input_period) &&
1605 demux->input_period == demux->output_period)
1606 gst_adaptive_demux_post_collection (demux);
1608 g_assert (stream->tracks);
1609 TRACKS_LOCK (demux);
1610 /* If we already have assigned tracks, update the pending upstream stream_id
1611 * for each of them based on the collection information. */
1612 gst_adaptive_demux2_stream_update_tracks (demux, stream);
1615 /* If we discovered pending tracks and we no longer have any, we can ensure
1616 * selected tracks are started */
1617 if (pending_tracks_activated
1618 && !gst_adaptive_demux_period_has_pending_tracks (demux->input_period)) {
1619 GList *iter = demux->input_period->streams;
1620 for (; iter; iter = iter->next) {
1621 GstAdaptiveDemux2Stream *new_stream = iter->data;
1623 /* The stream that posted this collection was already started. If a
1624 * different stream is now selected, start it */
1625 if (stream != new_stream
1626 && gst_adaptive_demux2_stream_is_selected_locked (new_stream))
1627 gst_adaptive_demux2_stream_start (new_stream);
1630 TRACKS_UNLOCK (demux);
1633 GST_MANIFEST_UNLOCK (demux);
1635 gst_message_unref (msg);
1640 gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
1642 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);
1644 switch (GST_MESSAGE_TYPE (msg)) {
1645 case GST_MESSAGE_STREAM_COLLECTION:
1647 gst_adaptive_demux_handle_stream_collection_msg (demux, msg);
1650 case GST_MESSAGE_ERROR:{
1651 GstAdaptiveDemux2Stream *stream = NULL;
1653 gchar *debug = NULL;
1654 gchar *new_error = NULL;
1655 const GstStructure *details = NULL;
1657 GST_MANIFEST_LOCK (demux);
1659 stream = find_stream_for_element_locked (demux, GST_MESSAGE_SRC (msg));
1660 if (stream == NULL) {
1661 GST_WARNING_OBJECT (demux,
1662 "Failed to locate stream for errored element");
1663 GST_MANIFEST_UNLOCK (demux);
1667 gst_message_parse_error (msg, &err, &debug);
1669 GST_WARNING_OBJECT (demux,
1670 "Source posted error: %d:%d %s (%s)", err->domain, err->code,
1671 err->message, debug);
1674 new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
1676 g_free (err->message);
1677 err->message = new_error;
1680 gst_message_parse_error_details (msg, &details);
1682 gst_structure_get_uint (details, "http-status-code",
1683 &stream->last_status_code);
1686 /* error, but ask to retry */
1687 if (GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
1688 gst_adaptive_demux2_stream_parse_error (stream, err);
1689 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
1695 GST_MANIFEST_UNLOCK (demux);
1697 gst_message_unref (msg);
1706 GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
1709 /* must be called with manifest_lock taken */
1711 gst_adaptive_demux2_stream_get_presentation_offset (GstAdaptiveDemux * demux,
1712 GstAdaptiveDemux2Stream * stream)
1714 GstAdaptiveDemuxClass *klass;
1716 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1718 if (klass->get_presentation_offset == NULL)
1721 return klass->get_presentation_offset (demux, stream);
1724 /* must be called with manifest_lock taken */
1726 gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
1728 GstAdaptiveDemuxClass *klass;
1730 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
1732 if (klass->get_period_start_time == NULL)
1735 return klass->get_period_start_time (demux);
1738 /* must be called with manifest_lock taken */
1740 gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1741 gboolean first_and_live)
1744 GstClockTime period_start;
1745 GstClockTimeDiff min_stream_time = GST_CLOCK_STIME_NONE;
1748 g_return_val_if_fail (demux->input_period->streams, FALSE);
1749 g_assert (demux->input_period->prepared == FALSE);
1751 new_streams = demux->input_period->streams;
1753 if (!gst_adaptive_demux2_is_running (demux)) {
1754 GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
1758 GST_DEBUG_OBJECT (demux,
1759 "Preparing %d streams for period %d , first_and_live:%d",
1760 g_list_length (new_streams), demux->input_period->period_num,
1763 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1764 GstAdaptiveDemux2Stream *stream = iter->data;
1766 GST_DEBUG_OBJECT (stream, "Preparing stream");
1768 stream->need_header = TRUE;
1769 stream->discont = TRUE;
1771 /* Grab the first stream time for live streams
1772 * * If the stream is selected
1773 * * Or it provides dynamic tracks (in which case we need to force an update)
1776 && (gst_adaptive_demux2_stream_is_selected_locked (stream)
1777 || stream->pending_tracks)) {
1778 /* TODO we only need the first timestamp, maybe create a simple function to
1779 * get the current PTS of a fragment ? */
1780 GST_DEBUG_OBJECT (stream, "Calling update_fragment_info");
1781 gst_adaptive_demux2_stream_update_fragment_info (demux, stream);
1783 GST_DEBUG_OBJECT (stream,
1784 "Got stream time %" GST_STIME_FORMAT,
1785 GST_STIME_ARGS (stream->fragment.stream_time));
1787 if (GST_CLOCK_STIME_IS_VALID (min_stream_time)) {
1788 min_stream_time = MIN (min_stream_time, stream->fragment.stream_time);
1790 min_stream_time = stream->fragment.stream_time;
1795 period_start = gst_adaptive_demux_get_period_start_time (demux);
1797 /* For live streams, the subclass is supposed to seek to the current fragment
1798 * and then tell us its stream time in stream->fragment.stream_time. We now
1799 * also have to seek our demuxer segment to reflect this.
1801 * FIXME: This needs some refactoring at some point.
1803 if (first_and_live) {
1804 gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1805 GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_stream_time + period_start,
1806 GST_SEEK_TYPE_NONE, -1, NULL);
1809 GST_DEBUG_OBJECT (demux,
1810 "period_start:%" GST_TIME_FORMAT ", min_stream_time:%" GST_STIME_FORMAT
1811 " demux segment %" GST_SEGMENT_FORMAT,
1812 GST_TIME_ARGS (period_start), GST_STIME_ARGS (min_stream_time),
1815 /* Synchronize stream start/current positions */
1816 if (min_stream_time == GST_CLOCK_STIME_NONE)
1817 min_stream_time = period_start;
1819 min_stream_time += period_start;
1820 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1821 GstAdaptiveDemux2Stream *stream = iter->data;
1822 stream->start_position = stream->current_position = min_stream_time;
1825 for (iter = new_streams; iter; iter = g_list_next (iter)) {
1826 GstAdaptiveDemux2Stream *stream = iter->data;
1827 stream->compute_segment = TRUE;
1828 stream->first_and_live = first_and_live;
1830 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
1831 demux->input_period->prepared = TRUE;
1836 static GstAdaptiveDemuxTrack *
1837 find_track_for_stream_id (GstAdaptiveDemuxPeriod * period, gchar * stream_id)
1841 for (tmp = period->tracks; tmp; tmp = tmp->next) {
1842 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1843 if (!g_strcmp0 (track->stream_id, stream_id))
1850 /* TRACKS_LOCK hold */
1852 demux_update_buffering_locked (GstAdaptiveDemux * demux)
1854 GstClockTime min_level_time = GST_CLOCK_TIME_NONE;
1855 GstClockTime video_level_time = GST_CLOCK_TIME_NONE;
1856 GstClockTime audio_level_time = GST_CLOCK_TIME_NONE;
1858 gint min_percent = -1, percent;
1859 gboolean all_eos = TRUE;
1861 /* Go over all active tracks of the output period and update level */
1863 /* Check that all tracks are above their respective low thresholds (different
1864 * tracks may have different fragment durations yielding different buffering
1865 * percentages) Overall buffering percent is the lowest. */
1866 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
1867 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
1869 GST_LOG_OBJECT (demux,
1870 "Checking track '%s' (period %u) active:%d selected:%d eos:%d level:%"
1871 GST_TIME_FORMAT " buffering_threshold:%" GST_TIME_FORMAT,
1872 track->stream_id, track->period_num, track->active, track->selected,
1873 track->eos, GST_TIME_ARGS (track->level_time),
1874 GST_TIME_ARGS (track->buffering_threshold));
1876 if (track->active && track->selected) {
1881 if (min_level_time == GST_CLOCK_TIME_NONE) {
1882 min_level_time = track->level_time;
1883 } else if (track->level_time < min_level_time) {
1884 min_level_time = track->level_time;
1887 if (track->type & GST_STREAM_TYPE_VIDEO
1888 && video_level_time > track->level_time)
1889 video_level_time = track->level_time;
1891 if (track->type & GST_STREAM_TYPE_AUDIO
1892 && audio_level_time > track->level_time)
1893 audio_level_time = track->level_time;
1895 if (track->level_time != GST_CLOCK_TIME_NONE
1896 && track->buffering_threshold != 0) {
1898 gst_util_uint64_scale (track->level_time, 100,
1899 track->buffering_threshold);
1900 if (min_percent < 0 || cur_percent < min_percent)
1901 min_percent = cur_percent;
1907 GST_DEBUG_OBJECT (demux,
1908 "Minimum time level %" GST_TIME_FORMAT " percent %d all_eos:%d",
1909 GST_TIME_ARGS (min_level_time), min_percent, all_eos);
1911 /* Update demuxer video/audio level properties */
1912 GST_OBJECT_LOCK (demux);
1913 demux->current_level_time_video = video_level_time;
1914 demux->current_level_time_audio = audio_level_time;
1915 GST_OBJECT_UNLOCK (demux);
1917 if (min_percent < 0 && !all_eos)
1920 if (min_percent > 100 || all_eos)
1923 percent = MAX (0, min_percent);
1925 GST_LOG_OBJECT (demux, "percent : %d %%", percent);
1927 if (demux->priv->is_buffering) {
1929 demux->priv->is_buffering = FALSE;
1930 if (demux->priv->percent != percent) {
1931 demux->priv->percent = percent;
1932 demux->priv->percent_changed = TRUE;
1934 } else if (percent < 1) {
1935 demux->priv->is_buffering = TRUE;
1936 if (demux->priv->percent != percent) {
1937 demux->priv->percent = percent;
1938 demux->priv->percent_changed = TRUE;
1942 if (demux->priv->percent_changed)
1943 GST_DEBUG_OBJECT (demux, "Percent changed, %d %% is_buffering:%d", percent,
1944 demux->priv->is_buffering);
1947 /* With TRACKS_LOCK held */
1949 demux_post_buffering_locked (GstAdaptiveDemux * demux)
1954 if (!demux->priv->percent_changed)
1957 BUFFERING_LOCK (demux);
1958 percent = demux->priv->percent;
1959 msg = gst_message_new_buffering ((GstObject *) demux, percent);
1960 TRACKS_UNLOCK (demux);
1961 gst_element_post_message ((GstElement *) demux, msg);
1963 BUFFERING_UNLOCK (demux);
1964 TRACKS_LOCK (demux);
1965 if (percent == demux->priv->percent)
1966 demux->priv->percent_changed = FALSE;
1969 /* MANIFEST_LOCK and TRACKS_LOCK hold */
1970 GstAdaptiveDemux2Stream *
1971 find_stream_for_track_locked (GstAdaptiveDemux * demux,
1972 GstAdaptiveDemuxTrack * track)
1976 for (iter = demux->output_period->streams; iter; iter = iter->next) {
1977 GstAdaptiveDemux2Stream *stream = (GstAdaptiveDemux2Stream *) iter->data;
1978 if (g_list_find (stream->tracks, track))
1985 /* Called from seek handler
1987 * This function is used when a (flushing) seek caused a new period to be activated.
1989 * This will ensure that:
1990 * * the current output period is marked as finished (EOS)
1991 * * Any potential intermediate (non-input/non-output) periods are removed
1992 * * That the new input period is prepared and ready
1995 gst_adaptive_demux_seek_to_input_period (GstAdaptiveDemux * demux)
1999 GST_DEBUG_OBJECT (demux,
2000 "Preparing new input period %u", demux->input_period->period_num);
2002 /* Prepare the new input period */
2003 gst_adaptive_demux_update_collection (demux, demux->input_period);
2005 /* Transfer the previous selection to the new input period */
2006 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
2007 demux->output_period);
2008 gst_adaptive_demux_prepare_streams (demux, FALSE);
2010 /* Remove all periods except for the input (last) and output (first) period */
2011 while (demux->priv->periods->length > 2) {
2012 GstAdaptiveDemuxPeriod *period = g_queue_pop_nth (demux->priv->periods, 1);
2013 /* Mark all tracks of the removed period as not selected and EOS so they
2014 * will be skipped / ignored */
2015 for (iter = period->tracks; iter; iter = iter->next) {
2016 GstAdaptiveDemuxTrack *track = iter->data;
2017 track->selected = FALSE;
2020 gst_adaptive_demux_period_unref (period);
2023 /* Mark all tracks of the output period as EOS so that the output loop
2024 * will immediately move to the new period */
2025 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2026 GstAdaptiveDemuxTrack *track = iter->data;
2030 /* Go over all slots, and clear any pending track */
2031 for (iter = demux->priv->outputs; iter; iter = iter->next) {
2032 OutputSlot *slot = (OutputSlot *) iter->data;
2034 if (slot->pending_track != NULL) {
2035 GST_DEBUG_OBJECT (demux,
2036 "Removing track '%s' as pending from output of current track '%s'",
2037 slot->pending_track->stream_id, slot->track->stream_id);
2038 gst_adaptive_demux_track_unref (slot->pending_track);
2039 slot->pending_track = NULL;
2044 /* must be called with manifest_lock taken */
2046 gst_adaptive_demux_get_live_seek_range (GstAdaptiveDemux * demux,
2047 gint64 * range_start, gint64 * range_stop)
2049 GstAdaptiveDemuxClass *klass;
2051 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2053 g_return_val_if_fail (klass->get_live_seek_range, FALSE);
2055 return klass->get_live_seek_range (demux, range_start, range_stop);
2058 /* must be called with manifest_lock taken */
2060 gst_adaptive_demux2_stream_in_live_seek_range (GstAdaptiveDemux * demux,
2061 GstAdaptiveDemux2Stream * stream)
2063 gint64 range_start, range_stop;
2064 if (gst_adaptive_demux_get_live_seek_range (demux, &range_start, &range_stop)) {
2065 GST_LOG_OBJECT (stream,
2066 "stream position %" GST_TIME_FORMAT " live seek range %"
2067 GST_STIME_FORMAT " - %" GST_STIME_FORMAT,
2068 GST_TIME_ARGS (stream->current_position), GST_STIME_ARGS (range_start),
2069 GST_STIME_ARGS (range_stop));
2070 return (stream->current_position >= range_start
2071 && stream->current_position <= range_stop);
2077 /* must be called with manifest_lock taken */
2079 gst_adaptive_demux_can_seek (GstAdaptiveDemux * demux)
2081 GstAdaptiveDemuxClass *klass;
2083 klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2084 if (gst_adaptive_demux_is_live (demux)) {
2085 return klass->get_live_seek_range != NULL;
2088 return klass->seek != NULL;
2092 gst_adaptive_demux_setup_streams_for_restart (GstAdaptiveDemux * demux,
2093 GstSeekType start_type, GstSeekType stop_type)
2097 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
2098 GstAdaptiveDemux2Stream *stream = iter->data;
2100 /* Make sure the download loop clears and restarts on the next start,
2101 * which will recompute the stream segment */
2102 g_assert (stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_STOPPED ||
2103 stream->state == GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART);
2104 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2105 stream->start_position = 0;
2107 if (demux->segment.rate > 0 && start_type != GST_SEEK_TYPE_NONE)
2108 stream->start_position = demux->segment.start;
2109 else if (demux->segment.rate < 0 && stop_type != GST_SEEK_TYPE_NONE)
2110 stream->start_position = demux->segment.stop;
2114 #define IS_SNAP_SEEK(f) (f & (GST_SEEK_FLAG_SNAP_BEFORE | \
2115 GST_SEEK_FLAG_SNAP_AFTER | \
2116 GST_SEEK_FLAG_SNAP_NEAREST | \
2117 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | \
2118 GST_SEEK_FLAG_KEY_UNIT))
2119 #define REMOVE_SNAP_FLAGS(f) (f & ~(GST_SEEK_FLAG_SNAP_BEFORE | \
2120 GST_SEEK_FLAG_SNAP_AFTER | \
2121 GST_SEEK_FLAG_SNAP_NEAREST))
2124 gst_adaptive_demux_handle_seek_event (GstAdaptiveDemux * demux,
2127 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2131 GstSeekType start_type, stop_type;
2136 GstSegment oldsegment;
2137 GstEvent *flush_event;
2139 GST_INFO_OBJECT (demux, "Received seek event");
2141 GST_API_LOCK (demux);
2143 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
2146 if (format != GST_FORMAT_TIME) {
2147 GST_API_UNLOCK (demux);
2148 GST_WARNING_OBJECT (demux,
2149 "Adaptive demuxers only support TIME-based seeking");
2150 gst_event_unref (event);
2154 if (flags & GST_SEEK_FLAG_SEGMENT) {
2155 GST_FIXME_OBJECT (demux, "Handle segment seeks");
2156 GST_API_UNLOCK (demux);
2157 gst_event_unref (event);
2161 seqnum = gst_event_get_seqnum (event);
2163 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux)) {
2164 GST_LOG_OBJECT (demux, "Failed to acquire scheduler context");
2168 if (flags & GST_SEEK_FLAG_INSTANT_RATE_CHANGE) {
2169 /* For instant rate seeks, reply directly and update
2170 * our segment so the new rate is reflected in any future
2173 gdouble rate_multiplier;
2175 /* instant rate change only supported if direction does not change. All
2176 * other requirements are already checked before creating the seek event
2177 * but let's double-check here to be sure */
2178 if ((demux->segment.rate > 0 && rate < 0) ||
2179 (demux->segment.rate < 0 && rate > 0) ||
2180 start_type != GST_SEEK_TYPE_NONE ||
2181 stop_type != GST_SEEK_TYPE_NONE || (flags & GST_SEEK_FLAG_FLUSH)) {
2182 GST_ERROR_OBJECT (demux,
2183 "Instant rate change seeks only supported in the "
2184 "same direction, without flushing and position change");
2185 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2186 GST_API_UNLOCK (demux);
2190 rate_multiplier = rate / demux->segment.rate;
2192 ev = gst_event_new_instant_rate_change (rate_multiplier,
2193 (GstSegmentFlags) flags);
2194 gst_event_set_seqnum (ev, seqnum);
2196 ret = gst_adaptive_demux_push_src_event (demux, ev);
2199 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2200 demux->instant_rate_multiplier = rate_multiplier;
2201 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2204 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2205 GST_API_UNLOCK (demux);
2206 gst_event_unref (event);
2211 if (!gst_adaptive_demux_can_seek (demux)) {
2212 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2214 GST_API_UNLOCK (demux);
2215 gst_event_unref (event);
2219 /* We can only accept flushing seeks from this point onward */
2220 if (!(flags & GST_SEEK_FLAG_FLUSH)) {
2221 GST_ERROR_OBJECT (demux,
2222 "Non-flushing non-instant-rate seeks are not possible");
2224 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2226 GST_API_UNLOCK (demux);
2227 gst_event_unref (event);
2231 if (gst_adaptive_demux_is_live (demux)) {
2232 gint64 range_start, range_stop;
2233 gboolean changed = FALSE;
2234 gboolean start_valid = TRUE, stop_valid = TRUE;
2236 if (!gst_adaptive_demux_get_live_seek_range (demux, &range_start,
2238 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2239 GST_API_UNLOCK (demux);
2240 gst_event_unref (event);
2241 GST_WARNING_OBJECT (demux, "Failure getting the live seek ranges");
2245 GST_DEBUG_OBJECT (demux,
2246 "Live range is %" GST_STIME_FORMAT " %" GST_STIME_FORMAT,
2247 GST_STIME_ARGS (range_start), GST_STIME_ARGS (range_stop));
2249 /* Handle relative positioning for live streams (relative to the range_stop) */
2250 if (start_type == GST_SEEK_TYPE_END) {
2251 start = range_stop + start;
2252 start_type = GST_SEEK_TYPE_SET;
2255 if (stop_type == GST_SEEK_TYPE_END) {
2256 stop = range_stop + stop;
2257 stop_type = GST_SEEK_TYPE_SET;
2261 /* Adjust the requested start/stop position if it falls beyond the live
2263 * The only case where we don't adjust is for the starting point of
2264 * an accurate seek (start if forward and stop if backwards)
2266 if (start_type == GST_SEEK_TYPE_SET && start < range_start &&
2267 (rate < 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2268 GST_DEBUG_OBJECT (demux,
2269 "seek before live stream start, setting to range start: %"
2270 GST_TIME_FORMAT, GST_TIME_ARGS (range_start));
2271 start = range_start;
2274 /* truncate stop position also if set */
2275 if (stop_type == GST_SEEK_TYPE_SET && stop > range_stop &&
2276 (rate > 0 || !(flags & GST_SEEK_FLAG_ACCURATE))) {
2277 GST_DEBUG_OBJECT (demux,
2278 "seek ending after live start, adjusting to: %"
2279 GST_TIME_FORMAT, GST_TIME_ARGS (range_stop));
2284 if (start_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (start) &&
2285 (start < range_start || start > range_stop)) {
2286 GST_WARNING_OBJECT (demux,
2287 "Seek to invalid position start:%" GST_STIME_FORMAT
2288 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2289 ")", GST_STIME_ARGS (start), GST_STIME_ARGS (range_start),
2290 GST_STIME_ARGS (range_stop));
2291 start_valid = FALSE;
2293 if (stop_type == GST_SEEK_TYPE_SET && GST_CLOCK_TIME_IS_VALID (stop) &&
2294 (stop < range_start || stop > range_stop)) {
2295 GST_WARNING_OBJECT (demux,
2296 "Seek to invalid position stop:%" GST_STIME_FORMAT
2297 " out of seekable range (%" GST_STIME_FORMAT " - %" GST_STIME_FORMAT
2298 ")", GST_STIME_ARGS (stop), GST_STIME_ARGS (range_start),
2299 GST_STIME_ARGS (range_stop));
2303 /* If the seek position is still outside of the seekable range, refuse the seek */
2304 if (!start_valid || !stop_valid) {
2305 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2306 GST_API_UNLOCK (demux);
2307 gst_event_unref (event);
2311 /* Re-create seek event with changed/updated values */
2313 gst_event_unref (event);
2315 gst_event_new_seek (rate, format, flags,
2316 start_type, start, stop_type, stop);
2317 gst_event_set_seqnum (event, seqnum);
2321 GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
2323 /* have a backup in case seek fails */
2324 gst_segment_copy_into (&demux->segment, &oldsegment);
2326 GST_DEBUG_OBJECT (demux, "sending flush start");
2327 flush_event = gst_event_new_flush_start ();
2328 gst_event_set_seqnum (flush_event, seqnum);
2330 gst_adaptive_demux_push_src_event (demux, flush_event);
2332 gst_adaptive_demux_stop_tasks (demux, FALSE);
2333 gst_adaptive_demux_reset_tracks (demux);
2335 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2338 * Handle snap seeks as follows:
2339 * 1) do the snap seeking a (random) active stream
2340 * 2) use the final position on this stream to seek
2341 * on the other streams to the same position
2343 * We can't snap at all streams at the same time as they might end in
2344 * different positions, so just pick one and align all others to that
2347 if (IS_SNAP_SEEK (flags) && demux_class->stream_seek) {
2348 GstAdaptiveDemux2Stream *stream = NULL;
2349 GstClockTimeDiff ts;
2350 GstSeekFlags stream_seek_flags = flags;
2353 /* snap-seek on the stream that received the event and then
2354 * use the resulting position to seek on all streams */
2357 if (start_type != GST_SEEK_TYPE_NONE)
2360 ts = gst_segment_position_from_running_time (&demux->segment,
2361 GST_FORMAT_TIME, demux->priv->global_output_position);
2362 start_type = GST_SEEK_TYPE_SET;
2365 if (stop_type != GST_SEEK_TYPE_NONE)
2368 stop_type = GST_SEEK_TYPE_SET;
2369 ts = gst_segment_position_from_running_time (&demux->segment,
2370 GST_FORMAT_TIME, demux->priv->global_output_position);
2374 /* Pick a random active stream on which to do the stream seek */
2375 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2376 GstAdaptiveDemux2Stream *cand = iter->data;
2377 if (gst_adaptive_demux2_stream_is_selected_locked (cand)) {
2383 demux_class->stream_seek (stream, rate >= 0, stream_seek_flags, ts, &ts);
2386 /* replace event with a new one without snapping to seek on all streams */
2387 gst_event_unref (event);
2394 gst_event_new_seek (rate, format, REMOVE_SNAP_FLAGS (flags),
2395 start_type, start, stop_type, stop);
2396 GST_DEBUG_OBJECT (demux, "Adapted snap seek to %" GST_PTR_FORMAT, event);
2399 ret = gst_segment_do_seek (&demux->segment, rate, format, flags, start_type,
2400 start, stop_type, stop, &update);
2403 GST_DEBUG_OBJECT (demux, "Calling subclass seek: %" GST_PTR_FORMAT, event);
2405 ret = demux_class->seek (demux, event);
2409 /* Is there anything else we can do if it fails? */
2410 gst_segment_copy_into (&oldsegment, &demux->segment);
2412 demux->priv->segment_seqnum = seqnum;
2414 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2416 /* Resetting flow combiner */
2417 gst_flow_combiner_reset (demux->priv->flowcombiner);
2419 GST_DEBUG_OBJECT (demux, "Sending flush stop on all pad");
2420 flush_event = gst_event_new_flush_stop (TRUE);
2421 gst_event_set_seqnum (flush_event, seqnum);
2422 gst_adaptive_demux_push_src_event (demux, flush_event);
2424 /* If the seek generated a new period, prepare it */
2425 if (!demux->input_period->prepared) {
2426 /* This can only happen on flushing seeks */
2427 g_assert (flags & GST_SEEK_FLAG_FLUSH);
2428 gst_adaptive_demux_seek_to_input_period (demux);
2431 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2432 GST_DEBUG_OBJECT (demux, "Demuxer segment after seek: %" GST_SEGMENT_FORMAT,
2434 gst_adaptive_demux_setup_streams_for_restart (demux, start_type, stop_type);
2435 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2437 /* Reset the global output position (running time) for when the output loop restarts */
2438 demux->priv->global_output_position = 0;
2440 /* After a flushing seek, any instant-rate override is undone */
2441 demux->instant_rate_multiplier = 1.0;
2443 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2445 /* Restart the demux */
2446 gst_adaptive_demux_start_tasks (demux);
2448 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2449 GST_API_UNLOCK (demux);
2450 gst_event_unref (event);
2455 /* Returns TRUE if the stream has at least one selected track */
2457 gst_adaptive_demux2_stream_has_selected_tracks (GstAdaptiveDemux2Stream *
2462 for (tmp = stream->tracks; tmp; tmp = tmp->next) {
2463 GstAdaptiveDemuxTrack *track = tmp->data;
2465 if (track->selected)
2473 handle_stream_selection (GstAdaptiveDemux * demux, GList * streams,
2476 gboolean selection_handled = TRUE;
2478 GList *tracks = NULL;
2480 if (!GST_ADAPTIVE_SCHEDULER_LOCK (demux))
2483 TRACKS_LOCK (demux);
2484 /* Validate the streams and fill:
2485 * tracks : list of tracks corresponding to requested streams
2487 for (iter = streams; iter; iter = iter->next) {
2488 gchar *stream_id = (gchar *) iter->data;
2489 GstAdaptiveDemuxTrack *track;
2491 GST_DEBUG_OBJECT (demux, "Stream requested : %s", stream_id);
2492 track = find_track_for_stream_id (demux->output_period, stream_id);
2494 GST_WARNING_OBJECT (demux, "Unrecognized stream_id '%s'", stream_id);
2495 selection_handled = FALSE;
2496 goto select_streams_done;
2498 tracks = g_list_append (tracks, track);
2499 GST_DEBUG_OBJECT (demux, "Track found, selected:%d", track->selected);
2502 /* FIXME : ACTIVATING AND DEACTIVATING STREAMS SHOULD BE DONE FROM THE
2503 * SCHEDULING THREAD */
2505 /* FIXME: We want to iterate all streams, mark them as deselected,
2506 * then iterate tracks and mark any streams that have at least 1
2507 * active output track, then loop over all streams again and start/stop
2510 /* Go over all tracks present and (de)select based on current selection */
2511 for (iter = demux->output_period->tracks; iter; iter = iter->next) {
2512 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) iter->data;
2514 if (track->selected && !g_list_find (tracks, track)) {
2515 GST_DEBUG_OBJECT (demux, "De-select track '%s' (active:%d)",
2516 track->stream_id, track->active);
2517 track->selected = FALSE;
2518 track->draining = TRUE;
2519 } else if (!track->selected && g_list_find (tracks, track)) {
2520 GST_DEBUG_OBJECT (demux, "Selecting track '%s'", track->stream_id);
2522 track->selected = TRUE;
2526 /* Start or stop streams based on the updated track selection */
2527 for (iter = demux->output_period->streams; iter; iter = iter->next) {
2528 GstAdaptiveDemux2Stream *stream = iter->data;
2531 gboolean is_running = gst_adaptive_demux2_stream_is_running (stream);
2532 gboolean should_be_running =
2533 gst_adaptive_demux2_stream_has_selected_tracks (stream);
2535 if (!is_running && should_be_running) {
2536 GstClockTime output_running_ts = demux->priv->global_output_position;
2537 GstClockTime start_position;
2539 /* Calculate where we should start the stream, and then
2541 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
2543 GST_DEBUG_OBJECT (stream, "(Re)starting stream. Output running ts %"
2544 GST_TIME_FORMAT " in demux segment %" GST_SEGMENT_FORMAT,
2545 GST_TIME_ARGS (output_running_ts), &demux->segment);
2548 gst_segment_position_from_running_time (&demux->segment,
2549 GST_FORMAT_TIME, output_running_ts);
2551 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
2553 GST_DEBUG_OBJECT (demux, "Setting stream start position to %"
2554 GST_TIME_FORMAT, GST_TIME_ARGS (start_position));
2556 stream->current_position = stream->start_position = start_position;
2557 stream->compute_segment = TRUE;
2559 /* If output has already begun, ensure we seek this segment
2560 * to the correct restart position when the download loop begins */
2561 if (output_running_ts != 0)
2562 stream->state = GST_ADAPTIVE_DEMUX2_STREAM_STATE_RESTART;
2564 /* Activate track pads for this stream */
2565 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2566 GstAdaptiveDemuxTrack *track =
2567 (GstAdaptiveDemuxTrack *) trackiter->data;
2568 gst_pad_set_active (track->sinkpad, TRUE);
2571 gst_adaptive_demux2_stream_start (stream);
2572 } else if (is_running && !should_be_running) {
2573 /* Stream should not be running and needs stopping */
2574 gst_adaptive_demux2_stream_stop (stream);
2576 /* Set all track sinkpads to inactive for this stream */
2577 for (trackiter = stream->tracks; trackiter; trackiter = trackiter->next) {
2578 GstAdaptiveDemuxTrack *track =
2579 (GstAdaptiveDemuxTrack *) trackiter->data;
2580 gst_pad_set_active (track->sinkpad, FALSE);
2585 g_atomic_int_set (&demux->priv->requested_selection_seqnum, seqnum);
2587 select_streams_done:
2588 demux_update_buffering_locked (demux);
2589 demux_post_buffering_locked (demux);
2591 TRACKS_UNLOCK (demux);
2592 GST_ADAPTIVE_SCHEDULER_UNLOCK (demux);
2595 g_list_free (tracks);
2596 return selection_handled;
2600 gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
2603 GstAdaptiveDemux *demux;
2605 demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2607 switch (event->type) {
2608 case GST_EVENT_SEEK:
2610 guint32 seqnum = gst_event_get_seqnum (event);
2611 if (seqnum == demux->priv->segment_seqnum) {
2612 GST_LOG_OBJECT (pad,
2613 "Drop duplicated SEEK event seqnum %" G_GUINT32_FORMAT, seqnum);
2614 gst_event_unref (event);
2617 return gst_adaptive_demux_handle_seek_event (demux, event);
2619 case GST_EVENT_LATENCY:{
2620 /* Upstream and our internal source are irrelevant
2621 * for latency, and we should not fail here to
2622 * configure the latency */
2623 gst_event_unref (event);
2626 case GST_EVENT_QOS:{
2627 GstClockTimeDiff diff;
2628 GstClockTime timestamp;
2629 GstClockTime earliest_time;
2631 gst_event_parse_qos (event, NULL, NULL, &diff, ×tamp);
2632 /* Only take into account lateness if late */
2634 earliest_time = timestamp + 2 * diff;
2636 earliest_time = timestamp;
2638 GST_OBJECT_LOCK (demux);
2639 if (!GST_CLOCK_TIME_IS_VALID (demux->priv->qos_earliest_time) ||
2640 earliest_time > demux->priv->qos_earliest_time) {
2641 demux->priv->qos_earliest_time = earliest_time;
2642 GST_DEBUG_OBJECT (demux, "qos_earliest_time %" GST_TIME_FORMAT,
2643 GST_TIME_ARGS (demux->priv->qos_earliest_time));
2645 GST_OBJECT_UNLOCK (demux);
2648 case GST_EVENT_SELECT_STREAMS:
2651 gboolean selection_handled;
2653 if (GST_EVENT_SEQNUM (event) ==
2654 g_atomic_int_get (&demux->priv->requested_selection_seqnum)) {
2655 GST_DEBUG_OBJECT (demux, "Already handled/handling select-streams %d",
2656 GST_EVENT_SEQNUM (event));
2660 gst_event_parse_select_streams (event, &streams);
2662 handle_stream_selection (demux, streams, GST_EVENT_SEQNUM (event));
2663 g_list_free_full (streams, g_free);
2664 return selection_handled;
2670 return gst_pad_event_default (pad, parent, event);
2674 gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
2677 GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
2678 gboolean ret = FALSE;
2684 switch (query->type) {
2685 case GST_QUERY_DURATION:{
2687 GstClockTime duration = GST_CLOCK_TIME_NONE;
2689 gst_query_parse_duration (query, &fmt, NULL);
2691 if (gst_adaptive_demux_is_live (demux)) {
2692 /* We are able to answer this query: the duration is unknown */
2693 gst_query_set_duration (query, fmt, -1);
2698 if (fmt == GST_FORMAT_TIME
2699 && g_atomic_int_get (&demux->priv->have_manifest)) {
2701 GST_MANIFEST_LOCK (demux);
2702 duration = demux->priv->duration;
2703 GST_MANIFEST_UNLOCK (demux);
2705 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
2706 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
2711 GST_LOG_OBJECT (demux, "GST_QUERY_DURATION returns %s with duration %"
2712 GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
2715 case GST_QUERY_LATENCY:{
2716 gst_query_set_latency (query, FALSE, 0, -1);
2720 case GST_QUERY_SEEKING:{
2725 if (!g_atomic_int_get (&demux->priv->have_manifest)) {
2726 GST_INFO_OBJECT (demux,
2727 "Don't have manifest yet, can't answer seeking query");
2728 return FALSE; /* can't answer without manifest */
2731 GST_MANIFEST_LOCK (demux);
2733 gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
2734 GST_INFO_OBJECT (demux, "Received GST_QUERY_SEEKING with format %d", fmt);
2735 if (fmt == GST_FORMAT_TIME) {
2736 GstClockTime duration;
2737 gboolean can_seek = gst_adaptive_demux_can_seek (demux);
2741 if (gst_adaptive_demux_is_live (demux)) {
2742 ret = gst_adaptive_demux_get_live_seek_range (demux, &start, &stop);
2744 GST_MANIFEST_UNLOCK (demux);
2745 GST_INFO_OBJECT (demux, "can't answer seeking query");
2749 duration = demux->priv->duration;
2750 if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
2754 gst_query_set_seeking (query, fmt, can_seek, start, stop);
2755 GST_INFO_OBJECT (demux, "GST_QUERY_SEEKING returning with start : %"
2756 GST_TIME_FORMAT ", stop : %" GST_TIME_FORMAT,
2757 GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
2759 GST_MANIFEST_UNLOCK (demux);
2764 GST_MANIFEST_LOCK (demux);
2766 /* TODO HLS can answer this differently it seems */
2767 if (demux->manifest_uri) {
2768 /* FIXME: (hls) Do we answer with the variant playlist, with the current
2769 * playlist or the the uri of the last downlowaded fragment? */
2770 gst_query_set_uri (query, demux->manifest_uri);
2774 GST_MANIFEST_UNLOCK (demux);
2776 case GST_QUERY_SELECTABLE:
2777 gst_query_set_selectable (query, TRUE);
2781 /* Don't forward queries upstream because of the special nature of this
2782 * "demuxer", which relies on the upstream element only to be fed
2792 gst_adaptive_demux_handle_lost_sync (GstAdaptiveDemux * demux)
2796 GST_WARNING_OBJECT (demux, "Lost synchronization, seeking back to live head");
2799 gst_event_new_seek (1.0, GST_FORMAT_TIME,
2800 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT, GST_SEEK_TYPE_END, 0,
2801 GST_SEEK_TYPE_NONE, 0);
2802 gst_adaptive_demux_handle_seek_event (demux, seek);
2807 /* Called when the scheduler starts, to kick off manifest updates
2808 * and stream downloads */
2810 gst_adaptive_demux_scheduler_start_cb (GstAdaptiveDemux * demux)
2814 GST_INFO_OBJECT (demux, "Starting streams' tasks");
2816 iter = demux->input_period->streams;
2818 for (; iter; iter = g_list_next (iter)) {
2819 GstAdaptiveDemux2Stream *stream = iter->data;
2821 /* If we need to process this stream to discover tracks *OR* it has any
2822 * tracks which are selected, start it now */
2823 if ((stream->pending_tracks == TRUE)
2824 || gst_adaptive_demux2_stream_is_selected_locked (stream))
2825 gst_adaptive_demux2_stream_start (stream);
2831 /* must be called with manifest_lock taken */
2833 gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux)
2835 if (!gst_adaptive_demux2_is_running (demux)) {
2836 GST_DEBUG_OBJECT (demux, "Not starting tasks due to shutdown");
2840 GST_DEBUG_OBJECT (demux, "Starting the SCHEDULER task");
2841 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2842 (GSourceFunc) gst_adaptive_demux_scheduler_start_cb, demux, NULL);
2844 TRACKS_LOCK (demux);
2845 demux->priv->flushing = FALSE;
2846 GST_DEBUG_OBJECT (demux, "Starting the output task");
2847 gst_task_start (demux->priv->output_task);
2848 TRACKS_UNLOCK (demux);
2851 /* must be called with manifest_lock taken */
2853 gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux * demux)
2855 GST_DEBUG_OBJECT (demux, "requesting stop of the manifest update task");
2856 if (demux->priv->manifest_updates_cb != 0) {
2857 gst_adaptive_demux_loop_cancel_call (demux->priv->scheduler_task,
2858 demux->priv->manifest_updates_cb);
2859 demux->priv->manifest_updates_cb = 0;
2863 static gboolean gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux);
2865 /* must be called with manifest_lock taken */
2867 gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux * demux)
2869 GstAdaptiveDemuxClass *demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
2871 if (gst_adaptive_demux_is_live (demux)) {
2872 /* Task to periodically update the manifest */
2873 if (demux_class->requires_periodical_playlist_update (demux)) {
2874 GST_DEBUG_OBJECT (demux, "requesting start of the manifest update task");
2875 if (demux->priv->manifest_updates_cb == 0) {
2876 demux->priv->manifest_updates_cb =
2877 gst_adaptive_demux_loop_call (demux->priv->scheduler_task,
2878 (GSourceFunc) gst_adaptive_demux_updates_start_cb, demux, NULL);
2884 /* must be called with manifest_lock taken
2885 * This function will temporarily release manifest_lock in order to join the
2887 * The api_lock will still protect it against other threads trying to modify
2888 * the demux element.
2891 gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux, gboolean stop_updates)
2893 GST_LOG_OBJECT (demux, "Stopping tasks");
2896 gst_adaptive_demux_stop_manifest_update_task (demux);
2898 TRACKS_LOCK (demux);
2899 demux->priv->flushing = TRUE;
2900 g_cond_signal (&demux->priv->tracks_add);
2901 gst_task_stop (demux->priv->output_task);
2902 TRACKS_UNLOCK (demux);
2904 gst_task_join (demux->priv->output_task);
2906 if (demux->input_period)
2907 gst_adaptive_demux_period_stop_tasks (demux->input_period);
2909 demux->priv->qos_earliest_time = GST_CLOCK_TIME_NONE;
2912 /* must be called with manifest_lock taken */
2914 gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event)
2917 gboolean ret = TRUE;
2919 GST_DEBUG_OBJECT (demux, "event %" GST_PTR_FORMAT, event);
2921 TRACKS_LOCK (demux);
2922 for (iter = demux->priv->outputs; iter; iter = g_list_next (iter)) {
2923 OutputSlot *slot = (OutputSlot *) iter->data;
2924 gst_event_ref (event);
2925 GST_DEBUG_OBJECT (slot->pad, "Pushing event");
2926 ret = ret & gst_pad_push_event (slot->pad, event);
2927 if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
2928 slot->pushed_timed_data = FALSE;
2930 TRACKS_UNLOCK (demux);
2931 gst_event_unref (event);
2935 /* must be called with manifest_lock taken */
2937 gst_adaptive_demux2_stream_set_caps (GstAdaptiveDemux2Stream * stream,
2940 GST_DEBUG_OBJECT (stream,
2941 "setting new caps for stream %" GST_PTR_FORMAT, caps);
2942 gst_caps_replace (&stream->pending_caps, caps);
2943 gst_caps_unref (caps);
2946 /* must be called with manifest_lock taken */
2948 gst_adaptive_demux2_stream_set_tags (GstAdaptiveDemux2Stream * stream,
2951 GST_DEBUG_OBJECT (stream,
2952 "setting new tags for stream %" GST_PTR_FORMAT, tags);
2953 if (stream->pending_tags) {
2954 gst_tag_list_unref (stream->pending_tags);
2956 stream->pending_tags = tags;
2959 /* must be called with manifest_lock taken */
2961 gst_adaptive_demux2_stream_queue_event (GstAdaptiveDemux2Stream * stream,
2964 stream->pending_events = g_list_append (stream->pending_events, event);
2968 _update_average_bitrate (GstAdaptiveDemux * demux,
2969 GstAdaptiveDemux2Stream * stream, guint64 new_bitrate)
2971 gint index = stream->moving_index % NUM_LOOKBACK_FRAGMENTS;
2973 stream->moving_bitrate -= stream->fragment_bitrates[index];
2974 stream->fragment_bitrates[index] = new_bitrate;
2975 stream->moving_bitrate += new_bitrate;
2977 stream->moving_index += 1;
2979 if (stream->moving_index > NUM_LOOKBACK_FRAGMENTS)
2980 return stream->moving_bitrate / NUM_LOOKBACK_FRAGMENTS;
2981 return stream->moving_bitrate / stream->moving_index;
2985 gst_adaptive_demux2_stream_update_current_bitrate (GstAdaptiveDemux * demux,
2986 GstAdaptiveDemux2Stream * stream)
2988 guint64 average_bitrate;
2989 guint64 fragment_bitrate;
2990 guint connection_speed, min_bitrate, max_bitrate, target_download_rate;
2992 fragment_bitrate = stream->last_bitrate;
2993 GST_DEBUG_OBJECT (stream, "Download bitrate is : %" G_GUINT64_FORMAT " bps",
2996 average_bitrate = _update_average_bitrate (demux, stream, fragment_bitrate);
2998 GST_INFO_OBJECT (stream,
2999 "last fragment bitrate was %" G_GUINT64_FORMAT, fragment_bitrate);
3000 GST_INFO_OBJECT (stream,
3001 "Last %u fragments average bitrate is %" G_GUINT64_FORMAT,
3002 NUM_LOOKBACK_FRAGMENTS, average_bitrate);
3004 /* Conservative approach, make sure we don't upgrade too fast */
3005 GST_OBJECT_LOCK (demux);
3006 stream->current_download_rate = MIN (average_bitrate, fragment_bitrate);
3008 /* If this is the/a video stream update the overall demuxer
3009 * reported bitrate and notify, to give the application a
3010 * chance to choose a new connection-bitrate */
3011 if ((stream->stream_type & GST_STREAM_TYPE_VIDEO) != 0) {
3012 demux->current_download_rate = stream->current_download_rate;
3013 GST_OBJECT_UNLOCK (demux);
3014 g_object_notify (G_OBJECT (demux), "current-bandwidth");
3015 GST_OBJECT_LOCK (demux);
3018 connection_speed = demux->connection_speed;
3019 min_bitrate = demux->min_bitrate;
3020 max_bitrate = demux->max_bitrate;
3021 GST_OBJECT_UNLOCK (demux);
3023 if (connection_speed) {
3024 GST_LOG_OBJECT (stream, "connection-speed is set to %u kbps, using it",
3025 connection_speed / 1000);
3026 return connection_speed;
3029 /* No explicit connection_speed, so choose the new variant to use as a
3030 * fraction of the measured download rate */
3031 target_download_rate =
3032 CLAMP (stream->current_download_rate, 0,
3033 G_MAXUINT) * demux->bandwidth_target_ratio;
3035 GST_DEBUG_OBJECT (stream, "Bitrate after target ratio limit (%0.2f): %u",
3036 demux->bandwidth_target_ratio, target_download_rate);
3039 /* Debugging code, modulate the bitrate every few fragments */
3041 static guint ctr = 0;
3043 GST_INFO_OBJECT (stream, "Halving reported bitrate for debugging");
3044 target_download_rate /= 2;
3050 if (min_bitrate > 0 && target_download_rate < min_bitrate) {
3051 target_download_rate = min_bitrate;
3052 GST_LOG_OBJECT (stream, "Bitrate adjusted due to min-bitrate : %u bits/s",
3056 if (max_bitrate > 0 && target_download_rate > max_bitrate) {
3057 target_download_rate = max_bitrate;
3058 GST_LOG_OBJECT (stream, "Bitrate adjusted due to max-bitrate : %u bits/s",
3062 GST_DEBUG_OBJECT (stream, "Returning target download rate of %u bps",
3063 target_download_rate);
3065 return target_download_rate;
3068 /* must be called with manifest_lock taken */
3069 static GstFlowReturn
3070 gst_adaptive_demux2_stream_finish_fragment_default (GstAdaptiveDemux * demux,
3071 GstAdaptiveDemux2Stream * stream)
3073 /* No need to advance, this isn't a real fragment */
3074 if (G_UNLIKELY (stream->downloading_header || stream->downloading_index))
3077 return gst_adaptive_demux2_stream_advance_fragment (demux, stream,
3078 stream->fragment.duration);
3081 /* must be called with manifest_lock taken.
3082 * Can temporarily release manifest_lock
3084 static GstFlowReturn
3085 gst_adaptive_demux2_stream_data_received_default (GstAdaptiveDemux * demux,
3086 GstAdaptiveDemux2Stream * stream, GstBuffer * buffer)
3088 return gst_adaptive_demux2_stream_push_buffer (stream, buffer);
3092 gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
3098 /* Called when a stream needs waking after the manifest is updated */
3100 gst_adaptive_demux2_stream_wants_manifest_update (GstAdaptiveDemux * demux)
3102 demux->priv->stream_waiting_for_manifest = TRUE;
3106 gst_adaptive_demux_manifest_update_cb (GstAdaptiveDemux * demux)
3108 GstFlowReturn ret = GST_FLOW_OK;
3109 gboolean schedule_again = TRUE;
3111 GST_MANIFEST_LOCK (demux);
3112 demux->priv->manifest_updates_cb = 0;
3114 /* Updating playlist only needed for live playlists */
3115 if (!gst_adaptive_demux_is_live (demux)) {
3116 GST_MANIFEST_UNLOCK (demux);
3117 return G_SOURCE_REMOVE;
3120 GST_DEBUG_OBJECT (demux, "Updating playlist");
3121 ret = gst_adaptive_demux_update_manifest (demux);
3123 if (ret == GST_FLOW_EOS) {
3124 GST_MANIFEST_UNLOCK (demux);
3125 return G_SOURCE_REMOVE;
3128 if (ret == GST_FLOW_OK) {
3129 GST_DEBUG_OBJECT (demux, "Updated playlist successfully");
3130 demux->priv->update_failed_count = 0;
3132 /* Wake up download tasks */
3133 if (demux->priv->stream_waiting_for_manifest) {
3136 for (iter = demux->input_period->streams; iter; iter = g_list_next (iter)) {
3137 GstAdaptiveDemux2Stream *stream = iter->data;
3138 gst_adaptive_demux2_stream_on_manifest_update (stream);
3140 demux->priv->stream_waiting_for_manifest = FALSE;
3143 demux->priv->update_failed_count++;
3145 if (demux->priv->update_failed_count <= DEFAULT_FAILED_COUNT) {
3146 GST_WARNING_OBJECT (demux, "Could not update the playlist, flow: %s",
3147 gst_flow_get_name (ret));
3149 GST_ELEMENT_ERROR (demux, STREAM, FAILED,
3150 (_("Internal data stream error.")), ("Could not update playlist"));
3151 GST_DEBUG_OBJECT (demux, "Stopped manifest updates because of error");
3152 schedule_again = FALSE;
3156 if (schedule_again) {
3157 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3159 demux->priv->manifest_updates_cb =
3160 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3161 klass->get_manifest_update_interval (demux) * GST_USECOND,
3162 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3165 GST_MANIFEST_UNLOCK (demux);
3167 return G_SOURCE_REMOVE;
3171 gst_adaptive_demux_updates_start_cb (GstAdaptiveDemux * demux)
3173 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3175 /* Loop for updating of the playlist. This periodically checks if
3176 * the playlist is updated and does so, then signals the streaming
3177 * thread in case it can continue downloading now. */
3179 /* block until the next scheduled update or the signal to quit this thread */
3180 GST_DEBUG_OBJECT (demux, "Started updates task");
3181 demux->priv->manifest_updates_cb =
3182 gst_adaptive_demux_loop_call_delayed (demux->priv->scheduler_task,
3183 klass->get_manifest_update_interval (demux) * GST_USECOND,
3184 (GSourceFunc) gst_adaptive_demux_manifest_update_cb, demux, NULL);
3186 return G_SOURCE_REMOVE;
3190 find_replacement_slot_for_track (GstAdaptiveDemux * demux,
3191 GstAdaptiveDemuxTrack * track)
3195 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3196 OutputSlot *slot = (OutputSlot *) tmp->data;
3197 /* Incompatible output type */
3198 if (slot->type != track->type)
3201 /* Slot which is already assigned to this pending track */
3202 if (slot->pending_track == track)
3205 /* slot already used for another pending track */
3206 if (slot->pending_track != NULL)
3209 /* Current output track is of the same type and is draining */
3210 if (slot->track && slot->track->draining)
3217 /* TRACKS_LOCK taken */
3219 find_slot_for_track (GstAdaptiveDemux * demux, GstAdaptiveDemuxTrack * track)
3223 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3224 OutputSlot *slot = (OutputSlot *) tmp->data;
3226 if (slot->track == track)
3233 /* TRACKS_LOCK held */
3235 all_selected_tracks_are_active (GstAdaptiveDemux * demux, guint32 seqnum)
3240 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3241 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3243 if (track->selected && !track->active)
3247 /* All selected tracks are active, created message */
3249 gst_message_new_streams_selected (GST_OBJECT (demux),
3250 demux->output_period->collection);
3251 GST_MESSAGE_SEQNUM (msg) = seqnum;
3252 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3253 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3254 if (track->active) {
3255 gst_message_streams_selected_add (msg, track->stream_object);
3263 gst_adaptive_demux_send_initial_events (GstAdaptiveDemux * demux,
3266 GstAdaptiveDemuxTrack *track = slot->track;
3269 /* Send EVENT_STREAM_START */
3270 event = gst_event_new_stream_start (track->stream_id);
3271 if (demux->have_group_id)
3272 gst_event_set_group_id (event, demux->group_id);
3273 gst_event_set_stream_flags (event, track->flags);
3274 gst_event_set_stream (event, track->stream_object);
3275 GST_DEBUG_OBJECT (demux, "Sending stream-start for track '%s'",
3277 gst_pad_push_event (slot->pad, event);
3279 /* Send EVENT_STREAM_COLLECTION */
3280 event = gst_event_new_stream_collection (demux->output_period->collection);
3281 GST_DEBUG_OBJECT (demux, "Sending stream-collection for track '%s'",
3283 gst_pad_push_event (slot->pad, event);
3285 /* Mark all sticky events for re-sending */
3286 gst_event_store_mark_all_undelivered (&track->sticky_events);
3290 * Called with TRACKS_LOCK taken
3293 check_and_handle_selection_update_locked (GstAdaptiveDemux * demux)
3296 guint requested_selection_seqnum;
3299 /* If requested_selection_seqnum != current_selection_seqnum, re-check all
3300 output slots vs active/draining tracks */
3301 requested_selection_seqnum =
3302 g_atomic_int_get (&demux->priv->requested_selection_seqnum);
3304 if (requested_selection_seqnum == demux->priv->current_selection_seqnum)
3307 GST_DEBUG_OBJECT (demux, "Selection changed, re-checking all output slots");
3309 /* Go over all slots, and if they have a pending track that's no longer
3310 * selected, clear it so the slot can be reused */
3311 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3312 OutputSlot *slot = (OutputSlot *) tmp->data;
3314 if (slot->pending_track != NULL && !slot->pending_track->selected) {
3315 GST_DEBUG_OBJECT (demux,
3316 "Removing deselected track '%s' as pending from output of current track '%s'",
3317 slot->pending_track->stream_id, slot->track->stream_id);
3318 gst_adaptive_demux_track_unref (slot->pending_track);
3319 slot->pending_track = NULL;
3323 /* Go over all tracks and create/re-assign/remove slots */
3324 for (tmp = demux->output_period->tracks; tmp; tmp = tmp->next) {
3325 GstAdaptiveDemuxTrack *track = (GstAdaptiveDemuxTrack *) tmp->data;
3327 if (track->selected) {
3328 OutputSlot *slot = find_slot_for_track (demux, track);
3330 /* 0. Track is selected and has a slot. Nothing to do */
3332 GST_DEBUG_OBJECT (demux, "Track '%s' is already being outputted",
3337 slot = find_replacement_slot_for_track (demux, track);
3339 /* 1. There is an existing slot of the same type which is currently
3340 * draining, assign this track as a replacement for it */
3341 g_assert (slot->pending_track == NULL || slot->pending_track == track);
3342 if (slot->pending_track == NULL) {
3343 slot->pending_track = gst_adaptive_demux_track_ref (track);
3344 GST_DEBUG_OBJECT (demux,
3345 "Track '%s' (period %u) will be used on output of track '%s' (period %u)",
3346 track->stream_id, track->period_num,
3347 slot->track->stream_id, slot->track->period_num);
3350 /* 2. There is no compatible replacement slot, create a new one */
3351 slot = gst_adaptive_demux_output_slot_new (demux, track->type);
3352 GST_DEBUG_OBJECT (demux, "Created slot for track '%s'",
3354 demux->priv->outputs = g_list_append (demux->priv->outputs, slot);
3356 track->update_next_segment = TRUE;
3358 slot->track = gst_adaptive_demux_track_ref (track);
3359 track->active = TRUE;
3360 gst_adaptive_demux_send_initial_events (demux, slot);
3363 /* If we were draining this track, we no longer are */
3364 track->draining = FALSE;
3368 /* Finally check all slots have a current/pending track. If not remove it */
3369 for (tmp = demux->priv->outputs; tmp;) {
3370 OutputSlot *slot = (OutputSlot *) tmp->data;
3371 /* We should never has slots without target tracks */
3372 g_assert (slot->track);
3373 if (slot->track->draining && !slot->pending_track) {
3374 GstAdaptiveDemux2Stream *stream;
3376 GST_DEBUG_OBJECT (demux, "Output for track '%s' is no longer used",
3377 slot->track->stream_id);
3378 slot->track->active = FALSE;
3380 /* If the stream feeding this track is stopped, flush and clear
3381 * the track now that it's going inactive. If the stream was not
3382 * found, it means we advanced past that period already (and the
3383 * stream was stopped and discarded) */
3384 stream = find_stream_for_track_locked (demux, slot->track);
3385 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3386 gst_adaptive_demux_track_flush (slot->track);
3388 tmp = demux->priv->outputs = g_list_remove (demux->priv->outputs, slot);
3389 gst_adaptive_demux_output_slot_free (demux, slot);
3394 demux->priv->current_selection_seqnum = requested_selection_seqnum;
3395 msg = all_selected_tracks_are_active (demux, requested_selection_seqnum);
3397 TRACKS_UNLOCK (demux);
3398 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3399 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3400 TRACKS_LOCK (demux);
3404 /* TRACKS_LOCK held */
3406 gst_adaptive_demux_advance_output_period (GstAdaptiveDemux * demux)
3409 GstAdaptiveDemuxPeriod *previous_period;
3410 GstStreamCollection *collection;
3412 /* Grab the next period, should be demux->periods->next->data */
3413 previous_period = g_queue_pop_head (demux->priv->periods);
3415 /* Remove ref held by demux->output_period */
3416 gst_adaptive_demux_period_unref (previous_period);
3417 demux->output_period =
3418 gst_adaptive_demux_period_ref (g_queue_peek_head (demux->priv->periods));
3420 GST_DEBUG_OBJECT (demux, "Moved output to period %d",
3421 demux->output_period->period_num);
3423 /* We can now post the collection of the new period */
3424 collection = demux->output_period->collection;
3425 TRACKS_UNLOCK (demux);
3426 gst_element_post_message (GST_ELEMENT_CAST (demux),
3427 gst_message_new_stream_collection (GST_OBJECT (demux), collection));
3428 TRACKS_LOCK (demux);
3430 /* Unselect all tracks of the previous period */
3431 for (iter = previous_period->tracks; iter; iter = iter->next) {
3432 GstAdaptiveDemuxTrack *track = iter->data;
3433 if (track->selected) {
3434 track->selected = FALSE;
3435 track->draining = TRUE;
3439 /* Force a selection re-check */
3440 g_atomic_int_inc (&demux->priv->requested_selection_seqnum);
3441 check_and_handle_selection_update_locked (demux);
3443 /* Remove the final ref on the previous period now that we have done the switch */
3444 gst_adaptive_demux_period_unref (previous_period);
3449 /* Called with TRACKS_LOCK taken */
3451 handle_slot_pending_track_switch_locked (GstAdaptiveDemux * demux,
3454 GstAdaptiveDemuxTrack *track = slot->track;
3456 gboolean pending_is_ready;
3457 GstAdaptiveDemux2Stream *stream;
3459 /* If we have a pending track for this slot, the current track should be
3460 * draining and no longer selected */
3461 g_assert (track->draining && !track->selected);
3463 /* If we're draining, check if the pending track has enough data *or* that
3464 we've already drained out entirely */
3466 (slot->pending_track->level_time >=
3467 slot->pending_track->buffering_threshold);
3468 pending_is_ready |= slot->pending_track->eos;
3470 if (!pending_is_ready && gst_queue_array_get_length (track->queue) > 0) {
3471 GST_DEBUG_OBJECT (demux,
3472 "Replacement track '%s' doesn't have enough data for switching yet",
3473 slot->pending_track->stream_id);
3477 GST_DEBUG_OBJECT (demux,
3478 "Pending replacement track has enough data, switching");
3479 track->active = FALSE;
3480 track->draining = FALSE;
3482 /* If the stream feeding this track is stopped, flush and clear
3483 * the track now that it's going inactive. If the stream was not
3484 * found, it means we advanced past that period already (and the
3485 * stream was stopped and discarded) */
3486 stream = find_stream_for_track_locked (demux, track);
3487 if (stream != NULL && !gst_adaptive_demux2_stream_is_running (stream))
3488 gst_adaptive_demux_track_flush (track);
3490 gst_adaptive_demux_track_unref (track);
3491 /* We steal the reference of pending_track */
3492 track = slot->track = slot->pending_track;
3493 slot->pending_track = NULL;
3494 slot->track->active = TRUE;
3496 /* Make sure the track segment will start at the current position */
3497 track->update_next_segment = TRUE;
3499 /* Send stream start and collection, and schedule sticky events */
3500 gst_adaptive_demux_send_initial_events (demux, slot);
3502 /* Can we emit the streams-selected message now ? */
3504 all_selected_tracks_are_active (demux,
3505 g_atomic_int_get (&demux->priv->requested_selection_seqnum));
3507 TRACKS_UNLOCK (demux);
3508 GST_DEBUG_OBJECT (demux, "Posting streams-selected");
3509 gst_element_post_message (GST_ELEMENT_CAST (demux), msg);
3510 TRACKS_LOCK (demux);
3516 gst_adaptive_demux_output_loop (GstAdaptiveDemux * demux)
3519 GstClockTimeDiff global_output_position = GST_CLOCK_STIME_NONE;
3520 gboolean wait_for_data = FALSE;
3523 GST_DEBUG_OBJECT (demux, "enter");
3525 TRACKS_LOCK (demux);
3527 /* Check if stopping */
3528 if (demux->priv->flushing) {
3529 ret = GST_FLOW_FLUSHING;
3533 /* If the selection changed, handle it */
3534 check_and_handle_selection_update_locked (demux);
3538 global_output_position = GST_CLOCK_STIME_NONE;
3539 if (wait_for_data) {
3540 GST_DEBUG_OBJECT (demux, "Waiting for data");
3541 g_cond_wait (&demux->priv->tracks_add, &demux->priv->tracks_lock);
3542 GST_DEBUG_OBJECT (demux, "Done waiting for data");
3543 if (demux->priv->flushing) {
3544 ret = GST_FLOW_FLUSHING;
3547 wait_for_data = FALSE;
3550 /* Grab/Recalculate current global output position
3551 * This is the minimum pending output position of all tracks used for output
3553 * If there is a track which is empty and not EOS, wait for it to receive data
3554 * then recalculate global output position.
3556 * This also pushes downstream all non-timed data that might be present.
3558 * IF all tracks are EOS : stop task
3560 GST_LOG_OBJECT (demux, "Calculating global output position of output slots");
3561 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3562 OutputSlot *slot = (OutputSlot *) tmp->data;
3563 GstAdaptiveDemuxTrack *track;
3565 /* If there is a pending track, Check if it's time to switch to it */
3566 if (slot->pending_track)
3567 handle_slot_pending_track_switch_locked (demux, slot);
3569 track = slot->track;
3571 if (!track->active) {
3572 /* Note: Edward: I can't see in what cases we would end up with inactive
3573 tracks assigned to slots. */
3574 GST_ERROR_OBJECT (demux, "FIXME : Handle track switching");
3575 g_assert (track->active);
3579 if (track->next_position == GST_CLOCK_STIME_NONE) {
3580 gst_adaptive_demux_track_update_next_position (track);
3583 if (track->next_position != GST_CLOCK_STIME_NONE) {
3584 if (global_output_position == GST_CLOCK_STIME_NONE)
3585 global_output_position = track->next_position;
3587 global_output_position =
3588 MIN (global_output_position, track->next_position);
3589 track->waiting_add = FALSE;
3590 } else if (!track->eos) {
3591 GST_DEBUG_OBJECT (demux, "Need timed data on track %s (period %u)",
3592 track->stream_id, track->period_num);
3593 wait_for_data = track->waiting_add = TRUE;
3595 GST_DEBUG_OBJECT (demux,
3596 "Track %s (period %u) is EOS, not waiting for timed data",
3597 track->stream_id, track->period_num);
3604 if (global_output_position == GST_CLOCK_STIME_NONE
3605 && demux->output_period->closed) {
3606 GST_DEBUG_OBJECT (demux, "Period %d is drained, switching to next period",
3607 demux->output_period->period_num);
3608 if (!gst_adaptive_demux_advance_output_period (demux)) {
3609 /* Failed to move to next period, error out */
3610 ret = GST_FLOW_ERROR;
3613 /* Restart the loop */
3617 GST_DEBUG_OBJECT (demux, "Outputting data for position %" GST_STIME_FORMAT,
3618 GST_STIME_ARGS (global_output_position));
3622 * We know all active tracks have pending timed data
3623 * * while track next_position <= global output position
3624 * * push pending data
3625 * * Update track next_position
3626 * * recalculate global output position
3627 * * Pop next pending data from track and update pending position
3630 for (tmp = demux->priv->outputs; tmp; tmp = tmp->next) {
3631 OutputSlot *slot = (OutputSlot *) tmp->data;
3632 GstAdaptiveDemuxTrack *track = slot->track;
3634 GST_LOG_OBJECT (track->element,
3635 "active:%d draining:%d selected:%d next_position:%" GST_STIME_FORMAT
3636 " global_output_position:%" GST_STIME_FORMAT, track->active,
3637 track->draining, track->selected, GST_STIME_ARGS (track->next_position),
3638 GST_STIME_ARGS (global_output_position));
3643 while (global_output_position == GST_CLOCK_STIME_NONE
3644 || !slot->pushed_timed_data
3645 || ((track->next_position != GST_CLOCK_STIME_NONE)
3646 && track->next_position <= global_output_position)) {
3647 GstMiniObject *mo = track_dequeue_data_locked (demux, track, TRUE);
3650 GST_DEBUG_OBJECT (demux,
3651 "Track '%s' (period %u) doesn't have any pending data (eos:%d pushed_timed_data:%d)",
3652 track->stream_id, track->period_num, track->eos,
3653 slot->pushed_timed_data);
3654 /* This should only happen if the track is EOS, or exactly in between
3655 * the parser outputting segment/caps before buffers. */
3656 g_assert (track->eos || !slot->pushed_timed_data);
3660 demux_update_buffering_locked (demux);
3661 demux_post_buffering_locked (demux);
3662 TRACKS_UNLOCK (demux);
3664 GST_DEBUG_OBJECT (demux,
3665 "Track '%s' (period %u) dequeued %" GST_PTR_FORMAT, track->stream_id,
3666 track->period_num, mo);
3668 if (GST_IS_EVENT (mo)) {
3669 GstEvent *event = (GstEvent *) mo;
3670 if (GST_EVENT_TYPE (event) == GST_EVENT_GAP)
3671 slot->pushed_timed_data = TRUE;
3672 gst_pad_push_event (slot->pad, event);
3674 if (GST_EVENT_IS_STICKY (event))
3675 gst_event_store_mark_delivered (&track->sticky_events, event);
3676 } else if (GST_IS_BUFFER (mo)) {
3677 GstBuffer *buffer = (GstBuffer *) mo;
3679 if (track->output_discont) {
3680 if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DISCONT)) {
3681 buffer = gst_buffer_make_writable (buffer);
3682 GST_DEBUG_OBJECT (slot->pad,
3683 "track %s marking discont %" GST_PTR_FORMAT, track->stream_id,
3685 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3687 track->output_discont = FALSE;
3689 slot->flow_ret = gst_pad_push (slot->pad, buffer);
3691 gst_flow_combiner_update_pad_flow (demux->priv->flowcombiner,
3692 slot->pad, slot->flow_ret);
3693 GST_DEBUG_OBJECT (slot->pad,
3694 "track %s (period %u) push returned %s (combined %s)",
3695 track->stream_id, track->period_num,
3696 gst_flow_get_name (slot->flow_ret), gst_flow_get_name (ret));
3697 slot->pushed_timed_data = TRUE;
3699 GST_ERROR ("Unhandled miniobject %" GST_PTR_FORMAT, mo);
3702 TRACKS_LOCK (demux);
3703 gst_adaptive_demux_track_update_next_position (track);
3705 if (ret != GST_FLOW_OK)
3710 /* Store global output position */
3711 if (global_output_position != GST_CLOCK_STIME_NONE)
3712 demux->priv->global_output_position = global_output_position;
3714 if (global_output_position == GST_CLOCK_STIME_NONE) {
3715 if (!demux->priv->flushing) {
3716 GST_DEBUG_OBJECT (demux,
3717 "Pausing output task after reaching NONE global_output_position");
3718 gst_task_pause (demux->priv->output_task);
3722 TRACKS_UNLOCK (demux);
3723 GST_DEBUG_OBJECT (demux, "leave");
3728 GST_DEBUG_OBJECT (demux, "Pausing due to %s", gst_flow_get_name (ret));
3729 /* If the flushing flag is set, then the task is being
3730 * externally stopped, so don't go to pause(), otherwise we
3731 * should so we don't keep spinning */
3732 if (!demux->priv->flushing) {
3733 GST_DEBUG_OBJECT (demux, "Pausing task due to %s",
3734 gst_flow_get_name (ret));
3735 gst_task_pause (demux->priv->output_task);
3738 TRACKS_UNLOCK (demux);
3740 if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3741 GstEvent *eos = gst_event_new_eos ();
3743 if (ret != GST_FLOW_EOS) {
3744 GST_ELEMENT_FLOW_ERROR (demux, ret);
3747 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3748 if (demux->priv->segment_seqnum != GST_SEQNUM_INVALID)
3749 gst_event_set_seqnum (eos, demux->priv->segment_seqnum);
3750 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3752 gst_adaptive_demux_push_src_event (demux, eos);
3759 /* must be called from the scheduler */
3761 gst_adaptive_demux_is_live (GstAdaptiveDemux * demux)
3763 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3766 return klass->is_live (demux);
3770 /* must be called from the scheduler */
3772 gst_adaptive_demux2_stream_seek (GstAdaptiveDemux * demux,
3773 GstAdaptiveDemux2Stream * stream, gboolean forward, GstSeekFlags flags,
3774 GstClockTimeDiff ts, GstClockTimeDiff * final_ts)
3776 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3778 if (klass->stream_seek)
3779 return klass->stream_seek (stream, forward, flags, ts, final_ts);
3780 return GST_FLOW_ERROR;
3783 /* must be called from the scheduler */
3785 gst_adaptive_demux2_stream_has_next_fragment (GstAdaptiveDemux * demux,
3786 GstAdaptiveDemux2Stream * stream)
3788 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3789 gboolean ret = TRUE;
3791 if (klass->stream_has_next_fragment)
3792 ret = klass->stream_has_next_fragment (stream);
3797 /* must be called from the scheduler */
3799 * the ::finish_fragment() handlers when an *actual* fragment is done
3802 gst_adaptive_demux2_stream_advance_fragment (GstAdaptiveDemux * demux,
3803 GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3805 if (stream->last_ret != GST_FLOW_OK)
3806 return stream->last_ret;
3809 gst_adaptive_demux2_stream_advance_fragment_unlocked (demux, stream,
3812 return stream->last_ret;
3815 /* must be called with manifest_lock taken */
3816 static GstFlowReturn
3817 gst_adaptive_demux2_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
3818 GstAdaptiveDemux2Stream * stream, GstClockTime duration)
3820 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3823 g_return_val_if_fail (klass->stream_advance_fragment != NULL, GST_FLOW_ERROR);
3825 GST_LOG_OBJECT (stream,
3826 "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3827 GST_STIME_ARGS (stream->fragment.stream_time), GST_TIME_ARGS (duration));
3829 stream->download_error_count = 0;
3830 g_clear_error (&stream->last_error);
3833 /* FIXME - url has no indication of byte ranges for subsegments */
3834 /* FIXME: Reenable statistics sending? */
3835 gst_element_post_message (GST_ELEMENT_CAST (demux),
3836 gst_message_new_element (GST_OBJECT_CAST (demux),
3837 gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
3838 "manifest-uri", G_TYPE_STRING,
3839 demux->manifest_uri, "uri", G_TYPE_STRING,
3840 stream->fragment.uri, "fragment-start-time",
3841 GST_TYPE_CLOCK_TIME, stream->download_start_time,
3842 "fragment-stop-time", GST_TYPE_CLOCK_TIME,
3843 gst_util_get_timestamp (), "fragment-size", G_TYPE_UINT64,
3844 stream->download_total_bytes, "fragment-download-time",
3845 GST_TYPE_CLOCK_TIME, stream->last_download_time, NULL)));
3848 /* Don't update to the end of the segment if in reverse playback */
3849 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
3850 if (GST_CLOCK_TIME_IS_VALID (duration) && demux->segment.rate > 0) {
3851 stream->parse_segment.position += duration;
3852 stream->current_position += duration;
3854 GST_DEBUG_OBJECT (stream,
3855 "stream position now %" GST_TIME_FORMAT,
3856 GST_TIME_ARGS (stream->current_position));
3858 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);
3860 /* When advancing with a non 1.0 rate on live streams, we need to check
3861 * the live seeking range again to make sure we can still advance to
3863 if (demux->segment.rate != 1.0 && gst_adaptive_demux_is_live (demux)) {
3864 if (!gst_adaptive_demux2_stream_in_live_seek_range (demux, stream))
3867 ret = klass->stream_advance_fragment (stream);
3868 } else if (gst_adaptive_demux_is_live (demux)
3869 || gst_adaptive_demux2_stream_has_next_fragment (demux, stream)) {
3870 ret = klass->stream_advance_fragment (stream);
3875 stream->download_start_time =
3876 GST_TIME_AS_USECONDS (gst_adaptive_demux2_get_monotonic_time (demux));
3878 if (ret == GST_FLOW_OK) {
3879 GST_DEBUG_OBJECT (stream, "checking if stream requires bitrate change");
3880 if (gst_adaptive_demux2_stream_select_bitrate (demux, stream,
3881 gst_adaptive_demux2_stream_update_current_bitrate (demux,
3883 GST_DEBUG_OBJECT (stream, "Bitrate changed. Returning FLOW_SWITCH");
3884 stream->need_header = TRUE;
3885 ret = (GstFlowReturn) GST_ADAPTIVE_DEMUX_FLOW_SWITCH;
3892 /* must be called with manifest_lock taken */
3894 gst_adaptive_demux2_stream_select_bitrate (GstAdaptiveDemux *
3895 demux, GstAdaptiveDemux2Stream * stream, guint64 bitrate)
3897 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3899 if (klass->stream_select_bitrate)
3900 return klass->stream_select_bitrate (stream, bitrate);
3904 /* must be called with manifest_lock taken */
3906 gst_adaptive_demux2_stream_update_fragment_info (GstAdaptiveDemux * demux,
3907 GstAdaptiveDemux2Stream * stream)
3909 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3912 g_return_val_if_fail (klass->stream_update_fragment_info != NULL,
3915 /* Make sure the sub-class will update bitrate, or else
3917 stream->fragment.finished = FALSE;
3919 GST_LOG_OBJECT (stream, "position %" GST_TIME_FORMAT,
3920 GST_TIME_ARGS (stream->current_position));
3922 ret = klass->stream_update_fragment_info (stream);
3924 GST_LOG_OBJECT (stream, "ret:%s uri:%s",
3925 gst_flow_get_name (ret), stream->fragment.uri);
3926 if (ret == GST_FLOW_OK) {
3927 GST_LOG_OBJECT (stream,
3928 "stream_time %" GST_STIME_FORMAT " duration:%" GST_TIME_FORMAT,
3929 GST_STIME_ARGS (stream->fragment.stream_time),
3930 GST_TIME_ARGS (stream->fragment.duration));
3931 GST_LOG_OBJECT (stream,
3932 "range start:%" G_GINT64_FORMAT " end:%" G_GINT64_FORMAT,
3933 stream->fragment.range_start, stream->fragment.range_end);
3939 /* must be called with manifest_lock taken */
3941 gst_adaptive_demux2_stream_get_fragment_waiting_time (GstAdaptiveDemux *
3942 demux, GstAdaptiveDemux2Stream * stream)
3944 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3946 if (klass->stream_get_fragment_waiting_time)
3947 return klass->stream_get_fragment_waiting_time (stream);
3952 handle_manifest_download_complete (DownloadRequest * request,
3953 DownloadRequestState state, GstAdaptiveDemux * demux)
3955 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
3957 GstFlowReturn result;
3959 g_free (demux->manifest_base_uri);
3960 g_free (demux->manifest_uri);
3962 if (request->redirect_permanent && request->redirect_uri) {
3963 demux->manifest_uri = g_strdup (request->redirect_uri);
3964 demux->manifest_base_uri = NULL;
3966 demux->manifest_uri = g_strdup (request->uri);
3967 demux->manifest_base_uri = g_strdup (request->redirect_uri);
3970 buffer = download_request_take_buffer (request);
3972 /* We should always have a buffer since this function is the non-error
3973 * callback for the download */
3976 result = klass->update_manifest_data (demux, buffer);
3977 gst_buffer_unref (buffer);
3979 /* FIXME: Should the manifest uri vars be reverted to original
3980 * values if updating fails? */
3982 if (result == GST_FLOW_OK) {
3983 GstClockTime duration;
3984 /* Send an updated duration message */
3985 duration = klass->get_duration (demux);
3986 if (duration != GST_CLOCK_TIME_NONE) {
3987 GST_DEBUG_OBJECT (demux,
3988 "Sending duration message : %" GST_TIME_FORMAT,
3989 GST_TIME_ARGS (duration));
3990 gst_element_post_message (GST_ELEMENT (demux),
3991 gst_message_new_duration_changed (GST_OBJECT (demux)));
3993 GST_DEBUG_OBJECT (demux,
3994 "Duration unknown, can not send the duration message");
3997 /* If a manifest changes it's liveness or periodic updateness, we need
3998 * to start/stop the manifest update task appropriately */
3999 /* Keep this condition in sync with the one in
4000 * gst_adaptive_demux_start_manifest_update_task()
4002 if (gst_adaptive_demux_is_live (demux) &&
4003 klass->requires_periodical_playlist_update (demux)) {
4004 gst_adaptive_demux_start_manifest_update_task (demux);
4006 gst_adaptive_demux_stop_manifest_update_task (demux);
4012 handle_manifest_download_failure (DownloadRequest * request,
4013 DownloadRequestState state, GstAdaptiveDemux * demux)
4015 GST_FIXME_OBJECT (demux, "Manifest download failed.");
4016 /* Retry or error out here */
4019 static GstFlowReturn
4020 gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux)
4022 DownloadRequest *request;
4023 GstFlowReturn ret = GST_FLOW_OK;
4024 GError *error = NULL;
4026 request = download_request_new_uri (demux->manifest_uri);
4028 download_request_set_callbacks (request,
4029 (DownloadRequestEventCallback) handle_manifest_download_complete,
4030 (DownloadRequestEventCallback) handle_manifest_download_failure,
4033 if (!downloadhelper_submit_request (demux->download_helper, NULL,
4034 DOWNLOAD_FLAG_COMPRESS | DOWNLOAD_FLAG_FORCE_REFRESH, request,
4037 GST_ELEMENT_WARNING (demux, RESOURCE, FAILED,
4038 ("Failed to download manifest: %s", error->message), (NULL));
4039 g_clear_error (&error);
4041 ret = GST_FLOW_NOT_LINKED;
4047 /* must be called with manifest_lock taken */
4049 gst_adaptive_demux_update_manifest (GstAdaptiveDemux * demux)
4051 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4054 ret = klass->update_manifest (demux);
4060 gst_adaptive_demux2_stream_fragment_clear (GstAdaptiveDemux2StreamFragment * f)
4067 g_free (f->header_uri);
4068 f->header_uri = NULL;
4069 f->header_range_start = 0;
4070 f->header_range_end = -1;
4072 g_free (f->index_uri);
4073 f->index_uri = NULL;
4074 f->index_range_start = 0;
4075 f->index_range_end = -1;
4077 f->stream_time = GST_CLOCK_STIME_NONE;
4078 f->duration = GST_CLOCK_TIME_NONE;
4079 f->finished = FALSE;
4082 /* must be called with manifest_lock taken */
4084 gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux)
4086 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4087 gboolean ret = FALSE;
4089 if (klass->has_next_period)
4090 ret = klass->has_next_period (demux);
4091 GST_DEBUG_OBJECT (demux, "Has next period: %d", ret);
4095 /* must be called with manifest_lock taken */
4097 gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux)
4099 GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
4100 GstAdaptiveDemuxPeriod *previous_period = demux->input_period;
4102 g_return_if_fail (klass->advance_period != NULL);
4104 GST_DEBUG_OBJECT (demux, "Advancing to next period");
4105 /* FIXME : no return value ? What if it fails ? */
4106 klass->advance_period (demux);
4108 if (previous_period == demux->input_period) {
4109 GST_ERROR_OBJECT (demux, "Advancing period failed");
4113 /* Stop the previous period stream tasks */
4114 gst_adaptive_demux_period_stop_tasks (previous_period);
4116 gst_adaptive_demux_update_collection (demux, demux->input_period);
4117 /* Figure out a pre-emptive selection based on the output period selection */
4118 gst_adaptive_demux_period_transfer_selection (demux, demux->input_period,
4119 demux->output_period);
4121 gst_adaptive_demux_prepare_streams (demux, FALSE);
4122 gst_adaptive_demux_start_tasks (demux);
4126 * gst_adaptive_demux_get_monotonic_time:
4127 * Returns: a monotonically increasing time, using the system realtime clock
4130 gst_adaptive_demux2_get_monotonic_time (GstAdaptiveDemux * demux)
4132 g_return_val_if_fail (demux != NULL, GST_CLOCK_TIME_NONE);
4133 return gst_adaptive_demux_clock_get_time (demux->realtime_clock);
4137 * gst_adaptive_demux_get_client_now_utc:
4138 * @demux: #GstAdaptiveDemux
4139 * Returns: the client's estimate of UTC
4141 * Used to find the client's estimate of UTC, using the system realtime clock.
4144 gst_adaptive_demux2_get_client_now_utc (GstAdaptiveDemux * demux)
4146 return gst_adaptive_demux_clock_get_now_utc (demux->realtime_clock);
4150 * gst_adaptive_demux_is_running
4151 * @demux: #GstAdaptiveDemux
4152 * Returns: whether the demuxer is processing data
4154 * Returns FALSE if shutdown has started (transitioning down from
4155 * PAUSED), otherwise TRUE.
4158 gst_adaptive_demux2_is_running (GstAdaptiveDemux * demux)
4160 return g_atomic_int_get (&demux->running);
4164 * gst_adaptive_demux_get_qos_earliest_time:
4166 * Returns: The QOS earliest time
4171 gst_adaptive_demux2_get_qos_earliest_time (GstAdaptiveDemux * demux)
4173 GstClockTime earliest;
4175 GST_OBJECT_LOCK (demux);
4176 earliest = demux->priv->qos_earliest_time;
4177 GST_OBJECT_UNLOCK (demux);
4183 gst_adaptive_demux2_add_stream (GstAdaptiveDemux * demux,
4184 GstAdaptiveDemux2Stream * stream)
4186 g_return_val_if_fail (demux && stream, FALSE);
4188 /* FIXME : Migrate to parent */
4189 g_return_val_if_fail (stream->demux == NULL, FALSE);
4191 GST_DEBUG_OBJECT (demux, "Adding stream %s", GST_OBJECT_NAME (stream));
4193 TRACKS_LOCK (demux);
4194 if (demux->input_period->prepared) {
4195 GST_ERROR_OBJECT (demux,
4196 "Attempted to add streams but no new period was created");
4197 TRACKS_UNLOCK (demux);
4200 stream->demux = demux;
4201 stream->period = demux->input_period;
4202 demux->input_period->streams =
4203 g_list_append (demux->input_period->streams, stream);
4205 if (stream->tracks) {
4207 for (iter = stream->tracks; iter; iter = iter->next)
4208 if (!gst_adaptive_demux_period_add_track (demux->input_period,
4209 (GstAdaptiveDemuxTrack *) iter->data)) {
4210 GST_ERROR_OBJECT (demux, "Failed to add track elements");
4211 TRACKS_UNLOCK (demux);
4215 TRACKS_UNLOCK (demux);
4219 /* Return the current playback rate including any instant rate multiplier */
4221 gst_adaptive_demux_play_rate (GstAdaptiveDemux * demux)
4224 GST_ADAPTIVE_DEMUX_SEGMENT_LOCK (demux);
4225 rate = demux->segment.rate * demux->instant_rate_multiplier;
4226 GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK (demux);